Perl多线程示例
2015-10-28 14:20:54 阿炯

线程是一个单一的执行流程,它是所有程序执行过程中最小的控制单位,即能被 CPU 所调度的最小任务单元。线程与进程之间既有联系,又完全不同。简单地说,一个线程必然属于某一个进程,而一个进程包含至少一个或者多个线程。早期的计算机系统一次只能运行一个程序,因此,当有多个程序需要执行的时候,唯一的办法就是让它们排成队,按顺序串行执行。进程的出现打破了这种格局,CPU 资源按时间片被分割开来,分配给不同的进程使用。这样一来,从微观上看进程的执行虽然仍是串行的,但是从宏观上看,不同的程序已经是在并行执行了。如果我们把同样的思想运用到进程上,很自然地就会把进程再细分成更小的执行单位,即线程。由于一个进程又往往需要同时执行多个类似的任务,因此这些被细分的线程之间可以共享相同的代码段,数据段和文件句柄等资源。线程给我们带来了更高的 CPU 利用率、更快速的程序响应、更经济地资源使用方式和对多 CPU 的体系结构更良好的适应性。

线程作为Perl中的一种实体,其一生可以粗略的分为创建,运行与退出这三个阶段。创建使得线程从无到有,运行则是线程完成其主要工作的阶段,退出自然就是指线程的消亡。线程的运行和普通函数的执行非常类似,有其入口参数,一段特定的代码流程以及执行完毕后返回的一个或一组结果,唯一与普通函数调用的不同之处就在于新建线程的执行与当前线程的执行是并行的。Perl中的多线程的实现一般有两种办法,而老版本的办法实际上是一种多进程的办法。

一 、Thread->New

该办法是传统的老办法,它与folk很类似,新建一个进程时,会把当前内存空间的所有变量都复制一份传到新的进程里面。已实现共享数据。而随着技术的发展,本文不针对该方法做深入研究。

二、IThread

这种方法是通过新建一个新的perl interpreter,perl自5.6版本后开始提供ithread模块专门为perl的多线程提供技术支持。默认情况下,所有的数据和变量是不被线程共享的。如果想共享一个变量,需通过threads::shared来实现。在使用此方法的时候,需要注意以下三点:
变量默认是不在线程中共享的。
通过"use threads"引用命名空间,不能通过 do 或者 require。
如果有变量需要共享,必须引用"threads::shared"。

模块中的方法:
threads->create(),创建一个新线程;
threads->join(),收割已经创建的线程;
threads->detach(),剥离创建的线程;
threads->list(),返回所有已经创建的线程;
threads->is_joinable(),返回目标线程是否已经完成,等待join;
$thr->equql($thr2),判断两个线程是否是相同的;
$thr->tid(), 获取$thr线程的ID号。


Perl 创建线程有两种方式,正常通过threads->create 创建线程,用async 创建一个调用匿名过程的线程,具体参考perldoc threads。

线程共享变量需要使用 threads::shared,共享变量只能存储scalar,共享变量的引用,如果存储List Hash的引用需使用shared_clone([@list]) shared_clone({%hash})。

线程创建后最好join或者detach,否则在退出时会有warning。

线程的join方式,使用threads中提供的函数接口,可以做到及时释放资源。

use v5.20;
use threads;
use Data::Dumper;
$|=1;

sub test{
    my $i = shift;
    my @x = (1..999_999); #使用一个大的变量观察内存使用情况
    sleep 2*$i;
    printf "%s run to the end.\n", threads->tid();
}

for (reverse (0..9)){
    threads->create(\&test,$_);
}

my $monitor = async {
 #用来观察子线程状态
 sleep 1;
 while(1){
    for(threads->list()){
        printf "%s join? %s\n", $_->tid(), $_->is_joinable() ? 'true' : 'false';
    }
 sleep 2;
}};
$monitor->detach();

#方法1
$_->join() for threads->list()
 
#方法2
#while(threads->list()){
 #$_->join() for threads->list(threads::joinable);
#}

