ZeroMQ入门介绍
2013-05-15 15:04:25 阿炯

ZeroMQ是一个定位为"史上最快消息队列",所以名字里面有"MQ"两个字母,但是后来逐渐演变发展,慢慢淡化了消息队列的身影,改称为消息内核,或者消息层了。

从网络通信的角度看,它处于会话层之上,应用层之下,有了它,甚至不需要自己写一行的socket函数调用就能完成复杂的网络通信工作。ZeroMQ 并不是一个对 socket 的封装,不能用它去实现已有的网络协议。它有自己的模式,不同于更底层的点对点通讯模式,它有比 tcp 协议更高一级的协议(当然 ZeroMQ 不一定基于 TCP 协议,它也可以用于进程间和进程内通讯)。它改变了通讯都基于一对一的连接这个假设。

与其它的消息队列相比,ZeroMQ有以下一些特点:

简单点对点无中间节点

传统的消息队列都需要一个消息服务器来存储转发消息。而ZeroMQ则放弃了这个模式,把侧重点放在了点对点的消息传输上,并且试图做到极致。以为消息服务器最终还是转化为服务器对其他节点的点对点消息传输上。ZeroMQ能缓存消息,但是是在发送端缓存。ZeroMQ里有水位设置的相关接口来控制缓存量。当然ZeroMQ也支持传统的消息队列(通过zmq_device来实现)。

相比原始的 socket API,ZMQ 封装掉了很多东西,免去了开发人员的很多麻烦。比如,传统的 TCP是基于字节流进行收发,因此程序员常常要自己去处理数据块与数据块之间的边界(断界处理);与之相对,ZMQ是以消息为单位进行收发,它确保你每次发出/收到的,都是一个消息块。这样一来,就省却了不少代码量。

基于 socket API 进行 TCP通讯,需要处理很多网络异常(比如连接异常中断以及重连),在 ZMQ 中,这些琐事统统不用操心。用传统的 socket API,当你想提高通讯性能,往往要搞些异步(非阻塞)、缓冲区、多线程之类的把戏。而这些东西,ZMQ 也帮你封装掉了。

ZMQ 对很多底层细节的封装,让网络程序代码变得简单。

强调消息收发模式

在点对点的消息传输上ZeroMQ将通信的模式做了归纳,比如常见的订阅模式(一个消息发多个客户),分发模式(N个消息平均分给X个客户)等等。下面是目前支持的消息模式配对,任何一方都可以做为服务端:

PUB and SUB
REQ and REP
REQ and XREP
XREQ and REP
XREQ and XREP
XREQ and XREQ
XREP and XREP
PUSH and PULL
PAIR and PAIR

统一接口支持多种底层通信方式(线程间通信,进程间通信,跨主机通信)

如果你想把本机多进程的软件放到跨主机的环境里去执行,通常要将IPC接口用套接字重写一遍。非常麻烦。而有了ZeroMQ就方便多了,只要把通信协议从'ipc:///xxx'改为'tcp://...:'就可以了,其他代码通通不需要改,如果这个是从配置文件里读的话,那么程序就完全不要动了,直接复制到其他机器上就可以了,以为ZeroMQ为我们做了很多。

ZMQ 可以灵活地支持多种通讯环境(进程内,主机内跨进程、跨主机)。ZMQ 的 API 设计得很好,以至于你的代码只要做很小的改动(甚至不改动),就可以适用于不同的通讯环境。

像 socket.connect("tcp://127.0.0.1:1234"),其中的 "tcp://127.0.0.1:1234" 是表示通讯对端的地址串。ZMQ 约定地址串使用如下格式:transport://endpoint 。地址串前面的 transport 表示通讯的类型,目前支持 inproc(进程内),ipc(主机内跨进程),tcp(跨主机),pgm(跨主机,支持多播)共4种方式。可把通讯的地址串保存到配置文件中,就完全可以用一套代码来搞定多种通讯方式即可。

ZeroMQ 把通讯的需求看成四类。其中一类是一对一结对通讯,用来支持传统的 TCP socket 模型,但并不推荐使用,常用的通讯模式只有三类。

请求回应模型:由请求端发起请求,并等待回应端回应请求。从请求端来看,一定是一对对收发配对的;反之,在回应端一定是发收对。请求端和回应端都可以是 1:N 的模型,通常把 1 认为是 server,N 认为是 Client。ZeroMQ 可以很好的支持路由功能(实现路由功能的组件叫作 Device),把 1:N 扩展为 N:M (只需要加入若干路由节点)。从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含有回应地址,而应用则不关心它。

