您的位置 首页 golang

并发服务器 IO多路复用之poll 与 epoll(重点)

I/O多路复用之poll

poll函数接口

 #include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);  

参数解释:

fds是一个poll函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合.

pollfd结构体:

 struct pollfd {
int fd; //要监控的文件描述符
short events; //设置我们监控的描述符发生的事件
常见事件类型:
POLLIN 可读事件
POLLOUT 可写事件
POLLIN | POLLOUT 用按位或的方式可以表示可读可写事件
short revents; //当关心的事件发生时,返回实际发生的事件
};  

nfds:表示fds数组的长度.

 程序员需要在代码当中先定义一个事件结构数组;
struct pollfd fd_arr[10];
fd_arr[0].fd= 3; //设置文件描述符
fd_arr[0].events = POLLIN; //设置可读事件  

timeout:表示poll函数的超时时间, 单位是毫秒(ms).

 大于0 :带有超时时间的监控
等于0 :非阻塞
小于0 :阻塞  

events和revents的取值:

返回值

  • 返回值小于0, 表示出错;
  • 返回值等于0, 表示poll函数等待超时;
  • 返回值大于0, 表示poll由于监听的文件描述符就绪而返回.

poll示例: 使用poll监控标准输入

 #include <poll.h>
#include <unistd.h>
#include <stdio.h>
int main() {
struct pollfd poll_fd;
poll_fd.fd = 0;
poll_fd.events = POLLIN;//组织事件结构 ,监控标准输入的可读事件
while(1) {
int ret = poll(&poll_fd, 1, 1000);//带有超时时间的监控
if (ret < 0) {
perror("poll");
continue;
}
if (ret == 0) {
printf("poll timeout\n");
continue;
}
if (poll_fd.revents == POLLIN) { //返回就绪事件为可读,即进行IO操作读取标准输入
char buf[1024] = {0};
read(0, buf, sizeof(buf) - 1);
printf("stdin:%s", buf);
}
}
return 0;
}  

poll的优点

  • 不同于select使用三个事件结构(位图)来表示三个fdset的方式,poll使用一个pollfd事件结构的指针实现,简化了代码的编写
  • pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便.
  • poll并没有最大数量限制 (但是数量过大后性能也是会下降).

poll的缺点

  • poll跨平台移植性不如select, poll只能在linux环境下使用,
  • 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.
  • 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
  • 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.

I/O多路复用之epoll

epoll初识

按照man手册的说法: 是为处理大批量句柄而作了改进的poll.

它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)

它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法.

epoll接口使用方便: 虽然拆分成了三个系统调用函数. 但是反而使用起来更方便高效.

epoll的使用过程就是三部曲:

  • 调用epoll_create创建一个epoll句柄;
  • 调用epoll_ctl, 将要监控的文件描述符进行注册;
  • 调用epoll_wait, 等待文件描述符就绪;

epoll_create 创建epoll操作句柄

 int epoll_create(int size);
size :本来的含义是定义epoll最大能够监控的文件描述符个数
但在linux内核版本2.6.8之后.该参数size就已经被弃用了.内存现在采用的是扩容的方式
size是不可以传入负数的! !
用完之后, 必须调用close()关闭.
返回值:返回epoll操作句柄,说白了,就是操作struct eventpoll结构体的的钥匙  

从内核角度分析:此函数在内核当中创建一个结构体, struct eventpoll结构体,此结构体里有两个数据结构:红黑树,双向链表。

而红黑树,众所周知,查找效率很高,而epoll便是将要监控的描述符组织成红黑树的数据结构,这样查找有IO事件触发的的描述符时效率就比select和poll的轮询遍历快了不少(都不是一个数量级了);

而双向链表,用来保存红黑树中返回的有IO事件触发的文件描述符(就绪的文件描述符);这样从监控到最后的IO读写,时间效率大大提升,且不受要监控的描述符增多的影响。

epoll_ctl epoll的事件注册函数

 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  

它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型.

第一个参数是 epfd: epoll_create()的返回值(即epoll操作句柄).

第二个参数op(option) : 表示动作:让epoll ctl函数做什么事情,用三个宏来表示.

  • EPOLL_ CTL ADD :添加一个文件描述符对应的事件结构到红黑树当中
  • EPOLL_ CTL MOD:修改一个已经在红黑树当中的事件结构
  • EPOLL_ CTL DEL :从epoll的红黑树当中删除一个文件描述符对应的事件结构

