Perl线程开发过程中的经验
2015-10-04 21:27:37 阿炯

Perl线程是一个单一的执行流程,它是所有程序执行过程中最小的控制单位,即能被CPU所调度的最小任务单元。Perl线程与进程之间既有联系,又完全不同。简单地说,一个Perl线程必然属于某一个进程,而一个进程包含至少一个或者多个Perl线程。早期的计算机系统一次只能运行一个程序,因此当有多个程序需要执行的时候,唯一的办法就是让它们排成队,按顺序串行执行。进程的出现打破了这种格局,CPU资源按时间片被分割开来,分配给不同的进程使用。

这样一来,从微观上看进程的执行虽然仍是串行的,但是从宏观上看,不同的程序已经是在并行执行了。如果我们把同样的思想运用到进程上,很自然地就会把进程再细分成更小的执行单位,即Perl线程。由于一个进程又往往需要同时执行多个类似的任务,因此这些被细分的Perl线程之间可以共享相同的代码段,数据段和文件句柄等资源。有了进程,我们可以在一台单CPU计算机系统上同时运行Firefox和MicrosoftOfficeWord等多个程序;有了Perl线程,我们可以使Firefox在不同的标签里同时加载多个不同的页面,在OfficeWord里编辑文档的同时进行语法错误检查。因此,Perl线程给我们带来了更高的CPU利用率、更快速的程序响应、更经济地资源使用方式和对多CPU的体系结构更良好的适应性。

Perl最多允许64个子线程,加上主线程因此最多65个线程。

Perl线程的生命周期

创建Perl线程

Perl线程作为Perl中的一种实体,其一生可以粗略的分为创建,运行与退出这三个阶段。创建使得线程从无到有,运行则是线程完成其主要工作的阶段,退出自然就是指线程的消亡。Perl线程的运行和普通函数的执行非常类似,有其入口参数,一段特定的代码流程以及执行完毕后返回的一个或一组结果,唯一与普通函数调用的不同之处就在于新建Perl线程的执行与当前Perl线程的执行是并行的。Perl中创建一个新的线程非常简单,主要有两种方法:
1.使用threads包的create()方法
2.使用async{}块创建Perl线程


信号量

Thread::Semaphore包为Perl线程提供了信号量的支持。可以创建一个自己的信号量,并通过down操作和up操作来实现对资源的同步访问。实际上,down操作和up操作对应的就是我们所熟知的P操作和V操作。从内部实现上看,Thread::Semaphore本质上就是加了锁的共享变量,无非是把这个加了锁的共享变量封装成了一个Perl线程安全的包而已。由于信号量不必与任何变量绑定,因此,它非常灵活,可以用来控制你想同步的任何数据结构和程序行为。例如

use threads;
use threads::shared;
use Thread::Semaphore;
my $s=Thread::Semaphore->new();
$s->down();#Poperation
...
$s->up();#Voperation

从本质上说,信号量是一个共享的整型变量的引用。默认情况下,它的初始值为1,down操作使它的值减1,up操作使它的值加1。当然你也可以自定义信号量初始值和每次up或down操作时信号量的变化。信号量(Semaphore)注意不要跟用于进程间通信的信号量混淆,信号量(semaphore)是互斥量的升级版:互斥量的状态为0或1,而信号量可以为n。也就是说,使用互斥量时,最多允许一个线程进入关键区,而信号量允许多个,具体值是信号量当前的内部值。

Semaphore实现的功能就类似厕所有5个坑位,有10个人要上厕所,那么同时只能有多少人去上厕所呢?同时只能有5个人能够使用,当5个人中 的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造信息量对象时传入的参数选项。当然单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。

线程的信号量与进程间通信中使用的信号量的概念是一样,它是一种特殊的变量,它可以被增加或减少,但对其的关键访问被保证是原子操作。如果一个程序中有多个线程试图改变一个信号量的值,系统将保证所有的操作都将依次进行。而只有0和1两种取值的信号量叫做二进制信号量。信号量一般常用于保护一段代码,使其每次只被一个执行线程运行。我们可以使用二进制信号量来完成这个工作。