使用方法1的结果,最先产生的线程耗时最长,按此法join 会按顺序释放资源,后来的线程虽已结束,但仍然等待前者结束然后才能释放自己的资源,前者结束前资源一直被占用。运行此脚本过程中打开任务管理器,观察内存使用情况,可以看到脚本结束前资源一直占用最大,没有释放过。

使用方法2的结果,只join已经完成的线程,资源会被及时释放。观察内存的使用情况,可以看到资源逐步递减,并没有被一直占用。

Perl的多线程实例:
涉及语言:Perl
所用模块:threads
模块中的方法:见上述

--------------------------------------------------------
use threads;#声明模块;
use warnings;use strict;
print localtime(time),"\n";  #输出系统时间;
my $j=0;
my $thread;
while(){
last if($j>=10);这里控制一下任务数量,共10个;
#控制创建的线程数,这里是5,scalar函数返回列表threads->list()元素的个数;
while(scalar(threads->list())<5)  {    $j++;
          #创建一个线程,这个线程其实就是调用(引用)函数“ss”;
          #函数‘ss’包含两个参数($j和$j);
          threads->new(\&ss,$j,$j);
      }
      foreach $thread(threads->list(threads::all)) {
      if($thread->is_joinable()) #判断线程是否运行完成;
           {$thread->join();
             #输出中间结果;
             print scalar(threads->list()),"\t$j\t",localtime(time),"\n";
           }
      }
}
#join掉剩下的线程(因为在while中当j=10时,还有4个线程正在运行,但是此时程序将退出while循,所以在这里需要额外程序join掉剩下的4个线程)
foreach $thread(threads->list(threads::all)){
$thread->join();print scalar(threads->list()),"\t$j\t",localtime(time),"\n";       
}

#输出程序结束的时间,和程序开始运行时间比较,看程序运行性能;
print localtime(time),"\n";  

#下面就是每个线程引用的函数;
sub ss() {
    my ($t,$s)=@_;
    sleep($t); #sleep函数,即休眠(以秒为单位)
    print "$s\t";
}
----------------------------------------------------
结果:
71231821191760
1    4    5    81231821191760
2    4    6    91231821191760
3    4    7    101231821191760
4    4    8    111231821191760
5    4    9    121231821191760
6    4    10    141231821191760
7    3    10    161231821191760
8    2    10    181231821191760
9    1    10    201231821191760
10    0    10    221231821191760
221231821191760

第一列表示程序已经完成的任务数,第二列表示正在运行的线程数-1(join掉一个了),第三列表示在收割掉一个线程后新添加的任务,最后一列表示完成一个线程时的系统时间。
------------------------------------------------------------
多线程运行性能
如果单独运行这10个任务,所需要的时间为:1+2+3+4++10=55s;
采用多线程运行(5个)的话,需要的时间为:54-39=16s;
-------------------------------------------------------------
运行过程
简要描述一下程序运行过程,以便更深入理解多线程的概念,呵呵
程序共要运行10个任务,第一个任务的作用是暂停程序1s(sleep(1));第二个任务是暂停程序2s(sleep(2));以此类推,第十个任务是暂停程序10s;
时间(s)       任务
0          1,2,3,4,5(程序初始,5个线程同时运行,需要时间最长的是线程5(5s))
1          2,3,4,5,6(经过1s后,第一个任务已经完成,被join掉,同时添加新任务6)
2          3,4,5,6,7(同上)
3          4,5,6,7,8
4          5,6,7,8,9
5          6,7,8,9,10
7-end      join所有剩下的线程(所有任务都已经添加,程序中while循环退出)

方法$thread->is_joinable()的作用

前面已经说了,这个方法是用来判断线程是否已经运行完成,处于等待join的状态。当需要处理多个任务,但这些任务完成需要的时间又不一样时,这个方法就显得特别重要。还是以上面的程序为例。程序初始运行时创建5个线程。第一个线程所需时间最短,为1s。第五个线程所需时间最长5s。如果不适用$thread->is_joinable()而直接join这五个线程的话,如下:
foreach $thread(threads->list(threads::all)){
$thread->join();
}