第三个参数是fd: 告诉epoll用户关心的文件描述符

第四个参数event: 是告诉内核需要监听什么事.

类型是struct epoll_event结构体,即epoll的事件结构

struct epoll_event结构如下:

 struct epoll_event
{
uint32_t events; //用户对描述符关心的事件
epoll_data_t data; // epoll_data类型的 用户数据变量
} __EPOLL_PACKED;  

events事件可以是以下几个宏的集合:

  • EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
  • EPOLLOUT : 表示对应的文件描述符可以写;
  • EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
  • EPOLLERR : 表示对应的文件描述符发生错误;
  • EPOLLHUP : 表示对应的文件描述符被挂断;
  • EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
  • EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里.

相关视频推荐

学习地址:

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括 C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg 等),免费分享

epoll_data 联合结构体:

 typedef union epoll_data
{
void *ptr; //可以传递一些信息,当epoll监控该描述符就绪的时候,返回之后,程序也就可以拿到这些信息
int fd; //用户关心的文件描述符,可以当做文件描述符事件就绪之后,返回给程序员看的
uint32_t u32;
uint64_t u64;
} epoll_data_t;  

对于 ptr 和 fd共用一块内存,两者在使用的时候,只能任选其一:

ptr :传入一个结构体”struct my_ epoll_ data{ int fd}” ,必须在结构体当中包含一个文件描述符

fd : fd的取值为文件描述符数值

一般都是使用fd成员。

epoll_wait 监控

 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);  

epfd : epolI操作句柄

events : 分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中

—》出参,返回就绪的事件结构(每一个事件结构都对应一个文件描述符)

maxevents :maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size.即最大能够拷贝多少个事件结构,

timeout:

大于0 :带有超时时间,单位为毫秒

等于0:非阻塞.

小于0 :阻塞

返回值:

大于0 :返回就绪的文件描述符个数

等于0 :等待超时

小于0 :监控出错

epoll工作原理

当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关.

 struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/struct list_head rdlist;
....
};  

每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件.

这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度).

而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法.

这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中.

在epoll中,对于每一个事件,都会建立一个epitem结构体.

 struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}  

当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可.

如果rdlist不为空,则把发生的事件结构通过页表映射到用户态虚拟地址空间,同时将事件数量返回给用户. 这个操作的时间复杂度是O(1).

epoll的优点(和 select 的缺点对应)

  • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
  • 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中(挂载到红黑树上), 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)
  • 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响.
  • 没有数量限制: 文件描述符数目无上限.

epoll工作方式

epoll有2种工作方式: 水平触发(LT) 和 边缘触发(ET)。

假如有这样一个例子:

 我们已经把一个tcp socket添加到epoll描述符(即已就绪)
这个时候socket的另一端被写入了2KB的数据
调用epoll_wait,并且它检测到IO事件触发后会立即返回,说明它已经准备好读取操作
然后调用read读取数据, 只读取了1KB的数据(由于缓冲区里时字节流,读取的字节大小可以自己定义)
(由于没有读取完)继续调用epoll_wait......  

水平触发Level Triggered 工作模式

epoll默认状态下就是LT工作模式.

  • 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.

如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait仍然会立刻返回并通知socket读事件就绪.

  • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
  • 支持阻塞读写和非阻塞读写

即:

对于可读事件:只要接收缓冲区当中的数据大于低水位标记(1字节) ,就会一直触发可读事件就绪,直到接收缓冲区当中没有数据可读

对于可写事件: 只要发送缓冲区当中的数据空间大小大于低水位标记( 1字节), 就会一直触发可写事件就绪,直到发送缓冲区当中没有空间可写

边缘触发Edge Triggered工作模式

如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式.

当epoll检测到socket上事件就绪时, 必须立刻处理.如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候, epoll_wait 不会再返回了.

也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会,直到有新的事件就绪,才会再返回,但是由于是字节流的问题,可能一份数据要分好几次读,所以就要使用while循环来利用这一次仅有的机会,把一份数据顺利读完。

ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll.