信号量用在多线程多任务同步的,一个线程完成了某一个动作就通过信号量告诉别的线程,别的线程再进行某些动作。也就是说Semaphore不一定是锁定某个资源,而是流程上的概念。比方说有A,B两个线程,B线程的操作可能要等A线程执行完毕之后才执行,这个任务并不一定是锁定某一资源,还可以是进行一些计算或者数据处理之类,它们也许并不访问共享变量,只是逻辑上的先后顺序。

Perl线程中的信号量

use threads;
use Thread::Semaphore;
 
my $s=Thread::Semaphore->new(5);
printf("s=".${$s}."\n");#s=5
$s->down(3);
printf("s=".${$s}."\n");#s=2
...
$s->up(4);
printf("s=".${$s}."\n");#s=6

Perl线程队列

Thread::Queue包为Perl线程提供了Perl线程安全的队列支持。与信号量类似,从内部实现上看,Thread::Queue也是把一个通过锁机制实现同步访问的共享队列封装成了一个Perl线程安全的包,并提供统一的使用接口。Thread::Queue在某些情况下可以大大简化Perl线程间通信的难度和成本。例如在生产者-消费者模型中,生产者可以不断地在Perl线程队列上做enqueue操作,而消费者只需要不断地在Perl线程队列上做dequeue操作,这就很简单地实现了生产者和消费者之间同步的问题。例如

生产者-消费者模型中对Perl线程队列的使用
use threads;
use Thread::Queue;
my $q=Thread::Queue->new();

sub produce{
my $name=shift;
while(1){
 my $r=int(rand(100));
 $q->enqueue($r);
 printf("$name produce $r\n");
 sleep(int(rand(3)));
}
}
sub consume{  
 my$name=shift;
 while(my$r=$q->dequeue()){  
  printf("consume$r\n");
 }  
}  

my $producer1=threads->create(\&produce,"producer1");
my $producer2=threads->create(\&produce,"producer2");
my $consumer1=threads->create(\&consume,"consumer2");

$producer1->join();
$producer2->join();
$consumer1->join();

其他有用的非核心包

本文前面讨论的所有内容都在Perl线程核心包的范畴之内。其实CPAN上还有其他一些与Perl线程相关的非核心包,它们往往也会给Perl线程的使用带来很大的便利,这里我们选出两个稍加介绍,抛砖引玉。

Thread::Pool包允许你在程序中创建一批Perl线程去完成多个类似的任务。例如当你希望创建一个多Perl线程程序去完成检验1000个ip地址是否都能ping通的任务时,Thread::Pool包可以给你带来便利。

Thread::RWLock包为Perl线程中的读写操作提供了锁机制的支持。例如当你有多个reader和writerPerl线程共同访问某一个或几个文件时,Thread::RWLock包可以给你带来便利。

Perl线程的消亡

大多数情况下,你希望你创建的Perl线程正常退出,这就意味着Perl线程所对应的函数体在执行完毕后返回并释放资源。例如在清单5的示例中,新建Perl线程被join以后的退出过程。可是,如果由于detach不当或者由于主线因某些意外的异常提前结束了,尽管它所创建的Perl线程可能尚未执行完毕,但是他们还是会被强制中止,正所谓皮之不存,毛将焉附。这时你也许会得到一个类似于“Perl exited with active threads”的警告。

当然,你也可以显示地调用exit()方法来结束一个Perl线程,不过值得注意的是,默认情况下,如果你在一个Perl线程中调用了exit()方法,其他Perl线程都会随之一起结束,在很多情况下,这也许不是你想要的,如果你希望exit()方法只在调用它的Perl线程内生效,那么你在创建该Perl线程的时候就需要设置’exit’=>’thread_only’。例如为某个Perl线程设置’exit’=>’thread_only’属性。
use threads;
sub say_hello{
printf("Hello thread!@_.\n");
sleep(10);
printf("Bye\n");
}

sub quick_exit{
printf("I will be exit in no time\n");
exit(1);
}

