API Documentation
snappy_pipeline.base
Basic utility code for snappy_pipeline
- exception snappy_pipeline.base.UnknownFiltrationSourceException[source]
Raised when user try to request an unknown filtration source.
- exception snappy_pipeline.base.UnsupportedActionException[source]
Raised when user try to call action that isn’t supported.
- snappy_pipeline.base.expand_ref(config_path: str, dict_data: dict | list, lookup_paths: list[str] = None, dict_class=<class 'collections.OrderedDict'>) tuple[Any, tuple[AnyStr, ...], tuple[AnyStr, ...]] [source]
Expand “$ref” in JSON-like data
dict_data
Returns triple:
path to resolved file
paths containing included config files
config files included
- snappy_pipeline.base.merge_dictlikes(dict1: DictLike, dict2: DictLike, dict_class: D = <class 'collections.OrderedDict'>) D [source]
Merge dictionary/model
dict2
intodict1
- snappy_pipeline.base.merge_kwargs(first_kwargs: dict[str, Any] | None, second_kwargs: dict[str, Any] | None) dict[str, Any] | None [source]
Merge two keyword arguments.
- Parameters:
first_kwargs (dict) – First keyword arguments dictionary.
second_kwargs (dict) – Second keyword arguments dictionary.
- Returns:
Returns merged dictionary with inputted keyword arguments.
- snappy_pipeline.base.print_config(config: dict[str, ~typing.Any], file=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)[source]
Print human-readable version of configuration to
file
snappy_pipeline.find_file
Code for crawling the file system and caching the results
- exception snappy_pipeline.find_file.FileNamesTooDifferent[source]
Raised when two file names are too different to be PE reads
- class snappy_pipeline.find_file.FileSystemCrawler(cache_path, invalidation_paths, lock_timeout=60)[source]
Crawl the file system
start crawling the file system from a given directory
look for files matching a given
PatternSet
that are below a directory with a given name
- cache
The actual dict with the cache, loaded from path to
cache_path
if the cache file exists.
- cache_dirty
Flag whether cache has been modified and needs saving
- cache_invalidated
Flag whether cache has been invalidated already.
- cache_path
Path to cache (will be stored in JSON format)
- invalidation_paths
Path to files to use for checking invalidation.
- lock_timeout
Timeout for obtaining file system lock on the file system
- logger
The logger to use.
- class snappy_pipeline.find_file.FileSystemCrawlerResult(base_folder, files, names=None)[source]
n-tuple of optionally named files
- base_folder
Folder to start crawling in
- files
Patterns to search for
- named_files
Dict with name-to-pattern mapping,
None
ifnames
is not given
- names
Names for the file patterns, optional; if given has to have the same length as files
snappy_pipeline.utils
Utility code
- class snappy_pipeline.utils.DictQuery[source]
Helper class for comfortable access to nested dicts with
str
keys.Source:
- snappy_pipeline.utils.flatten(coll: List[str | List[str]]) List[str] [source]
Flatten collection of strings or list of strings.
- snappy_pipeline.utils.is_not_none(value)[source]
Helper function returning whether
value is not None
snappy_pipeline.workflows.abstract
Base classes for the actual pipeline steps
- class snappy_pipeline.workflows.abstract.BaseStep(workflow: Workflow, config: MutableMapping[str, Any], config_lookup_paths: tuple[str, ...], config_paths: tuple[str, ...], work_dir: str, *, config_model_class: type[C], previous_steps: tuple[type[Self], ...] | None = None)[source]
Base class for the pipeline steps
Each pipeline step is a Snakemake workflow
- check_config()[source]
Check
self.w_config
, raiseConfigurationMissing
on problemsOverride in sub classes.
- Raises:MissingConfiguration:
on missing configuration
- config_lookup_paths
Paths with configuration paths, important for later retrieving sample sheet files
- config_model_class
Pydantic model class for configuration validation
- config_paths
Tuple with absolute paths to configuration files read
- classmethod default_config_yaml()[source]
Override this function for providing default configuration
The configuration should be a YAML fragment. Your configuration should define a top-level key starting with ‘_’ and then consist of the name of the schema, e.g., ‘_ngs_mapping_schema’. Your default configuration is then merged into the main configuration where the main configuration takes precedence.
Example:
def default_config_yaml(self): return textwrap.dedent(""" schema_config: ngs_mapping: max_threads: 16 """).lstrip()))
Return
None
for no default configuration.You can also return an iterable of configurations, these will be merged in the order given (earlier ones will be overwritten by later ones). This is useful if your schema needs configuration for a later one.
- ensure_w_config(config_keys, msg, e_class=<class 'snappy_pipeline.base.MissingConfiguration'>)[source]
Check parameters in configuration.
Method ensures required configuration setting are present in the provided configuration; if not, it raises exception.
- Parameters:
config_keys – List of strings with all keys that must be present in the configuration
for a given step of the analysis to be performed. :type config_keys: tuple
- Parameters:
msg (str) – Message to be used in case of exception.
e_class – Preferred exception class to be raised in case of error.
Default: MissingConfiguration. :type e_class: class
- get_args(sub_step: str, action: str) InputFiles | dict[str, Any] | Callable[[Wildcards], InputFiles | dict[str, Any]] [source]
Return arguments for action of substep with given wildcards
Delegates to the sub step object’s get_args function
- get_input_files(sub_step: str, action: str) InputFiles | dict[str, Any] | Callable[[Wildcards], InputFiles | dict[str, Any]] [source]
Return input files for action of substep with given wildcards
Delegates to the sub step object’s get_input_files function
- get_log_file(sub_step: str, action: str) OutputFiles | dict[str, Any] [source]
Return path to the log file
Delegates to the sub step object’s get_log_file function
- get_output_files(sub_step: str, action: str) OutputFiles | dict[str, Any] [source]
Return list of strings with output files/patterns
Delegates to the sub step object’s get_output_files function
- get_params(sub_step: str, action: str) Any [source]
Return parameters
Delegates to the sub step object’s get_params function
- get_resource(sub_step: str, action: str, resource_name: str) Any [source]
Get resource
Delegates to the sub step object’s get_resource function
- get_shell_cmd(sub_step: str, action: str, wildcards: Wildcards) str [source]
Return shell command for the pipeline sub step
Delegates to the sub step object’s get_shell_cmd function
- get_tmpdir() str [source]
Return temporary directory.
To be used directly or via get_resource(“step”, “action”, “tmpdir”)
Try to evaluate global_config/tmpdir. Interpret $-variables from environment. Provides the current date as $TODAY.
If this fails, try to use environment variable TMPDIR.
If this fails, use tempfile.gettempdir(), same as Snakemake default.
- logger
Setup logger for the step
- name: str
Override with step name
- previous_steps
Classes of previously executed steps, used for merging their default configuration as well.
- register_sub_step_classes(classes: tuple[type[BaseStepPart] | tuple[type[BaseStepPart], Any], ...])[source]
Register an iterable of sub step classes
Initializes objects in
self.sub_steps
dict
- register_sub_workflow(step_name: str, workdir: str, sub_workflow_name: str | None = None)[source]
Register workflow with given pipeline
step_name
and in the givenworkdir
.Optionally, the sub workflow name can be given separate from
step_name
(the default) value for it.
- run(sub_step: str, action: str, wildcards: Wildcards) str [source]
Run command for the given action of the given sub step with the given wildcards
Delegates to the sub step object’s run function
- sheet_shortcut_args = None
Override with arguments to pass into sheet shortcut class constructor
- sheet_shortcut_class: type[ShortcutSampleSheet]
Override with the sheet shortcut class to use
- sheet_shortcut_kwargs = None
Override with keyword arguments to pass into sheet shortcut class constructor
- sheets
Shortcut to the BioMed SampleSheet objects
- shortcut_sheets
Shortcut sheets
- sub_workflows: dict[str, snakemake.Workflow]
Functions from sub workflows, can be used to generate output paths into these workflows
- substep_dispatch(step: str, function: str, *args, **kwargs)[source]
Dispatch call to function of sub step implementation
- work_dir
Absolute path to directory of where to perform work
- workflow
Snakefile “workflow” object
- class snappy_pipeline.workflows.abstract.BaseStepPart(parent: P)[source]
Base class for a part of a pipeline step
- actions: tuple[str, ...]
The actions available in the class.
- check_config()[source]
Check configuration, raise
ConfigurationMissing
on problemsOverride in sub classes.
- Raises:MissingConfiguration:
on missing configuration
- default_resource_usage: ResourceUsage = ResourceUsage(threads=1, time='01:00:00', memory='2G', partition=None, tmpdir=None)
Default resource usage for actions that are not given in
resource_usage
.
- get_args(action: str) InputFiles | dict[str, Any] | Callable[[Wildcards], InputFiles | dict[str, Any]] [source]
Return args for the given action of the sub step
- get_input_files(action: str) InputFiles | dict[str, Any] | Callable[[Wildcards], InputFiles | dict[str, Any]] [source]
Return input files for the given action of the sub step
- get_log_file(action: str) OutputFiles | dict[str, Any] [source]
Return path to log file
The default implementation tries to call
self._get_log_files()
and in the case of this function returning adict
, augments it with paths to MD5 files.
- get_output_files(action: str) OutputFiles | dict[str, Any] [source]
Return output files for the given action of the sub step and
- get_resource(action: str, resource_name: str) Callable[[Wildcards, InputFiles], Any] [source]
Return the amount of resources to be allocated for the given action.
- Parameters:
action – The action to return the resource requirement for.
resource_name – The name to return the resource for.
- get_resource_usage(action: str, **kwargs) ResourceUsage [source]
Return the resource usage for the given action.
- get_shell_cmd(action: str, wildcards: Wildcards) str [source]
Return shell command for the given action of the sub step and the given wildcards
- resource_usage: dict[str, ResourceUsage] = {}
Configure resource usage here that should not use the default resource usage from
default_resource_usage
.
- class snappy_pipeline.workflows.abstract.DataSearchInfo(sheet_path: str, base_paths: list, search_paths: list, search_patterns: list, mixed_se_pe: bool)[source]
Data search information - simplified version of
DataSetInfo
.
- class snappy_pipeline.workflows.abstract.DataSetInfo(name, sheet_path, base_paths, search_paths, search_patterns, sheet_type, is_background, naming_scheme, mixed_se_pe, sodar_uuid, sodar_title, pedigree_field=None)[source]
Information on a DataSet
- base_paths
All base paths of all configuration, to look for
sheet_path
- is_background
Whether the data set info is to be used only for background
- mixed_se_pe
Whether mixing SE and PE data sets is allowed.
- name
Name of the data set
- pedigree_field_kwargs
The (optional) custom field used to define pedigree
- search_paths
Search paths for the files in the sample sheet
- search_patterns
Search patterns
- sheet
The BioMed SampleSheet
- sheet_path
Path to the sheet file, for loading
- sodar_title
The (optional) title of the project in SODAR.
- sodar_uuid
The UUID of the corresponding SODAR project.
Raised when a function that is to be overridden optionally is called
This is provided as an alternative to
NotImplementedError
as the Python linters warn if a class does not override functions throwingNotImplementedError
.
- class snappy_pipeline.workflows.abstract.InputFilesStepPartMixin[source]
Mixin with predefined “get_input_files” function.
- ext_names = None
Names of the files to create for the extension
- ext_values = None
Extensions of files to create as main payload
- include_ped_file = None
Whether to include path to PED file or not
- prev_class = None
Class with input VCF file name
- class snappy_pipeline.workflows.abstract.LinkInBaiExternalStepPart(parent)[source]
Link in the external BAI files.
- actions: tuple[str, ...] = ('run',)
Class available actions
- name: str = 'link_in_bai_external'
Step name
- pattern_set_keys = ('bai', 'bai_md5')
Patterns set keys
- class snappy_pipeline.workflows.abstract.LinkInBamExternalStepPart(parent)[source]
Link in the external BAM files.
- actions: tuple[str, ...] = ('run',)
Class available actions
- name: str = 'link_in_bam_external'
Step name
- pattern_set_keys = ('bam', 'bam_md5')
Patterns set keys
- class snappy_pipeline.workflows.abstract.LinkInPathGenerator(work_dir, data_set_infos, config_paths, cache_file_name='.snappy_path_cache', preprocessed_path='')[source]
Helper class for generating paths to link in
- cache_file_name
Name of cache file to create
- config_paths
Path to configuration files, used for invalidating cache
- run(folder_name, pattern_set_keys=('left', 'right', 'left_md5', 'right_md5', 'bam'))[source]
Yield (src_path, path_infix, filename) one-by-one
Cache is saved after the last iteration
- work_dir
Working directory
- class snappy_pipeline.workflows.abstract.LinkInStep(parent)[source]
Link in the raw files, e.g. FASTQ files
Depending on the configuration, the files are linked out after postprocessing
- class snappy_pipeline.workflows.abstract.LinkInVcfExternalStepPart(parent)[source]
Link in the external VCF files.
- actions: tuple[str, ...] = ('run',)
Class available actions
- get_shell_cmd(action, wildcards)[source]
Return call for linking in the files
The files are linked, keeping their relative paths to the item matching the “folderName” intact.
- name: str = 'link_in_vcf_external'
Step name
- pattern_set_keys = ('vcf', 'vcf_md5')
Patterns set keys
- class snappy_pipeline.workflows.abstract.LinkOutStepPart(parent, disable_patterns=None)[source]
Generically link out
This is for output files that are created unconditionally, i.e., for output files where the output name is the same as for the work file.
- disable_patterns
Patterns for disabling linking out to. This is useful/required when there is a specialized link out step part, e.g., for the case of alignment where realignment is performed or not, depending on the configuration.
- snappy_pipeline.workflows.abstract.STDERR_TO_LOG_FILE = '# -----------------------------------------------------------------------------\n# Redirect stderr to log file and enable printing executed commands\nexec 2> >(tee -a "{log}")\nset -x\n# -----------------------------------------------------------------------------\n\n'
String constant with bash command for redirecting stderr to
{log}
file
- class snappy_pipeline.workflows.abstract.WritePedigreeSampleNameStepPart(*args, **kwargs)[source]
Class contains method to write pedigree file for primary DNA sample given the index NGS library name.It will create pedigree information based sole on sample name, example ‘P001’ instead of ‘P001-N1-DNA1-WGS1’.
- name: str = 'write_pedigree_with_sample_name'
Step name
- class snappy_pipeline.workflows.abstract.WritePedigreeStepPart(parent: P, require_dna_ngs_library: bool = False, only_trios: bool = False)[source]
Write out pedigree file for primary DNA sample given the index NGS library name
- actions: tuple[str, ...] = ('run',)
Class available actions
- get_input_files(action)[source]
Returns function returning input files.
Returns a dict with entry
"bam"
mapping to list of input BAM files. This list will be empty if the parent step does not define an"ngs_mapping"
workflow.
- name: str = 'write_pedigree'
Step name
- require_dna_ngs_library
Whether to prevent writing out of samples with out NGS library.
- snappy_pipeline.workflows.abstract.get_ngs_library_folder_name(sheets, library_name)[source]
Return library’s folder name
The library is searched for based on the
library_name
. In the case of multiple NGS library matches, the first one is returned.
- snappy_pipeline.workflows.abstract.modified_environ(*remove, **update)[source]
Temporarily updates the
os.environ
dictionary in-place.The
os.environ
dictionary is updated in-place so that the modification is sure to work in all situations.- Parameters:
remove – Environment variables to remove.
update –
Dictionary of environment variables and values to add/update.