只支持非阻塞的读写(需要使用fnctl函数设置文件描述符属性为非阻塞)

即:

对于可读事件:只有当新就绪事件到来的时候,才会一次触发可读处理。如果应用程序没有将接收缓冲区当中的数据读走或者读完,也不会在再通知;直到又来一个新就绪事件,才会触发可读事件;

对于可写事件:只有发送缓冲区剩余空间从不可写变成可写才会触发一次可写事件就绪(同上)。

select和poll其实也是工作在LT模式下. 而epoll既可以支持LT, 也可以支持ET.

epoll示例: 使用epoll监控标准输入(水平触发LT模式)

 #include <stdio.h>
#include <unistd.h>
#include <sys/epoll.h>
int main()
{
int epollfd = epoll_create(10);//创建epoll操作句柄
if(epollfd < 0)
{
perror("epoll_create");
return 0;
}
struct epoll_event ev;//组织事件结构
ev.events = EPOLLIN;
ev.data.fd = 0;
epoll_ctl(epollfd, EPOLL_CTL_ADD, 0, &ev);//将其添加进要内核监视的结构中
while(1)
{
struct epoll_event fd_arr[10];//保存就绪返回的文件描述符结构
int ret = epoll_wait(epollfd, fd_arr, sizeof(fd_arr)/sizeof(fd_arr[0]), 3000); //监控
if(ret < 0)
{
perror("epoll_wait");
return 0;
}
else if(ret == 0) //
{
printf("timeout out\n");
continue;
}
//监控返回,即有 IO事件触发,读数据
//epoll默认为水平触发,只要接收缓冲区不为空,监视函数就会一直返回,通知用户读取数据
for(int i = 0; i < ret; i++)
{
if(fd_arr[i].data.fd == 0)
{
//char buf[1024] = {0};
char buf[3]={0};//将接收缓冲区容量设置为3(模拟一次读不完缓冲区全部数据的场景),测试水平触发应接收缓冲区不为空而不断返回通知读取的工作流程
read(fd_arr[i].data.fd, buf, sizeof(buf) - 1);
printf("buf is %s\n", buf);
}
}
}
return 0;
}  

buf容量足够大时:char buf[1024] = {0};

buf容量小的不足以一次读完缓冲区里的数据时:char buf[3] = {0};

epoll示例2: 使用epoll监控标准输入(边缘触发ET模式)

对于ET模式必须利用while循环把一份数据顺利读完,那么我们怎么判断是否将一个完整的数据读完呢?

则可以根据read返回读取成功的有效字节数来判断

即:

如果判断read函数的返回值比我们准备的buf的最大接收能力(设置的缓冲区长度)还小,那就说明读完了,退出循环。

但是也要考虑一种情况:如果该数据长度是buf长度的整数倍呢,比如,四个字节的数据abcd,每次读两个字节,读两次,每次read返回值不小于buf长度,其实两次已经读完了,但是根据那一个条件无法判断是否读完接着继续循环,而再去第三次时,由于文件描述符默认为阻塞属性,而接收缓冲区为空,read就会陷入饥饿状态(即阻塞在read中,等待),所以对于ET模式循环读,要避免read的读饥饿,所以要提前设置文件描述符为非阻塞属性。因为对于非阻塞 IO 读数据, 如果接受缓冲区为空, 就会返回错误,错误码为 EAGAIN 或者 EWOULDBLOCK, 本意是需要重试,但是我们可以根据这个错误码来解决整数倍的问题,即判断若为错误码,则说明正好读完了缓冲区里的数据,跳出循环。

 #include<stdio.h>