my$t1=threads->create(\&say_hello,"param1","param2");
my$t2=threads->create({'exit'=>'thread_only'},\&quick_exit);
 
$t1->join();
$t2->join();

如果希望每个Perl线程的exit方法都只对自己有效,那么在每次创建一个新Perl线程的时候都去要显式设置’exit’=>’thread_only’属性显然有些麻烦,也可以在引入threads包的时候设置这个属性在全局范围内有效,可以设置’exit’=>’thread_only’为全局属性。

use threads('exit'=>'threads_only');
subfunc{
...
if($condition){
exit(1);
}}
 
my$t1=threads->create(\&func);
my$t2=threads->create(\&func);
 
$t1->join();
$t2->join();

如果想共享一个变量,需通过threads::shared来实现。在使用此方法的时候,需要注意以下三点:
1.变量默认是不在线程中共享的。
2.通过"usethreads"引用命名空间,不能通过eval,do或者require。
3.如果有变量需要共享,必须引用"threads::shared"。并在定义变量的时候如下:
my $var1:shared="value";

---------------------------------------------------
取得线程的返回值

my $rda=$thread->join();#retrun data

其所派生的线程函数中有其返回值。

其中有几个需要注意的地方:

thread的默认返回值是scalar

如果sub的返回值是array,可以返回array的ref如上例,或者直接在 create 时传入hash ref指定:$thread->create({'context'=>'list'},\&grabdata,$cnip,$snip) 。

join VS detach

上例用到join只是因为我需要每个 grabdata sub 的返回值,如果创建Thread只是为了达到并行处理的目的,线程的返回值不重要,可以创建后 detach 。

危险的IO Buffer

这个虽然和多线程没有直接关系,但是令我印象深刻,所以还是记录一下。因为以前用到 file handler 从来不会手动 close,在这里就悲剧了。Perl在处理 file handler 的时候为了减少文件IO以提高性能,有一个 buffer 机制。比如上例在for循环中写入$OUT,perl并不会在每次循环都写入文件,而是暂时保存在 buffer 里。如果不手动 close 的话,最终上传的将是一个空文件。Perl 诡异的 Buffer 迷惑了很多人。

内置的thread模块可以满足一般多线程的需求,但是没有解决一些我们经常会碰到的问题:

无法固定线程个数,排队处理

比如,我需要发送1000个http request,显然同时发送害人害己。比较自然的想法是,创建比如20个线程,某一个线程完毕后,程序自动补上一个,保证同时只有20个线程。事实上,thread 结合 thread::queue 模块可以实现,但是十分费解,手动要做的事情太多,而且容易侧漏。

线程之间变量共享比较困难

上例中,各个线程的变量是独立的。但是很多情况下我们需要所有线程共享一个数据结构,比如 push 数据到同一个array,thread 模块无法解决。thread::shared 模块号称能够共享变量,但是对于array和hash还是无能为力。

---------------------------------------------------
Perl线程的历史


5005threads线程模型

Perl对线程的支持最早可以追溯到1998年7月发布的Perlv5.005。其发布申明指出,Perlv5.005中加入了对操作系统级Perl线程的支持,这个新特性是一个实验性的产品,这也就是所称的5005threads线程模型;对于该线程模型来说,默认情况下所有数据结构都是共享的,所以用户必须负责这些共享数据结构的同步访问。如今5005threads已经不再被推荐使用,Perlv5.10及以后的版本里也将不会再支持5005threads线程模型。

ithreads线程模型

2000年5月发布的Perlv5.6.0中开始引入了一个全新的Perl线程模型,即interpreterthreads,简称为ithreads,也正是在这个版本的发布申明中第一次提出了5005threads线程模型将来可能会被禁用的问题。尽管如此,ithreads在那个时候还是一个新的实验性的Perl线程模型,用户并不能直接使用它,唯一的办法是通过fork函数模拟。经过两年时间的发展,到2002年7月,Perlv5.8.0正式发布,这时ithreads已经是一个相对成熟的Perl线程模型,发布申明中也鼓励用户从老的5005threads线程模型转换到新的ithreads线程模型,并明确指出5005threads线程模型最终将被淘汰。本文后面所讨论的所有内容也都是基于新的ithreads线程模型。在ithreads模型中,最与众不同的特点就在于默认情况一下一切数据结构都不是共享的,这一点会在后面内容中有更深刻的体会。

