异步事件处理库-libevent
2011-01-07 14:31:35 阿炯

Libevent是一个C语言编写开发的异步事件处理程序库,以BSD许可证释出。高效的事件驱动编程已经成为开发高性能网络应用程序的关键。其做为一个轻量级但功能强大的C库,提供了一套简洁高效的事件通知机制,广泛应用于网络服务器、分布式系统等领域。

libevent提供了一组应用程序编程接口(API),让程序员可以设定某些事件发生时所执行的函式,也就是说,它可以用来取代网络服务器所使用的循环检查架构。由于可以省去对网络的处理,且拥有不错的效能,有些软件使用libevent作为网络底层的函式库,如:memcached、Tor。

The libevent API provides a mechanism to execute a callback function when a specific event occurs on a file descriptor or after a timeout has been reached. Furthermore, libevent also support callbacks due to signals or regular timeouts.

libevent is meant to replace the event loop found in event driven network servers. An application just needs to call event_dispatch() and then add or remove events dynamically without having to change the event loop.

Currently, libevent supports /dev/poll, kqueue(2), event ports, select(2), poll(2) and epoll(4). The internal event mechanism is completely independent of the exposed event API, and a simple update of libevent can provide new functionality without having to redesign the applications. As a result, Libevent allows for portable application development and provides the most scalable event notification mechanism available on an operating system. Libevent can also be used for multi-threaded applications; see Steven Grimm's explanation. Libevent should compile on Linux, *BSD, Mac OS X, Solaris and Windows.

libevent、libevlibuv都是C语言实现的异步事件库,注册异步事件,检测异步事件,根据事件的触发先后顺序,调用相对应回调函数处理事件。处理的事件包括:网络IO、定时以及信号事件,这三个事件驱动着服务器的运行。

1.网络io事件:
linux:epoll、poll、select
bsd:kqueue
window:iocp

2.定时事件:
红黑树
最小堆:二叉树、四叉树
跳表
时间轮

3.信号事件
libevent 和 libev 主要封装了异步事件库与操作系统的交互简单的事件管理接口,让用户无需关注平台检测处理事件的机制的差异,只需关注事件的具体处理。

从设计理念出发,libev 是为了改进 libevent 中的一些架构决策;例如:全局变量的使用使得在多线程环境中很难安全地使用 libevent;event 的数据结构设计太大,它包含了 io、时间以及信号处理全封装在一个结构体中,额外的组件如 http、dns、openssl 等实现质量差(容易产生安全问题),计时器不精确,不能很好地处理时间事件。

libev 通过完全去除全局变量的使用,而是通过回调传参来传递上下文(后面libevent也这样做了);并且根据不同事件类型构建不同的数据结构,以此来减低事件耦合性;计时器使用最小四叉堆。libev 小而高效;只关注事件处理。

libevent 和 libev 对 window 支持比较差,由此产生了 libuv 库;libuv 基于 libev,在window 平台上更好的封装了 iocp;node.js 基于 libuv。其有着不少的优点。

几个显著的亮点:
=> 事件驱动(event-driven),高性能;
=> 轻量级,专注于网络,不如ACE那么臃肿庞大;
=> 源代码相当精炼、易读;
=> 跨平台,支持Windows、Linux、*BSD和Mac Os;
=> 支持多种I/O多路复用技术, epoll、poll、dev/poll、select和kqueue等;
=> 支持I/O,定时器和信号等事件;
=> 注册事件优先级;

Libevent做为一个开源的事件通知库,旨在提供一种高效的、可扩展的方式来处理大量并发的I/O操作。它封装了操作系统提供的各种事件通知机制,如select、poll、epoll、kqueue等,提供了一个统一的接口,让开发者可以轻松地编写跨平台的事件驱动程序。主要功能包括:

事件循环管理:提供了一种高效的事件循环机制,可以同时处理多种类型的事件,如I/O事件、定时器事件和信号事件。

多种后端支持:封装了不同操作系统上的底层事件通知机制,提供统一接口,支持select、poll、epoll、kqueue、devpoll等。

高并发处理:通过非阻塞I/O和事件驱动的方式,可以高效地处理大量并发连接,特别适合网络服务器开发。

可用来开发可扩展的网络服务器的事件通知函数库。当一个文件描述符上的特定事件发生或是一个超时时间到达后,libevent API提供一种执行回调函数的机制。且其还支持基于信号或定期超时的回调功能,旨在替换在原有事件驱动网络服务器事件循环而设计的,应用程序仅仅需要调用event_dispatch(),然后动态地添加或是移除事件就可以了,而不需要改变原有的事件循环。其特点如下:

做为轻量级的网络库,其具有以下这些特点:
支持平台:windows、unix/linux
轻量级:专注网络
支持 I/O,定时器和信号等事件
支持I/O的多路复用技术(epoll/poll/select/kqueue)
高性能:事件驱动

libevent是reactor反应堆模式(高效服务器架构模式可分为:reactor/proactor)。

目前libevent支持/dev/poll,kqueue(2),select(2),poll(2)和epoll(4)等高并发网络编程模型。而它对实时信号的支持正处于实验性阶段。内部的事件处理机制是完全独立于暴露出来的API的,并且新功能的加入并不需要重新设计应用程序,而是仅仅需要做一个简单的libevent更新即可。因此,lievent允许可移植性的应用程序开发,并且能够提供适合特定操作系统的最具可扩展性的事件通知机制。libevent同时也可用于多线程编程环境,更多说明请看Steven Grimm的说明。libevent可以在Linux,*BSD,Mac OS X,Solaris和Windows系统上编译。在众多事件通知库中,Libevent脱颖而出,不仅因为它的高效和易用性,还因为它在实际应用中展现出的独特优势。

1.高性能
Libevent通过封装操作系统底层高效的事件通知机制,如Linux的epoll和BSD的kqueue,实现了高性能的事件处理。相比传统的select和poll,epoll和kqueue在面对大量并发连接时表现更佳,这使得Libevent特别适合用来构建高并发的网络服务器和分布式系统。

2.跨平台
Libevent提供了统一的API接口,屏蔽了不同操作系统之间的差异,使得开发者可以编写跨平台的事件驱动程序。无论是在Linux、BSD还是Windows上,Libevent都能良好地工作。

3.灵活性
Libevent不仅支持I/O事件,还支持定时器事件和信号事件。这使得它在各种应用场景下都能得心应手,无论是网络通信、定时任务还是信号处理,都能使用Libevent来实现。

4.社区支持
作为一个开源项目,Libevent拥有活跃的社区和详尽的文档资源。无论是初学者还是资深开发者,都能在社区中找到丰富的学习资料和帮助。


网络库选择的标准

1.功能(tcp/udp/http/https/dns)
网络库提供的功能,能否解决痛点需求。

2.平台兼容性(linux/windows/unix)
如果只需要一个平台,那么只要库支持就这个平台就ok,如果需要多平台那么要考虑它的平台兼容性。

3.效率与复杂度的容忍
在满足上面两点的情况下,当然是效率越高越好了。


标准用法

每一个使用libevent的程序,都需要包含头文件,并且需要传递-levent标志给连接器linker。在使用任何库函数之前需要先调用event_init()或者event_base_new()函数制执行一次libevent库的初始化。

事件通知
对于每一个你想监视的文件描述符,你必须声明一个事件结构并且调用event_set()去初始化结构中的成员。为了激活通知,你需要通过调用event_add()将该结构添加到监视事件列表。只要是该事件存活,那么就需要保持该已allocated的事件结构,因此该事件结构需要在堆(heap)上申请。最后,需要调用event_dispatch()函数循环和调度事件。