结果是:主程序处于等待状态。在1s后,第一个线程被join,主程序依然处于等待,2s后第二个线程被join,主程序等待。知道5s后第五个线程被join,主程序通畅,重新创建下一组线程(5个)。显然这个过程不能最大话利用CPU的资源。当第一个线程被join后,虽然程序中只有4个线程在运行,但是由于主程序处于等待状态,新的线程不会被创建。

最佳的方法就是判断线程是否可以被join。如上面的程序所写的,这样可以保证程序运行过程中始终是5个线程,最大化的利用CPU资源。

在当今多核CPU主流的形势下,多核并行程序提供了最大话利用CPU的可能性。perl自5.6版本后开始提供ithread模块专门为perl的多线程提供技术支持。在perl的多线程一文中,我们初步探讨了perl的多线程技术。里面使用了is_joinable方法,来判断目标线程是否已经执行完成,并处于等待join的状态。程序源码如下
use threads;
print localtime(time),"\n";
my $j=0;
my $thread;
while(){
last if($j>=10);
while(scalar(threads->list())<5){
$j++;
threads->new(\&ss,$j,$j);
}
foreach $thread(threads->list(threads::all)){
if($thread->is_joinable()) {
    $thread->join();
    print scalar(threads->list()),"\t$j\t",localtime(time),"\n";
 }
}
}

foreach $thread(threads->list(threads::all)){
    $thread->join();print scalar(threads->list()),"\t$j\t",localtime(time),"\n";       
}

print localtime(time),"\n";  

sub ss(){
    my ($t,$s)=@_;
    sleep($t);      
    print "$s\t";
}

上述方法有一个极大的缺陷。如果正在执行的五个线程都没有执行完成,最外层的while循环将会一直“空转”,直到有一个线程被join掉,while循环在控制创建新的线程。这个过程中,主线程因为这个while循环的存在,会一直耗费系统资源。其实在任务管理器中可以看到,我们的程序会耗费50%的CPU(双核CPU),实际上这都耗费在了没有执行任何功能的外层while循环上。

在perl创建的线程结束时不会有任何提示,以告诉主线程说自己(从线程)已经结束。所以必须使用附加程序来判断目标线程是否已经执行完成,并立即join掉线程,以释放系统资源。但是这个判断过程及耗系统资源。正如上面的程序。为此,在一次google了一下,感谢云舒提供的方法,终于学会了。

---------------------------------------------------
信号量
Thread::Semaphore 包为线程提供了信号量的支持,使用信号量可以控制并行的线程数。

对象申明:
my $semaphore = Thread::Semaphore->new(5);
or
my $semaphore = new Thread::Semaphore(5);  控制并行的最大线程数

对象方法:
$semaphore->down;     
获取一个信号量,之后可用的信号量减1,当$semaphore=0时,表示所有的线程都已经创建,无法获得新的信号量,并且此时主线程处于等待。直到获得新的信号量为止。

$semaphore->up;
释放一个信号量,之后可用信号量加1.当一个线程执行完毕时,调用此方法,释放一个信号量。此时 $semaphore->down方法获得信号量成功。处于等待的主线程从新唤醒,创建新的线程。

Semaphore线程信号量

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
它用来控制访问资源的最大线程数量;以上厕所例,只有10个坑位,所以最多只能允许10个人同时使用。

Semaphore 的使用场景也类似是这种生产消费的模式,不同的是它是控制一个资源的多个线程访问数量。其通常也叫做信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

可以把它简单的理解成停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。

通常用于那些资源有明确访问数量限制的场景,常用于限流。

比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。

又比如上面的停车场场景,车位数量有限,同时只能容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入:
1.停车场容纳总停车量10。
2.当一辆车进入停车场后,显示牌的剩余车位数响应的减1。
3.每有一辆车驶出停车场后,显示牌的剩余车位数响应的加1。
4.停车场剩余车位不足时,车辆只能在外面等待。


收割线程

