Source code for snappy_pipeline.find_file

# -*- coding: utf-8 -*-
"""Code for crawling the file system and caching the results"""

import json
import logging
import os
import sys
from collections import OrderedDict
from fnmatch import fnmatch

from fasteners import InterProcessLock

__author__ = "Manuel Holtgrewe <manuel.holtgrewe@bih-charite.de>"


[docs] class FileNamesTooDifferent(Exception): """Raised when two file names are too different to be PE reads"""
[docs] class PatternSet: """Store named or unnamed list of patterns""" def __init__(self, patterns, names=None): #: Patterns to search for with names self.patterns = tuple(patterns) #: Optional names self.names = tuple(names or []) if self.names and len(self.names) != len(self.patterns): raise ValueError( # pragma: no cover "Names must be empty or have the same length as files ({} vs. {})".format( self.names, self.patterns ) ) #: Named patterns, if any, else ``None`` self.named_patterns = None if self.names: self.named_patterns = OrderedDict(zip(names, patterns)) def __str__(self): return "PatternSet({}, {})".format(self.patterns, self.names) def __repr__(self): return str(self)
[docs] class FileSystemCrawlerResult: """n-tuple of optionally named files""" def __init__(self, base_folder, files, names=None): #: Folder to start crawling in self.base_folder = base_folder #: Patterns to search for self.files = tuple(files) #: Names for the file patterns, optional; if given has to have the same length as files self.names = tuple(names or []) if self.names and len(self.names) != len(self.files): raise ValueError( # pragma: no cover "Names must be empty or have the same length as files ({} vs. {})".format( self.names, self.files ) ) #: Dict with name-to-pattern mapping, ``None`` if ``names`` is not given self.named_files = None if self.names: self.named_files = OrderedDict(zip(names, files))
[docs] def to_dict(self): """Convert to dict, can only work if self.names and self.files is given""" # TODO: remove? if not self.names: raise ValueError("No names, cannot convert to dict") return dict(zip(self.names, self.files))
def __str__(self): tpl = "FileSystemCrawlerResult({})" return tpl.format( ", ".join(map(repr, (self.base_folder, self.files, self.names, self.named_files))) ) def __repr__(self): return str(self)
[docs] class FileSystemCrawler: """Crawl the file system - start crawling the file system from a given directory - look for files matching a given ``PatternSet`` - that are below a directory with a given name """ cache_version = 1 def __init__(self, cache_path, invalidation_paths, lock_timeout=60): #: The logger to use. self.logger = logging.getLogger("file_crawler") #: Path to cache (will be stored in JSON format) self.cache_path = cache_path #: Path to files to use for checking invalidation. self.invalidation_paths = invalidation_paths #: The actual dict with the cache, loaded from path to ``cache_path`` if the cache file #: exists. self.cache = None #: Flag whether cache has been modified and needs saving self.cache_dirty = False #: Flag whether cache has been invalidated already. self.cache_invalidated = False #: Timeout for obtaining file system lock on the file system self.lock_timeout = lock_timeout if os.path.exists(self.cache_path): self.cache_invalidated = False self.cache = self._load_cache() else: self._set_fresh_cache() def _set_fresh_cache(self): """Set cache to a fresh state.""" self.cache_invalidated = True self.cache_dirty = True self.cache = {"cache_version": self.__class__.cache_version, "root_dirs": {}}
[docs] def run(self, root_dir, dir_name, pattern_sets, allow_empty_right): """Perform the file system crawling from a root directory given a query pattern set ``allow_empty_right`` -- for mixed PE/SE read data sets (must be either SE or PE for one library!) """ matches = {} # path => pattern set idx => pattern idx => [path] returned = 0 # from {0, 1, 2}; how many patterns matched? # Invalidate cache fully if the cache file is older than any one of self.invalidation_paths self._perform_cache_invalidation() # Ensure that cache entry with crawling results of all files exists if root_dir not in self.cache["root_dirs"]: self.cache_dirty = True self.cache["root_dirs"][root_dir] = tuple(sorted(self._build_cache(root_dir))) # Now, crawl over this structure and match against all pattern sets self.logger.debug('Crawling "%s" for dir_name "%s"', root_dir, dir_name) for i, pattern_set in enumerate(pattern_sets): self.logger.debug(" patterns in pattern set #%d: %s", i, pattern_set.patterns) for path in self.cache["root_dirs"][root_dir]: if dir_name not in path: continue idx = path.index(dir_name) left, right = path[: idx + 1], path[idx + 1 :] for i, pattern_set in enumerate(pattern_sets): for j, pattern in enumerate(pattern_set.patterns): does_match = fnmatch("/".join(right), pattern) self.logger.debug( 'does "%s" match "%s" match? => %s', "/".join(right), pattern, does_match ) if does_match: matches.setdefault("/".join(left), {}).setdefault(i, {}).setdefault( j, [] ).append("/".join(right)) # Go over results and check whether they are conforming. for path, path_matches in matches.items(): for set_idx, set_matches in path_matches.items(): # Must have a match for each pattern if not allow_empty_right and len(set_matches) != len( pattern_sets[set_idx].patterns ): print( ( "WARNING: Skipping matches {} as the number of matches is not equal to " "the number of patterns in {}" ).format(set_matches, pattern_sets[set_idx].patterns), file=sys.stderr, ) continue # Guard against mixing SE and PE results for crawling if returned: if returned != len(set_matches): raise ValueError( # pragma: no cover "Found mixed SE and PE data for one library!" ) else: returned = len(set_matches) # Must have the same number of matches for each pattern lst_lens = [len(i) for i in set_matches.values()] if len(set(lst_lens)) != 1: raise ValueError( # pragma: no cover "Must have the same number of matches per pattern, but found {}".format( list(set_matches.values()) ) ) # Yield result, checking that file names are equal except for one character (e.g., # "R1" vs "R2")" for i in range(0, lst_lens[0]): files = [ os.path.join(root_dir, path, match[i]) for match in set_matches.values() ] self._check_fname_mismatches(files) base_path = os.path.join(root_dir, path) yield FileSystemCrawlerResult( base_path, files, pattern_sets[set_idx].names[:returned] )
def _perform_cache_invalidation(self): """Check whether the cache needs to be invalidated and do so if necessary.""" if self.cache_invalidated: return # Cache has been invalidated before if not os.path.exists(self.cache_path): return # No cache yet self.logger.debug("Checking whether file crawler cache should be invalidated...") cache_ctime = os.path.getctime(self.cache_path) for path in self.invalidation_paths: path_mtime = os.path.getmtime(path) if path_mtime > cache_ctime: self.logger.info("Invalidating cache because of %s", path) self._set_fresh_cache() return self.logger.debug(" => no, not invalidating cache") @classmethod def _check_fname_mismatches(cls, file_names): for i in range(0, len(file_names)): for j in range(0, i): a = file_names[i] b = file_names[j] mismatches = 0 if len(a) != len(b): raise FileNamesTooDifferent( # pragma: no cover "File names have different length {} vs {}".format(a, b) ) for x, y in zip(a, b): mismatches += int(x != y) if mismatches > 1: raise FileNamesTooDifferent( # pragma: no cover "File names too different ({} mismatches) {} vs {}".format(mismatches, a, b) ) def _build_cache(self, root_dir): self.logger.info("Building file system crawler cache from %s", root_dir) for root, _, files in os.walk(root_dir, followlinks=True): self.logger.debug("Caching for directory %s", root) base = root[len(root_dir) + 1 :].split("/") or ["."] yield from (tuple(base + [f]) for f in files)
[docs] def save_cache(self, cache_path=None): """Save cache, ``cache_path`` overriding ``self.cache_path``""" if not (self.cache_dirty or self.cache_invalidated): return # don't save if unchanged cache_path = cache_path or self.cache_path with InterProcessLock(self.cache_path + ".lock"): self.logger.debug("Saving file system crawler cache to %s", cache_path) with open(cache_path, "wt") as f: json.dump(self.cache, f)
def _load_cache(self): with InterProcessLock(self.cache_path + ".lock"): self.logger.info("Loading file system crawler cache from %s", self.cache_path) with open(self.cache_path, "rt") as f: result = json.loads(f.read(), object_pairs_hook=OrderedDict) if result["cache_version"] != self.__class__.cache_version: raise ValueError( # pragma: no cover "Invalid cache version {}".format(result["cache_version"]) ) return result