API Documentation

snappy_pipeline.base

Basic utility code for snappy_pipeline

exception snappy_pipeline.base.InvalidConfiguration[source]

Raised on invalid configuration

exception snappy_pipeline.base.MissingConfiguration[source]

Raised on missing configuration

exception snappy_pipeline.base.SkipLibraryWarning[source]

Raised when libraries are skipped.

exception snappy_pipeline.base.UnknownFiltrationSourceException[source]

Raised when user try to request an unknown filtration source.

exception snappy_pipeline.base.UnsupportedActionException[source]

Raised when user try to call action that isn’t supported.

snappy_pipeline.base.expand_ref(config_path: str, dict_data: dict | list, lookup_paths: list[str] = None, dict_class=<class 'collections.OrderedDict'>) tuple[Any, tuple[AnyStr, ...], tuple[AnyStr, ...]][source]

Expand “$ref” in JSON-like data dict_data

Returns triple:

  • path to resolved file

  • paths containing included config files

  • config files included

snappy_pipeline.base.merge_dictlikes(dict1: DictLike, dict2: DictLike, dict_class: D = <class 'collections.OrderedDict'>) D[source]

Merge dictionary/model dict2 into dict1

snappy_pipeline.base.merge_kwargs(first_kwargs: dict[str, Any] | None, second_kwargs: dict[str, Any] | None) dict[str, Any] | None[source]

Merge two keyword arguments.

Parameters:
  • first_kwargs (dict) – First keyword arguments dictionary.

  • second_kwargs (dict) – Second keyword arguments dictionary.

Returns:

Returns merged dictionary with inputted keyword arguments.

snappy_pipeline.base.print_config(config: dict[str, ~typing.Any], file=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)[source]

Print human-readable version of configuration to file

snappy_pipeline.base.print_sample_sheets(step: BaseStep, file=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)[source]

Print loaded sample sheets from BaseStep in human-readable format

snappy_pipeline.base.snakefile_path(step_name: str) AnyStr[source]

Return absolute path to Snakefile for the given step name

snappy_pipeline.find_file

Code for crawling the file system and caching the results

exception snappy_pipeline.find_file.FileNamesTooDifferent[source]

Raised when two file names are too different to be PE reads

class snappy_pipeline.find_file.FileSystemCrawler(cache_path, invalidation_paths, lock_timeout=60)[source]

Crawl the file system

  • start crawling the file system from a given directory

  • look for files matching a given PatternSet

  • that are below a directory with a given name

cache

The actual dict with the cache, loaded from path to cache_path if the cache file exists.

cache_dirty

Flag whether cache has been modified and needs saving

cache_invalidated

Flag whether cache has been invalidated already.

cache_path

Path to cache (will be stored in JSON format)

invalidation_paths

Path to files to use for checking invalidation.

lock_timeout

Timeout for obtaining file system lock on the file system

logger

The logger to use.

run(root_dir, dir_name, pattern_sets, allow_empty_right)[source]

Perform the file system crawling from a root directory given a query pattern set

allow_empty_right – for mixed PE/SE read data sets (must be either SE or PE

for one library!)

save_cache(cache_path=None)[source]

Save cache, cache_path overriding self.cache_path

class snappy_pipeline.find_file.FileSystemCrawlerResult(base_folder, files, names=None)[source]

n-tuple of optionally named files

base_folder

Folder to start crawling in

files

Patterns to search for

named_files

Dict with name-to-pattern mapping, None if names is not given

names

Names for the file patterns, optional; if given has to have the same length as files

to_dict()[source]

Convert to dict, can only work if self.names and self.files is given

class snappy_pipeline.find_file.PatternSet(patterns, names=None)[source]

Store named or unnamed list of patterns

named_patterns

Named patterns, if any, else None

names

Optional names

patterns

Patterns to search for with names

snappy_pipeline.utils

Utility code

class snappy_pipeline.utils.DictQuery[source]

Helper class for comfortable access to nested dicts with str keys.

Source:

get(path, default=None)[source]

Return the value for key if key is in the dictionary, else default.

snappy_pipeline.utils.flatten(coll: List[str | List[str]]) List[str][source]

Flatten collection of strings or list of strings.

Source: https://stackoverflow.com/a/17865033

snappy_pipeline.utils.is_none(value)[source]

Helper function returning whether value is None

snappy_pipeline.utils.is_not_none(value)[source]

Helper function returning whether value is not None

snappy_pipeline.utils.listify(gen)[source]

Decorator that converts a generator into a function which returns a list

Use it in the case where a generator is easier to write but you want to enforce returning a list:

@listify
def counter(max_no):
    i = 0
    while i <= max_no:
        yield i
snappy_pipeline.utils.try_or_none(func, exceptions)[source]