I/O缓冲区
libevent提供了一个定期回调事件顶层的抽象。该抽象被称为缓冲事件(buffered event)。缓冲事件提供自动地填充和流掉(drained)的输入和输出缓冲区。缓冲时间的用户不再需要直接操作I/O,取而待之的是仅仅从输入缓冲区读,向输出缓冲区写就可以了。一旦通过bufferevent_new()进行了初始化,bufferevent结构就可以通过bufferevent_enable()和bufferevent_disable()重复地使用了。作为替代,对一个套接口的读写需要通过调用bufferevent_read()和bufferevent_write()函数来完成。当由于读事件而激活bufferevent时,那么后续将会自动回调读函数从该文件描述符读取数据。写函数将会被回调,无论何时这个输出缓冲区空间被耗尽到低于写的下水位(low watemark),通常该值默认为0。

定时器
libevent通过创建一个定时器来参与到一个经过一定超时时间后的回调事件中。evtimer_set()函数将准备(分配)一个事件结构被用于作为一个定时器。为了激活定时器,需要调用evtimer_add()函数。相反,需要调用evtimer_del()函数。

超时
除了简单的定时器,libevent可以为文件描述符指定一个超时事件,用于触发经过一段时间后而没有被激活的文件描述符执行相应的操作。timeout_set()函数可以为一个超时时间初始化一个事件结构。一旦被初始化成功,那么这个事件必须通过timeout_add()函数激活。为了取消一个超时事件,可以调用timeout_del()函数。

异步DNS解析
libevent提供了一个异步DNS解析器,可用于代替标准的DNS解析器。这些函数可以通过在程序中包含头文件而将其导入。在使用任何解析器函数之前,你必须调用evdns_init()函数初始化函数库。为转化一个域名到IP地址,可以调用evdns_resolve_ipv4()函数。为了执行一个反响查询,你可以调用evdns_resolve_reverse()函数。所有的这些函数,在查找时都会使用回调的方式而避免阻塞的发生。

事件驱动的HTTP服务器
libevent提供了一个简单的可以嵌入到程序中的并能处理HTTP请求的事件驱动HTTP服务器。为了使用这种能力,应在程序中包含头文件,可以通过调用evhttp_new()函数来创建一个服务器。通过evhttp_bind_socket()函数添加用于监听的地址和端口。然后可以注册一个或多个对到来请求的处理句柄。对于每一个URI可以通过evhttp_set_cb()函数指定一个回调。通常一个回调函数也可以通过evhttp_set_gencb()函数完成注册;如果没有其他的回调已经被注册得到该URI,那么这个回调将会与其关联。


以下是一个简单的示例,展示了如何使用Libevent来创建一个基本的事件循环。

1. 安装Libevent
首先需要安装Libevent。在大多数Linux发行版中可以通过包管理器来安装:
apt-get install libevent-dev

也可以从Libevent的官方网站下载源码自行编译安装。

2. 创建基本的事件循环
以下是一个简单的示例,展示了如何使用Libevent创建一个基本的事件循环,并处理I/O事件和定时器事件。
#include <event2/event.h>
#include <stdio.h>
#include <stdlib.h>

// 定时器事件回调函数
void timeout_cb(evutil_socket_t fd, short event, void *arg) {
    printf("Timeout occurred\n");
}

int main() {
    // 创建事件基础结构
    struct event_base *base = event_base_new();
    if (!base) {
        fprintf(stderr, "Could not initialize libevent!\n");
        return 1;
    }

    // 创建定时器事件
    struct event *timeout_event;
    struct timeval timeout = {2, 0}; // 2秒超时
    timeout_event = evtimer_new(base, timeout_cb, NULL);
    evtimer_add(timeout_event, &timeout);

    // 进入事件循环
    event_base_dispatch(base);

    // 释放资源
    event_free(timeout_event);
    event_base_free(base);

    return 0;
}

在这个示例中首先创建了一个事件基础结构event_base,然后创建了一个定时器事件,并将其添加到事件循环中。定时器事件的回调函数timeout_cb将在定时器超时后被调用。最后进入事件循环,等待事件发生并处理。

3. 处理I/O事件
除了定时器事件,Libevent最常用的功能就是处理I/O事件。以下是一个简单的示例,展示了如何使用Libevent处理套接字上的I/O事件。

#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <unistd.h>

// 读取数据回调函数
void read_cb(struct bufferevent *bev, void *ctx) {
    char buf[256];
    int n;
    while ((n = bufferevent_read(bev, buf, sizeof(buf))) > 0) {
        buf[n] = '\0';
        printf("Received data: %s\n", buf);
    }
}

// 错误处理回调函数
void event_cb(struct bufferevent *bev, short events, void *ctx) {
    if (events & BEV_EVENT_ERROR) {
        perror("Error from bufferevent");
    }
    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
        bufferevent_free(bev);
    }
}

// 监听套接字回调函数
void accept_cb(evutil_socket_t listener, short event, void *arg) {
    struct event_base *base = (struct event_base *)arg;
    struct sockaddr_in sin;
    socklen_t slen = sizeof(sin);
    int fd = accept(listener, (struct sockaddr *)&sin, &slen);
    if (fd < 0) {
        perror("accept");
        return;
    }

    struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(bev, read_cb, NULL, event_cb, NULL);
    bufferevent_enable(bev, EV_READ | EV_WRITE);
}