现有环境支持哪种Perl线程模型

既然Perl中有可能存在两种不同的Perl线程模型,很自然地就需要判断现有Perl环境到底支持的是哪一种Perl线程实现方式。归纳起来有两种方法:

shell中查询Perl当前Perl线程模型
perl-V|grepuse.*threads

从结果中不难看出,在当前的Perl环境中提供了对ithreadsPerl线程模型的支持。在Perl程序中也可以通过使用Config模块来动态获取Perl线程模型的相关信息,例如

Perl程序中动态获取当前Perl线程模型

值得一提的是,对于5005threads和ithreads线程模型,Perl同时只能支持其中的一种。不可能在某一个Perl环境中同时使用这两种Perl线程模型,本文后面讨论的所有内容都是基于ithreads线程模型的。

---------------------------------------------------
Perl 的线程中的锁


在Linux中线程和进程和最大分别,可能就是有共享变量就个东西了。其它的地方使用起来感觉不大,只是线程利用cpu更加高效,但是也更加容易出问题。下面就看看Perl中的锁和相关的问题,线程是一个好东西,它不象进程占用那么多的内存,因不它不需要主空间,不需要进程控制块。它只共享所有主进程的所有内容。

use threads;
use threads::shared;
use Data::Dumper;

my $val:shared; # 共享变量
my %hash:shared; # 共享数组
my @array:shared; # 共享哈希

my $t1 = threads->create(\&test1);
my $t2 = threads->create(\&test2);

$t1->join; # 回收 t1 的线程
$t2->join;

print Dumper(\$val);
print Dumper(\@array);
print Dumper(\%hash);

sub test1 {
 lock ($val); lock (@array); lock (%hash);
 for ( 1 .. 1000 ){
  $val++;
  $array[0]++;
  $hash{test}++;
 }  
}

sub test2 {
 lock ($val); lock (@array); lock (%hash);
 for ( 1 .. 1000 ){
  $val++;
  $array[0]++;
  $hash{test}++;
 }  
}

可以看到在使用共享变量时一定要使用一个 lock 来锁定当前需要共享的变量,这需要一个排它性。不然很容易出问题,不锁的话会出什么问题?在来看下面的例子,出掉锁以后会怎么样(即注释掉有'lock...'的那一行)。在 Perl 中有个子函数的 lock,也有对象的方法锁,使用的方法是:sub test1 : locked {...},但这种方法已经被弃用(属于5005threads线程模型),不再受支持。

引用的线程共享

上面是使用的普通的变量的共享变量,但能使用引用和对象来共享吗?这是大家比较关心的,下面就来测试一下引用在共享变量中是否能用。
use threads;
use threads::shared;
my %hash:shared;
 
my $t = threads->create(\&test);
$t->join;

sub test{$hash{test}{test}++;}

输出如下:
Thread 1 terminated abnormally: Invalid value for shared scalar at threads2.pl line 10.

可以发现建了一个共享的变量:%hash 时没有问题,但向$hash{test}中放一个匿名的哈希变量时,被 threads::shared 模块检查出来,讲这个不能放入,所以导致错误。因为匿名的哈希没有声明是不能共享的。
use threads;
use threads::shared;
my %hash:shared;
$hash{test}=&share({});
my $t = threads->create(\&test);
$t->join;
sub test{$hash{test}{test}++;}

在这多了一个 $hash{test}=&share({});

这里告诉线程将要放入一个共享的匿名的哈希变量来做为引用,这时就可以使用引来了。所以在这种情况下需要先声明匿名的哈希变量来做为引用;这时如果有多级的数据结构的引用也可以,使用相同的方法就行了,象 test 下面如果还有一个 try 的话,就使用 $hash{test}{try} = &share ({});来一级一级的声明。如果要使用变量的引用,这时我们可以使用 threads::shared 提供给我们的 shared_clone 功能。
use v5.12;
use threads;
use threads::shared;
use Data::Dumper;