Helper that tries to execute the function

If one of the exceptions is raised then return None

snappy_pipeline.workflows.abstract

Base classes for the actual pipeline steps

class snappy_pipeline.workflows.abstract.BaseStep(workflow: Workflow, config: MutableMapping[str, Any], config_lookup_paths: tuple[str, ...], config_paths: tuple[str, ...], work_dir: str, *, config_model_class: type[C], previous_steps: tuple[type[Self], ...] | None = None)[source]

Base class for the pipeline steps

Each pipeline step is a Snakemake workflow

check_config()[source]

Check self.w_config, raise ConfigurationMissing on problems

Override in sub classes.

Raises:MissingConfiguration:

on missing configuration

config_lookup_paths

Paths with configuration paths, important for later retrieving sample sheet files

config_model_class

Pydantic model class for configuration validation

config_paths

Tuple with absolute paths to configuration files read

classmethod default_config_yaml()[source]

Override this function for providing default configuration

The configuration should be a YAML fragment. Your configuration should define a top-level key starting with ‘_’ and then consist of the name of the schema, e.g., ‘_ngs_mapping_schema’. Your default configuration is then merged into the main configuration where the main configuration takes precedence.

Example:

def default_config_yaml(self):
    return textwrap.dedent("""
        schema_config:
          ngs_mapping:
            max_threads: 16
    """).lstrip()))

Return None for no default configuration.

You can also return an iterable of configurations, these will be merged in the order given (earlier ones will be overwritten by later ones). This is useful if your schema needs configuration for a later one.

ensure_w_config(config_keys, msg, e_class=<class 'snappy_pipeline.base.MissingConfiguration'>)[source]

Check parameters in configuration.

Method ensures required configuration setting are present in the provided configuration; if not, it raises exception.

Parameters:

config_keys – List of strings with all keys that must be present in the configuration

for a given step of the analysis to be performed. :type config_keys: tuple

Parameters:
  • msg (str) – Message to be used in case of exception.

  • e_class – Preferred exception class to be raised in case of error.

Default: MissingConfiguration. :type e_class: class

get_args(sub_step: str, action: str) InputFiles | dict[str, Any] | Callable[[Wildcards], InputFiles | dict[str, Any]][source]

Return arguments for action of substep with given wildcards

Delegates to the sub step object’s get_args function

get_input_files(sub_step: str, action: str) InputFiles | dict[str, Any] | Callable[[Wildcards], InputFiles | dict[str, Any]][source]

Return input files for action of substep with given wildcards

Delegates to the sub step object’s get_input_files function

get_log_file(sub_step: str, action: str) OutputFiles | dict[str, Any][source]

Return path to the log file

Delegates to the sub step object’s get_log_file function

get_output_files(sub_step: str, action: str) OutputFiles | dict[str, Any][source]

Return list of strings with output files/patterns

Delegates to the sub step object’s get_output_files function

get_params(sub_step: str, action: str) Any[source]

Return parameters

Delegates to the sub step object’s get_params function

get_resource(sub_step: str, action: str, resource_name: str) Any[source]

Get resource

Delegates to the sub step object’s get_resource function

get_result_files() OutputFiles[source]

Return actual list of file names to build

get_shell_cmd(sub_step: str, action: str, wildcards: Wildcards) str[source]

Return shell command for the pipeline sub step

Delegates to the sub step object’s get_shell_cmd function

get_tmpdir() str[source]

Return temporary directory.

To be used directly or via get_resource(“step”, “action”, “tmpdir”)

  1. Try to evaluate global_config/tmpdir. Interpret $-variables from environment. Provides the current date as $TODAY.

  2. If this fails, try to use environment variable TMPDIR.

  3. If this fails, use tempfile.gettempdir(), same as Snakemake default.

logger

Setup logger for the step

name: str

Override with step name

previous_steps

Classes of previously executed steps, used for merging their default configuration as well.

register_sub_step_classes(classes: tuple[type[BaseStepPart] | tuple[type[BaseStepPart], Any], ...])[source]

Register an iterable of sub step classes

Initializes objects in self.sub_steps dict

register_sub_workflow(step_name: str, workdir: str, sub_workflow_name: str | None = None)[source]

Register workflow with given pipeline step_name and in the given workdir.

Optionally, the sub workflow name can be given separate from step_name (the default) value for it.

run(sub_step: str, action: str, wildcards: Wildcards) str[source]

Run command for the given action of the given sub step with the given wildcards

Delegates to the sub step object’s run function

sheet_shortcut_args = None

Override with arguments to pass into sheet shortcut class constructor

sheet_shortcut_class: type[ShortcutSampleSheet]

Override with the sheet shortcut class to use