int main() {
    struct event_base *base = event_base_new();
    if (!base) {
        fprintf(stderr, "Could not initialize libevent!\n");
        return 1;
    }

    int listener = socket(AF_INET, SOCK_STREAM, 0);
    if (listener < 0) {
        perror("socket");
        return 1;
    }

    struct sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = htonl(INADDR_ANY);
    sin.sin_port = htons(9999);

    if (bind(listener, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
        perror("bind");
        return 1;
    }

    if (listen(listener, 16) < 0) {
        perror("listen");
        return 1;
    }

    evutil_make_socket_nonblocking(listener);

    struct event *listener_event = event_new(base, listener, EV_READ | EV_PERSIST, accept_cb, (void *)base);
    event_add(listener_event, NULL);

    event_base_dispatch(base);

    event_free(listener_event);
    close(listener);
    event_base_free(base);

    return 0;
}

这个示例展示了一个简单的基于Libevent的TCP服务器。服务器监听9999端口,接受客户端连接,并在接收到数据时将其打印出来。

高级用法

除了基本的事件处理,Libevent还提供了一些高级特性,使得开发者可以更灵活地使用它来构建复杂的应用程序。

1. Buffer Events
Buffer Events是Libevent提供的高级I/O抽象,用于简化异步I/O操作。与直接使用底层I/O事件不同,Buffer Events提供了一个更高层次的接口,支持自动缓冲区管理、自动重试、优雅关闭等功能。使用Buffer Events,开发者可以更方便地实现高效的异步I/O操作。

2. 多线程支持
Libevent支持多线程编程,允许多个线程同时使用同一个事件基础结构。通过使用锁和条件变量,Libevent可以在多线程环境中高效地调度事件,避免了线程之间的竞争和冲突。

3. DNS解析
Libevent内置了一个异步DNS解析器,可以方便地进行非阻塞的域名解析。使用Libevent的DNS解析器,开发者可以避免在网络通信中因DNS解析导致的阻塞,提高了程序的响应速度。

实际应用

Libevent在许多高性能网络应用中都有广泛的应用,以下是一些典型的实际应用场景,展示了它如何在不同项目中发挥其强大的功能。

1. 网络服务器
Libevent最常见的应用场景之一是构建高性能的网络服务器。通过非阻塞I/O和事件驱动机制,Libevent可以高效地处理大量并发连接,特别适合用来开发HTTP服务器、WebSocket服务器等网络应用。例如,Memcached,一个高性能的分布式缓存系统,就使用了Libevent来实现其高效的网络通信。

2. 分布式系统
在分布式系统中,高效的事件处理机制对于系统的性能和可扩展性至关重要。Libevent可以通过其事件循环机制和多线程支持,实现高效的事件调度和处理,确保系统在高负载下仍能稳定运行。例如,Tor,一个匿名网络工具,就使用了Libevent来处理其复杂的网络通信和路由事件。

3. 实时通信
实时通信应用需要高效的事件处理和低延迟的响应能力。Libevent通过其异步I/O和定时器事件支持,可以实现高效的实时通信,特别适合用来构建实时聊天系统、在线游戏服务器等应用。例如,Libevent被广泛应用于一些高性能的实时聊天系统中,通过其事件驱动机制实现高效的数据传输和低延迟的消息处理。

4. 数据流处理
在数据流处理应用中,高效的I/O处理和事件调度是关键。Libevent可以通过其Buffer Events和非阻塞I/O机制,实现高效的数据流处理,适合用来构建实时数据分析、流媒体服务器等应用。例如,Libevent被用于一些实时数据分析系统中,通过其高效的事件处理机制实现快速的数据采集和处理。

最佳实践

在实际项目中,为了充分发挥Libevent的性能和功能,开发者需要遵循一些最佳实践,以确保应用程序的高效和稳定。

1. 选择合适的后端
Libevent支持多种底层事件通知机制,如select、poll、epoll、kqueue等。在不同的操作系统和应用场景下,选择合适的后端可以显著提升性能。例如,在Linux上,epoll通常是最优的选择,因为它在处理大量并发连接时具有更高的性能。

2. 合理设置超时
在处理网络通信和定时任务时,合理设置超时参数可以有效避免资源浪费和性能瓶颈。例如,在设置定时器事件时,可以根据实际需求调整超时时间,避免过长或过短的超时设置影响事件处理效率。

3. 使用Buffer Events
对于复杂的异步I/O操作,使用Libevent提供的Buffer Events可以简化代码,提高开发效率。Buffer Events不仅支持自动缓冲区管理,还提供了更加高效的I/O操作接口,适合处理大数据量和高并发的网络通信。

4. 多线程优化
在多线程环境中使用Libevent时,合理使用锁和条件变量,避免线程之间的竞争和冲突,可以有效提升系统的并发性能。例如,在使用Libevent的多线程支持时,可以根据实际需求调整线程池大小,确保事件调度的高效和稳定。

5. 性能监控和调优
在高性能应用中,性能监控和调优是必不可少的环节。通过定期监控应用程序的性能指标,识别潜在的性能瓶颈,并进行针对性的优化,可以显著提升系统的整体性能。例如,可以使用一些性能分析工具,如gprof、Valgrind等,对应用程序进行性能分析和调优,确保Libevent在高负载下仍能稳定高效地运行。

Libevent作为一个高效的事件通知库,凭借其简单易用的API接口和强大的功能,在众多高性能网络应用中得到了广泛的应用。通过深入理解Libevent的工作原理和最佳实践,开发者可以充分发挥其性能优势,构建高效稳定的事件驱动程序。


细说libevent的原理及实现(转自Linux开发架构之路博客,感谢原作者)

Libevent已经被广泛的应用,作为底层的网络库;比如memcached、Vomit、Nylon、Netchat等等。如果不想自己操作IO事件,那么就将IO读写的操作交给libevent进行管理,让其去处理边界问题。从较高的封装层次去使用libevent,开发者只需要在libevent完成读写I/O的处理后自己仅需从缓冲区中读数据来完成事件的逻辑处理,至于边界的问题则不需要操心。

libevent封装了两个层次,一个是事件检测,一个是事件操作。事件检测是低层次的封装,由libevent完成事件的检测,然后调用者自己完成IO操作,类似于将底层的epoll,select,poll的细节隐藏掉。该层次封装了事件管理器的操作和事件本身的操作接口。

事件管理器event_base

构建事件管理器event_base_new

使用libevent 函数之前需要分配一个或者多个 event_base 结构体, 每个event_base结构体持有一个事件集合, 可以检测以确定哪个事件是激活的,event_base结构相当于epoll红黑树的树根节点, 每个event_base都有一种用于检测某种事件已经就绪的 “方法”

struct event_base *event_base_new(void);
函数说明: 获得event_base结构,当于epoll红黑树的树根节点
参数说明: 无
返回值:
成功返回event_base结构体指针;
失败返回NULL;

释放事件管理器event_base_free

void event_base_free(struct event_base *);
函数说明: 释放event_base指针

event_reinit

int event_reinit(struct event_base *base);
函数说明: 如果有子进程, 且子进程也要使用base, 则子进程需要对event_base重新初始化,
此时需要调用event_reinit函数.
函数参数: 由event_base_new返回的执行event_base结构的指针
返回值: 成功返回0, 失败返回-1
对于不同系统而言, event_base就是调用不同的多路IO接口去判断事件是否已经被激活,
对于linux系统而言, 核心调用的就是epoll, 同时支持poll和select.

event_get_supported_methods

const char **event_get_supported_methods(void);
函数说明: 获得当前系统(或者称为平台)支持的方法有哪些
参数: 无
返回值: 返回二维数组, 类似与main函数的第二个参数**argv.

event_base_get_method

const char * event_base_get_method(const struct event_base *base);
函数说明: 获得当前base节点使用的多路io方法
函数参数: event_base结构的base指针.
返回值: 获得当前base节点使用的多路io方法的指针

event_set()

void event_set(struct event *ev, int fd, short events, void (*callback)(int, short, void *), void *arg)
event_set 初始化事件event,设置回调函数和关注的事件。
参数说明:
ev:执行要初始化的event对象;
fd:该event绑定的“句柄”,对于信号事件,它就是关注的信号;
events:在该fd上关注的事件类型,它可以是EV_READ, EV_WRITE, EV_SIGNAL;
callback:这是一个函数指针,当fd上的事件event发生时,调用该函数执行处理,它有三个参数,调用时由event_base负责传入,按顺序,实际上就是event_set时的fd, event和arg;
arg:传递给callback函数指针的参数;

evtimer_set

定时事件说明:evtimer_set(&ev, timer_cb, NULL) = event_set(&ev, -1, 0, timer_cb, NULL)
由于定时事件不需要fd,并且定时事件是根据添加时(event_add)的超时值设定的,因此这里event也不需要设置。这一步相当于初始化一个event handler,在libevent中事件类型保存在event结构体中。
注意:libevent并不会管理event事件集合,这需要应用程序自行管理;

#include <event.h>
#include <stdio.h>
#include <string.h>
int main() {
   const char** p = event_get_supported_methods();
   //获取当前系统支持的方法有哪些
   int i = 0;
   while(p[i] != NULL) {
     printf("[%s] ",p[i]);
   }
   printf("\n");

   struct event_base* base = event_base_new();
   if(base == NULL) printf("event_base_new error\n");
   printf("[%s]\n",event_base_get_method(base));

  event_base_free(base);
  return 0;
}


struct event结构体分析

struct event {
  TAILQ_ENTRY (event) ev_next;
  TAILQ_ENTRY (event) ev_active_next;
  TAILQ_ENTRY (event) ev_signal_next;
  unsigned int min_heap_idx;  /* for managing timeouts 用于管理超时默认是-1 */
 
  struct event_base *ev_base; //属于哪个一event_base
 
  int ev_fd; //设置事件监听对象,也就是监听句柄
  short ev_events; //设置监听对象触发的动作:EV_READ, EV_WRITE, EV_SIGNAL,EV_TIMEOUT,EV_PERSIST  
  short ev_ncalls; //事件被调用了几次
  short *ev_pncalls;  /* Allows deletes in callback */
 
  struct timeval ev_timeout;
 
  int ev_pri;    /* smaller numbers are higher priority */
    
    //设置事件的回调函数
  void (*ev_callback)(int, short, void *arg);
 
    //设置事件的回调函数的参数
  void *ev_arg;
 
  int ev_res;    /* result passed to event callback */
  int ev_flags; //事件的状态,EVLIST_INIT,EVLIST_INTERNAL,EVLIST_ACTIVE,EVLIST_SIGNAL,EVLIST_INSERTED,EVLIST_TIMEOUT
};

事件循环event_base_dispatch和event_base_loop

libevent在event_base_new好之后, 需要等待事件的产生, 也就是等待事件被激活, 所以程序不能退出, 对于epoll来说, 我们需要自己控制循环, 而在libevent中也给我们提供了API接口, 类似where(1)的功能.

//这个函数一般不用, 而大多数都调用libevent给我们提供的另外一个API:
int event_base_loop(struct event_base *base, int flags);
函数说明: 进入循环等待事件
参数说明:
  base: 由event_base_new函数返回的指向event_base结构的指针
  flags的取值:
  #define EVLOOP_ONCE  0x01
  只触发一次, 如果事件没有被触发, 阻塞等待
  #define EVLOOP_NONBLOCK  0x02
  非阻塞方式检测事件是否被触发, 不管事件触发与否, 都会立即返回.


int event_base_dispatch(struct event_base *base);
函数说明: 进入循环等待事件
参数说明:由event_base_new函数返回的指向event_base结构的指针
调用该函数, 相当于没有设置标志位的event_base_loop。程序将会一直运行,
直到没有需要检测的事件了, 或者被结束循环的API终止。

事件循环推出event_base_loopbreak和event_base_loopexit

int event_base_loopexit(struct event_base *base, const struct timeval *tv);
int event_base_loopbreak(struct event_base *base);
struct timeval {
  long    tv_sec;                    
  long    tv_usec;            
};

两个函数的区别是如果正在执行激活事件的回调函数, 那么event_base_loopexit将在事件回调执行结束后终止循环(如果tv时间非NULL, 那么将等待tv设置的时间后立即结束循环), 而event_base_loopbreak会立即终止循环。

event_process_active
主要是处理激活队列中的数据

static void
event_process_active(struct event_base *base) {
  struct event *ev;
  struct event_list *activeq = NULL;
  int i;
  short ncalls;
 
  获得就绪链表中有就绪事件并且高优先级的表头
  for (i = 0; i < base->nactivequeues; ++i) {
    if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
      activeq = base->activequeues[i];
      break;
    }
  }
 
  assert(activeq != NULL);
 
  for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
    if (ev->ev_events & EV_PERSIST)
      event_queue_remove(base, ev, EVLIST_ACTIVE);
    else
      event_del(ev);//如果不是永久事件则需要进行一系统的删除工作,包括移除注册在事件链表的事件等
 
    /* Allows deletes to work */
    ncalls = ev->ev_ncalls;
    ev->ev_pncalls = &ncalls;
    while (ncalls) {
      ncalls--;
      ev->ev_ncalls = ncalls;
            //根据回调次数调用回调函数
      (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);
      if (event_gotsig || base->event_break)
        return;
    }
  }
}