上次使用的程序收割线程时使用的是join方法,该方法的特点就是如果线程已经执行完毕,那么此时调用join方法理所当然,但是如果join调用过早,线程还没有执行完毕,主线程就会处于拥堵状态。知道从线程执行完毕,join方法完成为止。这极大的限制了程序运行的性能。

perl里面提供了另外一种收割线程的方法:detach,其特点是直接把从线程从主线程里面剥离,以后互不相关,但也就不能获取获取其后续的相关结果。当从线程执行完毕时会自动释放占有的资源,算是省心省力了。这里我们将尝试使用detach方法收割线程,并且使用信号量来控制线程的数量。

实例:
------------------------------------------------------------
use v5.20;
use threads;
use Thread::Semaphore;
my $j=0;
my $thread;
my $max_threads=5;
my $semaphore=new Thread::Semaphore($max_threads);

say localtime(time);
while(){
if($j>10){
    last;
}
$j++;
#获得一个信号量;当执行的线程数为5时,获取失败,主线程等待。直到有一个线程结束,新的信号量可用。恢复正常运行;
$semaphore->down();
my $thread=threads->new(\&ss,$j,$j); #创建线程
$thread->detach(); #剥离线程
}

#必须存在的部分,用在控制在主线程结束前保证所有剥离的线程已经执行完成。否则的话,当主线程结束时还有从线程没有执行完成,这时从线程将不得不被强行kill掉(皮之不存毛将焉附)。
&waitquit;

say localtime(time);

sub ss(){
    my ($t,$s)=@_;
    sleep($t);
    say "$s\t",scalar(threads->list()),"\t$j\t",localtime(time);
    $semaphore->up();#当线程执行完成时,释放信号量。
}

#来源于云舒
sub waitquit{
say 'Waiting to quit...';
my $num=0;
while($num<$max_threads){
    $semaphore->down();
    $num++;
    say "$num thread quit...";
}
  say "All $max_threads thread quit\n";
}

------------------------------------------------
程序运行结果和前文一致,就不描述了,详情请点此处

与上面使用的代码最大区别在于少了一个外循环用来判断从线程是否已经执行完成,极大的降低了CPU的使用率。


虽然脚本语言中的线程并不那么受人喜欢,但存在即是合理,从我们系统管理员来的角度来说已经足够了。这里提供一些示例,都是经过实践证明过的。为大家在编写perl线程时提供一种参考。做为提高处理效率的方法,这里以ping一系列主机是否存活(使用Net::Ping模块),简单实用。

---------------------------------------------------
perl 使用多线程取得网站状态

use v5.12;
use threads;
use Thread::Queue;
use Mojo::UserAgent;

use constant THREADS =>3;

my $queue = Thread::Queue->new();
my @URLs=('http://www.163.com/','http://www.sina.com.cn/','http://www.freeoa.net/','http://stackoverflow.com/','http://blog.csdn.net/','http://blog.chinaunix.net/','http://www.cloudera.com/');
my @threads;

for (1..THREADS){
 push @threads, threads->create(sub{
 my $ua = Mojo::UserAgent->new;
 $ua->connect_timeout(3);
while(my $task = $queue->dequeue){
 my $resp=$ua->get($task);
 if ($resp->{res}->{code} =~ /2|3/){
  say "$task has $resp->{res}->{message}"
 }else{
  say "$task NOT done and retun:$resp->{res}->{message}";
 }
}
});
}

$queue->enqueue(@URLs);
$queue->enqueue(undef) for 1..THREADS;
say '-' x 30;
# ... here work is done
#$_->join foreach @threads;
foreach my $thread (@threads){
 my $tid=$thread->tid();
 say 'i am tid is:'.$tid;
 $thread->join;
}

发现虽然会开启三个线程,但通常第三个线程没有任务。线程数量与任务控制没有那么好。

---------------------------------------------------
取得特定目录下文件的stat信息

use v5.12;
use threads;
use threads::shared;
use File::stat;

my @files=glob("*.*");
#最大线程数
my $max_th=3;
#线程池
my @thread_array;
my $cu_th=0;

