Home > Enterprise >  Is there a way to lookup a value from a CSV in nextflow? Or, alternately, reuse a CSV?
Is there a way to lookup a value from a CSV in nextflow? Or, alternately, reuse a CSV?

Time:05-12

I have a simple csv created as part of a workflow, like below:

sample,value
A,1
B,0.5

Separately, I have another channel with file names matching the sample names. I'd like to be able to use the values associated with each sample name within a new process.

I've tried splitting the CSV using .splitCsv but (unsurprisingly) sometimes the incorrect value gets used with a sample, although it does run the correct number of times. I've also tried just using awk within the script to pull out the corresponding value and save it to a variable, and this causes the correct value to be used, but it consumes the CSV file and so only one sample gets processed.

Super simplified nextflow (DSL2) script:

#!/usr/bin/env nextflow
nextflow.enable.dsl=2

process foo {
   input:
   path input_file

   output:
   path 'file.csv', emit csv

   """
   script that creates csv
   """
}

process bar {
   input:
   path input_file2

   output:
   path 'file.bam', emit bam

   """
   script that creates bam files
   """
}

process help_me {
   input:
   path csv
   path bam

   output:
   path 'result'

   """
   script that uses value from csv on associated bam file
   """
}

workflow {

   foo(params.input)
   bar(params.input2)
   help_me(foo.out.csv, bar.out.bam)
}

Thanks!!

Edit: In essence, is there a way to synchronize two channels such that I can use a csv's individual rows with associated files?

CodePudding user response:

If you have a value channel, you can reuse a file (like a CSV) an unlimited number of times without consuming the channel. For example:

workflow {

   input1 = file( params.input1 )
   input2 = file( params.input2 )

   foo( input1 )
   bar( input2 )

   help_me(foo.out.csv, bar.out.bam)
}

Here, both input1 and input2 are value channels. Also, (emphasis mine):

A value channel is implicitly created by a process when an input specifies a simple value in the from clause. Moreover, a value channel is also implicitly created as output for a process whose inputs are only value channels.

Means that both foo.out.csv and bar.out.bam are also value channels. Additionally, help_me.out is also a value channel. If input2 was instead a queue channel, you can see that input1 can be re-used an unlimited number of times:

$ mkdir -p ./path/to/bams
$ touch ./path/to/bams/{A,B,C}.bam
$ touch ./foo.txt
params.input1 = './foo.txt'
params.input2 = './path/to/bams/*.bam'

workflow {

   input1 = file( params.input1 )
   input2 = Channel.fromPath( params.input2 )

   foo( input1 )
   bar( input2 )

   help_me(foo.out.csv, bar.out.bam)
}

Results:

$ nextflow run script.nf
N E X T F L O W  ~  version 22.04.0
Launching `script.nf` [trusting_allen] DSL2 - revision: 75209e4c85
executor >  local (7)
[24/d459f7] process > foo         [100%] 1 of 1 ✔
[04/a903e4] process > bar (2)     [100%] 3 of 3 ✔
[24/7a9a1d] process > help_me (3) [100%] 3 of 3 ✔

Note that bar.out.bam and help_me.out are now queue channels.

If instead, you have one CSV per sample (or similar configuration), you will need some way to join these channels prior and adjust your new process' input declaration accordingly. What you want to avoid is declaring two (or more) queue channels in your input block. This part of docs is well worth the time investment: Understand how multiple input channels work, and would explain why you saw the incorrect value being associated with a particular sample when consuming the splitCsv output. To join these channels, you can use the join operator. For example, given your simple CSV (as 'foo.csv') and the test bams created previously:

nextflow.enable.dsl=2

params.input1 = './foo.csv'
params.input2 = './path/to/bams/*.bam'


process help_me {

   debug true

   input:
   tuple val(sample), val(myval), path(bam)

   output:
   path 'result'

   """
   echo -n "sample: ${sample}, myval: ${myval}, bam: ${bam}"
   touch result
   """
}

workflow {

    Channel.fromPath( params.input1 ) \
       | splitCsv( header:true ) \
       | map { row -> tuple( row.sample, row.value ) } \
       | set { rows_ch }

    Channel.fromPath( params.input2 ) \
       | map { bam -> tuple( bam.baseName, bam ) } \
       | join( rows_ch ) \
       | map { sample, bam, myval -> tuple( sample, myval, bam ) } \
       | help_me
}

Results:

$ nextflow run script.nf
N E X T F L O W  ~  version 22.04.0
Launching `script.nf` [lethal_mayer] DSL2 - revision: 395732babc
executor >  local (2)
[c5/e96085] process > help_me (1) [100%] 2 of 2 ✔
sample: B, myval: 0.5, bam: B.bam
sample: A, myval: 1, bam: A.bam

If your CSV has more than one value for a particalar sample and these are specified on seperate lines, you probably want instead the combine operator. For example, if your 'foo.csv' contains:

sample,value
A,1
B,0.5
B,2

And replace, join( rows_ch ) with combine( rows_ch, by:0 ) in the above example. Results:

nextflow run script.nf
N E X T F L O W  ~  version 22.04.0
Launching `script.nf` [festering_miescher] DSL2 - revision: f8de1e0d20
executor >  local (3)
[ee/8af543] process > help_me (3) [100%] 3 of 3 ✔
sample: A, myval: 1, bam: A.bam
sample: B, myval: 0.5, bam: B.bam
sample: B, myval: 2, bam: B.bam
  • Related