事件对象

typedef void (*event_callback_fn)(evutil_socket_t fd, short events, void *arg);
struct event *event_new(struct event_base *base, evutil_socket_t fd,
short events, event_callback_fn cb, void *arg);
 
#define evsignal_new(b, x, cb, arg)   event_new((b), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg))
函数说明: event_new负责创建event结构指针, 同时指定对应的base(epfd), 还有对应的文件描述符, 事件, 以及回调函数和回调函数的参数。
参数说明:
  base: 对应的根节点--epfd
  fd: 要监听的文件描述符
  events:要监听的事件
    #define  EV_TIMEOUT    0x01   //超时事件
    #define  EV_READ       0x02    //读事件
    #define  EV_WRITE      0x04    //写事件
    #define  EV_SIGNAL     0x08    //信号事件
    #define  EV_PERSIST     0x10    //周期性触发
    #define  EV_ET         0x20    //边缘触发, 如果底层模型支持设置                     则有效, 若不支持则无效.
    若要想设置持续的读事件则:EV_READ | EV_PERSIST
    
  cb: 回调函数, 原型如下:
  typedef void (*event_callback_fn)(evutil_socket_t fd, short events, void *arg);
  注意: 回调函数的参数就对应于event_new函数的fd, event和arg

销毁事件对象event_free

void event_free(struct event *ev);
函数说明: 释放由event_new申请的event节点。

注册事件event_add(类似于epoll_ctl)

int event_add(struct event *ev, const struct timeval *timeout);
函数说明: 将非未决态事件转为未决态, 相当于调用epoll_ctl函数(EPOLL_CTL_ADD),
开始监听事件是否产生, 相当于epoll的上树操作.
参数说明:
  ev: 调用event_new创建的事件
  timeout: 限时等待事件的产生(定时事件使用), 也可以设置为NULL, 没有限时。

注销事件event_del(类似于epoll的del)

int event_del(struct event *ev);
函数说明: 将事件从未决态变为非未决态, 相当于epoll的下树(epoll_ctl调用EPOLL_CTL_DEL操作)操作。
参数说明: ev指的是由event_new创建的事件.

事件驱动event介绍

事件驱动是libevent的核心思想

比较重要,但是学过epoll和reactor的话,学起来还是比较简单。


主要几个状态:

无效的指针: 此时仅仅是定义了 struct event *ptr;

非未决:相当于创建了事件, 但是事件还没有处于被监听状态, 类似于我们使用epoll的时候定义了struct epoll_event ev并且对ev的两个字段进行了赋值, 但是此时尚未调用epoll_ctl对事件上树.

未决:就是对事件开始监听, 暂时未有事件产生。相当于调用epoll_ctl对要监听的事件上树, 但是没有事件产生.

激活:代表监听的事件已经产生, 这时需要处理, 相当于调用epoll_wait函数有返回, 当事件被激活以后, libevent会调用该事件对应的回调函数.

只用libevent事件检测,io操作自己来处理demo

像memcached它就是用这种层次(只使用libevent检测,io操作自己写)。我们从下面Demo中看到,使用libevent就像操作reactor一样,只需要传递回调函数,在回调函数里面去写io操作的逻辑。

#include <event.h>
#include <event2/listener.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
//为什么events是short是因为库里定义过了
// #define  EV_TIMEOUT    0x01   //超时事件
// #define  EV_READ       0x02    //读事件
// #define  EV_WRITE      0x04    //写事件
// #define  EV_SIGNAL     0x08    //信号事件
// #define  EV_PERSIST     0x10    //周期性触发
// #define  EV_ET         0x20    //边缘触发, 如果底层模型支持设置  

