Somatic Variant Calling Dissection

Note

Before reading this chapter, you should have

After reading this chapter, you should

  • understand the BaseStep and BaseStepPart classes and how to subclass them and override the different functions

  • understand how the objects of these classes are tied into the Snakefile of each pipeline step

  • understand how to create and use Snakemake wrappers for tools

Pipeline Step File Structure

Generally, all pipeline steps go into a sub module of snappy_pipeline.workflows (thus, a sub directory). In this chapter, we look at the somatic_variant_calling pipeline step. This step has the following structure on the file system:

somatic_variant_calling/
|-- __init__.py
`-- Snakefile

As you can see, it is a Python module (as it contains an __init__.py file) that also contains non-Python files (here Snakefile). The directory could also contain more files. This could be any small static data file that the module could require. Further, we could decide to factorize out the rules for a tool that requires many small rules (such as the tool cnvkit in somatic_targeted_seq_cnv_calling).

The code for generating input and output file lists etc. is in the __init__.py file and the module is available as snappy_pipeline.workflow.somatic_variant_calling. The Snakefile is used for creating the Snakemake workflow. When executing cubi-snake for a somatic_variant_calling step instance, you will note that the Snakemake command line displayed at the top will use the --snakefile argument and put the value to the Snakefile inside the somatic_variant_calling directory at the argument’s value. Thus, cubi-snake is no real “magic” but simply a shortcut to the snakemake executable.

The Snakefile

We will first consider the Snakefile.

Necessary Imports

At the top, it starts with a line specifying UTF-8 coding and a Python docstring giving a short synopsis. It does some Python imports for making the expand_ref() function and the SomaticVariantCallingWorkflow class available.

# -*- coding: utf-8 -*-
"""CUBI Pipeline somatic_variant_calling step Snakefile"""

import os

from snappy_pipeline import expand_ref
from snappy_pipeline.workflows.somatic_variant_calling import (
    SomaticVariantCallingWorkflow,
)

Configuration

Then follows the loading of the configuration. The Snakemake configfile: statement loads the file config.yaml from the current working directory. When executing the pipeline step with cubi-snake, this is either the directory the command is called in or the value of the --directory argument if given.

In the last line of this chunk, the JSON pointers in the configuration are expanded, i.e., the "$ref" values are interpreted. This is used for implementing “overriding behaviour”, i.e., including and extending (OOP-like) the project’s main configuration file with the per–step instance one. This way, you can set the pipeline_step/name to somatic_variant_calling in the somatic variant calling pipeline instance directory and to ngs_mapping in the ngs mapping pipeline step instance directory, for example.

# Configuration ===============================================================


configfile: "config.yaml"


Local Rules / Rule all

In the next chunk, the rules that are to be executed locally and not generate any cluster jobs are defined. Then, the all rule is defined to obtain the list of files to generate by default using the get_result_files() method of the SomaticVariantCallingWorkflow() class.


# Rules =======================================================================


localrules:
    # Linking files from work/ to output/ should be done locally
    somatic_variant_calling_link_out_run,

House-Keeping Rules

Next follow the “house-keeping” rules that do not perform any real work. In this case, the somatic_variant_calling_link_out_run rule performs the linking from the work/ directory into the output/ directory.

Note that the rule names are generated by concatenating the step name (here somatic_variant_calling), the part of the pipeline step (here link_out), followed by the action to be performed (here run as there is no other action for link_out).

rule all:
    input:
        wf.get_result_files(),


# House-Keeping ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

# Generic linking out ---------------------------------------------------------

Rule for MuTect

Next comes the rule for running mutect. Note that both for input: and output:, dict values will be passed. These should be unpacked (similar to Python **kwargs unpacking).

As we are using an input function (i.e., a function object that accepts a wildcards argument), we have to use the recently introduced Snakemake unpack directive. This allows for lazy unpacking after the input function has been called with the wildcards argument. For the output files, no wildcards are required as only strings with placeholders are returned. Thus, the dict with key/value pairs of the named output files is to be unpacked directly using two asterisks (**).

rule somatic_variant_calling_link_out_run:
    input:
        wf.get_input_files("link_out", "run"),
    output:
        wf.get_output_files("link_out", "run"),
    run:
        shell(wf.get_shell_cmd("link_out", "run", wildcards))


# Somatic Variant Calling ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Rule for Scalpel

The rule for Scalpel looks similar. However, here the name of the normal library is required (as it is not part of the wildcards in contrast to the somatic library).

The way this is implemented here is to introduce a parameter called normal_library_name. The attribute get_normal_lib_name of the scalpel BaseStepPart sub class object is passed in here (which is actually a function). On execution of the rule, the function will be called with the wildcards object as the parameter. It will then lookup the name of the normal library for the matched tumor NGS library and return it. This value is then available as params.normal_library_name.

Also note that for scalpel, the call is not generated directly by the BaseStepPart sub class, but a Snakemake wrapper is used instead. This wrapper is located in the directory ../wrappers/scalpel/run, relative to the somatic_variant_calling Snakefile. The method wrapper_path() builds the correct path relative to the wrappers directory in the snappy_pipeline directory and its return value is passed to the wrapper: section.

# Run MuTect ------------------------------------------------------------------


rule somatic_variant_calling_mutect_run:
    input:
        unpack(wf.get_input_files("mutect", "run")),
    output:
        **wf.get_output_files("mutect", "run"),
    threads: wf.get_resource("mutect", "run", "threads")

The Module

Module Documentation

The file starts with the module-level documentation. Only the first four lines are shown below. This module-level documentation is also included into the user documentation, e.g., the one for the somatic variant calling module is included at Somatic Variant Calling.

# -*- coding: utf-8 -*-
"""Implementation of the ``somatic_variant_calling`` step

The ``somatic_variant_calling`` step takes as the input the results of the ``ngs_mapping`` step

Imports

Then follow the necessary imports for the module. Note that the classes of the steps that are used for the input are also imported (this will be important below).

(unplaced or unlocalized contigs, viral sequences, decoys, HLA alleles, ...).
Somatic variant calling is generally meaningless for many of these small contigs.
It is possible to configure the somatic variant calling to avoid the contigs irrelevant for downstream analysis, for example:

.. code-block:: yaml

  mutect2:
    ignore_chroms: ['*_random', 'chrUn_*', '*_decoy', "EBV", "HPV*", "HBV", "HCV-*", "HIV-*", "HTLV-1", "CMV", "KSHV", "MCV", "SV40"] # GRCh38.d1.vd1
    window_length: 300000000   # OK for WES, too large for WGS
    keep_tmpdir: onerror       # Facilitates debugging
    ...


=======

Constants

The imports are followed by constant definitions.

In the case of the somatic variant calling methods, the different tools generate a common set of core files, here VCF files with TBI indices and MD5 files for both. Thus, it makes sense to store the extensions (and names for named input and output file lists) in module-level constants. Note the use of tuples over lists for marking this datas explicitely as immutable.

Further, the DEFAULT_CONFIG constant is defined with default configuration in YAML format. This is also displayed in the user configuration so the users know where configuration settings are available. Required configuration without any defaults should be set to null (or []/{} for empty lists/dicts) and marked with a # REQUIRED comment. The different values are to be documented with YAML comments.

This default configuration will be loaded when initializing the BaseStep sub class object and then overridden with the project- and pipeline step instance–wide configuration.

=======

Currently, no reports are generated.
"""

import os
import sys
from collections import OrderedDict

from biomedsheets.shortcuts import CancerCaseSheet, CancerCaseSheetOptions, is_not_background
from snakemake.io import expand

from snappy_pipeline.utils import dictify, listify
from snappy_pipeline.workflows.abstract import (
    BaseStep,
    BaseStepPart,
    LinkOutStepPart,
    ResourceUsage,
)
from snappy_pipeline.workflows.ngs_mapping import NgsMappingWorkflow

from .model import SomaticVariantCalling as SomaticVariantCallingConfigModel

__author__ = "Manuel Holtgrewe <manuel.holtgrewe@bih-charite.de>"

#: Extensions of files to create as main payload
EXT_VALUES = (".vcf.gz", ".vcf.gz.tbi", ".vcf.gz.md5", ".vcf.gz.tbi.md5")

The BaseStep Sub Class

Let’s jump towards the end of the file. Here is the BaseStep sub class SomaticVariantCallingWorkflow.

Each sub class has to configure the name and sheet_shortcut_class class members. They will be used for identifying the step name and the BioMed Sheet sheet shortcut class. The somatic variant calling step sets these to "somatic_variant_calling" and the CancerCaseSheet class.

The static method default_config_yaml() must be overridden in each BaseStep sub class. Each of these functions will have the same content but it is important for the scope of accessing DEFAULT_CONFIG in the current module.

        # Validate action
        self._validate_action(action)

        prefix = (
            "work/{{mapper}}.{var_caller}.{{tumor_library}}/log/"
            "{{mapper}}.{var_caller}.{{tumor_library}}"
        ).format(var_caller=self.__class__.name)
        key_ext = (
            ("log", ".log"),
            ("conda_info", ".conda_info.txt"),
            ("conda_list", ".conda_list.txt"),
        )

The constructor of the class calls the super class’ constructor with the arguments from the Snakefile. It is very important to note that it also gets an iterable (here a one-element tuple) of the BaseStep sub classes that provide input for this step (here only NGSMappingWorkflow imported at the top). This information is required for making the default configuration of these steps available.

The constructor then proceeds to register the sub step classes that are used to implement the actual behaviour of the pipeline step. Here, it is for running MuTect, Scalpel, and linking out the somatic variant call VCF files from work/ into output/.

            yield key, prefix + ext
            yield key + "_md5", prefix + ext + ".md5"


class MutectBaseStepPart(SomaticVariantCallingStepPart):
    """Base class for Mutect 1 and 2 step parts"""

    def check_config(self):
        if self.name not in self.config.tools:
            return  # Mutect not enabled, skip

The method get_result_files() returns a list of result files of this pipeline step. For this, it uses the _yield_result_files_() helper method and generates path in the output/ folder. Snakemake knows that the link out rule will create these files but need corresponding files in work/ for this. Through this mechanism, the individual tools’ rules will be triggered.

Note the use of cubi.utils.listify() decorator that converts a generator (as created by using yield in the function) to a function returning a list with the yielded objects in the ordere that they are yielded.

            "COSMIC not configured but required for %s" % (self.name,),
        )
        self.parent.ensure_w_config(
            ("static_data_config", "dbsnp", "path"),
            "Path to dbSNP not configured but required for %s" % (self.name,),
        )
        self.parent.ensure_w_config(
            ("static_data_config", "reference", "path"),
            "Path to reference FASTA not configured but required for %s" % (self.name,),
        )

    def get_output_files(self, action):
        output_files = {}
        for k, v in EXT_MATCHED[self.name].items():
            output_files[k] = self.base_path_out.format(var_caller=self.name, ext=v)
        return output_files

    def get_resource_usage(self, action: str, **kwargs) -> ResourceUsage:
        """Get Resource Usage

        :param action: Action (i.e., step) in the workflow, example: 'run'.
        :type action: str

        :return: Returns ResourceUsage for step.

Finally, the check_config() implementation ensures that the path to the NGS mapping step is configured for the somatic variant calling step.

        # Validate action
        self._validate_action(action)
        return ResourceUsage(
            threads=2,
            time="3-00:00:00",  # 3 days

Module-Level BaseStepPart Sub Class

Now, we move up towards the top of the file again.

The SomaticVariantCallingStepPart class is the base class for the somatic variant calling implementations. The constructor builds a template string for generating result/output paths. It then builds the member cancer_ngs_library_to_sample_pair with a mapping from tumor DNA NGS library name to the BioMed Sheets CancerSamplePair object that contains information about both the tumor and normal sample. Note the use of OrderedDict to keep the order from the sample sheet definition.

EXT_MATCHED = {
    "mutect": {
        "vcf": ".vcf.gz",
        "vcf_md5": ".vcf.gz.md5",
        "vcf_tbi": ".vcf.gz.tbi",
        "vcf_tbi_md5": ".vcf.gz.tbi.md5",
        "full_vcf": ".full.vcf.gz",
        "full_vcf_md5": ".full.vcf.gz.md5",
        "full_vcf_tbi": ".full.vcf.gz.tbi",
        "full_vcf_tbi_md5": ".full.vcf.gz.tbi.md5",
        "txt": ".txt",
        "txt_md5": ".txt.md5",
        "wig": ".wig",
        "wig_md5": ".wig.md5",
    },
    "scalpel": {
        "vcf": ".vcf.gz",

The implementation of get_input_files() returns an input function that given the wildcards returns a dict with paths to the normal and tumor libraries’ aligned BAM file from the sub workflow ngs_mapping. Note that the input function returns the actual path without any wildcards.

        "vcf_tbi": ".vcf.gz.tbi",
        "vcf_tbi_md5": ".vcf.gz.tbi.md5",
        "full_vcf": ".full.vcf.gz",
        "full_vcf_md5": ".full.vcf.gz.md5",
        "full_vcf_tbi": ".full.vcf.gz.tbi",
        "full_vcf_tbi_md5": ".full.vcf.gz.tbi.md5",
        "tar": ".tar.gz",
        "tar_md5": ".tar.gz.md5",
    },
    "mutect2": {
        "vcf": ".vcf.gz",
        "vcf_md5": ".vcf.gz.md5",
        "vcf_tbi": ".vcf.gz.tbi",
        "vcf_tbi_md5": ".vcf.gz.tbi.md5",
        "full_vcf": ".full.vcf.gz",
        "full_vcf_md5": ".full.vcf.gz.md5",
        "full_vcf_tbi": ".full.vcf.gz.tbi",
        "full_vcf_tbi_md5": ".full.vcf.gz.tbi.md5",
    },
}

The method get_normal_lib_name() returns the name of the matched normal NGS library for the given tumor NGS library name.

#: Available somatic variant callers assuming matched samples.
SOMATIC_VARIANT_CALLERS_MATCHED = {"mutect", "mutect2", "scalpel", "strelka2"}

#: Available somatic variant callers that just call all samples from one donor together.

The implementation of get_output_files() returns a dict with named output files for Snakemake. Note that this function returns named output files with wildcard placeholders.

    "bcftools_joint",
    "platypus_joint",
    "gatk_hc_joint",
    "gatk_ug_joint",
    "varscan_joint",
}

Finally, the method get_log_file() returns the path to the log file to create by Snakemake. This is available as {log} in shell commands and as {snakemake.log} in Snakemake wrappers.

SOMATIC_VARIANT_CALLERS = SOMATIC_VARIANT_CALLERS_MATCHED | SOMATIC_VARIANT_CALLERS_JOINT

#: Default configuration for the somatic_variant_calling schema

MuTect BaseStepPart Sub Class

First, the class defines the class attribute name and sets it to 'mutect'. This is used by the super class and also by BaseStep in the places where the name of the implementation is needed.

The check_config() implementation ensures that the necessary MuTect-specific configuration has been set.


class SomaticVariantCallingStepPart(BaseStepPart):
    """Base class for somatic variant calling step parts

    Variant calling is performed on matched cancer bio sample pairs.  That is, the primary NGS
    library for the primary bio sample is used for each cancer bio sample (paired with the primary
    normal bio sample's primary NGS library).
    """

    def __init__(self, parent):
        super().__init__(parent)
        self.base_path_out = (
            "work/{{mapper}}.{var_caller}.{{tumor_library}}/out/"
            "{{mapper}}.{var_caller}.{{tumor_library}}{ext}"
        )
        # Build shortcut from cancer bio sample name to matched cancer sample
        self.tumor_ngs_library_to_sample_pair = OrderedDict()

The function get_shell_cmd() generates the shell command to the CUBI wrapper (not Snakemake wrapper ;) to the MuTect tool. Here, the parallel CUBI wrapper for MuTect is used with appropriate configuration. Note the direct use of configuration and that no complex string operations are required for building the call to MuTect.

            self.tumor_ngs_library_to_sample_pair.update(
                sheet.all_sample_pairs_by_tumor_dna_ngs_library
            )

    def get_input_files(self, action):
        # Validate action
        self._validate_action(action)

        def input_function(wildcards):
            """Helper wrapper function"""
            # Get shorcut to Snakemake sub workflow
            ngs_mapping = self.parent.sub_workflows["ngs_mapping"]
            # Get names of primary libraries of the selected cancer bio sample and the
            # corresponding primary normal sample
            tumor_base_path = (
                "output/{mapper}.{tumor_library}/out/" "{mapper}.{tumor_library}"
            ).format(**wildcards)
            input_files = {
                "tumor_bam": ngs_mapping(tumor_base_path + ".bam"),
                "tumor_bai": ngs_mapping(tumor_base_path + ".bam.bai"),
            }

            normal_library = self.get_normal_lib_name(wildcards)
            if normal_library:
                normal_base_path = (
                    "output/{mapper}.{normal_library}/out/{mapper}.{normal_library}".format(

Finally, the method update_cluster_config() takes the Snakemake cluster configuration and updates it appropriately. The three settings cluster.h_vmem, cluster.h_rt, and cluster.pe are used in the generated Snakemake command line generated by the cubi-snake tool appropriately for SGE.

                    )
                )
                input_files.update(
                    {
                        "normal_bam": ngs_mapping(normal_base_path + ".bam"),
                        "normal_bai": ngs_mapping(normal_base_path + ".bam.bai"),

Scalpel BaseStepPart Sub Class

The integration for Scalpel is even simpler as Snakemake wrappers can be used and there is no need for the get_shell_cmd() function. The class sets the class attribute name to 'scalpel'. Then, the check_config() implementation ensure the presence of the required configuration.


            return input_files

        return input_function

    def get_normal_lib_name(self, wildcards):
        """Return name of normal (non-cancer) library"""
        pair = self.tumor_ngs_library_to_sample_pair.get(wildcards.tumor_library, None)
        return pair.normal_sample.dna_ngs_library.name if pair else None

    def get_tumor_lib_name(self, wildcards):

Output file generation is similarly easy as for the MuTect part. However, the Scalpel working directory is tar-gzed in case the users wants to have access to the intermediate results and query the built MuTect database for variants with different coverages.

        pair = self.tumor_ngs_library_to_sample_pair.get(wildcards.tumor_library, None)
        return pair.tumor_sample.dna_ngs_library.name if pair else wildcards.tumor_library

    def get_output_files(self, action):
        """Return output files that all somatic variant calling sub steps must
        return (VCF + TBI file)
        """
        # Validate action
        self._validate_action(action)

The update_cluster_config() method’s implementation is also very simple.

            zip(EXT_NAMES, expand(self.base_path_out, var_caller=[self.name], ext=EXT_VALUES))
        )

    @dictify
    def _get_log_file(self, action):
        """Return dict of log files."""