#include<unistd.h>
#include<sys/epoll.h>
#include<fcntl.h>
#include<string>
#include<errno.h>
using namespace std;
void SetfdNoBlock(int fd)//设置非阻塞属性
{
int fl=fcntl(fd,F_GETFL);
if(fl < 0)
{
perror("fcntl");
return ;
}
fcntl(fd,F_SETFL, fl|O_NONBLOCK);
}
int main()
{
//1.将标准输入文件描述符设置为非阻塞属性(用于边缘触发ET模式只通知一次,所以必须使用循环读,来判断是否读取完 整条数据)
SetfdNoBlock(0);
//2.创建epoll结构 返回操作句柄
int epollfd=epoll_create(10);
if(epollfd < 0)
{
perror("epoll_create");
return 0;
}
//3.组织事件结构,再将其加入监视
struct epoll_event ev;
ev.data.fd=0;
ev.events=EPOLLIN |EPOLLET ; //ET模式
epoll_ctl(epollfd,EPOLL_CTL_ADD,0,&ev);
//4.监视
while(1)
{
epoll_event event_arr[10];
int ret=epoll_wait(epollfd,event_arr,10,-1);
if(ret < 0)
{
perror("epoll_wait");
continue;
}
//有IO事件触发,监视函数检测到后返回触发个数
for(int i = 0;i < ret; i++)
{
if(event_arr[i].events == EPOLLIN )
{
string read_ret;
while(1)//由于ET模式只会通知一次,所以必须加循环将缓冲区的所有数据读完
{
char buf[3]={0};
ssize_t readsize = read(0,buf,sizeof(buf)-1);
if(readsize < 0)
{
// 对于非阻塞 IO 读数据, 如果 接受缓冲区为空, 就会返回错误
// 错误码为 EAGAIN 或者 EWOULDBLOCK , 需要重试
if(errno == EAGAIN || errno == EWOULDBLOCK )
{
//说明数据正好读完,跳出循环
goto overend;
}
perror("read");
return 0;
}
read_ret+=buf;
// 如果当前读到的数据长度小于尝试读的缓冲区的长度, 就退出循环
// 这种写法其实不算特别严谨(没有考虑粘包问题)
if(readsize <(ssize_t)sizeof(buf)-1)
{
overend:
printf("stdin: %s\n",read_ret.c_str());
break;
}
}
}
}
}
return 0;
}  

基于epoll的并发TCP服务器(水平触发LT)

封装epoll操作

 #pragma once
#include "tcpclass.hpp"
#include<vector>
#include<stdio.h>
#include<unistd.h>
#include<sys/epoll.h>
class EpollSever
{
public:
EpollSever()
{
epoll_fd=-1;
}
~EpollSever()
{}
bool init_create(int size)
{
epoll_fd = epoll_create(size);//创建epoll
if(epoll_fd < 0)
{
return false;
}
return true;
}
bool Add_events(int fd)//往epoll结构里添加要监视的事件
{
struct epoll_event ev;
ev.data.fd=fd;
ev.events=EPOLLIN;
int ret= epoll_ctl(epoll_fd,EPOLL_CTL_ADD,fd,&ev);
if(ret < 0)
{
perror("epoll_ctl");
return false;
}
return true;
}
bool Del_events(int fd)//删除事件
{
int ret=epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL);
if(ret < 0)
{
perror("epoll_ctl");
return false;
}
return true;
}
bool Epoll_Listen(vector<Tcpsc>* v)//监视
{
struct epoll_event event_arr[10];
size_t ret=epoll_wait(epoll_fd,event_arr,sizeof(event_arr)/sizeof(event_arr[0]),-1);
if(ret < 0)
{
perror("epoll_wait");
return false;
}
else if(ret == 0)
{
printf("timeout!");
return false;
}
if(ret > sizeof(event_arr)/sizeof(event_arr[0])) //防止数组越界
{
ret = sizeof(event_arr)/sizeof(event_arr[0]);
}
for(size_t i= 0;i < ret; i++)//将就绪的IO事件封装到tcp类中,由那边具体使用
{
Tcpsc tc;
tc.Setfd(event_arr[i].data.fd);
v->push_back(tc);
}
return true;
}
private:
int epoll_fd;//epoll操作句柄
};  

socket操作类

 #pragma once