sheet_shortcut_kwargs = None

Override with keyword arguments to pass into sheet shortcut class constructor

sheets

Shortcut to the BioMed SampleSheet objects

shortcut_sheets

Shortcut sheets

sub_workflows: dict[str, snakemake.Workflow]

Functions from sub workflows, can be used to generate output paths into these workflows

substep_dispatch(step: str, function: str, *args, **kwargs)[source]

Dispatch call to function of sub step implementation

substep_getattr(step: str, name: str) Any[source]

Return attribute from substep

work_dir

Absolute path to directory of where to perform work

workflow

Snakefile “workflow” object

classmethod wrapper_path(path: str) str[source]

Generate path to wrapper

class snappy_pipeline.workflows.abstract.BaseStepPart(parent: P)[source]

Base class for a part of a pipeline step

actions: tuple[str, ...]

The actions available in the class.

check_config()[source]

Check configuration, raise ConfigurationMissing on problems

Override in sub classes.

Raises:MissingConfiguration:

on missing configuration

default_resource_usage: ResourceUsage = ResourceUsage(threads=1, time='01:00:00', memory='2G', partition=None, tmpdir=None)

Default resource usage for actions that are not given in resource_usage.

get_args(action: str) InputFiles | dict[str, Any] | Callable[[Wildcards], InputFiles | dict[str, Any]][source]

Return args for the given action of the sub step

static get_default_partition() str | None[source]

Helper that returns the default partition.

get_input_files(action: str) InputFiles | dict[str, Any] | Callable[[Wildcards], InputFiles | dict[str, Any]][source]

Return input files for the given action of the sub step

get_log_file(action: str) OutputFiles | dict[str, Any][source]

Return path to log file

The default implementation tries to call self._get_log_files() and in the case of this function returning a dict, augments it with paths to MD5 files.

get_output_files(action: str) OutputFiles | dict[str, Any][source]

Return output files for the given action of the sub step and

get_resource(action: str, resource_name: str) Callable[[Wildcards, InputFiles], Any][source]

Return the amount of resources to be allocated for the given action.

Parameters:
  • action – The action to return the resource requirement for.

  • resource_name – The name to return the resource for.

get_resource_usage(action: str, **kwargs) ResourceUsage[source]

Return the resource usage for the given action.

get_shell_cmd(action: str, wildcards: Wildcards) str[source]

Return shell command for the given action of the sub step and the given wildcards

resource_usage: dict[str, ResourceUsage] = {}

Configure resource usage here that should not use the default resource usage from default_resource_usage.

run(action: str, wildcards: Wildcards)[source]

Run the sub steps action action’s code with the given wildcards

class snappy_pipeline.workflows.abstract.DataSearchInfo(sheet_path: str, base_paths: list, search_paths: list, search_patterns: list, mixed_se_pe: bool)[source]

Data search information - simplified version of DataSetInfo.

class snappy_pipeline.workflows.abstract.DataSetInfo(name, sheet_path, base_paths, search_paths, search_patterns, sheet_type, is_background, naming_scheme, mixed_se_pe, sodar_uuid, sodar_title, pedigree_field=None)[source]

Information on a DataSet

base_paths

All base paths of all configuration, to look for sheet_path

is_background

Whether the data set info is to be used only for background

mixed_se_pe

Whether mixing SE and PE data sets is allowed.

name

Name of the data set

pedigree_field_kwargs

The (optional) custom field used to define pedigree

search_paths

Search paths for the files in the sample sheet

search_patterns

Search patterns

sheet

The BioMed SampleSheet

sheet_path

Path to the sheet file, for loading

sodar_title

The (optional) title of the project in SODAR.

sodar_uuid

The UUID of the corresponding SODAR project.

exception snappy_pipeline.workflows.abstract.ImplementationUnavailableError[source]

Raised when a function that is to be overridden optionally is called

This is provided as an alternative to NotImplementedError as the Python linters warn if a class does not override functions throwing NotImplementedError.

class snappy_pipeline.workflows.abstract.InputFilesStepPartMixin[source]

Mixin with predefined “get_input_files” function.

ext_names = None

Names of the files to create for the extension

ext_values = None

Extensions of files to create as main payload

include_ped_file = None

Whether to include path to PED file or not

prev_class = None

Class with input VCF file name

class snappy_pipeline.workflows.abstract.LinkInBaiExternalStepPart(parent)[source]

Link in the external BAI files.

actions: tuple[str, ...] = ('run',)

Class available actions

name: str = 'link_in_bai_external'

Step name

pattern_set_keys = ('bai', 'bai_md5')

Patterns set keys

class snappy_pipeline.workflows.abstract.LinkInBamExternalStepPart(parent)[source]