// int event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, event_callback_fn callback, void *arg);
// ev:要进行初始化的事件结构体指针。
// base:事件所关联的事件基础。
// fd:文件描述符或套接字,表示这个事件与哪个描述符相关联。
// events:指定事件类型,比如读取、写入等。可以使用 EV_READ、EV_WRITE 等常量。
// callback:是事件触发时调用的函数。
// arg:是传递给回调函数 callback 的可选参数。
void socket_read_cb(int fd,short events,void* arg);
void socket_accept_cb(int fd,short events,void* arg) {
  sleep(1);
  struct sockaddr_in addr;
  socklen_t len = sizeof(addr);
  int clientfd = accept(fd, (struct sockaddr *) &addr, &len);
  evutil_make_socket_nonblocking(clientfd);
  //将该fd设置成非阻塞的
 
  printf("client_fd:%d",clientfd);
 
  struct event_base* base = (struct event_base*)arg;//这个是reactor对象
  struct event *ev = event_new(NULL, -1, 0, NULL, NULL);
  //建立一个event
 
  //注册这个事件,写到base里
  event_assign(ev,base,clientfd,EV_READ | EV_PERSIST,socket_read_cb,(void*)ev);//只是写入了数据
  //这次的arg我们传的是事件结构体
 
  event_add(ev,NULL);//跟epoll_ctl一样,但是也不是很相同
 
}
 
void socket_read_cb(int fd,short events,void* arg) {
   char msg[1024];
   struct event* ev = (struct event*)arg;
   int len = read(fd,msg,sizeof(msg) - 1);
   if (len <= 0) {
        printf("client fd:%d disconnect\n", fd);
        event_free(ev);
        close(fd);
        return;
    }

   msg[len] = '\0';
   printf("recv the client msg: %s",msg);
   char reply_msg[1024] = "recvieced msg: ";
   strcat(reply_msg + strlen(reply_msg), msg);
   write(fd, reply_msg, strlen(reply_msg));
}

int main() {
   int listenfd = socket(AF_INET,SOCK_STREAM,0);
   if(listenfd  == -1) printf("socket error\n");
   struct sockaddr_in addr;
   memset(&addr, 0, sizeof(addr));
   addr.sin_family = AF_INET;
   addr.sin_port = htons(8080);
   addr.sin_addr.s_addr = htonl(INADDR_ANY);
 
   if(bind(listenfd ,(struct sockaddr*)&addr,sizeof(addr)) == -1)printf("bind error\n");
   listen(listenfd ,3);
   struct event_base* base  = event_base_new();
   printf("create base\n");
   struct event* ev_listen = event_new(base,listenfd,EV_READ | EV_PERSIST,socket_accept_cb,base);
    //返回是struct event*得到这个事件结构体的指针
    /*
    event_new 等价于
    struct event ev_listen;
    event_set(&ev_listen, listenfd, EV_READ | EV_PERSIST, socket_accept_cb, base);
    event_base_set(base, &ev_listen);
    */
   event_add(ev_listen,NULL);//相当于epoll_ctl

   event_base_dispatch(base);//循环等待事件
    return 0;
}

IO事件操作的封装与api介绍(主要是evconnlistener和bufferevent)


自带buffer的事件-bufferevent

bufferevent实际上也是一个event, 只不过比普通的event高级一些, 它的内部有两个缓冲区, 以及一个文件描述符(网络套接字)。一个网络套接字有读和写两个缓冲区, bufferevent同样也带有两个缓冲区, 还有就是libevent事件驱动的核心回调函数, 那么四个缓冲区以及触发回调的关系如下:


从图中可以得知, 一个bufferevent对应两个缓冲区, 三个回调函数, 分别是写回调, 读回调和事件回调

bufferevent有三个回调函数:

读回调– 当bufferevent将底层读缓冲区的数据读到自身的读缓冲区时触发读事件回调.

写回调– 当bufferevent将自身写缓冲的数据写到底层写缓冲区的时候触发写事件回调, 由于数据最终是写入了内核的写缓冲区中, 应用程序已经无法控制, 这个事件对于应用程序来说基本没什么用, 只是通知功能.

事件回调– 当bufferevent绑定的socket连接, 断开或者异常的时候触发事件回调.

构建bufferevent对象

struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options);
函数说明: bufferevent_socket_new 对已经存在socket创建bufferevent事件, 可用于
后面讲到的连接监听器的回调函数中.
参数说明:
  base:对应根节点
  fd:文件描述符
  options : bufferevent的选项
    BEV_OPT_CLOSE_ON_FREE  -- 释放bufferevent自动关闭底层接口
    (当bufferevent被释放以后, 文件描述符也随之被close)    
    BEV_OPT_THREADSAFE  -- 使bufferevent能够在多线程下是安全的

销毁bufferevent对象

void bufferevent_free(struct bufferevent *bufev);
函数说明: 释放bufferevent

连接操作bufferevent_socket_connect

int bufferevent_socket_connect(struct bufferevent *bev, struct sockaddr *serv, int socklen);
函数说明: 该函数封装了底层的socket与connect接口, 通过调用此函数, 可以将bufferevent事件与通信的socket进行绑定, 参数如下:
  bev – 需要提前初始化的bufferevent事件
  serv – 对端(一般指服务端)的ip地址, 端口, 协议的结构指针
  socklen – 描述serv的长度
说明: 调用此函数以后, 通信的socket与bufferevent缓冲区做了绑定, 后面调用了bufferevent_setcb函数以后, 会对bufferevent缓冲区的读写操作的事件设置回调函数, 当往缓冲区中写数据的时候会触发写回调函数, 当数据从socket的内核缓冲区读到bufferevent读缓冲区中的时候会触发读回调函数.

设置bufferevent回调与bufferevent_setcb

void bufferevent_setcb(struct bufferevent *bufev,
  bufferevent_data_cb readcb,
  bufferevent_data_cb writecb,
  bufferevent_event_cb eventcb,
  void *cbarg
);
函数说明: bufferevent_setcb用于设置bufferevent的回调函数,
readcb, writecb,eventcb分别对应了读回调, 写回调, 事件回调,
cbarg代表回调函数的参数。

回调函数的原型:

typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short what, void *ctx);
What 代表 对应的事件
BEV_EVENT_EOF--遇到文件结束指示
BEV_EVENT_ERROR--发生错误
BEV_EVENT_TIMEOUT--发生超时
BEV_EVENT_CONNECTED--请求的过程中连接已经完成

写数据到写缓冲区bufferevent_write

int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
bufferevent_write是将data的数据写到bufferevent的写缓冲区,bufferevent_write_buffer
是将数据写到写缓冲区另外一个写法, 实际上bufferevent的内部的两个缓冲区结构就是struct evbuffer。

从读缓冲区读数据bufferevent_read

size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf);
bufferevent_read 是将bufferevent的读缓冲区数据读到data中, 同时将读到的数据从
bufferevent的读缓冲清除。
bufferevent_read_buffer 将bufferevent读缓冲数据读到buf中, 接口的另外一种。

注册与注销事件类型bufferevent_enable/disable

int bufferevent_enable(struct bufferevent *bufev, short event);
int bufferevent_disable(struct bufferevent *bufev, short event);
bufferevent_enable与bufferevent_disable是设置事件是否生效, 如果设置为disable,事件回调将不会被触发。

获取读写缓冲区bufferevent_get_input和bufferevent_get_oupput

struct evbuffer *bufferevent_get_input(struct bufferevent *bufev)
struct evbuffer *bufferevent_get_output(struct bufferevent *bufev)
获取bufferevent的读缓冲区和写缓冲区

