diff --git a/quickstats/_version.py b/quickstats/_version.py index f58bfc6a7da3511a5610999df62199383aafe0fe..ca046e97da0e73e600309c7c83f354cd68392f56 100644 --- a/quickstats/_version.py +++ b/quickstats/_version.py @@ -1 +1 @@ -__version__ = "0.7.0.0" +__version__ = "0.7.0.1" diff --git a/quickstats/components/processors/roo_processor.py b/quickstats/components/processors/roo_processor.py index 028847472f0402608ac6f38356ca5137f69c03ca..fdc9c17ba950301890ff23a8ffdb6bdf4355ba6d 100644 --- a/quickstats/components/processors/roo_processor.py +++ b/quickstats/components/processors/roo_processor.py @@ -11,7 +11,9 @@ from .roo_process_config import RooProcessConfig from quickstats import Timer, AbstractObject, PathManager from quickstats.interface.root import TFile, RDataFrame, RDataFrameBackend -from quickstats.utils.root_utils import declare_expression, close_all_root_files, set_cachedir +from quickstats.interface.xrootd import get_cachedir, set_cachedir, switch_cachedir +from quickstats.utils.root_utils import declare_expression, close_all_root_files +from quickstats.utils.path_utils import is_remote_path class RooProcessor(AbstractObject): @@ -47,11 +49,15 @@ class RooProcessor(AbstractObject): else: self.backend = RDataFrameBackend.parse(backend) self.backend_options = backend_options + self.set_remote_file_options(localize=False, + cachedir=get_cachedir()) self.load_buildin_functions() if multithread: ROOT.EnableImplicitMT() + elif ROOT.IsImplicitMTEnabled(): + ROOT.DisableImplicitMT() if config_source is not None: self.load_config(config_source) @@ -59,8 +65,16 @@ class RooProcessor(AbstractObject): def set_cache(self, cache:bool=True): self.cache = cache - def set_remote_cachedir(self, cachedir:Optional[str]=None): - self.set_cachedir(cachedir, forcecache=True) + def set_remote_file_options(self, localize:bool=False, + cache:bool=True, cachedir:Optional[str]="/tmp", + copy_options:Optional[Dict]=None): + remote_file_options = { + 'localize': localize, + 'cache': cache, + 'cachedir': cachedir, + 'copy_options': copy_options + } + self.remote_file_options = remote_file_options def load_buildin_functions(self): # bug of redefining module from ROOT @@ -155,25 +169,50 @@ class RooProcessor(AbstractObject): return None def list_files(self, filenames:List[str], resolve_cache:bool=True): - return TFile.list_files(filenames, resolve_cache=True) + cachedir = self.remote_file_options['cachedir'] + with switch_cachedir(cachedir): + files = TFile.list_files(filenames, resolve_cache=resolve_cache) + return files + def _fetch_remote_files(self, filenames:List[str]): + opts = self.remote_file_options + copy_options = opts.get('copy_options', None) + if copy_options is None: + copy_options = {} + TFile.fetch_remote_files(filenames, cache=opts['cache'], + cachedir=opts['cachedir'], + **copy_options) + def load_rdataframe(self, filenames:Union[List[str], str], treename:Optional[str]=None): + if treename is None: treename = self.default_treename + if treename is None: raise RuntimeError("treename is undefined") + filenames = self.list_files(filenames, resolve_cache=True) + if not filenames: self.stdout.info('No files to be processed. Skipped.') return None + + has_remote_file = any(is_remote_path(filename) for filename in filenames) + # copy remote files to local storage + if has_remote_file and self.remote_file_options['localize']: + remote_files = [filename for filename in filenames if is_remote_path(filename)] + self._fetch_remote_files(remote_files) + filenames = self.list_files(filenames, resolve_cache=True) + if len(filenames) == 1: self.stdout.info(f'Processing file "{filenames[0]}".') else: self.stdout.info("Professing files") for filename in filenames: self.stdout.info(f' "{filename}"', bare=True) + rdf = RDataFrame.from_files(filenames, treename=treename, backend=self.backend, backend_options=self.backend_options, diff --git a/quickstats/interface/__init__.py b/quickstats/interface/__init__.py index ca12ec81315b668e35a10f0e080f67202f778467..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/quickstats/interface/__init__.py +++ b/quickstats/interface/__init__.py @@ -1,2 +0,0 @@ -import quickstats.interface.cppyy -import quickstats.interface.root \ No newline at end of file diff --git a/quickstats/interface/root/TFile.py b/quickstats/interface/root/TFile.py index a66ef6716460264fe4e68a865aa02db3ed8ddc79..2647383354861a71a48f3e80ad117ecb09e6732b 100644 --- a/quickstats/interface/root/TFile.py +++ b/quickstats/interface/root/TFile.py @@ -6,8 +6,12 @@ import glob import numpy as np from quickstats import semistaticmethod -from quickstats.utils.path_utils import resolve_paths -from quickstats.utils.root_utils import get_cachedir, set_cachedir, is_corrupt +from quickstats.utils.path_utils import (resolve_paths, is_remote_path, remote_glob, + remote_isdir, remote_dirlist, dirlist, + local_file_exist, split_url) +from quickstats.utils.root_utils import is_corrupt +from quickstats.utils.common_utils import in_notebook +from quickstats.interface.xrootd import get_cachedir from .TObject import TObject class TFile(TObject): @@ -32,9 +36,23 @@ class TFile(TObject): @semistaticmethod def _requires_protocol(self, filename:str): return "://" in filename + + @semistaticmethod + def _filter_valid_filenames(self, filenames:List[str]): + filenames = [filename for filename in filenames if self._is_valid_filename(filename)] + return filenames + + @semistaticmethod + def _get_cache_path(self, path:str, cachedir:str="/tmp"): + host, filename = split_url(path) + filename = filename.lstrip('/.~') + cache_path = os.path.join(cachedir, filename) + return cache_path @semistaticmethod - def _resolve_cached_remote_paths(self, paths:List[str]): + def _resolve_cached_remote_paths(self, paths:List[str], + strict_format:Optional[bool]=True, + cached_only:bool=False): import ROOT cachedir = get_cachedir() if cachedir is None: @@ -49,42 +67,57 @@ class TFile(TObject): filename = url.GetFile().lstrip("/") cache_path = os.path.join(cachedir, filename) if os.path.exists(cache_path): - resolved_paths.append(cache_path) - else: + if os.path.isdir(cache_path): + cache_paths = dirlist(cache_path) + if strict_format: + cache_paths = self._filter_valid_filenames(cache_paths) + if not cache_paths: + if not cached_only: + resolved_paths.append(path) + continue + resolved_paths.extend(cache_paths) + else: + resolved_paths.append(cache_path) + elif not cached_only: resolved_paths.append(path) - return resolved_paths + return resolved_paths @semistaticmethod def list_files(self, paths:Union[List[str], str], strict_format:Optional[bool]=True, - resolve_cache:bool=False): + resolve_cache:bool=False, + expand_remote_files:bool=True): paths = resolve_paths(paths) filenames = [] # expand directories if necessary for path in paths: + if is_remote_path(path): + if local_file_exist(path): + host, path = split_url(path) + else: + if expand_remote_files and remote_isdir(path): + filenames.extend(remote_dirlist(path)) + else: + filenames.append(path) + continue if os.path.isdir(path): - filenames.extend(glob.glob(os.path.join(path, "*"))) + filenames.extend(dirlist(path)) else: filenames.append(path) if strict_format: - filenames = [filename for filename in filenames if self._is_valid_filename(filename)] + filenames = self._filter_valid_filenames(filenames) if not filenames: return [] if resolve_cache: - filenames = self._resolve_cached_remote_paths(filenames) + filenames = self._resolve_cached_remote_paths(filenames) import ROOT invalid_filenames = [] valid_filenames = [] for filename in filenames: - if self._requires_protocol(filename): - url = ROOT.TUrl(path) - local_filename = url.GetFile() - # file already accessible, no transfer protocol needed - if os.path.exists(local_filename): - filename = local_filename - else: - valid_filenames.append(filename) - continue + if is_remote_path(filename): + # delay the check of remote root file to when they are open + valid_filenames.append(filename) + continue try: rfile = ROOT.TFile(filename) if self.is_corrupt(rfile): @@ -140,24 +173,36 @@ class TFile(TObject): return tree @semistaticmethod - def fetch_remote_file(self, name:str, + def fetch_remote_files(self, paths:Union[str, List[str]], + cache:bool=True, cachedir:str="/tmp", - forcecache:bool=False): - if not self._requires_protocol(name): - self.stdout.warning(f"Not a remote file: {name}. Skipping.") - return None - import ROOT - set_cachedir(cachedir, forcecache=True) - ROOT.TFile.Open(name) - set_cachedir(cachedir, forcecache=False) - - @semistaticmethod - def fetch_remote_files(self, names:List[str], - cachedir:str="/tmp", - forcecache:bool=False): - for name in names: - self.fetch_remote_file(name, cachedir=cachedir, - forcecache=forcecache) + **kwargs): + if isinstance(paths, str): + paths = [paths] + remote_paths = [] + for path in paths: + if not is_remote_path(path): + self.stdout.warning(f"Not a remote file: {path}. Skipped.") + continue + if local_file_exist(path): + self.stdout.warning(f"Remote file {path} can be accessed locally. Skipped.") + continue + remote_paths.append(path) + filenames = self.list_files(remote_paths, resolve_cache=cache, + expand_remote_files=True) + cached_files = [filename for filename in filenames if not is_remote_path(filename)] + files_to_fetch = [filename for filename in filenames if is_remote_path(filename)] + if cached_files: + self.stdout.info(f'Cached remote file(s):\n' + '\n'.join(cached_files)) + from quickstats.interface.xrootd.utils import copy_files + src, dst = [], [] + for file in files_to_fetch: + src.append(file) + dst.append(self._get_cache_path(file)) + if src: + self.stdout.info(f'Fetching remote file(s):\n' + '\n'.join(src)) + self.stdout.info(f'Destination(s):\n' + '\n'.join(dst)) + copy_files(src, dst, force=not cache, **kwargs) def close(self): self.obj.Close() diff --git a/quickstats/interface/xrootd/__init__.py b/quickstats/interface/xrootd/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..fa049b6d52d13e277689f065d570d27794cf666c --- /dev/null +++ b/quickstats/interface/xrootd/__init__.py @@ -0,0 +1 @@ +from .core import get_cachedir, set_cachedir, switch_cachedir \ No newline at end of file diff --git a/quickstats/interface/xrootd/core.py b/quickstats/interface/xrootd/core.py new file mode 100644 index 0000000000000000000000000000000000000000..0db715cfc27ff12096f9ed0dc3f3a81f40ce7cb6 --- /dev/null +++ b/quickstats/interface/xrootd/core.py @@ -0,0 +1,22 @@ +from contextlib import contextmanager + +class Setting: + CACHEDIR = None + +def get_cachedir(): + return Setting.CACHEDIR + +def set_cachedir(dirname:str=None): + Setting.CACHEDIR = dirname + + +@contextmanager +def switch_cachedir(dirname:str): + try: + tmp_cachedir = get_cachedir() + set_cachedir(dirname) + yield None + except Exception as e: + pass + finally: + set_cachedir(tmp_cachedir) \ No newline at end of file diff --git a/quickstats/interface/xrootd/utils.py b/quickstats/interface/xrootd/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..a907076855c414087990f71529809b0937cc8e44 --- /dev/null +++ b/quickstats/interface/xrootd/utils.py @@ -0,0 +1,10 @@ +from typing import List +from XRootD.client import CopyProcess + +#https://xrootd.slac.stanford.edu/doc/python/xrootd-python-0.1.0/modules/client/copyprocess.html +def copy_files(src:List[str], dst:List[str], force:bool=False, **kwargs): + copy_process = CopyProcess() + for src_i, dst_i in zip(src, dst): + copy_process.add_job(src_i, dst_i, force=force, **kwargs) + copy_process.prepare() + copy_process.run() \ No newline at end of file diff --git a/quickstats/utils/path_utils.py b/quickstats/utils/path_utils.py index 9ed8fce9af7a1ba989b32904b55eec4f57a08f12..2a3e5f20ff87d9c061de87fdcbacb6b42f8a2634 100644 --- a/quickstats/utils/path_utils.py +++ b/quickstats/utils/path_utils.py @@ -1,10 +1,89 @@ +import os +import sys import glob from typing import List, Union from pathlib import Path +if sys.version_info[0] > 2: + from urllib.parse import urlparse +else: + from urlparse import urlparse + from .string_utils import split_str +FILESYSTEM_TO = {} + +def split_url(url): + parsed_uri = urlparse(url) + domain = '{uri.scheme}://{uri.netloc}/'.format(uri=parsed_uri) + path = parsed_uri.path + if path.startswith("//"): + path = path[1:] + return domain, path + +def is_remote_path(path:str): + return "://" in path + +def is_xrootd_path(path:str): + return "root://" in path + +def remote_glob(path:str): + # can only glob xrootd path + if not is_xrootd_path(path): + return path + import XRootD.client.glob_funcs as glob + return glob.glob(path) + +def get_filesystem(host:str): + if host in FILESYSTEM_TO: + return FILESYSTEM_TO[host] + from XRootD.client import FileSystem + FILESYSTEM_TO[host] = FileSystem(host) + return get_filesystem(host) + +def remote_isdir(dirname:str, timeout:int=0): + # can only list xrootd dir + if not is_xrootd_path(dirname): + return None + from XRootD.client import FileSystem + host, path = split_url(dirname) + query = get_filesystem(host) + if not query: + raise RuntimeError("Cannot prepare xrootd query") + status, dirlist = query.dirlist(path, timeout=timeout) + return not status.error + #return len(remote_glob(os.path.join(dirname, "*"))) > 0 + +def remote_dirlist(dirname:str): + # can only list xrootd dir + if not is_xrootd_path(dirname): + return [] + return remote_glob(os.path.join(dirname, "*")) + +def dirlist(dirname:str): + return glob.glob(os.path.join(dirname, "*")) + +def local_file_exist(path:str): + if os.path.exists(path): + return True + if is_xrootd_path(path): + host, path = split_url(path) + return local_file_exist(path) + return False + +def remote_file_exist(path:str, timeout:int=0): + # can not stat non-xrootd file for now + if not is_xrootd_path(path): + return None + from XRootD.client import FileSystem + host, path = split_url(path) + query = get_filesystem(host) + if not query: + raise RuntimeError("Cannot prepare xrootd query") + status, _ = query.stat(path, timeout=timeout) + return not status.error + def resolve_paths(paths:Union[str, List[str]], sep:str=","): if isinstance(paths, str): @@ -13,7 +92,11 @@ def resolve_paths(paths:Union[str, List[str]], resolved_paths = [] for path in paths: if "*" in path: - resolved_paths.extend(glob.glob(path)) + if is_remote_path(path): + glob_paths = remote_glob(path) + else: + glob_paths = glob.glob(path) + resolved_paths.extend(glob_paths) else: resolved_paths.append(path) return resolved_paths