foreach(@files){
 if($cu_th>=$max_th){
  foreach my $thread(@thread_array){
   #say $thread->tid();
   $thread->join();
   my $th_cnt=threads->list();
  }
  $cu_th=0;
  @thread_array=();
 }
 $thread_array[$cu_th]=threads->new(\&fstat,$_);
 $cu_th++;
}

#等待线程结束
foreach my $thread(@thread_array){
 say $thread->tid();
 $thread->join();
}

sub fstat{
 my $file=shift;
 my $a=stat($file);
 print "$file size(bytes):";
 #say Dumper($a);
 say $a->[7];
}

检测内网存活的主机地址

use v5.12;
use threads;
use threads::shared;
use Net::Ping;

my $p = Net::Ping->new();
$p->hires();

#最大线程数
my $max_th=5;
#线程池
my @thread_array;
my $cu_th=0;

my @addres=(100..149);
my @hosts=map "192.168.20.$_", @addres;

foreach(@hosts){
 if($cu_th>=$max_th){
  say "Threads exceed $max_th,we will suspended and handle some theards...";
  foreach my $thread(@thread_array){
   say 'Tid is:'.$thread->tid();
   $thread->join();
  }
  ($cu_th,@thread_array)=(0);
 }
 $thread_array[$cu_th]=threads->new(\&pingit,$_);
 $cu_th++;
}

#等待线程结束
foreach my $thread(@thread_array){
 say 'Remain thread id:'.$thread->tid();
 $thread->join();
}

sub pingit{
 my ($host) = @_;
 my ($ret,$duration,$ip) = $p->ping($host,2);
 if($ret){
  printf("[ip: $ip] is alive (packet return time: %.2f ms)\n",1000*$duration);
 }else{
  #printf("[ip: $ip] is Dead (packet return time: %.2f ms)\n",1000*$duration);
 }
}

线程数量与任务控制的较好。

---------------------------------------------------
推荐使用的Perl线程方法

一、控制线程数量(设置最大线程数)

use v5.12;
use threads;
use Net::Ping;

my $thread_num=0;
my @addres=(100..149);
my @hosts=map "192.168.20.$_", @addres;

#初始化ping模块
my $p = Net::Ping->new();
$p->hires();

while(@hosts){
#for(@hosts)
 if($thread_num>=10){
  for my $t (threads->list(threads::joinable)){
   say 'thread id:'.$t->tid();
   $t->join();
   $thread_num--;
  }
  redo;
 }
 threads->create(\&pingit,pop(@hosts));
 $thread_num++;
}

#将剩余的线程结束
for my $t (threads->list()){
 $t->join();
 say 'Remain thread id:'.$t->tid();
}

sub pingit{
 my ($host) = @_;
 my ($ret,$duration,$ip) = $p->ping($host,2);
 if($ret){
  printf("[ip: $ip] is alive (packet return time: %.2f ms)\n",1000*$duration);
 }else{
  #printf("[ip: $ip] is Dead (packet return time: %.2f ms)\n",1000*$duration);
 }
}

二、固定的线程数量,异步完成
use v5.20;
use Net::Ping;
use Data::Dumper;
use Time::HiRes;
use threads;
use Thread::Queue qw( );

my $NUM_WORKERS = 10;
my @hosts:shared;
#ip address scope
my @addres=(100..149);

#初始化ping模块
my $p = Net::Ping->new();
$p->hires();

@hosts=map "192.168.20.$_", @addres;
my $q=Thread::Queue->new();
my @workers;

sub worker{
  my ($host) = @_;
  my ($ret,$duration,$ip) = $p->ping($host,2);
 if($ret){
  printf("[ip: $ip] is alive (packet return time: %.2f ms)\n",1000*$duration);
 }else{
  #printf("[ip: $ip] is Dead (packet return time: %.2f ms)\n",1000*$duration);
 }
}

for(1..$NUM_WORKERS){
 push @workers,async{
  while(defined(my $job=$q->dequeue())){
   my $tid=threads->self->tid();
   worker($job);
   print "done with threads $tid\n";
   #say '+' x threads->list(threads::joinable);
  }
 };
}