#include<cstdio>
#include<cstdlib>
#include<unistd.h>
#include<string>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<iostream>
#include<netinet/in.h>
#include<sys/types.h>
using namespace std;
class Tcpsc
{
public:
Tcpsc()
{
sock_=-1;
}
~Tcpsc()
{
}
//创建套接字
bool CreateSocket()
{
sock_=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(sock_ < 0)
{
perror("socket");
return false;
}
int opt=1;
setsockopt(sock_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));//地址复用
return true;
}
//绑定地址信息
bool Bind(std::string& ip,uint16_t port)
{
struct sockaddr_in addr;//组织成ipv4地址结构
addr.sin_family =AF_INET;
addr.sin_port=htons(port);
addr.sin_addr.s_addr=inet_addr(ip.c_str());
int ret=bind(sock_,(struct sockaddr*)&addr,sizeof(addr));
if(ret < 0)
{
perror("bind");
return false;
}
return true;
}
//监听
bool Listen(int backlog=5)
{
int ret=listen(sock_,backlog);
if(ret < 0)
{
perror("listen");
return false;
}
return true;
}
//accept 服务器获取连接
//bool Accept(struct sockaddr_in* peeraddr,int* newfd)
//peeraddr :出参。保存的是客户端的地址信息,newfd:出参,表示完成连接的可以进行通信的新创建出来的套接字描述符

bool Accept(struct sockaddr_in* peeraddr,Tcpsc* newsc)//这里用一个类的实例化指针,把数据传出去
{
socklen_t addrlen=sizeof(struct sockaddr_in);//记录地址信息长度
int newserverfd=accept(sock_,(struct sockaddr*)peeraddr,&addrlen);
if(newserverfd < 0)
{
perror("accept");
return false;
}
newsc->sock_=newserverfd;//传出去新创建出来的用来通信的套接字
return true;
}
//connect 客户端调用来连接服务端
bool Connect(string& ip,uint16_t port)
{
struct sockaddr_in addr;//还是先组织服务端地址信息
addr.sin_family =AF_INET;
addr.sin_port=htons(port);
addr.sin_addr.s_addr=inet_addr(ip.c_str());
int ret=connect(sock_,(struct sockaddr*)&addr,sizeof(addr));
if(ret < 0)
{
perror("connect");
return false;
}
return true;
}
//因为是已经建立连接了的,所以参数就只是数据,和已完成连接的可以进行通信的socket套接字
//发送数据
bool Send(string& data)
{
int sendsize=send(sock_,data.c_str(),data.size(),0);
if(sendsize < 0)
{
perror("sned");
return false;
}
return true;
}
//接收数据
bool Recv(string* data)//出参,保留信息
{
char buf[1024]={0};
int recvsize=recv(sock_,buf,sizeof(buf)-1,0);
if(recvsize < 0)
{
perror("recv");
return false;
}
else if(recvsize==0)//对端已关闭close
{
printf("peer is close connect");
return false;
}
(*data).assign(buf,recvsize);//赋值给传出型参数
return true;
}
//关闭套接字
void Close()
{
close(sock_);
sock_=-1;
}

int Getfd()
{
return sock_;
}
void Setfd(int fd)
{
sock_=fd;
}
private:
int sock_;
};  

客户端连接操作及收发数据

 #include"tcpclass.hpp"
int main(int argc,char* argv[])
{
if(argc!=3)
{
printf("please enter true server_ip and port!");
return 0;
}
string ip=argv[1];
uint16_t port=atoi(argv[2]);
Tcpsc sc;
if(!sc.CreateSocket())
{
return 0;
}
if(!sc.Connect(ip,port))
{
return 0;
}
//连接完成,开始收发数据
while(1)
{
//发送数据
printf("cli say:");
fflush(stdout);
string buf;
cin>>buf;
sc.Send(buf);

//接收服务端回复的数据
sc.Recv(&buf);
printf("server reply:%s\n",buf.c_str());
}
sc.Close();//其实进程结束后会自动关闭描述符的
return 0;
}  

主函数逻辑:

还是服务器端的基本逻辑 创建套接字–》绑定地址信息–》转化为监听套接字–》 加入epoll结构 –》使用epoll进行监听事件 –》返回就绪的文件描述符 –?判断是新连接还是数据到来—?若是是新连接就调用accpet函数创建新的用于通信的套接字,并将其加入epoll结构,等待事件就绪。 若是数据到来,即读取数据

 #include"epoll_lt_tcpsvr.hpp"