my $obj:shared;
$obj=shared_clone({'foo' => [qw/foo bar baz/],'a'=>1});
say Dumper $obj;

my $th1 = threads->create(\&test1);
my $th2 = threads->create(\&test2);
$th1->join;
$th2->join;

sub test1{
 lock($obj);
 $obj->{'b'}=2;
 pop @{$obj->{'foo'}};
 say Dumper $obj;
}

sub test2{
 lock($obj);
 delete $obj->{'a'};
 push @{$obj->{'foo'}},'me';
 delete $obj->{'b'};
 say threads->tid().Dumper($obj);
}

对象的线程共享

常常想在线程中使用同一个对象,多个线程来操作同一个对象,这时线程中是怎么做啦?默认的对象是在线程中不共享的,也不能正常的使用的。下面的方法可以使用对象,在线程中,但要注意在使用时也需要对对象进行lock;不然会和上面所讲到的,线程中的锁中没锁一样,数据会乱掉。
use threads;
use threads::shared;

my $obj = &shared_clone (FreeOA->new); # 要给变量声明成共享
my $th1 = threads->create(\&test1);
my $th2 = threads->create(\&test2);
$th1->join;
$th2->join;
$obj->display;
 
sub test1 {
 lock ($obj);
 for( 1 .. 1000 ){
  $obj->add_number();
 }  
}

sub test2 {
 lock ($obj);
 for( 1 .. 1000 ){
  $obj->add_number();
 }  
}

package FreeOA;
sub new {
 my $class = shift;
 my $data = { 'number' => 0 };
 my $self = bless ($data, $class);
 return $self;
}

sub add_number {
 my $self = shift;
 $self->{number} ++;
}
 
sub display {
 my $self = shift;
 print $self->{number}."\n"
}

1;

这其实一样是使用的 shared_clone 记的使用对象的时候,需要锁。

参考来源:http://www.php-oa.com/
---------------------------------------------------
走向并行之perl的线程锁

perl的多线程技术在共享变量是出了一些问题,一个多线程技术里很重要的概念--线程锁。

1、线程锁的相关知识
如果程序的线程之间需要相互访问,这时涉及到的变量就需要定义为共享变量。但是因共享变量而会导致新问题-对变量访问的冲突。即如果多个线程同时访问同一个变量,这时冲突就会发生。perl提供了内置的lock函数,用于控制变量的访问。

需要注意的是,lock只是阻塞其他lock实例,但并不能够阻止普通的数据访问,这个关键点下面会详细说到。可以使用lock锁住独立的标量、整个数组或者散列,比如:lock $var; lock @values; lock %table;

但在一个聚集(数组或者散列)上使用lock并非隐含对该聚集的每一个标量元素都锁定,即:lock @values  #当在在线程1lock数组时同时,在线程2 使用lock $values[23]时并不会阻塞(perl5.10好像不支持对聚集的单个元素进行锁定,也即lock $values[23]在5.10中不能使用)。

当使用lock锁定一个变量后,其他lock 该变量的实例将会被阻塞,直到拥有锁的线程释放锁定。在perl中没有显示的释放锁的机制,一般可以简单添加一个嵌套作用域层次就可以限定锁的作用范围,比如:
{
    lock $var;$var++;
}

2、线程锁的作用
有下面的例子可供参考
use threads;
use threads::shared;
use Data::Dumper;
my $val:shared; # 共享变量
my %hash:shared; # 共享数组
my @array:shared; # 共享哈希

my $t1 = threads->create(\&test1);
my $t2 = threads->create(\&test2);
$t1->join; # 回收 t1 的线程
$t2->join;
 
print Dumper(\$val);
print Dumper(\@array);
print Dumper(\%hash);
 
sub test1 {
    lock ($val); lock (@array); lock (%hash);
    for ( 1 .. 1000 ){
        $val++;
        $array[0]++;
        $hash{test}++;
    }   
}

