# -*- coding: utf-8 -*-
"""Base classes for the actual pipeline steps"""
import contextlib
import datetime
import logging
import os
import os.path
import sys
import tempfile
import typing
from collections import OrderedDict
from collections.abc import MutableMapping
from fnmatch import fnmatch
from functools import lru_cache
from io import StringIO
from typing import Any, Callable
import attr
import pydantic
import ruamel.yaml as ruamel_yaml
import snakemake
from biomedsheets import io_tsv
from biomedsheets.io import SheetBuilder, json_loads_ordered
from biomedsheets.models import SecondaryIDNotFoundException
from biomedsheets.naming import NAMING_SCHEMES, name_generator_for_scheme
from biomedsheets.ref_resolver import RefResolver
from biomedsheets.shortcuts import (
ShortcutSampleSheet,
donor_has_dna_ngs_library,
write_pedigree_to_ped,
write_pedigrees_to_ped,
)
from snakemake.io import InputFiles, OutputFiles, Wildcards, touch
from snappy_pipeline.base import (
MissingConfiguration,
UnsupportedActionException,
merge_kwargs,
print_config,
print_sample_sheets,
snakefile_path,
)
from snappy_pipeline.find_file import FileSystemCrawler, PatternSet
from snappy_pipeline.models import SnappyStepModel
from snappy_pipeline.utils import dictify, listify
from snappy_pipeline.workflows.abstract.pedigree import append_pedigree_to_ped
from snappy_wrappers.resource_usage import ResourceUsage
#: String constant with bash command for redirecting stderr to ``{log}`` file
STDERR_TO_LOG_FILE = r"""
# -----------------------------------------------------------------------------
# Redirect stderr to log file and enable printing executed commands
exec 2> >(tee -a "{log}")
set -x
# -----------------------------------------------------------------------------
""".lstrip()
[docs]
@contextlib.contextmanager
def modified_environ(*remove, **update):
"""
Temporarily updates the ``os.environ`` dictionary in-place.
The ``os.environ`` dictionary is updated in-place so that the modification
is sure to work in all situations.
:param remove: Environment variables to remove.
:param update: Dictionary of environment variables and values to add/update.
Source: https://stackoverflow.com/a/34333710/84349
"""
env = os.environ
update = update or {}
remove = remove or []
# List of environment variables being updated or removed.
stomped = (set(update.keys()) | set(remove)) & set(env.keys())
# Environment variables and values to restore on exit.
update_after = {k: env[k] for k in stomped}
# Environment variables and values to remove on exit.
remove_after = frozenset(k for k in update if k not in env)
try:
env.update(update)
[env.pop(k, None) for k in remove]
yield
finally:
env.update(update_after)
[env.pop(k) for k in remove_after]
[docs]
class ImplementationUnavailableError(NotImplementedError):
"""Raised when a function that is to be overridden optionally is called
This is provided as an alternative to ``NotImplementedError`` as the Python linters warn if
a class does not override functions throwing ``NotImplementedError``.
"""
Inputs: typing.TypeAlias = InputFiles | dict[str, Any]
Outputs: typing.TypeAlias = OutputFiles | dict[str, Any]
Threads: typing.TypeAlias = int | None
Attempt: typing.TypeAlias = int | None
[docs]
class BaseStepPart:
"""Base class for a part of a pipeline step"""
name: str
#: The actions available in the class.
actions: tuple[str, ...]
#: Default resource usage for actions that are not given in ``resource_usage``.
default_resource_usage: ResourceUsage = ResourceUsage(
threads=1,
time="01:00:00",
memory="2G", # 1h
)
#: Configure resource usage here that should not use the default resource usage from
#: ``default_resource_usage``.
resource_usage: dict[str, ResourceUsage] = {}
def __init__[P: BaseStep](self, parent: P):
self.name = self.__class__.name
self.parent: P = parent
self.config = parent.config
self.w_config = parent.w_config
def _validate_action(self, action: str):
"""Validate provided action
Checks that the provided ``action`` is listed in the valid class actions list.
:param action: Action (i.e., step) in the workflow, example: 'run'.
:type action: str
:raises UnsupportedActionException: if action not in class defined list of valid actions.
"""
if action not in self.actions:
actions_str = ", ".join(self.actions)
error_message = f"Action '{action}' is not supported. Valid options: {actions_str}"
raise UnsupportedActionException(error_message)
[docs]
def get_resource_usage(self, action: str, **kwargs) -> ResourceUsage:
"""Return the resource usage for the given action."""
if action not in self.actions:
raise ValueError(f"Invalid {action} not in {self.actions}")
return self.resource_usage.get(action, self.default_resource_usage)
[docs]
@staticmethod
def get_default_partition() -> str | None:
"""Helper that returns the default partition."""
return os.getenv("SNAPPY_PIPELINE_PARTITION")
[docs]
def get_resource(
self, action: str, resource_name: str
) -> Callable[[Wildcards, InputFiles, Threads, Attempt], Any]:
"""Return the amount of resources to be allocated for the given action.
:param action: The action to return the resource requirement for.
:param resource_name: The name to return the resource for.
"""
if resource_name not in ("threads", "time", "memory", "partition", "tmpdir"):
raise ValueError(f"Invalid resource name: {resource_name}")
def _get_resource(
wildcards: Wildcards = None,
input: InputFiles = None,
threads: Threads = None,
attempt: Attempt = 1,
) -> Any:
resource_usage = self.get_resource_usage(
action, wildcards=wildcards, input=input, threads=threads, attempt=attempt
)
if resource_name == "tmpdir" and not resource_usage.tmpdir:
return self.parent.get_tmpdir()
if resource_name == "partition" and not resource_usage.partition:
return self.get_default_partition()
# '_get_resource' is the primary input function,
# if resource is callable we need to call it
resource = getattr(resource_usage, resource_name)
if callable(resource):
return resource(wildcards=wildcards, input=input, threads=threads, attempt=attempt)
else:
return getattr(resource_usage, resource_name)
return _get_resource
[docs]
def get_args(self, action: str) -> Inputs | Callable[[Wildcards], Inputs]:
"""Return args for the given action of the sub step"""
raise NotImplementedError("Called abstract method. Override me!") # pragma: no cover
[docs]
def get_output_files(self, action: str) -> Outputs:
"""Return output files for the given action of the sub step and"""
raise NotImplementedError("Called abstract method. Override me!") # pragma: no cover
[docs]
def get_log_file(self, action: str) -> Outputs:
"""Return path to log file
The default implementation tries to call ``self._get_log_files()`` and in the case of
this function returning a ``dict``, augments it with paths to MD5 files.
"""
if hasattr(self, "_get_log_file"):
inner = self._get_log_file(action)
if not isinstance(inner, dict):
return inner
else:
result = {}
for k, v in inner.items():
result[k] = v
if not k.endswith("_md5") and (k + "_md5") not in inner:
result[k + "_md5"] = v + ".md5"
return result
else:
raise NotImplementedError(
"Log file name generation not implemented!"
) # pragma: no cover
[docs]
def get_shell_cmd(self, action: str, wildcards: Wildcards) -> str: # NOSONAR
"""Return shell command for the given action of the sub step and the given wildcards"""
raise ImplementationUnavailableError(
"Override this method before calling it!"
) # pragma: no cover
[docs]
def run(self, action: str, wildcards: Wildcards): # NOSONAR
"""Run the sub steps action action's code with the given wildcards"""
raise ImplementationUnavailableError(
"Override this method before calling it!"
) # pragma: no cover
[docs]
def check_config(self):
"""Check configuration, raise ``ConfigurationMissing`` on problems
Override in sub classes.
:raises:MissingConfiguration: on missing configuration
"""
[docs]
class WritePedigreeStepPart(BaseStepPart):
"""Write out pedigree file for primary DNA sample given the index NGS library name"""
#: Step name
name = "write_pedigree"
#: Class available actions
actions = ("run",)
def __init__[P: BaseStep](
self, parent: P, require_dna_ngs_library: bool = False, only_trios: bool = False
):
super().__init__(parent)
#: Whether to prevent writing out of samples with out NGS library.
self.require_dna_ngs_library = require_dna_ngs_library
# Build shortcut from index library name to pedigree
self.index_ngs_library_to_pedigree = OrderedDict()
for sheet in self.parent.shortcut_sheets:
if require_dna_ngs_library:
for name, pedigree in sheet.index_ngs_library_to_pedigree.items():
pedigree = pedigree.with_filtered_donors(donor_has_dna_ngs_library)
if only_trios:
in_trio = set()
for donor in pedigree.donors:
if donor.father and donor.mother:
in_trio |= {
donor.name,
donor.father.name,
donor.mother.name,
}
if not any((donor.name in in_trio for donor in pedigree.donors)):
continue # ignore empty pedigree post filtration
pedigree = pedigree.with_filtered_donors(
lambda donor: donor.name in in_trio
)
self.index_ngs_library_to_pedigree[name] = pedigree
else:
self.index_ngs_library_to_pedigree.update(sheet.index_ngs_library_to_pedigree)
[docs]
def get_output_files(self, action):
self._validate_action(action=action)
return "work/write_pedigree.{index_ngs_library}/out/{index_ngs_library}.ped"
# @listify
def get_result_files(self):
# tpl = self.get_output_files("run")
# for sheet in getattr(self.parent, "shortcut_sheets", []):
# for index_ngs_library in sheet.index_ngs_library_to_pedigree.keys():
# yield tpl.format(index_ngs_library=index_ngs_library)
return []
[docs]
def run(self, wildcards: Wildcards, output: OutputFiles):
"""Write out the pedigree information
:param wildcards: Snakemake wildcards associated with rule (unused).
:type wildcards: snakemake.io.Wildcards
:param output: Snakemake output associated with rule.
:type output: snakemake.io.Namedlist
"""
if wildcards.index_ngs_library == "whole_cohort":
write_pedigrees_to_ped(self.index_ngs_library_to_pedigree.values(), str(output))
else:
write_pedigree_to_ped(
self.index_ngs_library_to_pedigree[wildcards.index_ngs_library],
str(output),
)
[docs]
class WritePedigreeSampleNameStepPart(WritePedigreeStepPart):
"""
Class contains method to write pedigree file for primary DNA sample given the index
NGS library name.It will create pedigree information based sole on sample name,
example 'P001' instead of 'P001-N1-DNA1-WGS1'.
"""
#: Step name
name = "write_pedigree_with_sample_name"
def __init__(self, *args, **kwargs):
WritePedigreeStepPart.__init__(self, *args, **kwargs)
[docs]
def run(self, wildcards, output):
"""Write out the pedigree information
:param wildcards: Snakemake wildcards associated with rule (unused).
:type wildcards: snakemake.io.Wildcards
:param output: Snakemake output associated with rule.
:type output: snakemake.io.Namedlist
"""
append_pedigree_to_ped(
pedigree=self.index_ngs_library_to_pedigree[wildcards.index_ngs_library],
output_path=str(output),
)
[docs]
class LinkOutStepPart(BaseStepPart):
"""Generically link out
This is for output files that are created unconditionally, i.e., for output files where the
output name is the same as for the work file.
"""
name = "link_out"
def __init__(self, parent, disable_patterns=None):
super().__init__(parent)
self.base_pattern_out = "output/{path}/{file}.{ext}"
self.base_path_out = self.base_pattern_out.replace(",[^/]", "")
self.base_path_in = "work/{path}/{file}.{ext}"
#: Patterns for disabling linking out to. This is useful/required when there is a
#: specialized link out step part, e.g., for the case of alignment where realignment is
#: performed or not, depending on the configuration.
self.disable_patterns = list(disable_patterns or [])
[docs]
def get_output_files(self, action):
"""Return output file pattern"""
assert action == "run", "Unsupported action"
return self.base_pattern_out
[docs]
def get_shell_cmd(self, action, wildcards):
"""Return call for linking out"""
assert action == "run", "Unsupported action"
tpl = "test -h {out} || ln -sr {in_} {out}"
in_ = self.base_path_in.replace("{", "{wildcards.")
out = self.base_path_out.replace("{", "{wildcards.")
return tpl.format(in_=in_, out=out)
@lru_cache()
def _cached_read_cancer_tsv_sheet(path_abs, path_rel, naming_scheme):
"""Cached reading of cancer TSV sheets."""
with open(path_abs, "rt") as f:
return io_tsv.read_cancer_tsv_sheet(f, path_rel, naming_scheme)
@lru_cache()
def _cached_read_germline_tsv_sheet(path_abs, path_rel, naming_scheme):
"""Cached reading of germline TSV sheets."""
with open(path_abs, "rt") as f:
return io_tsv.read_germline_tsv_sheet(f, path_rel, naming_scheme)
@lru_cache()
def _cached_read_generic_tsv_sheet(path_abs, path_rel, naming_scheme):
"""Cached reading of generic TSV sheets."""
with open(path_abs, "rt") as f:
return io_tsv.read_generic_tsv_sheet(f, path_rel, naming_scheme)
@lru_cache()
def _cached_yaml_round_trip_load_str(str_value):
"""Cached reading of YAML ``str`` objects."""
yaml = ruamel_yaml.YAML()
return yaml.load(StringIO(str_value))
[docs]
class DataSetInfo:
"""Information on a DataSet"""
def __init__(
self,
name,
sheet_path,
base_paths,
search_paths,
search_patterns,
sheet_type,
is_background,
naming_scheme,
mixed_se_pe,
sodar_uuid,
sodar_title,
pedigree_field=None,
):
"""Constructor.
:param name: Name of the data set.
:type name: str
:param sheet_path: Path to sheet file that should be loaded.
:type sheet_path: str
:param base_paths: All base paths of all configuration, to look for ``sheet_path``.
:type base_paths: list
:param search_paths: Search paths for the files in the sample sheet.
:type search_paths: list
:param search_patterns: Search patterns. Example: "{left: '**/*_R1_*.fastq.gz',
right: '**/*_R2_*.fastq.gz'}".
:type search_patterns: List[Dict[str, str]]
:param sheet_type: Explicit sheet type (e.g. "matched_cancer"), if any. Otherwise, will
attempt to load from sheet.
:type sheet_type: str
:param is_background: Whether the data set info is to be used only for background.
:type is_background: bool
:param naming_scheme: Selected naming schema: either 'secondary_id_pk' or
'only_secondary_id'.
:type naming_scheme: str
:param mixed_se_pe: Whether mixing SE and PE data sets is allowed.
:param sodar_uuid: The UUID of the corresponding SODAR project.
:type sodar_uuid: str
:param sodar_title: The title of the project in SODAR [optional].
:type sodar_title: str
:param pedigree_field: Custom field from sample sheet used to define pedigree, e.g.,
'familyId'. If none defined, it will set pedigree based on sample sheet 'row'.
Default: None.
:type pedigree_field: str
"""
#: Name of the data set
self.name = name
#: Path to the sheet file, for loading
self.sheet_path = sheet_path
#: All base paths of all configuration, to look for ``sheet_path``
self.base_paths = base_paths
#: Search paths for the files in the sample sheet
self.search_paths = list(search_paths)
#: Search patterns
self.search_patterns = search_patterns
#: Explicit sheet type (e.g. "matched_cancer"), if any. Otherwise, will attempt to load
# from sheet.
self.sheet_type = sheet_type
#: Whether the data set info is to be used only for background
self.is_background = is_background
#: Selected naming schema
if naming_scheme not in NAMING_SCHEMES:
raise ValueError("Invalid naming scheme: {}".format(naming_scheme)) # pragma: no cover
self.naming_scheme = naming_scheme
#: Whether mixing SE and PE data sets is allowed.
self.mixed_se_pe = mixed_se_pe
#: The BioMed SampleSheet
self.sheet = self._load_sheet()
#: The UUID of the corresponding SODAR project.
self.sodar_uuid = sodar_uuid
#: The (optional) title of the project in SODAR.
self.sodar_title = sodar_title
#: The (optional) custom field used to define pedigree
self.pedigree_field_kwargs = None
if pedigree_field:
self.pedigree_field_kwargs = {"join_by_field": pedigree_field}
def _load_sheet(self):
for base in self.base_paths:
fname = os.path.join(base, self.sheet_path)
if not os.path.exists(fname):
continue
if self.sheet_path.endswith(".tsv"):
if self.sheet_type == "matched_cancer":
return self._set_is_background(
_cached_read_cancer_tsv_sheet(fname, self.sheet_path, self.naming_scheme),
self.is_background,
)
elif self.sheet_type == "germline_variants":
return self._set_is_background(
_cached_read_germline_tsv_sheet(fname, self.sheet_path, self.naming_scheme),
self.is_background,
)
elif self.sheet_type == "generic":
return self._set_is_background(
_cached_read_generic_tsv_sheet(fname, self.sheet_path, self.naming_scheme),
self.is_background,
)
else:
raise ValueError("Invalid sheet type {}".format(self.sheet_type))
elif self.sheet_path.endswith(".json"):
with open(fname, "rt") as f:
sheet_json = json_loads_ordered(f.read())
resolver = RefResolver(
lookup_paths=os.path.dirname(os.path.abspath(self.sheet_path)),
dict_class=OrderedDict,
)
resolved_json = resolver.resolve("file://" + self.sheet_path, sheet_json)
return self._set_is_background(
SheetBuilder(resolved_json).run(
name_generator=name_generator_for_scheme(self.naming_scheme)
),
self.is_background,
)
else:
raise ValueError( # pragma: no cover
"Invalid sheet file type of {}".format(self.sheet_path)
)
# Raise problem if we could not find the sample sheet
raise ValueError( # pragma: no cover
"Could not find sample sheet file {} in the lookup paths {}".format(
self.sheet_path, self.base_paths
)
)
@classmethod
def _set_is_background(cls, sheet, flag):
"""Override "is_background" flag"""
# TODO: check whether is already there and fail if not compatible
sheet.json_data["extraInfoDefs"]["is_background"] = {
"type": "boolean",
"default": False,
}
sheet.extra_infos["is_background"] = flag
return sheet
[docs]
@attr.s(frozen=True, auto_attribs=True)
class DataSearchInfo:
"""Data search information - simplified version of ``DataSetInfo``."""
sheet_path: str
base_paths: list
search_paths: list
search_patterns: list
mixed_se_pe: bool
[docs]
class BaseStep:
"""Base class for the pipeline steps
Each pipeline step is a Snakemake workflow
"""
#: Override with step name
name: str
#: Override with the sheet shortcut class to use
sheet_shortcut_class: type[ShortcutSampleSheet]
#: Override with arguments to pass into sheet shortcut class constructor
sheet_shortcut_args = None
#: Override with keyword arguments to pass into sheet shortcut class
#: constructor
sheet_shortcut_kwargs = None
[docs]
@classmethod
def default_config_yaml(cls):
"""Override this function for providing default configuration
The configuration should be a YAML fragment. Your configuration should define a top-level
key starting with '_' and then consist of the name of the schema, e.g.,
'_ngs_mapping_schema'. Your default configuration is then merged into the main
configuration where the main configuration takes precedence.
Example: ::
def default_config_yaml(self):
return textwrap.dedent(""\"
schema_config:
ngs_mapping:
max_threads: 16
""\").lstrip()))
Return ``None`` for no default configuration.
You can also return an iterable of configurations, these will be merged in the order given
(earlier ones will be overwritten by later ones). This is useful if your schema needs
configuration for a later one.
"""
return "" # pragma: no cover
def __init__[C: SnappyStepModel](
self,
workflow: snakemake.Workflow,
config: MutableMapping[str, Any],
config_lookup_paths: tuple[str, ...],
config_paths: tuple[str, ...],
work_dir: str,
*,
config_model_class: type[C],
previous_steps: tuple[type[typing.Self], ...] | None = None,
):
self.name = self.__class__.name
#: Tuple with absolute paths to configuration files read
self.config_paths = config_paths
#: Pydantic model class for configuration validation
self.config_model_class = config_model_class
#: Absolute path to directory of where to perform work
self.work_dir = work_dir
#: Classes of previously executed steps, used for merging their default configuration as
#: well.
self.previous_steps = tuple(previous_steps or [])
#: Snakefile "workflow" object
self.workflow = workflow
#: Setup logger for the step
self.logger = logging.getLogger(self.name)
#: Merge default configuration with true configuration
workflow_config = config
local_config = workflow_config["step_config"].get(self.name, OrderedDict())
self.logger.info(local_config)
# #: Validate workflow step configuration using its accompanying pydantic model
# #: available through self.config_model_class (mandatory keyword arg for BaseStep)
# try:
# self.config: C = validate_config(local_config, self.config_model_class)
# # Also update the workflow config, just in case
# workflow_config["step_config"][self.name] = self.config.model_dump(by_alias=True)
# except pydantic.ValidationError as ve:
# self.logger.error(f"{self.name} failed validation:\n{local_config}")
# raise ve
#: Validate complete workflow configuration using SnappyPipeline's ConfigModel
#: This includes static_data_config, step_config and data_sets
try:
# local import of ConfigModel to avoid circular import
from snappy_pipeline.workflow_model import ConfigModel
self.w_config: ConfigModel = ConfigModel(**workflow_config)
self.config: C = self.w_config.step_config[self.name]
except pydantic.ValidationError as ve:
self.logger.error(f"Workflow configuration failed validation:\n{workflow_config}")
raise ve
#: Paths with configuration paths, important for later retrieving sample sheet files
self.config_lookup_paths = list(config_lookup_paths)
self.sub_steps: dict[str, BaseStepPart] = {}
self.data_set_infos = list(self._load_data_set_infos())
#: Shortcut to the BioMed SampleSheet objects
self.sheets = [info.sheet for info in self.data_set_infos]
#: Shortcut BioMed SampleSheet keyword arguments
sheet_kwargs_list = [
merge_kwargs(
first_kwargs=self.sheet_shortcut_kwargs,
second_kwargs=info.pedigree_field_kwargs,
)
for info in self.data_set_infos
]
#: Shortcut sheets
self.shortcut_sheets = []
klass = self.__class__.sheet_shortcut_class
for sheet, kwargs in zip(self.sheets, sheet_kwargs_list):
kwargs = kwargs or {}
kwargs = {k: v for k, v in kwargs.items() if k in klass.supported_kwargs}
self.shortcut_sheets.append(
klass(sheet, *(self.__class__.sheet_shortcut_args or []), **kwargs)
)
# Setup onstart/onerror/onsuccess hooks
self._setup_hooks()
#: Functions from sub workflows, can be used to generate output paths into these workflows
self.sub_workflows: dict[str, snakemake.Workflow] = {}
# Even though we already validated via pydantic, we still call check_config here, as
# some of the checks done in substep check_config are not covered by the pydantic models yet
# and some of the checks actually influence program logic/flow
self._check_config()
config_string = self.config.model_dump_yaml(by_alias=True)
self.logger.info(f"Configuration for step {self.name}\n{config_string}")
config_string = self.w_config.model_dump_yaml(by_alias=True)
self.logger.info(f"Configuration for workflow\n{config_string}")
# Update snakemake.config (which `config` is a reference to)
# with the validated configuration.
# All fields with default values are explicitly defined.
_config = _cached_yaml_round_trip_load_str(config_string)
config.update(_config)
self.logger.info(f"Snakemake config\n{config}")
def _setup_hooks(self):
"""Setup Snakemake workflow hooks for start/end/error"""
# In the following, the "log" parameter to the handler functions is set to "_" as we
# don't use them
def on_start(_):
"""Print configuration and sample sheets on start"""
verbose = False
if verbose:
# Print configuration back to the user after merging workflow step-specific
# configuration
print_config(self.config, file=sys.stderr)
# Print sample sheets to the user
print_sample_sheets(self, file=sys.stderr)
def on_error(_):
"""Error handler, print message"""
msg = "Oh no! Something went wrong."
print("\n" + "*" * len(msg), file=sys.stderr)
print(msg, file=sys.stderr)
print("*" * len(msg) + "\n", file=sys.stderr)
def on_success(_):
"""Success handler, print message"""
msg = "All done; have a nice day!"
print("\n" + "*" * len(msg), file=sys.stderr)
print(msg, file=sys.stderr)
print("*" * len(msg) + "\n", file=sys.stderr)
self.workflow.onstart(on_start)
self.workflow.onerror(on_error)
self.workflow.onsuccess(on_success)
def _check_config(self):
"""Internal method, checks step and sub step configurations"""
self.check_config()
for step in self.sub_steps.values():
step.check_config()
[docs]
def check_config(self):
"""Check ``self.w_config``, raise ``ConfigurationMissing`` on problems
Override in sub classes.
:raises:MissingConfiguration: on missing configuration
"""
[docs]
def ensure_w_config(self, config_keys, msg, e_class=MissingConfiguration):
"""Check parameters in configuration.
Method ensures required configuration setting are present in the provided configuration;
if not, it raises exception.
:param config_keys: List of strings with all keys that must be present in the configuration
for a given step of the analysis to be performed.
:type config_keys: tuple
:param msg: Message to be used in case of exception.
:type msg: str
:param e_class: Preferred exception class to be raised in case of error.
Default: MissingConfiguration.
:type e_class: class
"""
# Initialise variables
so_far = []
handle = self.w_config
# Check if configuration is empty
if not handle:
tpl = 'Empty configuration ("{full_path}"): {msg}'.format(
full_path="/".join(config_keys), msg=msg
)
raise e_class(tpl)
# Iterate over required configuration keys
for entry in config_keys:
# Check if keys are present in config dictionary
if (handle := (getattr(handle, entry, None) or handle.get(entry, None))) is not None:
so_far.append(entry)
else:
tpl = 'Missing configuration ("{full_path}", got up to "{so_far}"): {msg}'.format(
full_path="/".join(config_keys), so_far="/".join(so_far), msg=msg
)
raise e_class(tpl)
[docs]
def register_sub_step_classes(
self, classes: tuple[type[BaseStepPart] | tuple[type[BaseStepPart], Any], ...]
):
"""Register an iterable of sub step classes
Initializes objects in ``self.sub_steps`` dict
"""
for pair_or_class in classes:
try:
klass, args = pair_or_class
except TypeError:
klass = pair_or_class
args = ()
obj = klass(self, *args)
# obj.check_config()
self.sub_steps[klass.name] = obj
[docs]
def register_sub_workflow(
self, step_name: str, workdir: str, sub_workflow_name: str | None = None
):
"""Register workflow with given pipeline ``step_name`` and in the given ``workdir``.
Optionally, the sub workflow name can be given separate from ``step_name`` (the default)
value for it.
"""
sub_workflow_name = sub_workflow_name or step_name
if sub_workflow_name in self.sub_workflows:
raise ValueError("Sub workflow {} already registered!".format(sub_workflow_name))
if os.path.isabs(workdir):
abs_workdir = workdir
else:
abs_workdir = os.path.realpath(os.path.join(os.getcwd(), workdir))
self.workflow.subworkflow(
sub_workflow_name,
workdir=abs_workdir,
snakefile=snakefile_path(step_name),
configfile=abs_workdir + "/" + "config.yaml",
)
self.sub_workflows[sub_workflow_name] = self.workflow.globals[sub_workflow_name]
[docs]
def get_args(self, sub_step: str, action: str) -> Inputs | Callable[[Wildcards], Inputs]:
"""Return arguments for action of substep with given wildcards
Delegates to the sub step object's get_args function
"""
return self._get_sub_step(sub_step).get_args(action)
[docs]
def get_output_files(self, sub_step: str, action: str) -> Outputs:
"""Return list of strings with output files/patterns
Delegates to the sub step object's get_output_files function
"""
return self._get_sub_step(sub_step).get_output_files(action)
[docs]
def get_params(self, sub_step: str, action: str) -> Any:
"""Return parameters
Delegates to the sub step object's get_params function
"""
return self.substep_dispatch(sub_step, "get_params", action)
[docs]
def get_resource(self, sub_step: str, action: str, resource_name: str) -> Any:
"""Get resource
Delegates to the sub step object's get_resource function
"""
return self.substep_dispatch(sub_step, "get_resource", action, resource_name)
[docs]
def get_tmpdir(self) -> str:
"""Return temporary directory.
To be used directly or via get_resource("step", "action", "tmpdir")
1. Try to evaluate global_config/tmpdir. Interpret $-variables from environment.
Provides the current date as $TODAY.
2. If this fails, try to use environment variable TMPDIR.
3. If this fails, use tempfile.gettempdir(), same as Snakemake default.
"""
tmpdir = getattr(self.w_config, "global_config", {}).get("tmpdir", None)
if tmpdir:
with modified_environ(TODAY=datetime.date.today().strftime("%Y%m%d")):
tmpdir = os.path.expandvars(tmpdir)
if not tmpdir:
tmpdir = os.getenv("TMPDIR")
if not tmpdir:
tmpdir = tempfile.gettempdir()
# Force existence of temporary directory
os.makedirs(tmpdir, exist_ok=True)
return tmpdir
[docs]
def get_log_file(self, sub_step: str, action: str) -> Outputs:
"""Return path to the log file
Delegates to the sub step object's get_log_file function
"""
return self.substep_dispatch(sub_step, "get_log_file", action)
[docs]
def get_shell_cmd(self, sub_step: str, action: str, wildcards: Wildcards) -> str:
"""Return shell command for the pipeline sub step
Delegates to the sub step object's get_shell_cmd function
"""
return self.substep_dispatch(sub_step, "get_shell_cmd", action, wildcards)
[docs]
def run(self, sub_step: str, action: str, wildcards: Wildcards) -> str:
"""Run command for the given action of the given sub step with the given wildcards
Delegates to the sub step object's run function
"""
return self._get_sub_step(sub_step).run(action, wildcards)
[docs]
def get_result_files(self) -> OutputFiles:
"""Return actual list of file names to build"""
raise NotImplementedError("Implement me!") # pragma: no cover
[docs]
def substep_getattr(self, step: str, name: str) -> Any:
"""Return attribute from substep"""
return getattr(self._get_sub_step(step), name)
[docs]
def substep_dispatch(self, step: str, function: str, *args, **kwargs):
"""Dispatch call to function of sub step implementation"""
return self.substep_getattr(step, function)(*args, **kwargs)
def _get_sub_step(self, sub_step: str) -> BaseStepPart:
if sub_step in self.sub_steps:
return self.sub_steps[sub_step]
else:
raise ValueError(
'Could not find sub step "{}" in workflow step "{}"'.format(sub_step, self.name)
) # pragma: no cover
def _load_data_set_infos(self) -> typing.Generator[DataSetInfo, None, None]:
"""Load BioMed Sample Sheets as given by configuration and yield them"""
for name, data_set in self.w_config.data_sets.items():
yield DataSetInfo(
name,
data_set.file,
self.config_lookup_paths,
data_set.search_paths,
data_set.search_patterns,
data_set.type,
data_set.is_background,
data_set.naming_scheme,
data_set.mixed_se_pe,
data_set.sodar_uuid,
data_set.sodar_title,
data_set.pedigree_field,
)
def _load_data_search_infos(self) -> typing.Generator[DataSearchInfo, None, None]:
"""Use workflow and step configuration to yield ``DataSearchInfo`` objects"""
for _, data_set in self.w_config.data_sets.items():
yield DataSearchInfo(
sheet_path=data_set.file,
base_paths=self.config_lookup_paths,
search_paths=self.config.search_paths,
search_patterns=self.config.search_patterns,
mixed_se_pe=False,
)
[docs]
@classmethod
def wrapper_path(cls, path: str) -> str:
"""Generate path to wrapper"""
return "file://" + os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"..",
"..",
"snappy_wrappers",
"wrappers",
path,
)
)
[docs]
class LinkInPathGenerator:
"""Helper class for generating paths to link in"""
def __init__(
self,
work_dir,
data_set_infos,
config_paths,
cache_file_name=".snappy_path_cache",
preprocessed_path="",
):
#: Working directory
self.work_dir = work_dir
#: Data set info list from configuration
if preprocessed_path:
self.data_set_infos = [
self._update_datasetinfo(x, preprocessed_path) for x in data_set_infos
]
else:
self.data_set_infos = data_set_infos
#: Path to configuration files, used for invalidating cache
self.config_paths = config_paths
#: Name of cache file to create
self.cache_file_name = cache_file_name
#: File system crawler to use
invalidate_paths_list = self._merge_cache_invalidate_paths(self.data_set_infos)
invalidate_paths_list += config_paths
self.crawler = FileSystemCrawler(
os.path.join(self.work_dir, self.cache_file_name), invalidate_paths_list
)
[docs]
def run(
self,
folder_name,
pattern_set_keys=("left", "right", "left_md5", "right_md5", "bam"),
):
"""Yield (src_path, path_infix, filename) one-by-one
Cache is saved after the last iteration
"""
# Iterate over data set infos and crawl file system
filenames = set([])
# TODO: crawling the actual data sheet of the current data set is enough!
seen_root_paths = set()
for info in self.data_set_infos:
patterns = []
# Build PatternSet objects, based on types in configuration
for pat in info.search_patterns:
if not isinstance(pat, (dict, MutableMapping)):
raise ValueError("search_patterns must be a dict!") # pragma: no cover
# Add patterns as found in configuration file: DataSetInfo.search_patterns
patterns.append(PatternSet(pat.values(), names=pat.keys()))
# Add MD5 files to search file as default
pat_md5 = [pattern + ".md5" for pattern in pat.values()]
pat_names_md5 = [pattern + "_md5" for pattern in pat.keys()]
patterns.append(PatternSet(pat_md5, names=pat_names_md5))
# Crawl all root paths, link in the resulting files
for root_path in self._get_shell_cmd_root_paths(info):
if root_path in seen_root_paths:
continue # skip this root path
seen_root_paths.add(root_path)
for result in self.crawler.run(root_path, folder_name, patterns, info.mixed_se_pe):
res_dict = result.to_dict()
for key in pattern_set_keys:
if key not in res_dict:
continue # skip if not found
path_infix = os.path.relpath(
os.path.dirname(res_dict[key]), result.base_folder
)
filename = os.path.basename(res_dict[key])
if res_dict[key] in filenames:
raise ValueError("Detected double link-in {}".format(filename))
filenames.add(filename)
src_dir = os.path.dirname(res_dict[key])
yield src_dir, path_infix, filename
# Finally, save the cache
self.crawler.save_cache()
def _update_datasetinfo(self, data_set_info, preprocessed_path=""):
return DataSetInfo(
name=data_set_info.name,
sheet_path=data_set_info.sheet_path,
base_paths=data_set_info.base_paths,
search_paths=[preprocessed_path] if preprocessed_path else data_set_info.search_paths,
search_patterns=data_set_info.search_patterns,
sheet_type=data_set_info.sheet_type,
is_background=data_set_info.is_background,
naming_scheme=data_set_info.naming_scheme,
mixed_se_pe=data_set_info.mixed_se_pe,
sodar_uuid=data_set_info.sodar_uuid,
sodar_title=data_set_info.sodar_title,
)
@classmethod
def _get_shell_cmd_root_paths(cls, info):
for base_path in info.base_paths:
if not os.path.exists(os.path.join(base_path, info.sheet_path)):
continue # skip this one
for search_path in info.search_paths:
yield os.path.abspath(
os.path.join(
os.path.dirname(os.path.join(base_path, info.sheet_path)),
search_path,
)
)
@classmethod
def _merge_cache_invalidate_paths(cls, data_set_infos):
"""
:param data_set_infos: List of DataSetInfo objects.
:type data_set_infos: list
:return: Returns list with paths that should be used to potentially
invalidate a cache file based on the DataSetInfo. Method merges paths
to a project tsv file as well as the search paths into a single list of strings.
"""
# Initialise variable
out_list = []
# Iterate over DataSetInfo objects
for info in data_set_infos:
# Search paths - expects a list already
out_list.extend(getattr(info, "search_paths"))
# Sheet path
# Only name of file is stored in config file (relative path used),
# hence we need to find it in the base paths
sheet_file_name = getattr(info, "sheet_path") # expects a string
base_paths = getattr(info, "base_paths") # expects a list
sheet_path = cls._find_sheet_file(sheet_file_name, base_paths)
# Append if not None
if sheet_path:
out_list.append(sheet_path)
# Return
return out_list
@classmethod
def _find_sheet_file(cls, sheet_file_name, base_paths):
"""Method searches for sheet file in base paths.
:param sheet_file_name: Sheet file name.
:type sheet_file_name: str
:param base_paths: List of strings with base paths.
:type base_paths: list
:return: Returns path to sheet file.
"""
# Check if full path already
if os.path.exists(sheet_file_name):
return sheet_file_name
# Iterate over base paths
# Assumption: sheet file stored in the same level as config file,
# i.e., one of the base paths.
for base_p in base_paths:
dir_path = os.path.realpath(base_p)
# Find all files
for item in os.listdir(dir_path):
if sheet_file_name == item:
return os.path.join(dir_path, sheet_file_name)
# If not found: None
return None
[docs]
def get_ngs_library_folder_name(sheets, library_name):
"""Return library's folder name
The library is searched for based on the ``library_name``. In the case of
multiple NGS library matches, the first one is returned.
"""
for sheet in sheets:
try:
ngs_library = sheet.crawl(sheet.name_generator.inverse(library_name))
except SecondaryIDNotFoundException:
continue # skip, not in this sheet
if ngs_library:
try:
return ngs_library.extra_infos["folderName"]
except AttributeError:
raise ValueError("No folderName extraInfos entry for {}".format(ngs_library.name))
raise ValueError("Found no folders for NGS library of name {}".format(library_name))
# TODO: Rename to LinkInStepPart
[docs]
class LinkInStep(BaseStepPart):
"""Link in the raw files, e.g. FASTQ files
Depending on the configuration, the files are linked out after postprocessing
"""
name = "link_in"
def __init__(self, parent):
super().__init__(parent)
self.base_pattern_out = "work/input_links/{library_name}/.done"
# The key 'path_link_in' is only defined for pipelines that could used preprocessed
# FASTQ files. That doesn't make sense for pipelines that are using externally generated
# data already.
try:
preprocessed_path = self.config.path_link_in
except AttributeError:
preprocessed_path = ""
# Path generator.
self.path_gen = LinkInPathGenerator(
self.parent.work_dir,
self.parent.data_set_infos,
self.parent.config_lookup_paths,
cache_file_name=".snappy_path_cache",
preprocessed_path=preprocessed_path,
)
[docs]
def get_output_files(self, action):
assert action == "run", "Unsupported action"
return touch(self.base_pattern_out)
[docs]
def get_shell_cmd(self, action, wildcards):
"""Return call for linking in the files
The files are linked, keeping their relative paths to the item matching the "folderName"
intact.
"""
assert action == "run", "Unsupported action"
# Get base out path
out_path = os.path.dirname(self.base_pattern_out.format(**wildcards))
# Get folder name of first library candidate
folder_name = get_ngs_library_folder_name(self.parent.sheets, wildcards.library_name)
if self.config.path_link_in:
folder_name = wildcards.library_name
# Perform the command generation
lines = []
tpl = (
"mkdir -p {out_path}/{path_infix} && "
"{{{{ test -h {out_path}/{path_infix}/{filename} || "
"ln -sr {src_path}/{filename} {out_path}/{path_infix}; }}}}"
)
filenames = {} # generated so far
for src_path, path_infix, filename in self.path_gen.run(folder_name):
new_path = os.path.join(out_path, path_infix, filename)
if new_path in filenames:
if filenames[new_path] == src_path:
continue # ignore TODO: better correct this
msg = "WARNING: Detected double output path {}"
print(msg.format(filename), file=sys.stderr)
filenames[new_path] = src_path
lines.append(
tpl.format(
src_path=src_path,
out_path=out_path,
path_infix=path_infix,
filename=filename,
)
)
if not lines:
msg = "Found no files to link in for {}".format(dict(**wildcards))
print(msg, file=sys.stderr)
raise Exception(msg)
return "\n".join(lines)
[docs]
def run(self, action, wildcards):
raise ImplementationUnavailableError(
"run() not implemented for linking in reads"
) # pragma: no cover
[docs]
class LinkInVcfExternalStepPart(LinkInStep):
"""Link in the external VCF files."""
#: Step name
name = "link_in_vcf_external"
#: Class available actions
actions = ("run",)
#: Patterns set keys
pattern_set_keys = ("vcf", "vcf_md5")
def __init__(self, parent):
super().__init__(parent)
[docs]
def get_shell_cmd(self, action, wildcards):
"""Return call for linking in the files
The files are linked, keeping their relative paths to the item matching the "folderName"
intact.
"""
self._validate_action(action)
# Define path generator
path_gen = LinkInPathGenerator(
self.parent.work_dir,
self.parent.data_search_infos,
self.parent.config_lookup_paths,
)
# Get base out path
out_path = os.path.dirname(self.base_pattern_out.format(**wildcards))
# Perform the command generation
lines = []
tpl = (
"mkdir -p {out_path}/{path_infix} && "
"{{{{ test -h {out_path}/{path_infix}/{filename} || "
"ln -sr {src_path}/{filename} {out_path}/{path_infix}; }}}}"
)
filenames = {}
for src_path, path_infix, filename in path_gen.run(
folder_name=wildcards.library_name, pattern_set_keys=self.pattern_set_keys
):
new_path = os.path.join(out_path, path_infix, filename)
if new_path in filenames:
if filenames[new_path] == src_path:
continue # ignore TODO: better correct this
msg = "WARNING: Detected double output path {}"
print(msg.format(filename), file=sys.stderr)
filenames[new_path] = src_path
lines.append(
tpl.format(
src_path=src_path,
out_path=out_path,
path_infix=path_infix,
filename=filename,
)
)
if not lines:
msg = "Found no files to link in for {}".format(dict(**wildcards))
print(msg, file=sys.stderr)
raise Exception(msg)
return "\n".join(lines)
[docs]
class LinkInBamExternalStepPart(LinkInVcfExternalStepPart):
"""Link in the external BAM files."""
#: Step name
name = "link_in_bam_external"
#: Class available actions
actions = ("run",)
#: Patterns set keys
pattern_set_keys = ("bam", "bam_md5")
def __init__(self, parent):
super().__init__(parent)
self.base_pattern_out = "work/input_links/{library_name}/.done_bam_external"
[docs]
class LinkInBaiExternalStepPart(LinkInVcfExternalStepPart):
"""Link in the external BAI files."""
#: Step name
name = "link_in_bai_external"
#: Class available actions
actions = ("run",)
#: Patterns set keys
pattern_set_keys = ("bai", "bai_md5")
def __init__(self, parent):
super().__init__(parent)
self.base_pattern_out = "work/input_links/{library_name}/.done_bai_external"