分割字符读evbuffer_readln与固定长度读evbuffer_remove

char *evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, enum evbuffer_eol_style eol_style);
int evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen);
分割字符读evbuffer_readln
固定长度读evbuffer_remove

bufferevent总结

对于bufferevent来说, 一个文件描述符, 2个缓冲区, 3个回调函数。文件描述符是用于和客户端进行通信的通信文件描述符, 并不是监听的文件描述符。

2个缓冲区是指: 一个bufferevent包括读缓冲区和写缓冲区。

3个回调函数指: 读回调函数 写回调函数 和事件回调函数。

读回调函数的触发时机:
当socket的内核socket读缓冲区中有数据的时候, bufferevent会将内核缓冲区中的数据读到自身的读缓冲区, 会触发bufferevent的读操作, 此时会调用bufferevent的读回调函数.

写回调函数的触发时机:
当往bufferevent的写缓冲区写入数据的时候, bufferevent底层会把缓冲区中的数据写入到内核的socket的写缓冲区中, 此时会触发bufferevent的写回调函数, 最后由内核的驱动程序将数据发送出去.

事件(异常)回调函数的触发时机:
客户端关闭连接或者是被信号终止进程会触发事件回调函数

连接监听器-evconnlistener

链接监听器封装了底层的socket通信相关函数, 比如socket, bind, listen, accept这几个函数。链接监听器创建后实际上相当于调用了socket, bind, listen, 此时等待新的客户端连接到来, 如果有新的客户端连接, 那么内部先进行调用accept处理, 然后调用用户指定的回调函数。可以先看看函数原型, 了解一下它是怎么运作的:

构建连接监听器evconnlistener_new_bind

struct evconnlistener *evconnlistener_new_bind(
  struct event_base *base,evconnlistener_cb cb,
  void *ptr, unsigned flags, int backlog,
  const struct sockaddr *sa, int socklen
);

函数说明:
是在当前没有套接字的情况下对链接监听器进行初始化, 看最后2个参数实际上就是bind使用的关键参数,
backlog是listen函数的关键参数(略有不同的是, 如果backlog是-1, 那么监听器会自动选择一个合适的值,
如果填0, 那么监听器会认为listen函数已经被调用过了), ptr是回调函数的参数, cb是有新连接之后的回调函数,
但是注意这个回调函数触发的时候, 链接器已经处理好新连接了, 并将与新连接通信的描述符交给回调函数。

flags 需要参考几个值:
  LEV_OPT_LEAVE_SOCKETS_BLOCKING  文件描述符为阻塞的
  LEV_OPT_CLOSE_ON_FREE  关闭时自动释放
  LEV_OPT_REUSEABLE  端口复用
  LEV_OPT_THREADSAFE  分配锁, 线程安全

struct evconnlistener *evconnlistener_new(
  struct event_base *base,
  evconnlistener_cb cb, void *ptr,
  unsigned flags, int backlog,
  evutil_socket_t fd
);

evconnlistener_new函数与前一个函数不同的地方在与后2个参数, 使用本函数时, 认为socket已经初始化好, 并且bind完成, 甚至也可以做完listen, 所以大多数时候,我们都可以使用第一个函数。

accept的回调函数evconnlistener_cb

typedef void (*evconnlistener_cb)(struct evconnlistener *evl, evutil_socket_t fd, struct sockaddr *cliaddr, int socklen, void *ptr);

回调函数fd参数是与客户端通信的描述符, 并非是等待连接的监听的那个描述符, 所以cliaddr对应的也是新连接的对端地址信息, 已经是accept处理好的。

销毁连接监听器evconnlistener_free

void evconnlistener_free(struct evconnlistener *lev);

使用libevent的事件检测与事件操作demo

#include <netinet/in.h>
#include <sys/socket.h>
#include <stdio.h>
#include <signal.h>
#include <event.h>
#include <event2/buffer.h>
#include <time.h>
#include <string.h>
#include <stdlib.h>
#include <event2/bufferevent.h>
#include <event2/listener.h>
void socket_read_callback(struct bufferevent* bev,void* arg) {
    //操作读缓冲当中过的数据,通过该函数得到读缓冲的地址
    struct evbuffer* evbuf = bufferevent_get_input(bev);
 
    char* msg = evbuffer_readln(evbuf,NULL,EVBUFFER_EOL_LF);
    //我们的bufferevent里肯定有读缓冲
 
    // 也可以直接用 bufferevent_read 读数据
    // bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
    if(!msg) return;
 
    printf("server read the data: %s\n",msg);
 
    char reply[1024] = {0};
    sprintf(reply, "recvieced msg: %s\n", msg);//echo
    //需要自己释放资源,很重要
    free(msg);
    bufferevent_write(bev,reply,strlen(reply));
}
void socket_event_callback(struct bufferevent *bev, short events, void *arg) {
     if(events & BEV_EVENT_EOF)
        printf("connection closed\n");
     else if (events & BEV_EVENT_ERROR)
        printf("some other error\n");
     else if (events & BEV_EVENT_TIMEOUT)
        printf("timeout\n");
 
    bufferevent_free(bev);
 
}
void listener_callback(struct evconnlistener *listener,evutil_socket_t fd,
                      struct sockaddr *sock,int socklen, void *arg) {
      char ip[32] = {0};
      evutil_inet_ntop(AF_INET,sock,ip,sizeof(ip) - 1);
      //该函数的作用是将网络字节序表示的 IPv4 地址转换为可读的字符串格式,并将结果存储在提供的缓冲区中。
      //这样可以方便地将 IP 地址以人可读的形式输出,比如在日志中记录连接的来源。
      printf("accept a client fd:%d ip:%s\n",fd,ip);
      //也就是说,监听到之后,触发回调,然后会自动把新连接的fd传进来吗
      //也就是相当于我们前面设置了一个监听套接字的bufferevent,当内核中的socket有链接到来的时候
      //也就是内核中的读缓冲有数据的时候,那么就会触发bufferevent回调,写到bufferevent的读缓冲区
      //然后把数据传到我们用户层,就不需要我们自己去事件处理了
 
      struct event_base* base = (struct event_base*)arg;//把reactor对象传进来了
 
      //创建一个bufferevent,构建bufferevent对象
      struct bufferevent* bev = bufferevent_socket_new(base,fd,BEV_OPT_CLOSE_ON_FREE);
      //函数说明: bufferevent_socket_new 对已经存在socket创建bufferevent事件, 可用于
      //后面讲到的连接监听器的回调函数中.我这里的fd是accept是用于和客户端交互的fd
      //所以这个fd是我们设置到bufferevent中,让bufferevent对象帮我们处理
      //选了这个选项会当bufferevent释放后,里面的fd什么的也会自动关闭和释放
      
      // 设置读、写、以及异常时的回调函数
      bufferevent_setcb(bev,socket_read_callback,NULL,socket_event_callback,NULL);
 
      bufferevent_enable(bev,EV_READ | EV_PERSIST);//注册事件
}
void stdin_callback(struct bufferevent* bev,void* arg) {
     struct evbuffer* evbuf = bufferevent_get_input(bev);
     struct event_base* base =  (struct event_base*)arg;//这个就是上下文
     char* msg = evbuffer_readln(evbuf,NULL,EVBUFFER_EOL_LF);
     if(!msg) return;
 
    if (strcmp(msg, "quit") == 0) {
        printf("safe exit!!!\n");
        event_base_loopbreak(arg);//中断事件循环
    }
 
    printf("stdio read the data: %s\n", msg);
}