sub test2 {
    lock ($val); lock (@array); lock (%hash);
    for ( 1 .. 1000 ){
        $val++;
        $array[0]++;
        $hash{test}++;
    }   
}

在test1和test2中分别使用lock,这时输出的结果是正常的。
$VAR1 = \'2000';
$VAR1 = [
    '2000'
];
$VAR1 = {
    'test' => '2000'
};

但是,当去掉test1和test2中的lock后,输出结果为
$VAR1 = \'1830';
$VAR1 = [
    '1901'
];
$VAR1 = {
    'test' => '1922'
};

结果完全是错乱的,出现这种结果的原因在于,test1和test2同时访问共享变量并对其进行累加,这是两个线程拿到的变量值可能是同一个(而不是想象中的有序的不同值)。

3、lock只是阻塞其他lock实例
把这个单独拿出来说的原因是在使用线程锁是很容易把它误解为:阻塞对变量的访问。比如以下例子:
use threads;
use threads::shared;
my @rib_pos:shared;
@rib_pos=(0..9);

my $thread=threads->new(\&fa);
$thread->join();

my $thread2=threads->new(\&fb);
$thread2->join();

sub fa{
    lock @rib_pos;
    sleep(1);
    for(my $i=0;$i<=$#rib_pos;$i++){
       $rib_pos[$i]++;
    }
    print "fa\n";
}

sub fb{
    $rib_pos[8]="123";print "fb\n";
}

运行上面的程序时,首先输出的是"fb"然后才是"fa"。在sub fa中,使用lock锁定数组@rib_pos,同时使用sleep函数延迟执行后面的代码,sub fa 拥有lock的时间延长,此时如果lock可以阻塞其他过程(sub fb)对数组@rib_pos的访问的话,其他过程会处于等待,知道sub fa运行完成。但实际上发现在sub fa中锁定@rib_pos的同时,可以访问$rib_pos[8](sub fb)这说明lock 不能阻塞对变量的访问。在这种情况下,锁是不起作用的。

另注:线程的声明收回收的位置不一样,也会对上述的结果生成有影响:
1.创建一个线程后就开启回收
my $thread=threads->new(\&fa);
$thread->join();
my $thread2=threads->new(\&fb);
$thread2->join();

2.全部创建完成后再回收
my $thread=threads->new(\&fa);
my $thread2=threads->new(\&fb);
$thread->join();
$thread2->join();

甚至在fa中,将
lock @rib_pos;
sleep(1);

这两行互换,对结果都有影响,可见在并发执行的情况下,要精细地控制函数行为。不然会对结果产生相当的影响!

如果把sub fb改为:
sub fb{
    lock @rib_pos;
    $rib_pos[8]="123";print "fb\n";
}

输出结果会完全不一样,即'fa'和'fb'几乎同时执行完成,说明sub fb在等待sub fa执行完成后在执行的。可见在sub fb中使用lock时,因为 sub fa拥有@rib_pos的锁,只有等待@rib_pos锁的释放才能在sub fb中执行执行lock @rib_pos,线程被阻塞在lock 一行,而不会是其他线程对@rib_pos访问的行上。

4、锁的位置
上面的例子表现为:只有当sub fa执行完成后sub fb才能继续执行。程序实际上仍然是单线程在运行,这与多线程初衷相悖。为了解决这种问题,需要在程序中仔细使用lock,以尽量减少锁的时间为原则。上面的例子中,sub fa锁的时间为执行该函数的整个过程,显然是不需要的。可以修改为:
sub fa{
   sleep(1);
   for(my $i=0;$i<=#$rib_pos;$i++){
    lock @rib_pos;
    $rib_pos[$i]++;
   }
   print "fa\n";
}

这时只有在需要读取或者修改@rib_pos的值时才锁定它,锁定时间仅仅是执行一行代码的时间,这样就可以极大的较少其他lock等待的时间。但是此时需要主要线程之间的协调。使用修改后的sub fa,程序运行时,sub fb获得的$rib_pos[8]是sub fa修改前的数据,而不是修改后的数据。

---------------------------------------------------


---------------------------------------------------


---------------------------------------------------


---------------------------------------------------