3

我制作了一个包含另一个工具(overlapFeatures)的 perl 脚本,这样我就可以即时正确地转换我的文件格式。我正在处理的文件都是制表符分隔的表格,通常有 200 万行左右。就其本身而言,overlapFeatures 可以轻松处理这些问题。

但是,我认为我正在通过一次管道如此多的线路导致管道锁定。我知道我需要以某种方式线程化,以便我可以同时读取和写入子进程。但是我真的不明白如何在 perl(或任何其他程序)中正确使用线程。据我了解,我可以使用threads甚至IPC::run解决我的问题。

我最终陷入僵局的原始脚本是这样的:

use strict;
use warnings;
use IPC::Open2;

my $infile = shift;
my $featurefile = shift;

my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
    or die "Failed with error $!\n";

open (my $infh, '<', $infile) or die "Can't open $infile\n";
while (<$infh>){
    # Do some format conversion...
    chomp
    my @cols = split /\t/;
    # print a modified line to the tool
    print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
}
close ($input);

while (<$output>){
    # format conversion for ouput
    chomp;
    my @cols = split /\t/;
    print join (",",@cols[0,1,2,5,3,8]),"\n";
}
close ($output);

我尝试根据如何使用 IPC::Open2 过滤大量数据来重写脚本以使用线程?像这样:

use strict;
use warnings;
use IPC::Open2;
use threads;

my $infile = shift;
my $featurefile = shift;

my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
    or die "Failed with error $!\n";

my $thread = async {
    print join(",", qw(seqid start end strand read feature name)),"\n";
    for(;;) {
        my $line = <$output>; # should block here and wait for output?
        last if !defined $line; # end of stream reached?
        print STDERR "Got line $line\n";
        # Do some format conversion...
        chomp $line;
        my @cols = split /\t/, $line;
        # print a modified line to the tool
        print join(",",@cols[0,1,2,5,3,8]),"\n";
    }
    close($output)
};

{
    open (my $infh, '<', $infile) or die "Can't open $infile\n";
    while (<$infh>){
        # format conversion for ouput
        chomp;
        my @cols = split /\t/;
        print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
    }
    close ($input);
}

$thread->join();
waitpid ($pid, 0);

但是,脚本仍然以同样的方式卡住,我也卡住了。我也无法弄清楚如何IPC::run在这种情况下使用。

我究竟做错了什么?我误解了线程吗?


编辑:花更多时间调试脚本(以及 amon 的帮助),我发现我能够从$output. 但是,脚本永远不会完成,并且在收到所有输出后似乎挂起。我想这是我现在唯一的问题。

4

1 回答 1

1

这更像是一个长评论。

我在精简版本中尝试了您的代码。我删除了转换代码,使用 Unixyes命令作为无限数据源并将输出打印到/dev/null,因为我们目前对输出不感兴趣,但对程序工作感兴趣。作为您的替代品overlapFeatures,我曾经cat将数据通过不变。

use strict; use warnings; use IPC::Open2; use threads;

my $command = "cat";
my @args = ();

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
  or die "Failed with error $!\n";

my $thread = async {
  print $_ while defined($_ = <$output>);
  close($output)
};

{
  my $c=0;
  open (my $infh, "-|", "yes") or die;
  open my $null, ">/dev/null" or die;
  while (<$infh>){
    $c++;
    print $null $_;
    if ($c >= 1_000_000) {
      print "\n==another million==\n\n";
      $c=0
    }
  }
  close ($input);
}

$thread->join();
waitpid ($pid, 0);

在一百万行中(字面意思),我打印一条状态消息以断言 IO 仍在工作。

结果

在使用 Perl 12.4 的 Ubuntu Linux 上进行测试,给定的脚本可以完美运行。因此可以合理地假设问题不在于 IPC 代码,而在于数据格式转换、您正在包装的程序或数据量 (yes输出字符串"1\n",是什么导致了每行小数据的多行。(每组约 2MB,每行 2 个字节))

结论

可能是您正在运行不同的配置。如果您正在运行 *nix,请断言我使用的脚本也适用于您。如果没有,请明确说明此配置并尝试运行等效脚本。

也可以将你的包装器分成两个脚本,至少用于测试,所以你可以运行类似的东西

$ convert-to | overlapFeatures | convert-from

这会将所有 IPC 委托给 shell,并断言转换正在工作并且架构是可实现的。

头脑风暴的其他不太可能的想法:

(1) 什么时候close进行操作?会不会是因为某种奇怪的原因,循环的一端过早退出?print STDERR "Closing down xx\n"s之前的aclose可能很有趣。(2) 是否成功open2async产生了它们的进程/线程返回控制流?偏执的我会print STDERR在他们之后放另一个……(3)您是否从脚本中获取任何数据,或者流在一段时间后是否会变干?

编辑

EOF在所有写入端都为closed之前,管道不会产生。因此,所有线程都应该关闭它们不使用的任何东西:

my $thread = async {
  close $input;
  print $_ while defined($_ = <$output>);
  close($output)
};

{
  close $output;
  my $c=0;
  open (my $infh, "-|", "yes") or die;
  open my $null, ">/dev/null" or die;
  while (<$infh>){
    $c++;
    print $null $_;
    ...
于 2012-09-21T15:38:45.253 回答