void do_timer(int fd,short events,void* arg) {
    struct event *timer = (struct event *) arg;
    time_t now = time(NULL);
    printf("do_timer %s", (char *) ctime(&now));
}

void do_sig_int(int fd,short events,void* arg) {
    struct event *si = (struct event *)arg;
    event_del(si);
    printf("do_sig_int SIGINT\n");//CTRL + C
}

int main() {
    struct sockaddr_in sin;
    memset(&sin,0,sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(8088);
    sin.sin_addr.s_addr = htonl(INADDR_ANY);
    struct event_base* base = event_base_new();
    
    //链接监听器封装了底层的socket通信相关函数, 比如socket, bind, listen, accept这几个函数。
    struct evconnlistener* listener = evconnlistener_new_bind(base,listener_callback,
    base,LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,10,(struct sockaddr*)&sin,sizeof(sin));
    //: 创建一个监听器对象,该监听器将监听指定的地址和端口,
    //并在有新连接时调用 listener_callback 函数进行处理。参数说明如下:
    // base: 事件基础结构,将监听器与此关联。
    // listener_callback: 当有新连接建立时将调用的回调函数。
    // base: 将传递给回调函数的参数,这里是事件基础结构。
    // LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE: 一组标志,指定监听器的行为,包括启用地址重用和在释放时关闭底层文件描述符。
    // 10: 允许在套接字上排队等待连接的最大数量。
    // (struct sockaddr*)&sin: 要监听的地址和端口信息。
    // sizeof(sin): 提供地址结构体的大小。
    
    //对stdin的io事件进行处理
    //stdin的文件描述符是0
    struct bufferevent* ioev = bufferevent_socket_new(base,0,BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(ioev,stdin_callback,NULL,NULL,base);//设置读写事件回调
    bufferevent_enable(ioev,EV_READ | EV_PERSIST);//设置事件属性并开启
 
    //定时事件
    struct event evtimer;
    struct timeval tv = {1,0};
    event_set(&evtimer,-1,EV_PERSIST,do_timer,&evtimer);//初始化
    // &evtimer: 是一个指向 struct event 结构体的指针,表示你要设置的事件对象。
    // -1: 是事件的文件描述符,定时器事件不需要一个真实的文件描述符,因此通常设置为 -1。
    // EV_PERSIST: 是事件的标志,表示这是一个持久性事件,即它会在每次触发后自动重新添加到事件循环中,使得它可以周期性地触发。
    // do_timer: 是事件触发时执行的回调函数。
    // &evtimer: 是传递给回调函数的用户数据。
    event_base_set(base,&evtimer);
    //设置event从属的event_base,这一步相当于指明event要注册到哪个event_base实例上。
    event_add(&evtimer,&tv);
 
    //信号事件
    struct event ev_sig_int;
    event_set(&ev_sig_int,-1,EV_PERSIST,do_sig_int,&ev_sig_int);
    event_base_set(base,&ev_sig_int);
    event_add(&ev_sig_int,NULL);
 
    //开启事件主循环
    event_base_dispatch(base);
    /* 结束释放资源 */
    evconnlistener_free(listener);
    event_base_free(base);
    return 0;
}


libevent事件原理剖析

信号事件剖析(这个先不看了)

定时事件和网络事件剖析

Timer小根堆

libevent定时器的机制是最小堆+epoll_wait的机制,event_base_dispatch内部调用的是event_base_loop,我们进入主循环看看,发现它先是去最小堆找timeout参数,然后执行epoll_wait。之后再将所有的超时任务取出timeout_process放到就绪队列,我们发现现在网络事件和定时事件都被加入到就绪队列中了,然后按照优先级进行处理,调用对应的回调函数。


while (!done) {
  ......
  tv_p = &tv;
  if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
    timeout_next(base, &tv_p);  // 返回的 tv_p 即是 最小堆实现的定时器中第一个事件的剩余等待时间
  }
  ......
  clear_time_cache(base);

  res = evsel->dispatch(base, tv_p);  // 以tv_p作为 epoll_wait 的超时时间。这里相当于epoll_wait(),收集网络事件
  ......
  update_time_cache(base);  // 更新 time_cache,time_cache的作用在于不必每次都从系统调用获取时间值
  ......
  timeout_process(base);  // 将所有已超时的任务从最小堆中取出,插入到就绪队列(有优先级)收集定时事件

  if (N_ACTIVE_CALLBACKS(base)) {
    int n = event_process_active(base);  // 处理这些就绪的任务,调用其回调函数
    ......
  }
}

/* Activate every event whose timeout has elapsed. */
static void timeout_process(struct event_base *base) {
  /* Caller must hold lock. */
  struct timeval now;
  struct event *ev;
  if (min_heap_empty_(&base->timeheap)) {
    return;
  }

  gettime(base, &now);
  while ((ev = min_heap_top_(&base->timeheap))) {
    if (evutil_timercmp(&ev->ev_timeout, &now, >))  // 从堆中取出所有 ev_timeout 已达到 now 的事件
      break;
    /* delete this event from the I/O queues */
    event_del_nolock_(ev, EVENT_DEL_NOBLOCK);  // 从所在的 event_base 中删除该事件
    event_active_nolock_(ev, EV_TIMEOUT, 1);  //  激活该事件,即 插入到就绪队列
  }
}

event_active_nolock_()底层将调用event_queue_insert_active()将事件插入到event_base下的就绪队列activequeues中,这个就绪队列实际上是有nactivequeues个元素的队列数组,数组下标越小的队列优先级越高,每次我们新建一个event时默认的优先级ev_pri都是nactivequeues / 2(by default, we put new events into the middle priority),而注册事件到event_base前可以通过该函数来手动设置优先级:

/* Set's the priority of an event - if an event is already scheduled
* changing the priority is going to fail. */
int event_priority_set(struct event *ev, int pri)

读写缓冲区evbuffer的实现(重点理解)

在读写网络IO的时候是不能确保一次读取,就是一个完整的数据包。比如写入size,但是实际写入n<size,数据没有全部写出去,那剩下的数据怎么办呢?需要缓存起来等待下次写数据触发,读数据同理。所以因为这个原因,就需要设置缓冲区来解决这个问题。常用的解决方案有三种

fix buffer:char rbuf[16 * 1024 * 1024];char wbuf[16 * 1024 * 1024] ,但是这样会造成两个新的问题,1. 存在空间浪费 2. 数据移动频繁

ringbuffer:环形缓冲区,解决了数据移动频繁的问题,但是数据空间浪费的问题没有解决

libevent中的evbuffer。下面开始介绍evbuffer。

evbuffer 是 libevent 底层实现的一种链式缓冲区,当我们使用bufferevent来管理事件时,就会从每个事件的 evbuffer 中读写数据。每个 evbuffer 实质是一个缓冲区链表,其中的每个元素为 struct evbuffer_chain。一个struct evbuffer中的关键成员定义如下:
struct evbuffer {
  /** The first chain in this buffer's linked list of chains. */
  struct evbuffer_chain *first;
  /** The last chain in this buffer's linked list of chains. */
  struct evbuffer_chain *last;
  /** Pointer to the next pointer pointing at the 'last_with_data' chain. */
  struct evbuffer_chain **last_with_datap;  // 指针指向最后一个可写的 chain
  /** Total amount of bytes stored in all chains.*/
  size_t total_len;
  ...... // 以上为关键成员
}

每个evbuffer_chain的定义又如下所示:
/** A single item in an evbuffer. */
struct evbuffer_chain {
  /** points to next buffer in the chain */
  struct evbuffer_chain *next;  // 指向下一个 evbuffer_chain
 
  /** total allocation available in the buffer field. */
  size_t buffer_len;    // buffer 的长度
 
  /** unused space at the beginning of buffer or an offset into a file for sendfile buffers. */
  ev_misalign_t misalign;    // 实际数据在 buffer 中的偏移
 
  /** Offset into buffer + misalign at which to start writing.
   * In other words, the total number of bytes actually stored in buffer. */
  size_t off;    // buffer 中有效数据的末尾,接下来的数据从这个位置开始填入(该位置即 buffer + misalign + off)
 
  /** number of references to this chain */
  int refcnt;    // 这个 buffer的引用计数
 
  /** Usually points to the read-write memory belonging to this buffer allocated as part of the evbuffer_chain allocation.
   * For mmap, this can be a read-only buffer and EVBUFFER_IMMUTABLE will be set in flags.  For sendfile, it may point to NULL. */
  unsigned char *buffer;    // 指向实际数据存储的位置,这是真正的 buffer
};



misaligin是什么意思呢?是已经被读取的数据,下一段有效数据是从【buffer+misaligin,buffer+misaligin +off】这一段off的长,是我们待取的有效数据。而【buffer,buffer+misaligin 】这一段是之前就已经被读取过了,所以这里是失效的数据。所以misaligin 就解决了数据移动频繁的问题。而evbuffer_chain是链表形式,所以又解决了数据空间浪费的问题。所以说evbuffer的设计是非常巧妙的。

bufferevent_write

当调用bufferevent_write往写缓冲区写数据时,实际上是调用了evbuffer_add,在写入后libevent自动帮我们写到内核缓冲区,之后会触发写回调函数。

若这个evbuffer中没有一个 chain 可以写入数据,则需要根据写入的数据大小新申请一个 chain 挂到链表末尾,然后往这个chain中写数据,所以每个 chain 的 buffer 大小是不定的。还有更多细节内容我写到注释里面了,读者自行阅读。

int evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) {
    //...
    //如果大于限定的容量
    if (datlen > EV_SIZE_MAX - buf->total_len) {
        goto done;
    }
 
    //使chain指向之后一个链表
    if (*buf->last_with_datap == NULL) {
        chain = buf->last;
    } else {
        chain = *buf->last_with_datap;
    }
 
    //...
    //如果没有chain,那么创建一个datlen大小的返回即可
    if (chain == NULL) {
        chain = evbuffer_chain_insert_new(buf, datlen);
        if (!chain)
            goto done;
    }
 
    if ((chain->flags & EVBUFFER_IMMUTABLE) == 0) {
        //...
        //remain为当前可用剩余空间还有多少
        remain = chain->buffer_len - (size_t) chain->misalign - chain->off;
        //如果剩余空间大于需求,那么直接分配即可
        if (remain >= datlen) {
            /* there's enough space to hold all the data in the
             * current last chain */
            memcpy(chain->buffer + chain->misalign + chain->off,
                   data, datlen);
            chain->off += datlen;
            buf->total_len += datlen;
            buf->n_add_for_cb += datlen;
            goto out;
        }
            //否则看一看剩余空间+misalign是否大于需求,大于则移动off数据
        else if (!CHAIN_PINNED(chain) &&
                 //里面涉及别的一些细节,这里不展开
                 evbuffer_chain_should_realign(chain, datlen)) {
            /* we can fit the data into the misalignment */
            evbuffer_chain_align(chain);
 
            memcpy(chain->buffer + chain->off, data, datlen);
            chain->off += datlen;
            buf->total_len += datlen;
            buf->n_add_for_cb += datlen;
            goto out;
        }
    } else {
        /* we cannot write any data to the last chain */
        remain = 0;
    }
    //走到这里代表一个chain不能满足datlen,那么预分配一个tmp chain
    /* we need to add another chain */
    to_alloc = chain->buffer_len;
    if (to_alloc <= EVBUFFER_CHAIN_MAX_AUTO_SIZE / 2)
        to_alloc <<= 1;
    if (datlen > to_alloc)
        to_alloc = datlen;
    tmp = evbuffer_chain_new_membuf(to_alloc);
    if (tmp == NULL)
        goto done;
    //把当前chain给分配完
    if (remain) {
        memcpy(chain->buffer + chain->misalign + chain->off,
               data, remain);
        chain->off += remain;
        buf->total_len += remain;
        buf->n_add_for_cb += remain;
    }
    //还需要多少大小从新的tmp里面分配
    data += remain;
    datlen -= remain;
 
    memcpy(tmp->buffer, data, datlen);
    tmp->off = datlen;
    evbuffer_chain_insert(buf, tmp);
    buf->n_add_for_cb += datlen;
 
    out:
    evbuffer_invoke_callbacks_(buf);
    result = 0;
    done:
    EVBUFFER_UNLOCK(buf);
    return result;
}