Link in the external BAM files.

actions: tuple[str, ...] = ('run',)

Class available actions

name: str = 'link_in_bam_external'

Step name

pattern_set_keys = ('bam', 'bam_md5')

Patterns set keys

class snappy_pipeline.workflows.abstract.LinkInPathGenerator(work_dir, data_set_infos, config_paths, cache_file_name='.snappy_path_cache', preprocessed_path='')[source]

Helper class for generating paths to link in

cache_file_name

Name of cache file to create

config_paths

Path to configuration files, used for invalidating cache

run(folder_name, pattern_set_keys=('left', 'right', 'left_md5', 'right_md5', 'bam'))[source]

Yield (src_path, path_infix, filename) one-by-one

Cache is saved after the last iteration

work_dir

Working directory

class snappy_pipeline.workflows.abstract.LinkInStep(parent)[source]

Link in the raw files, e.g. FASTQ files

Depending on the configuration, the files are linked out after postprocessing

get_input_files(action)[source]

Return required input files

get_output_files(action)[source]

Return output files for the given action of the sub step and

get_shell_cmd(action, wildcards)[source]

Return call for linking in the files

The files are linked, keeping their relative paths to the item matching the “folderName” intact.

run(action, wildcards)[source]

Run the sub steps action action’s code with the given wildcards

class snappy_pipeline.workflows.abstract.LinkInVcfExternalStepPart(parent)[source]

Link in the external VCF files.

actions: tuple[str, ...] = ('run',)

Class available actions

get_shell_cmd(action, wildcards)[source]

Return call for linking in the files

The files are linked, keeping their relative paths to the item matching the “folderName” intact.

name: str = 'link_in_vcf_external'

Step name

pattern_set_keys = ('vcf', 'vcf_md5')

Patterns set keys

class snappy_pipeline.workflows.abstract.LinkOutStepPart(parent, disable_patterns=None)[source]

Generically link out

This is for output files that are created unconditionally, i.e., for output files where the output name is the same as for the work file.

disable_patterns

Patterns for disabling linking out to. This is useful/required when there is a specialized link out step part, e.g., for the case of alignment where realignment is performed or not, depending on the configuration.

get_input_files(action)[source]

Return input file pattern

get_output_files(action)[source]

Return output file pattern

get_shell_cmd(action, wildcards)[source]

Return call for linking out

snappy_pipeline.workflows.abstract.STDERR_TO_LOG_FILE = '# -----------------------------------------------------------------------------\n# Redirect stderr to log file and enable printing executed commands\nexec 2> >(tee -a "{log}")\nset -x\n# -----------------------------------------------------------------------------\n\n'

String constant with bash command for redirecting stderr to {log} file

class snappy_pipeline.workflows.abstract.WritePedigreeSampleNameStepPart(*args, **kwargs)[source]

Class contains method to write pedigree file for primary DNA sample given the index NGS library name.It will create pedigree information based sole on sample name, example ‘P001’ instead of ‘P001-N1-DNA1-WGS1’.

name: str = 'write_pedigree_with_sample_name'

Step name

run(wildcards, output)[source]

Write out the pedigree information

Parameters:
  • wildcards (snakemake.io.Wildcards) – Snakemake wildcards associated with rule (unused).

  • output (snakemake.io.Namedlist) – Snakemake output associated with rule.

class snappy_pipeline.workflows.abstract.WritePedigreeStepPart(parent: P, require_dna_ngs_library: bool = False, only_trios: bool = False)[source]

Write out pedigree file for primary DNA sample given the index NGS library name

actions: tuple[str, ...] = ('run',)

Class available actions

get_input_files(action)[source]

Returns function returning input files.

Returns a dict with entry "bam" mapping to list of input BAM files. This list will be empty if the parent step does not define an "ngs_mapping" workflow.

get_output_files(action)[source]

Return output files for the given action of the sub step and

name: str = 'write_pedigree'

Step name

require_dna_ngs_library

Whether to prevent writing out of samples with out NGS library.

run(wildcards: Wildcards, output: OutputFiles)[source]

Write out the pedigree information

Parameters:
  • wildcards (snakemake.io.Wildcards) – Snakemake wildcards associated with rule (unused).

  • output (snakemake.io.Namedlist) – Snakemake output associated with rule.

snappy_pipeline.workflows.abstract.get_ngs_library_folder_name(sheets, library_name)[source]

Return library’s folder name

The library is searched for based on the library_name. In the case of multiple NGS library matches, the first one is returned.

snappy_pipeline.workflows.abstract.modified_environ(*remove, **update)[source]

Temporarily updates the os.environ dictionary in-place.

The os.environ dictionary is updated in-place so that the modification is sure to work in all situations.

Parameters: