I have a Nextflow pipeline that runs a BUSCO search on input data, returns the output and then creates a report based on the output. One of my parameters is the lineage dataset. I would like to have my pipeline run multiple times (preferably at the same time) based on how long my list of lineage datasets is. The goal is that my pipeline would run twice in this case. If there are n lineages specified I would like it to run n times. These are my files so far:
Nextflow.config
params {
input_files = "$PWD/input_data/*.fasta"
lineage_datasets = ["fungi_odb10", "microsporidia_odb10"]
augustus_species = "fumigatus"
mode = "geno"
threads = 16
help = false
outdir {
main = "$PWD/results"
intermediate = "$launchDir/busco_outputs"
}
}
process {
executor = 'slurm'
cpus = 4
time = '1hour'
memory = '4.GB'
}
singularity {
enabled = true
autoMounts = true
}
main.nf:
process busco {
container "quay.io/biocontainers/busco:5.2.2--pyhdfd78af_0"
tag "${lineage_dataset}: ${input_file.name}"
input:
path input_file
each lineage_dataset
output:
path "hap1/short_summary.${lineage_dataset}.${output_name}.txt"
script:
output_name = input_file.baseName
"""
busco \
-m ${params.mode} \
-o hap1 \
-i $input_file \
-l ${lineage_dataset} \
--augustus_species ${params.augustus_species} \
--update-data \
-c 8 \
| echo "${params.augustus_species}" >> "./hap1/short_summary.${lineage_dataset}.${output_name}.txt"
"""
}
process createReport {
publishDir params.outdir.main, mode: 'copy'
input:
path "short_summary.*", stageAs: 'busco_outputs/*'
output:
path 'report.html'
script:
"""
source "$launchDir/venv/bin/activate"
report.py > report.html
"""
}
workflow {
input_files = Channel.fromPath(params.input_files)
busco( input_files, params.lineage_datasets )
| collect
| createReport
}
report.py
#!/usr/bin/env python
from report_modules.utils.report_utils import Report_Printer
from report_modules.utils.parsing_utils import Report_Parser
from pathlib import Path
projectDir = "/".join(__file__.split("/")[0:-2])
path = Path(f"{projectDir}/busco_outputs")
list_of_files = path.glob('*.txt')
file_data_array = []
for data in list_of_files:
file_data = ""
with open(data, 'r') as file:
lines = file.readlines()
for line in lines:
file_data = line
file_data_array.append(file_data)
all_stats_dicts = {}
for index, file_data in enumerate(file_data_array):
parser = Report_Parser(file_data)
stats_dict = parser.parse_report()
all_stats_dicts[f'dict_{index}'] = stats_dict
if __name__ == '__main__':
report_printer = Report_Printer()
report_template = report_printer.print_template(all_stats_dicts)
This is what I currently have now (after editing my code) and I am hoping that each summary.txt file from busco will be passed into the createReport process inside of a folder called busco_outputs. The report.py script will then read the busco_outputs folder and loop through each of the summary files inside of it.
Please let me know what I should change in order to have this working properly! Thank you very much!
CodePudding user response:
You get the error because params.lineage_datasets
is an ArrayList
, but the cross
operator requires a target channel in order to combine the items. That said, I think there are some other issues that need addressing first:
List params are not (yet) supported, so there's no way to specify multiple lineage datasets from the command line using the current approach. Instead, you will likely want to pass in a string of comma-separated lineages so they can be split/tokenized.
Know that files are copied to the publishDir asynchronously, so downstream processes should not try to access files in the publish directory. Ensure your
createReport
process therefore avoids targeting files in this directory. Usually you do not want to publish intermediary files anyway, so your BUSCO process can probably omit the publishDir directive altogether.
Here's one way, using the each input repeater, that address the above. I'm not sure if I've got the summary output filename pattern correct, so you may need to adjust accordingly:
params.sequence_files = "./input_data/*.fasta"
params.lineage_datasets = "fungi_odb10,microsporidia_odb10"
params.outdir = "./results"
process BUSCO {
tag "${lineage_dataset}: ${sequence_file.name}"
input:
path sequence_file
each lineage_dataset
output:
path "short_summary.${lineage_dataset}.${output_name}.txt"
script:
output_name = sequence_file.baseName
"""
touch short_summary.${lineage_dataset}.${output_name}.txt
"""
}
process createReport {
publishDir "${params.outdir}/report", mode: 'copy'
input:
path summary_files, stageAs: 'summary_files/*'
output:
path 'report.html'
"""
source "$launchDir/venv/bin/activate"
report.py --summary_files ${summary_files} > report.html
"""
}
workflow {
lineage_datasets = params.lineage_datasets?.tokenize(',')
sequence_files = Channel.fromPath( params.sequence_files )
outputs = BUSCO( sequence_files, lineage_datasets )
createReport( outputs.collect() )
}
CodePudding user response:
It seems to me you're looking for the each
qualifier. Your process block should look like:
...
input:
path inputFile
each lineage_dataset
...
The each
qualifier makes the process start a new task for every value of the lineage_dataset
collection, keeping everything else the same. Check the example below:
process listOfBeverages {
debug true
input:
val beverage
each temperatures
"""
echo $temperatures $beverage
"""
}
workflow {
Channel
.of('Coca-cola', 'Coffee', 'Orange Juice')
.set { beverages }
temperatures = ['hot', 'cold']
listOfBeverages(beverages, temperatures)
}
The output:
N E X T F L O W ~ version 22.10.4
Launching `ex.nf` [thirsty_lavoisier] DSL2 - revision: 9a4e72fc82
executor > local (6)
[55/5cfce4] process > listOfBeverages (1) [100%] 6 of 6 ✔
cold Coffee
cold Orange Juice
hot Orange Juice
cold Coca-cola
hot Coffee
hot Coca-cola