1

我正在尝试枚举通道中的文件以在使用之前重命名它们collectFile

files.flatten().merge(Channel.fromList([1, 2, 3, 4])).collectFile(storeDir: "$SCRATCH/intermediate") {
    item -> ["data${item[1]}.csv", item[0].text]
}

但是最新的文档说merge频道的运营商已被弃用,但没有指出应该使用的任何替代方案。我可以用什么代替merge

4

1 回答 1

2

迁移说明说要使用连接运算符。如果您的输入是列表,您可以执行以下操作:

def indexedChannel( items ) {
    return Channel.from( items.withIndex() ).map { item, idx -> tuple( idx, item ) }
}

ch1 = indexedChannel( [ 15, 20, 21 ] )
ch2 = indexedChannel( [ 'a', 'b', 'c' ] )
ch3 = indexedChannel( [ 1, 2, 3 ] )

ch1
    .join( ch2 )
    .join( ch3 )
    .view()

结果:

[0, 15, a, 1]
[1, 20, b, 2]
[2, 21, c, 3]

然而,两个通道的合并/加入是不必要的。只需使用地图运算符

def c = 1
Channel
    .fromPath( './data/*.txt' )
    .map { tuple( it, c++ ) }
    .collectFile(storeDir: "$SCRATCH/intermediate") { fn, count ->
        ["data${count}.csv", fn.text]
    }
于 2021-04-14T01:50:58.543 回答