0

我是 perl 线程的新手。

我有一个包含项目列表的文件(每个项目都在一个单独的行中),我想并行构建这些项目。目前,每个线程:

  1. 以“读取”模式打开文件
  2. 保存一些项目的列表(= 一些文件行)
  3. 关闭文件
  4. 再次打开文件 - 作为“写入”模式
  5. 在没有选择的行的情况下重写它

为了确保每个线程是唯一访问文件的线程,我尝试使用信号量。

由于某种原因,发生了线程冲突,我无法弄清楚我做错了什么。

我可以看到(在我的“报告”中,它还获取每个构建的当前时间)不同的线程从“共享”文件中选择相同的项目(它只偶尔发生一次,但仍然......)

我什至不确定我的 $semaphore decleration 作为“我的”变量是否合法。

任何帮助将不胜感激!

谢谢。


这是我的代码的一部分:

    my $semaphore = Thread::semaphore->new() ;

sub build_from_targets_list{

    #...
    open(REPORT, "+>$REPORT_tmp");  # Open for output
    #....
    @threads =();


    for ($i = 0; $i < $number_of_cores; $i++){
        my $thr = threads->new(\&compile_process, $i,*REPORT);
        push @threads, $thr;
    }

    $_->join for @threads;
    close (REPORT);
}
### Some stuff..


sub compile_process{

    *REPORT = shift(@_);
    #...

    while (1){
        $semaphore->down();
        open (DATA_FILE, $shared_file);
        flock(DATA_FILE, 2);
        while ($record = <DATA_FILE>) {
            chomp($record);
            push(@temp_target_list,$record);    
        }


        # ... choose some lines (=projects)...
        # remove the projects that should be built by this thread:
        for ($k = 0; $k < $num_of_targets_in_project; $k++){            
            shift(@temp_target_list);

        }

        close(DATA_FILE);       
        open (REWRITE,">$shared_file");

        flock(REWRITE, 2);

        seek(REWRITE, 0, 0); 
        foreach $temp_target (@temp_target_list){

            print REWRITE "$temp_target\n";

        }

        close (REWRITE);

        ## ... BUILD selected projects...

        $semaphore->up();
        }
}
4

1 回答 1

1

首先,对您处理文件的方式进行一些基本清理。如果它是一个简单的文件问题,那么尝试调试线程问题是没有意义的。

必须检查任何文件命令(打开、关闭、flock、seek 等)是否成功。要么在上面贴一些or dies 要么use autodie

其次是使用一个硬编码常量来表示群。这些是系统相关的,很难记住模式 2 是哪种。 Fcntl提供常量。

您正在使用排他锁打开数据文件以进行读取(2 通常是排他锁)。那应该是共享锁。这不太可能导致问题,但会导致您的线程不必要地阻塞。

最后,使用词法文件句柄而不是全局范围的 glob。这减少了机会

use Fcntl qw(:flock);
use autodie;

open (my $data_fh, $shared_file);
flock($data_fh, LOCK_SH);

作为旁注,seek $fh, 0, 0打开文件后进行写入是不必要的。寻找常量和羊群一样,使用 Fcntl 来获取常量。

另一个错误是您正在传递$i, *REPORTcompile_process认为*REPORT是第一个参数。再次使用全局文件句柄意味着将其传入是多余的,请使用词法文件句柄。

现在已经不碍事了,你的基本算法似乎有缺陷。 compile_process让每个线程将整个数据文件读取到线程本地数组@temp_target_list中,从该本地数组中移出一些并将其余的写出。因为@temp_target_list是每个线程,所以没有协调。除非$num_of_targets_in_project是共享的并进行某种屏幕外协调,但未显示。

基于文件的锁定总是会成为地狱的一小部分。线程有更好的协调机制。有一种更简单的方法可以做到这一点。

假设文件不是太大,将每一行读入一个共享数组。然后让每个线程从该数组中获取要处理的项目。该数组是共享的,因此当删除每个元素时,该数组将为所有线程更新。就像是...

use strict;
use warnings;
use autodie;

use threads;
use threads::shared;

my $Max_Threads = 5;
my @Todo : shared;

open my $fh, "<", $work_file;
@Todo = <$fh>;
close $fh;

my @threads;
for (1..$Max_Threads) {
    push @threads, threads->new(\&compile_process);
}

$_->join for @threads;

sub compile_process {
    while( my $work = shift @Todo ) {
        ...do whatever with $work...
    }
}

如果文件太大而无法保存在内存中,您可以使用Thread::Queue构建工作项队列并动态添加到其中。

于 2012-10-22T19:09:52.200 回答