发布订阅模型:这个模型里,发布端是单向只发送数据的,且不关心是否把全部的信息都发送给订阅端。如果发布端开始发布信息的时候,订阅端尚未连接上来,这些信息直接丢弃。不过一旦订阅端连接上来,中间会保证没有信息丢失。同样,订阅端则只负责接收,而不能反馈。如果发布端和订阅端需要交互(比如要确认订阅者是否已经连接上),则使用额外的 socket 采用请求回应模型满足这个需求。

管道模型:这个模型里,管道是单向的,从 PUSH 端单向的向 PULL 端单向的推送数据流。

任何分布式,并行的需求,都可以用这三种模型组合起来解决问题。ZeroMQ 只专注和解决了消息通讯这一基本问题,干的非常漂亮。以上模型中,关注的是通讯双方的职责,而不是实现的方式:监听端口还是连接对方端口。对于复杂的多进程协同工作的系统, 不必纠结于进程启动的次序。

异步高性能

ZeroMQ设计之初就是为了高性能的消息发送而服务的,所以其设计追求简洁高效。它发送消息是异步模式,通过单独出一个IO线程来实现,所以消息发送调用之后不要立刻释放相关资源哦,会出错的(以为还没发送完),要把资源释放函数交给ZeroMQ让ZeroMQ发完消息自己释放。 下图是一张第三方的性能对比图,可以看出,单就性能来说,它是相当不错的。






以下部分转自千里之外的博客,感谢原作者。

ZeroMQ简称ZMQ是一个基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。是网络通信中新的一层,介于应用层和传输层之间按照TCP/IP划分,其是一个可伸缩层,可并行运行,分散在分布式系统间。ZMQ不是单独的服务,而是一个嵌入式库,它封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信。



主线程与I/O线程:

I/O线程,ZMQ根据用户调用zmq_init函数时传入的参数,创建对应数量的I/O线程。每个I/O线程都有与之绑定的Poller,Poller采用经典的Reactor模式实现。

Poller根据不同操作系统平台使用不同的网络I/O模型select、poll、epoll、devpoll、kequeue等,所有的I/O操作都是异步的,线程不会被阻塞。

主线程与I/O线程通过Mail Box传递消息来进行通信。



Server,在主线程创建zmq_listener,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_listener添加到Poller中用以侦听读事件。

Client,在主线程中创建zmq_connecter,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_connecter添加到Poller中用以侦听写事件。

Client与Server第一次通信时,会创建zmq_init来发送identity,用以进行认证。认证结束后,双方会为此次连接创建Session,以后双方就通过Session进行通信。

每个Session都会关联到相应的读/写管道,主线程收发消息只是分别从管道中读/写数据。Session并不实际跟kernel交换I/O数据,而是通过plugin到Session中的Engine来与kernel交换I/O数据。


ZMQ将消息通信分成4种模型:
一对一结对模型Exclusive-Pair,可以认为是一个TCP Connection,但是TCP Server只能接受一个连接。数据可以双向流动,这点不同于后面的请求回应模型。

请求回应模型Request-Reply,由Client发起请求,并由Server响应,跟一对一结对模型的区别在于可以有多个Client。

发布订阅模型Publish-Subscribe,Publish端单向分发数据,且不关心是否把全部信息发送给Subscribe端。如果Publish端开始发布信息时,Subscribe端尚未连接进来,则这些信息会被直接丢弃。Subscribe端只能接收,不能反馈,且在Subscribe端消费速度慢于Publish端的情况下,会在Subscribe端堆积数据。

管道模型Push-Pull,从 PUSH 端单向的向 PULL 端单向的推送数据流。如果有多个PULL端同时连接到PUSH端,则PUSH端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到PULL端上。与发布订阅模型相比,管道模型在没有消费者的情况下,发布的消息不会被消耗掉;在消费者能力不够的情况下,能够提供多消费者并行消费解决方案。该模型主要用于多任务并行。

这4种模型总结出了通用的网络通信模型,在实际中可以根据应用需要,组合其中的2种或多种模型来形成自己的解决方案。

ZMQ提供进程内inproc://、进程间ipc://、机器间tcp://、广播pgm://等四种通信协议。


ZMQ API


ZMQ提供的所有API均以zmq_开头

#include <zmq.h>

gcc [flags] files -lzmq [libraries]

例如,返回当前ZMQ库的版本信息
void zmq_version (int *major, int *minor, int *patch);

Context

在使用任何ZQM库函数之前,必须首先创建ZMQ context上下文,程序终止时,也需要销毁context。

创建context

void *zmq_ctx_new ();

ZMQ context是线程安全的,可以在多线程环境使用,而不需要程序员对其加/解锁。