#define CHECK_RET(q) if(!q) {return -1;}
int main()
{
Tcpsc listen_ts;
CHECK_RET(listen_ts.CreateSocket());
string ip("0.0.0.0");
CHECK_RET(listen_ts.Bind(ip,19999));
CHECK_RET(listen_ts.Listen());
EpollSever es;
CHECK_RET(es.init_create(10));
es.Add_events(listen_ts.Getfd());//先将监控描述符添加到epoll结构中
while(1)
{
//监控
vector<Tcpsc> v;
if(!es.Epoll_Listen(&v))
{
continue;
}
//返回就绪事件,判断是新连接还是数据到来
for(size_t i = 0; i < v.size();i++)
{
if(v[i].Getfd() == listen_ts.Getfd())//是侦听套接字上的就绪事件,说明是新连接
{
//调用 accept函数创建新的套接字用于通信,并将其添加到epoll中
struct sockaddr_in peeraddr;//对端的地址信息
Tcpsc newts;//用于保存新创建出来的套接字
listen_ts.Accept(&peeraddr,&newts);
printf("新的客户端连接----->[ip]:%s,[port]:%d\n",inet_ntoa(peeraddr.sin_addr),peeraddr.sin_port);
//再将其添加进去
es.Add_events(newts.Getfd());
}
else //否则,就是新数据到来,读取操作
{
string read_data;
bool ret=v[i].Recv(&read_data);
if(!ret)
{
es.Del_events(v[i].Getfd());
v[i].Close();
}
printf("客户端向你说话:%s\n",read_data.c_str());
}
}
}
return 0;
}  

基于epoll的并发TCP服务器(边缘触发ET)

epoll功能封装

 epoll_et_tcpsvr.hpp
#pragma once
#include<stdio.h>
#include<sys/epoll.h>
#include<unistd.h>
#include<stdlib.h>
#include<vector>
#include<errno.h>
#include"tcpclass.hpp"
class EpollSvr
{
public:
EpollSvr()
{
epoll_fd=-1;
}
~EpollSvr()
{
}
bool InitSvr(int size)
{
//创建epoll 操作句柄
epoll_fd = epoll_create(size);
if(epoll_fd < 0)
{
perror("epoll_create");
return false;
}
return true;
}
//对于ET模式,还需要再添加事件结构的跟上模式的指定
bool Addevent(int fd,bool is_ET=false)
{
//组织事件结构、
struct epoll_event ev;
ev.data.fd = fd;
if(is_ET)
ev.events = EPOLLIN | EPOLLET;
else
ev.events =EPOLLIN;
//添加此事件结构到epoll中
int ret=epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd,&ev);
if(ret < 0)
{
perror("epoll_ctl");
return false;
}
return true;
}
//从epoll中删除事件
bool Delevent(int fd)
{
int ret = epoll_ctl(epoll_fd, EPOLL_CTL_DEL,fd ,NULL);
if(ret < 0)
{
perror("epoll_ctl");
return false;
}
return true;
}
//监视
bool EventListen(vector<Tcpsc>* v)
{
struct epoll_event event_arr[10];
int ret=epoll_wait(epoll_fd,event_arr,sizeof(event_arr)/sizeof(event_arr[0]),-1);
if(ret < 0)
{
perror("epoll_wait");
return false;
}
//监视返回,即有事件触发,将其包装为 Tcpsc类对象,返回给主函数判断及使用
for(int i=0;i < ret;i++)
{
Tcpsc ts;
ts.Setfd(event_arr[i].data.fd);
v->push_back(ts);
}
return true;
}
private:
int epoll_fd;//epoll操作句柄
};  

tcp服务器功能封装

相较于之前的,由于ET模式的特性加了设置非阻塞属性、非阻塞接收和非阻塞写的接口。

 #pragma once
#include<cstdio>
#include<cstdlib>
#include<unistd.h>
#include<string>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<iostream>
#include<netinet/in.h>
#include<sys/types.h>
#include<fcntl.h>
using namespace std;
class Tcpsc
{
public:
Tcpsc()
{
sock_=-1;
}
~Tcpsc()
{
}
//创建套接字
bool CreateSocket()
{
sock_=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(sock_ < 0)
{
perror("socket");
return false;
}
int opt=1;
setsockopt(sock_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));//地址复用
return true;
}
//绑定地址信息
bool Bind(std::string& ip,uint16_t port)
{
struct sockaddr_in addr;//组织成ipv4地址结构
addr.sin_family =AF_INET;
addr.sin_port=htons(port);
addr.sin_addr.s_addr=inet_addr(ip.c_str());
int ret=bind(sock_,(struct sockaddr*)&addr,sizeof(addr));
if(ret < 0)
{
perror("bind");
return false;
}
return true;
}
//监听
bool Listen(int backlog=5)
{
int ret=listen(sock_,backlog);
if(ret < 0)
{
perror("listen");
return false;
}
return true;
}
//accept 服务器获取连接
//bool Accept(struct sockaddr_in* peeraddr,int* newfd)
//peeraddr :出参。保存的是客户端的地址信息,newfd:出参,表示完成连接的可以进行通信的新创建出来的套接字描述符

