Home > Enterprise >  Run nextflow pipeline multiple time depending on length of a list
Run nextflow pipeline multiple time depending on length of a list

Time:01-11

I have a Nextflow pipeline that runs a BUSCO search on input data, returns the output and then creates a report based on the output. One of my parameters is the lineage dataset. I would like to have my pipeline run multiple times (preferably at the same time) based on how long my list of lineage datasets is. The goal is that my pipeline would run twice in this case. If there are n lineages specified I would like it to run n times. These are my files so far:

Nextflow.config

params { 
    input_files              = "$PWD/input_data/*.fasta"
    lineage_datasets        = ["fungi_odb10", "microsporidia_odb10"]
    augustus_species        = "fumigatus"
    mode                    = "geno"
    
    threads                 = 16
    help                    = false
    outdir {
        main                = "$PWD/results"
        intermediate        = "$launchDir/busco_outputs"
    }
}

process {
    executor                = 'slurm'
    cpus                    = 4
    time                    = '1hour'
    memory                  = '4.GB'
}

singularity {
    enabled                 = true
    autoMounts              = true
}

main.nf:

process busco {
  container "quay.io/biocontainers/busco:5.2.2--pyhdfd78af_0"

  tag "${lineage_dataset}: ${input_file.name}"

  input:
    path input_file
    each lineage_dataset
  
  output:
    path "hap1/short_summary.${lineage_dataset}.${output_name}.txt"

  script: 
  output_name = input_file.baseName

    """
    busco \
    -m ${params.mode} \
    -o hap1 \
    -i $input_file \
    -l ${lineage_dataset} \
    --augustus_species ${params.augustus_species} \
    --update-data \
    -c 8 \
    | echo "${params.augustus_species}" >> "./hap1/short_summary.${lineage_dataset}.${output_name}.txt"

    """
}

process createReport {

  publishDir params.outdir.main, mode: 'copy'

  input: 
    path "short_summary.*", stageAs: 'busco_outputs/*'

  output:
    path 'report.html'

  script:
    """
    source "$launchDir/venv/bin/activate"
    report.py > report.html
    """ 
}

workflow {
  input_files = Channel.fromPath(params.input_files)
  
  busco( input_files, params.lineage_datasets )
  | collect
  | createReport
}

report.py

#!/usr/bin/env python

from report_modules.utils.report_utils import Report_Printer
from report_modules.utils.parsing_utils import Report_Parser
from pathlib import Path

projectDir = "/".join(__file__.split("/")[0:-2])
path = Path(f"{projectDir}/busco_outputs")
list_of_files = path.glob('*.txt')

file_data_array = []
for data in list_of_files:
    file_data = ""
    with open(data, 'r') as file:
        lines = file.readlines()
        for line in lines:
            file_data  = line
    file_data_array.append(file_data)

all_stats_dicts = {}
for index, file_data in enumerate(file_data_array):
    parser = Report_Parser(file_data)
    stats_dict = parser.parse_report()
    all_stats_dicts[f'dict_{index}'] = stats_dict

if __name__ == '__main__':
    report_printer = Report_Printer()
    report_template = report_printer.print_template(all_stats_dicts)

This is what I currently have now (after editing my code) and I am hoping that each summary.txt file from busco will be passed into the createReport process inside of a folder called busco_outputs. The report.py script will then read the busco_outputs folder and loop through each of the summary files inside of it.

Please let me know what I should change in order to have this working properly! Thank you very much!

CodePudding user response:

You get the error because params.lineage_datasets is an ArrayList, but the cross operator requires a target channel in order to combine the items. That said, I think there are some other issues that need addressing first:

  • List params are not (yet) supported, so there's no way to specify multiple lineage datasets from the command line using the current approach. Instead, you will likely want to pass in a string of comma-separated lineages so they can be split/tokenized.

  • Know that files are copied to the publishDir asynchronously, so downstream processes should not try to access files in the publish directory. Ensure your createReport process therefore avoids targeting files in this directory. Usually you do not want to publish intermediary files anyway, so your BUSCO process can probably omit the publishDir directive altogether.

Here's one way, using the each input repeater, that address the above. I'm not sure if I've got the summary output filename pattern correct, so you may need to adjust accordingly:

params.sequence_files = "./input_data/*.fasta"
params.lineage_datasets = "fungi_odb10,microsporidia_odb10"
params.outdir = "./results"
process BUSCO {

    tag "${lineage_dataset}: ${sequence_file.name}"

    input:
    path sequence_file
    each lineage_dataset

    output:
    path "short_summary.${lineage_dataset}.${output_name}.txt"

    script:
    output_name = sequence_file.baseName

    """
    touch short_summary.${lineage_dataset}.${output_name}.txt
    """
}
process createReport {

    publishDir "${params.outdir}/report", mode: 'copy'

    input:
    path summary_files, stageAs: 'summary_files/*'

    output:
    path 'report.html'

    """
    source "$launchDir/venv/bin/activate"
    report.py --summary_files ${summary_files} > report.html
    """
}
workflow {

    lineage_datasets = params.lineage_datasets?.tokenize(',')

    sequence_files = Channel.fromPath( params.sequence_files )

    outputs = BUSCO( sequence_files, lineage_datasets )

    createReport( outputs.collect() )
}

CodePudding user response:

It seems to me you're looking for the each qualifier. Your process block should look like:

...

  input:
  path inputFile
  each lineage_dataset

...

The each qualifier makes the process start a new task for every value of the lineage_dataset collection, keeping everything else the same. Check the example below:

process listOfBeverages {
  debug true
  input:
  val beverage
  each temperatures

  """
  echo $temperatures $beverage
  """
}

workflow {
  Channel
    .of('Coca-cola', 'Coffee', 'Orange Juice')
    .set { beverages }

  temperatures = ['hot', 'cold']

  listOfBeverages(beverages, temperatures)
}

The output:

N E X T F L O W  ~  version 22.10.4
Launching `ex.nf` [thirsty_lavoisier] DSL2 - revision: 9a4e72fc82
executor >  local (6)
[55/5cfce4] process > listOfBeverages (1) [100%] 6 of 6 ✔
cold Coffee

cold Orange Juice

hot Orange Juice

cold Coca-cola

hot Coffee

hot Coca-cola
  • Related