在一个进程中,可以有多个ZMQ context并存。

设置context选项

int zmq_ctx_set (void *context, int option_name, int option_value);
int zmq_ctx_get (void *context, int option_name);

销毁context

int zmq_ctx_term (void *context);


Sockets

ZMQ Sockets 是代表异步消息队列的一个抽象,注意,这里的ZMQ socket和POSIX套接字的socket不是一回事,ZMQ封装了物理连接的底层细节,对用户不透明。传统的POSIX套接字只能支持1对1的连接,而ZMQ socket支持多个Client的并发连接,甚至在没有任何对端peer的情况下,ZMQ sockets上也能放入消息;ZMQ sockets不是线程安全的,因此不要在多个线程中并行操作同一个sockets。


创建ZMQ  Sockets

void *zmq_socket (void *context, int type);

注意,ZMQ socket在bind之前还不能使用。

type参数含义

pattern

type

description

一对一结对模型

ZMQ_PAIR

 

请求回应模型

ZMQ_REQ

client端使用

ZMQ_REP

server端使用

ZMQ_DEALER

将消息以轮询的方式分发给所有对端(peers)

ZMQ_ROUTER

 

发布订阅模型

ZMQ_PUB

publisher端使用

ZMQ_XPUB

 

ZMQ_SUB

subscriber端使用

ZMQ_XSUB

 

管道模型

ZMQ_PUSH

push端使用

ZMQ_PULL

pull端使用

原生模型

ZMQ_STREAM

 



设置socket选项

int zmq_getsockopt (void *socket, int option_name, void *option_value, size_t *option_len);
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);


关闭socket

int zmq_close (void *socket);


创建一个消息流

int zmq_bind (void *socket, const char *endpoint);
int zmq_connect (void *socket, const char *endpoint);

bind函数是将socket绑定到本地的端点endpoint,而connect函数连接到指定的peer端点。

endpoint支持的类型:

transports

description

uri example

zmp_tcp

TCP的单播通信

tcp://*:8080

zmp_ipc

本地进程间通信

ipc://

zmp_inproc

本地线程间通信

inproc://

zmp_pgm

PGM广播通信

pgm://



收发消息

int zmq_send (void *socket, void *buf, size_t len, int flags);
int zmq_recv (void *socket, void *buf, size_t len, int flags);
int zmq_send_const (void *socket, void *buf, size_t len, int flags);

zmq_recv()函数的len参数指定接收buf的最大长度,超出部分会被截断,函数返回的值是接收到的字节数,返回-1表示出错;

zmq_send()函数将指定buf的指定长度len的字节写入队列,函数返回值是发送的字节数,返回-1表示出错;

zmq_send_const()函数表示发送的buf是一个常量内存区constant-memory,这块内存不需要复制、释放。


socket事件监控

int zmq_socket_monitor (void *socket, char * *addr, int events);

zmq_socket_monitor()函数会生成一对sockets,publishers端通过inproc://协议发布 sockets状态改变的events;
消息包含2帧,第1帧包含events id和关联值,第2帧表示受影响的endpoint。

监控支持的events:

ZMQ_EVENT_CONNECTED: 建立连接
ZMQ_EVENT_CONNECT_DELAYED: 连接失败
ZMQ_EVENT_CONNECT_RETRIED: 异步连接/重连
ZMQ_EVENT_LISTENING: bind到端点
ZMQ_EVENT_BIND_FAILED: bind失败
ZMQ_EVENT_ACCEPTED: 接收请求
ZMQ_EVENT_ACCEPT_FAILED: 接收请求失败
ZMQ_EVENT_CLOSED: 关闭连接
ZMQ_EVENT_CLOSE_FAILED: 关闭连接失败
ZMQ_EVENT_DISCONNECTED: 会话tcp/ipc中断


I/O多路复用

int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);

对sockets集合的I/O多路复用,使用水平触发。

与epoll类似,items参数指定一个结构体数组结构体定义如下,nitems指定数组的元素个数,timeout参数是超时时间单位:ms,0表示不等待立即返回,-1表示阻塞等待。

typedef struct {
    void *socket;
    int fd;
    short events;
    short revents;
} zmq_pollitem_t;


对于每个zmq_pollitem_t元素,ZMQ会同时检查其socketZMQ套接字和fd原生套接字上是否有指定的events发生,且ZMQ套接字优先。

events指定该sockets需要关注的事件,revents返回该sockets已发生的事件,它们的取值为:
ZMQ_POLLIN,可读;
ZMQ_POLLOUT,可写;
ZMQ_POLLERR,出错;


 
Messages