bool Accept(struct sockaddr_in* peeraddr,Tcpsc* newsc)//这里用一个类的实例化指针,把数据传出去
{
socklen_t addrlen=sizeof(struct sockaddr_in);//记录地址信息长度
int newserverfd=accept(sock_,(struct sockaddr*)peeraddr,&addrlen);
if(newserverfd < 0)
{
perror("accept");
return false;
}
newsc->sock_=newserverfd;//传出去新创建出来的用来通信的套接字
return true;
}
//connect 客户端调用来连接服务端
bool Connect(string& ip,uint16_t port)
{
struct sockaddr_in addr;//还是先组织服务端地址信息
addr.sin_family =AF_INET;
addr.sin_port=htons(port);
addr.sin_addr.s_addr=inet_addr(ip.c_str());
int ret=connect(sock_,(struct sockaddr*)&addr,sizeof(addr));
if(ret < 0)
{
perror("connect");
return false;
}
return true;
}
//因为是已经建立连接了的,所以参数就只是数据,和已完成连接的可以进行通信的socket套接字
//发送数据
bool Send(string& data)
{
int sendsize=send(sock_,data.c_str(),data.size(),0);
if(sendsize < 0)
{
perror("sned");
return false;
}
return true;
}
//接收数据
bool Recv(string* data)//出参,保留信息
{
char buf[1024]={0};
int recvsize=recv(sock_,buf,sizeof(buf)-1,0);
if(recvsize < 0)
{
perror("recv");
return false;
}
else if(recvsize==0)//对端已关闭close
{
printf("peer is close connect");
return false;
}
(*data).assign(buf,recvsize);//赋值给传出型参数
return true;
}
//关闭套接字
void Close()
{
close(sock_);
sock_=-1;
}

int Getfd()
{
return sock_;
}
void Setfd(int fd)
{
sock_=fd;
}
//ET模式下的非阻塞接收和非阻塞发送
//利用while循环 ,将数据保存到出参data里
bool RecvNoBlock(string* data )
{
while(1)
{
//sockfd_ 不是侦听套接字,而是已连接的用于通信的套接字描述符了
//sockfd_ 已经被设置加上了非阻塞属性,所以在判断返回值时候,需要注意 缓冲区为空(正好被接收完)的时候,recv函数返回有EAGAIN或者EWOULDBLOCK的情况产生
char buf[3]={0};
ssize_t readsize = recv(sock_,buf,sizeof(buf)-1,0);
if(readsize < 0)
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
{
break;
}
perror("recv");
return false;

}
else if(readsize == 0)
{
printf("对端关闭了连接!");
return false;
}
*data += buf;
if(readsize < (ssize_t)sizeof(buf)-1)
{
break;
}
}
return true;
}

//非阻塞发送
bool SendNoBlock(string& buf)//将传进来的数据发送出去
{
//使用指针和字节数 来确保数据全部发送完
ssize_t pos=0; //记录当前写到的位置
ssize_t lensize = buf.size(); //记录剩余字节数
while(1)
{
//对于非阻塞IO写入,如果tcp的发送缓冲区已经满了,则写操作也会返回 错误码提示
ssize_t sendsize = send(sock_, buf.data()+pos,lensize, 0);
if(sendsize < 0 )
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
{
//即使发送缓冲区满了,可能也没有把所有数据全部写入,所以继续重新写入
continue;
}
perror("send");
return false;
}
//更新指针位置 和 剩余字节数 即加减实际发送字节数
pos += sendsize;
lensize -= sendsize;
//推出条件,即真正写完了
if(lensize <= 0)
{
break;
}
}
return true;
}
//将文件描述符设置为非阻塞属性 ET模式下
void SetNoBlock()
{
int fl =fcntl(sock_,F_GETFL);
if(fl < 0)
{
perror("fcntl");
return ;
}
fcntl(sock_,F_SETFL,fl| O_NONBLOCK);
}
private:
int sock_;
};  

