轻量级消息内核-ZeroMQ
2013-05-13 16:10:41 阿炯

ZeroMQ是一个轻量级消息内核。它可用于C、C++、Perl、.NET /Mono、Fortran和Java语言绑定。它运行在AIX 、FreeBSD、HP-UX 、Linux、MacOS,OpenBSD,Solaris和Windows操作系统。采用C++编写开发并在MPLv2协议下授权使用。


ØMQ (also seen as ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPLv3 open source.

Feature
The socket library that acts as a concurrency framework.
Faster than TCP, for clustered products and supercomputing.
Carries messages across inproc, IPC, TCP, and multicast.
Connect N-to-N via fanout, pubsub, pipeline, request-reply.
Asynch I/O for scalable multicore message-passing apps.
Large and active open source community.
30+ languages including C, C++, Java, .NET, Perl.
Most OSes including Linux, Windows, OS X.


ZeroMQ作为一个高性能的异步消息库,为构建分布式应用程序提供了灵活且高效的方式。关于使用 ZeroMQ 与 C 语言结合使用是开发者深入探索这个强大技术的路径。它提供了多种消息模式,包括请求-应答模式、发布-订阅模式和推拉模式等,这些模式允许在分布式系统中的进程或节点之间进行不同类型的通信。可从一个简单的使用 ZeroMQ 的请求-应答模式的例子开始:

#include <stdio.h>
#include <zmq.h>

int main() {
    void *context = zmq_ctx_new();
    void *requester = zmq_socket(context, ZMQ_REQ);
    zmq_connect(requester, "tcp://localhost:5435");

    for (int request_nbr = 0; request_nbr < 10; request_nbr++) {
        char buffer[10];
        sprintf(buffer, "Hello %d", request_nbr);
        zmq_send(requester, buffer, strlen(buffer), 0);
        zmq_recv(requester, buffer, 10, 0);
        printf("Received reply %d: %s\n", request_nbr, buffer);
    }

    zmq_close(requester);
    zmq_ctx_destroy(context);
    return 0;
}

在这个代码中,我们创建了一个 ZeroMQ 上下文和一个请求套接字。然后向在本地主机端口 5435 上运行的服务器发送一系列请求,并接收回复。这展示了基本的请求-应答模式。

可适当增加代码使这个例子更加全面。例如可添加错误处理并使用更复杂的消息格式。

#include <stdio.h>
#include <zmq.h>

int main() {
    void *context = zmq_ctx_new();
    void *requester = zmq_socket(context, ZMQ_REQ);
    int rc = zmq_connect(requester, "tcp://localhost:5435");
    if (rc!= 0) {
        perror("Error connecting to server");
        zmq_close(requester);
        zmq_ctx_destroy(context);
        return -1;
    }

    for (int request_nbr = 0; request_nbr < 10; request_nbr++) {
        char buffer[10];
        sprintf(buffer, "Hello %d", request_nbr);
        rc = zmq_send(requester, buffer, strlen(buffer), 0);
        if (rc == -1) {
            perror("Error sending request");
            break;
        }
        rc = zmq_recv(requester, buffer, 10, 0);
        if (rc == -1) {
            perror("Error receiving reply");
            break;
        }
        printf("Received reply %d: %s\n", request_nbr, buffer);
    }

    zmq_close(requester);
    zmq_ctx_destroy(context);
    return 0;
}

在这个更新后的代码中,在连接和发送/接收消息时检查错误。这使得代码更加健壮。另一个有用的消息模式是发布-订阅模式。在这种模式下,一个或多个发布者向多个订阅者发送消息。下面是一个例子:

#include <stdio.h>
#include <zmq.h>

int main() {
    void *context = zmq_ctx_new();
    void *publisher = zmq_socket(context, ZMQ_PUB);
    zmq_bind(publisher, "tcp://*:5556");

    while (1) {
        zmq_send(publisher, "Hello subscribers", strlen("Hello subscribers"), 0);
        sleep(1);
    }

    zmq_close(publisher);
    zmq_ctx_destroy(context);
    return 0;
}

这段代码创建了一个发布者,它每秒发送一条消息。订阅者可以连接到这个发布者并接收消息。适当增加代码来创建一个订阅者并展示它如何接收消息。

#include <stdio.h>
#include <zmq.h>
#include <unistd.h>

int main() {
    void *context = zmq_ctx_new();
    void *subscriber = zmq_socket(context, ZMQ_SUB);
    zmq_connect(subscriber, "tcp://localhost:5556");
    zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);

    while (1) {
        char buffer[256];
        int size = zmq_recv(subscriber, buffer, 256, 0);
        if (size > 0) {
            buffer[size] = '\0';
            printf("Received: %s\n", buffer);
        }
        sleep(1);
    }

    zmq_close(subscriber);
    zmq_ctx_destroy(context);
    return 0;
}

在这个代码中创建了一个订阅者,它连接到发布者并接收消息。zmq_setsockopt函数用于订阅所有消息。

ZeroMQ 是一个强大的用于构建分布式应用程序的工具。探索使用其各种消息模式和功能,并构建更复杂的分布式系统。无论是在构建微服务架构还是实时数据处理管道,ZeroMQ 都可以帮助实现高效且可扩展的通信。


最新版本:4.0
该版本主要是 bug 修复。

官方主页:http://www.zeromq.org/

文档参考:http://zguide.zeromq.org/page:all