一个ZMQ消息就是一个用于在消息队列进程内部或跨进程中进行传输的数据单元,ZMQ消息本身没有数据结构,因此支持任意类型的数据,这完全依赖于程序员如何定义消息的数据结构。

一条ZMQ消息可以包含多个消息片multi-part messages,每个消息片都是一个独立zmq_msg_t结构。ZMQ保证以原子方式传递消息,要么所有消息片都发送成功,要么都不成功。


初始化消息

typedef void (zmq_free_fn) (void *data, void *hint);
int zmq_msg_init (zmq_msg_t *msg);
int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint);
int zmq_msg_init_size (zmq_msg_t *msg, size_t size);

zmq_msg_init()函数初始化一个消息对象zmq_msg_t ,不要直接访问zmq_msg_t对象,可以通过zmq_msg_* 函数来访问它。
zmq_msg_init()、zmq_msg_init_data()、zmq_msg_init_size() 三个函数是互斥的,每次使用其中一个即可。


设置消息属性

int zmq_msg_get (zmq_msg_t *message, int property);
int zmq_msg_set (zmq_msg_t *message, int property, int value);


释放消息

int zmq_msg_close (zmq_msg_t *msg);


收发消息

int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags);
int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);

 其中,flags参数如下:
ZMQ_DONTWAIT,非阻塞模式,如果没有可用的消息,将errno设置为EAGAIN;
ZMQ_SNDMORE,发送multi-part messages时,除了最后一个消息片外,其它每个消息片都必须使用 ZMQ_SNDMORE 标记位。


获取消息内容

void *zmq_msg_data (zmq_msg_t *msg);
int zmq_msg_more (zmq_msg_t *message);
size_t zmq_msg_size (zmq_msg_t *msg);

zmq_msg_data()返回指向消息对象所带内容的指针;
zmq_msg_size()返回消息的字节数;
zmq_msg_more()标识该消息片是否是整个消息的一部分,是否还有更多的消息片待接收;


控制消息

int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);

zmq_msg_copy()函数实现的是浅拷贝;
zmq_msg_move()函数中,将dst指向src消息,然后src被置空。


接收消息的代码示例:

zmq_msg_t part;
while (true) {
    //Create an empty ØMQ message to hold the message part
    int rc = zmq_msg_init (&part);
    assert (rc == 0);
    //Block until a message is available to be received from socket
    rc = zmq_msg_recv (socket, &part, 0);
    assert (rc != -1);
    if (zmq_msg_more (&part))
        fprintf (stderr, "more\n");
    else {
        fprintf (stderr, "end\n");
        break;
    }   
    zmq_msg_close (&part);
}



代理

ZMQ提供代理功能,代理可以在前端socket和后端socket之间转发消息。

int zmq_proxy (const void *frontend, const void *backend, const void *capture);
int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control);

共享队列shared queue,前端是ZMQ_ROUTER socket,后端是ZMQ_DEALER socket,proxy会把clients发来的请求,公平地分发给services;
转发队列forwarded,前端是ZMQ_XSUB socket, 后端是ZMQ_XPUB socket, proxy会把从publishers收到的消息转发给所有的subscribers;
流streamer,前端是ZMQ_PULL socket, 后端是ZMQ_PUSH socket.


proxy使用的一个示例:
//Create frontend and backend sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);

//Bind both sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);

//Start the queue proxy, which runs until ETERM zmq_proxy frontend, backend, NULL);


错误处理

ZMQ库使用POSIX处理函数错误,返回NULL指针或者负数时表示调用出错。

int zmq_errno (void);
const char *zmq_strerror (int errnum);

zmq_errno()函数返回当前线程的错误码errno变量的值;

zmq_strerror()函数将错误映射成错误字符串。


加密传输

ZQM可以为IPC和TCP连接提供安全机制:
不加密,zmq_null

使用用户名/密码授权,zmq_plain

椭圆加密,zmq_curve

这些通过 zmq_setsockopt()函数设置socket选项的时候配置。



总结:

1、仅仅提供24个API接口,风格类似于BSD Socket。

2、处理了网络异常,包括连接异常中断、重连等。

3、改变TCP基于字节流收发数据的方式,处理了粘包、半包等问题,以msg为单位收发数据,结合Protocol Buffers,可以对应用层彻底屏蔽网络通信层。

4、对大数据通过SENDMORE/RECVMORE提供分包收发机制。

5、通过线程间数据流动来保证同一时刻任何数据都只会被一个线程持有,以此实现多线程的“去锁化”。

6、通过高水位HWM来控制流量,用交换SWAP来转储内存数据,弥补HWM丢失数据的缺陷。

7、服务器端和客户端的启动没有先后顺序。