简单的客户端逻辑:

 #include"tcpclass.hpp"
int main(int argc,char* argv[])
{
if(argc!=3)
{
printf("please enter true server_ip and port!");
return 0;
}
string ip=argv[1];
uint16_t port=atoi(argv[2]);
Tcpsc sc;
if(!sc.CreateSocket())
{
return 0;
}
if(!sc.Connect(ip,port))
{
return 0;
}
//连接完成,开始收发数据
while(1)
{
//发送数据
printf("i am client, say:");
fflush(stdout);
string buf;
cin>>buf;
sc.Send(buf);

//接收服务端回复的数据
sc.Recv(&buf);
printf("peer server reply:%s\n",buf.c_str());
}
sc.Close();//其实进程结束后会自动关闭描述符的
return 0;
}  

main.cpp逻辑:

还是服务器端的基本逻辑 创建套接字–》绑定地址信息–》转化为监听套接字–》 加入epoll结构 –》使用epoll进行监听事件 –》返回就绪的文件描述符 –?判断是新连接还是数据到来—?若是是新连接就调用accpet函数创建新的用于通信的套接字,先设置为非阻塞模式,并将其加入epoll结构,等待事件就绪。 若是数据到来,即读取数据(ET模式下的非阻塞循环读(RecvNoBlock接口))

 #include"epoll_et_tcpsvr.hpp"
#define CHECK_RET(q) if(!q) {return -1;}
int main()
{


Tcpsc listen_ts;
CHECK_RET(listen_ts.CreateSocket());
string ip("0.0.0.0");
CHECK_RET(listen_ts.Bind(ip,19999));
CHECK_RET(listen_ts.Listen());
EpollSvr es;
es.InitSvr(10);//创建epoll操作句柄
es.Addevent(listen_ts.Getfd());//先将监控描述符添加到epoll结构中 ,并设置为ET边缘触发
while(1)
{
//监控
vector<Tcpsc> v;
if(!es.EventListen(&v))
{
continue;
}
//返回就绪事件,判断是新连接还是数据到来
for(size_t i = 0; i < v.size();i++)
{
if(v[i].Getfd() == listen_ts.Getfd())//是侦听套接字上的就绪事件,说明是新连接
{
//调用 accept函数创建新的套接字用于通信,并将其添加到epoll中
struct sockaddr_in peeraddr;//对端的地址信息
Tcpsc newts;//用于保存新创建出来的套接字
listen_ts.Accept(&peeraddr,&newts);
printf("新的客户端连接----->[ip]:%s,[port]:%d\n",inet_ntoa(peeraddr.sin_addr),peeraddr.sin_port);
newts.SetNoBlock();//设置为非阻塞属性
//再将其添加进去
es.Addevent(newts.Getfd(),true);
}
else //否则,就是新数据到来,读取操作,为ET模式时,要调用非阻塞recv方法
{
string read_data;
bool ret=v[i].RecvNoBlock(&read_data);
if(!ret)
{
es.Delevent(v[i].Getfd());
v[i].Close();
continue;
}
printf("客户端向你说话----》:%s\n",read_data.c_str());

read_data.clear();
read_data.assign("OK!本服务器已收到!");
v[i].SendNoBlock(read_data);
}
}
}
return 0;
}  

两个客户端示例:

tcp服务端的业务处理:

基于IO多路复用的tcp服务器的优点:

  • 一个基于I/O多路复用的事件驱动服务器是运行在单一进程上下文中的,因此每个逻辑流都能访问该进程的全部地址空间。这使得在流之间共享数据变得很容易。一个与作为单个进程运行相关的优点是,你可以利用熟悉的调试工具,例如GDB,来调试你的并发服务器,就像对顺序程序那样。
  • 并且事件驱动设计常常比基于进程的设计要高效得多,因为它们不需要进程上下文切换来调度新的流。

文章来源:智云一二三科技

文章标题:并发服务器 IO多路复用之poll 与 epoll(重点)

文章地址:https://www.zhihuclub.com/100583.shtml

关于作者: 智云科技

热门文章

网站地图