$q->enqueue($_) for @hosts; # Send work
$q->enqueue(undef) for @workers; # Tell workers they're done.
$_->join() for @workers; # Wait for the workers to finish.

三、共享数组变量,尽可能多的创建线程
use v5.20;
use Net::Ping;
use Data::Dumper;
use Time::HiRes;
use threads;
use threads::shared;

my @hosts:shared;

#ip address scope
my @addres=(100..149);

#初始化ping模块
my $p = Net::Ping->new();
$p->hires();

my (@threads);

@hosts=map "192.168.20.$_", @addres;

for ( my $i = 1; $i <= scalar(@addres); $i++){
 my $t = threads->new(\&pingme);
 push(@threads,$t);
}

foreach (@threads){
 $_->join;
 my $tid=$_->tid();
 print "done with threads $tid\n";
}

sub pingme{
 my $host;
 {
  lock(@hosts);
  $host=pop @hosts;
 }
  my ($ret,$duration,$ip) = $p->ping($host,2);
  printf("[ip: $ip] is alive (packet return time: %.2f ms)\n", 1000 * $duration) if $ret;
}

四、共享Hash变量,尽可能多的创建线程
use v5.20;
use Net::Ping;
use Data::Dumper;
use Time::HiRes;
use threads;
use threads::shared;

my %hosts:shared;

#ip address scope
my @addres=(100..150);

#初始化ping模块
my $p = Net::Ping->new();
$p->hires();

my (@threads);

%hosts=map {$_=>"192.168.20.$_"} @addres;

for( my $i=1; $i<=scalar(keys %hosts);$i++) {
  my $t = threads->new(\&pingme);
  push(@threads,$t);
}
foreach (@threads) {
 $_->join;
 my $tid=$_->tid();
 print "done with threads $tid\n";
}

sub pingme{
 my $host;
 {
  lock(%hosts);
  my @hks=keys(%hosts);
  my $tk=pop(@hks);
  $host=$hosts{$tk};
  delete $hosts{$tk};
 }
 say $host;
 my ($ret,$duration,$ip) = $p->ping($host,2);
 printf("[ip: $ip] is alive (packet return time: %.2f ms)\n", 1000 * $duration) if $ret;
}


---------------------------------------------------
与Perl式的面向对象结合使用

方法是一种属于类的特殊子例程,要求第一个参数必须是包含或指向对象的引用,并且这个参数是由perl自动隐式赋值的。

Free.pm
package Free;

sub new{
 my $class=shift;
 my $ref={};
 bless($ref,$class);
 return $ref;
}
sub set_data{
 my $self=shift;
 my $key=shift;
 my $value=shift;
 $self->{$key}=$value;
}
sub get_data{
 my $self=shift;
 my $key=shift;
 $self->{$key};
}
1;

同目录下建立Perl文件use Free
use v5.10;
require ("Free.pm");##引入外部文件
my $house=new Free("FreeOA");
say ref($house);##ref函数返回类名--House
$house->set_data("name","zwjong");
say $house->get_data("name");

多线程版的包使用示例
use threads;
use threads::shared;

my $obj=&shared_clone (OA->new); # 要给变量声明成共享

my $th1=threads->create(\&test1);
my $th2=threads->create(\&test2);
$th1->join;
$th2->join;
$obj->get_num;
 
sub test1 {
 lock ($obj);
 for( 1 .. 100){
  $obj->add_num();
 }  
}

sub test2 {
 lock ($obj);
 for( 1 .. 100 ){
  $obj->add_num();
 }  
}
 
package OA;
sub new {
 my $class=shift;
 my $data={ 'number' => 0 };
 my $self=bless ($data, $class);
 return $self;
}
 
sub add_num {
 my $self=shift;
 $self->{number} ++;
}
 
sub get_num {
 my $self=shift;
 print $self->{number}."\n"
}
1;

---------------------------------------------------


---------------------------------------------------


---------------------------------------------------
参考来源

Perl 中的线程

Perl线程开发过程中的经验