bufferevent_read

bufferevent_read()底层调用evbuffer_remove,这代表它按照指定长度去读,其又调用了evbuffer_copyout_from,具体细节就不展开了,我们知道了怎么写,那么怎么读我们也就知道了。

/* Reads data from an event buffer and drains the bytes read */
int evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen) {
  ev_ssize_t n;
  EVBUFFER_LOCK(buf);
  n = evbuffer_copyout_from(buf, NULL, data_out, datlen);  // 拷贝数据
  if (n > 0) {
    if (evbuffer_drain(buf, n)<0)  // drain 就是丢弃已读走的数据,即 调整当前 chain 的 misalign 或 直接释放数据已全部读走的 chain
      n = -1;
  }
  EVBUFFER_UNLOCK(buf);
  return (int)n;
}

evbuffer的缺点

上面我们说了evbuffer的优点,那么evbuffer的缺点呢?其实也很明显,即我们的数据是存储在不连续的内存上面(例如我们读20B,结果着20B分别在两个chain里面),内存不连续会带来多次io,我们可能需要多次io才能把数据读完整。对于内存不连续的问题,Linux内核提供了一个接口,readv和writev,解决内存不连续的读写问题。

readv:将读缓冲区的数据读到不连续的内存中

writev:将不连续的内存数据写到写缓冲区

man 2 readv
# 第二个参数是数组,第三个参数是数组的长度
ssize_t readv(int fd, const struct iovec *iov, int iovcnt);
ssize_t writev(int fd, const struct iovec *iov, int iovcnt);
 
struct iovec {
    void  *iov_base;    /* Starting address 起始地址*/
    size_t iov_len;     /* Number of bytes to transfer 长度*/
};



因为我们的内核缓冲区是连续的,而我们的libevent的bufferevent的缓冲区采用的是链式缓冲区

当面我们数据大的时候,可以放好几个chain的时候,如果我们还是采用write和read的话那么势必会导致内核切换过多,因为要分配好几次到chain上面

解决方法:
采用readv和writev,将bufferevent的所有的chain地址传进去,然后在每个chain上进行分配,那么此时就只需要一次用户态和内核态的切换,大大提高了效率。

解决了网络编程中那些痛点?
高效的网络缓冲区
io函数使用与网络原理
多线程环境下,buffer加锁时,读要读出一个完整的包,写也是一样。


最新版本:2.0


官方主页:http://libevent.org/

该文章最后由 阿炯 于 2024-08-07 22:03:11 更新,目前是第 2 版。