您的位置 首页 golang

网络编程实战-彻底解决面试C10k问题,高并发服务器,IO多路复用

IO多路复用的理解

提出问题: 在进行socket套接字编程中, 很多方法都是阻塞式的, 需要等待IO事件的到来, 像是accept 这是一个典型的阻塞函数, 如果没有IO到来. 你却把整个应用程序给阻塞起来等待IO的到来, 这就是对于CPU一个极大的浪费

解决办法: IO多路复用, 对于整个应用程序中所有IO事件是否发生的一种监视, 多路IO地理解, 就是可以同时监视多个文件地IO事件地发生, 复用就是复用主线程或者说是一个进程, 也就是在一个进程中处理多个IO事件。。。。

eg : 我们可以同时监视标准输入和socket IO事件地发生, 正常来说一旦直接进行accept挂起等待socket IO事件,就无法接收其他地IO事件了, 但是IO多路复用可以使得一个进程既可以监视socket IO 也可监视 标准输入了

上述说法是借鉴于盛延敏地网络编程实战地说法,极客时间中

多路复用多路转接简单理解其实就是在一个进程中同时处理多个IO事件,或者说处理一个fds集合,处理多个IO的方式存在如下两种

  • 轮询式解决:select和poll的本质都是一次并发处理多个IO事件,但是IO事件的解决上还是采取的轮询遍历对应的 fds集合的方式进行的处理, 因为轮询, 所以效率上不算很好, O(n)…看似是一次解决了多个IO事件, 其实本质上不过是在一段时间内轮询处理了多个IO事件, 之所以我们看起来像是同时并行处理似的, 是因为处理时间及短, 所以看起来就像同时处理一样.
  • 中断式解决: 和上述的select和poll相比epoll采取提前注册监视事件的方式, 不需要再进行轮询,来了IO事件就会提示,执行相应的处理

IO事件地真正监视者其实是操作系统内核, 所以我们地这些系统调用select poll 还有 epoll其实都是需要进入内核态,然后由内核检测IO事件是否发生地.

图解多种类型IO 在从数据报地准备层次上再去理解一下IO多路复用, IO多路转接

阻塞式IO请求调用函数,如果IO没有到位会一直阻塞应用程序等待IO数据报满足再拷贝回到用户空间进行处理

非阻塞IO请求,不会阻塞等待IO,而是直接返回采取定期轮询地方式来不停调用非阻塞函数, 直到数据报准备好拷贝回到用户空间进行处理,在没有准备好之前地调用返回地同时会设置errno = EAGAIN或说EWOULDBLOCK

上述地select 或者poll epoll 同时监视多个IO, 只要存在满足就触发对应地IO事件发生..

IO多路转接: 虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态

IO多路复用的发展

select

函数刨析详解:

上述集合地底层其实是一个位图, 加入到集合中也就是将对应位置为1, 从集合中清除, 也就是将对应位置为0. 位图就是一堆二进制位, 如果这个事件被触发了就对应地会被内核对应地在位图的相应位置上标记为1. 使用位图中的对应位标识监视的文件描述符.

  • FD_ISSET用来判断fd是否在集合中, 其实就是判断fd对应位上是不是1, 如果是1 说明这个fd对应的IO事件被触发了。。。
  • return val : 成功返回文件描述符中状态发生变化的个数, 失败返回-1
 //常见程序片段
fs_set readset;
//定义监视文件描述符集合
FD_SET(fd,&readset);
//添加需要监视事件
select(fd+1,&readset,NULL,NULL,NULL);
//阻塞监视所有IO(内核监视修改rfds)
if(FD_ISSET(fd,readset)){……}  

小案例测试1: 先简单地监视一下标准输入…

 
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/select.h>
#include <stdlib.h>
 
#define ERR_EXIT(m) \
	do { perror(m); close(EXIT_FAILURE); } while(0)
 
int main() {
	//第一步定义读事件描述符集合
	fd_set rfds;
	int ret;
	FD_ZERO(&rfds);//清零rfs集合
	FD_SET(0, &rfds);//添加标准输入的监视
	ret = select(1, &rfds, NULL, NULL, NULL); //最大的监视描述符 + 1 监视读取集合, 写集合 异常集合, 设置延迟
	if (ret == -1) {
		ERR_EXIT("select()");
	} else if (ret) {
		char buff[100] = {0};
		read(0, buff, sizeof(buff) - 1);
		printf("Data is available now.\n");//标准输入读取有动静了, 数据准备好了已经
		printf("input: %s\n", buff);
	} else {
		printf("No Data\n");
	}
	return 0;
}  

相关视频推荐

学习地址:

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括 C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg 等),免费分享

案例2: 实现一下基于select地并发服务器.. 服务器功能, 将客户端传过来的小写字符转成大写输出。 服务端代码如下:

 #include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/select.h>
#include <set>
#include <sys/socket.h>
#include <strings.h>
#include <ctype.h>
#include <arpa/inet.h>
 
using namespace std;
 
#define ERR_EXIT(m) \
	do { perror(m); close(EXIT_FAILURE); } while(0)
typedef struct sockaddr SA;
const int BUFFSIZE = 1024;
 
int CreateSocket() {
	struct sockaddr_in serveAdd;
	int listenfd;
	if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
		ERR_EXIT("socket");
	}
	int reuse = 1;
	//设置允许冲用本地端口和地址
	if (setsockopt(listenfd, SOL_SOCKET,  SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
		ERR_EXIT("setsockopt");
	}
	bzero(&serveAdd, sizeof(serveAdd));
	serveAdd.sin_family = AF_INET;
	serveAdd.sin_port = htons(12345);
	serveAdd.sin_addr.s_addr = htonl(INADDR_ANY);
	if (bind(listenfd, (SA*)&serveAdd, sizeof(serveAdd)) == -1) {
		ERR_EXIT("bind");
	}
	if (listen(listenfd, 5) == -1) {
		ERR_EXIT("listen");
	}
	return listenfd;
}
 
int main() {
	fd_set rfds;//定义读事件描述符集合
	char buff[BUFFSIZE] = {0};
 
	int listenfd = CreateSocket();
 
	set<int> fdset;//保存所有文件描述符.
	fdset.insert(listenfd);
	while (1) {
		//因为select内核每一次会对于rfds做出改变, 所以每一次都需要重新设置rfds
		FD_ZERO(&rfds);
		for (int fd : fdset) {
			FD_SET(fd, &rfds);
		}
		int ret = select(*fdset.rbegin() + 1, &rfds, NULL, NULL, NULL);
		if (ret > 0) {
			for (int fd : fdset) {
				if (fd == listenfd && FD_ISSET(fd, &rfds)) {
					//来了新的连接
					struct sockaddr_in clientAdd;
					socklen_t clientLen = sizeof(clientAdd);
					int connfd = accept(fd, (SA*)&clientAdd, &clientLen);
					if (connfd == -1) {
						ERR_EXIT("accept");
					}
					//将新的文件描述符加入到描述符集合和读事件描述符集合中
					fdset.insert(connfd);
					FD_SET(connfd, &rfds);
					printf("create connection fd: %d\n", connfd);
				} else if (FD_ISSET(fd, &rfds)) {
					//说明是客户端发送的IO来了
					int read_size = recv(fd, buff, sizeof(buff) - 1, 0);
					if (read_size < 0) {
						ERR_EXIT("recv");
					} 
					if (read_size == 0) {//客户端断开连接了
						FD_CLR(fd, &rfds);
						fdset.erase(fd);
						printf("delete connection fd: %d\n", fd);
						close(fd);//关闭连接
					} else {//正常收到数据
						for (char& ch : buff) {
							ch = toupper(ch);
						}
						send(fd, buff, read_size, 0);
					}
				}
			}
		} else {
			ERR_EXIT("select");
		}
	}
	return 0;
}  

select小结:

select实现多路复用存在两次描述符集合的遍历, 第一次是调用select将描述符拷贝进入到内核中让内核进行检测IO事件的发生, 内核检测的方式就是遍历一遍描述符集合, 然后修改描述符集合中的发生IO状态变化的文件描述符的对应位. 将对应位图中的位标记为1, 表示IO事件触发,然后select函数返回, 在用户态, 还是需要一次遍历检测被触发的IO事件然后执行对应的IO处理代码块…

所以使用select会需要两次的文件描述符集合的遍历, 还会存在两次拷贝, 用户拷贝到内核空间,内核拷贝会用户态, 存在效率上的不足, 而且文件描述符数量还存在限制, 一般是默认最多1024个

对于需要检测监视的IO事件对应的所有描述符我们需要额外进行存储

原因1 : 每一次调用select之后内核都会对于fds进行一个修改, 所以每一次从新调用select监视之前我们都需要重新向fds中加入所有需要监视的文件描述符, 所以需要一个容器存储需要监视的文件描述符号

原因2 : 我们需要获取需要监视的所有描述符数量, 就是maxFd + 1, 故而也需要存在所有的fd, 为了更快的获取maxFd 我采取了使用set容器存储所有的fd, 底层是一颗红黑树, 获取maxFd的效率高,直接就是 *fdset.rbegin(); 当然使用数组存储也是OK的

编程模型如下:

  1. 创建fd_set , 和一个存储所有描述符号的容器
  2. 初始化最初需要监视的文件描述符, 加入到fd_set中同步添加到容器中
  3. 循环不断地进行 r = select();
  4. 根据select地返回值 r 来判断是否存在事件地触发

if ( r < 0) { 出错了;}

else if (r == 0) {设置了延时, 轮询时间,在延时期间没有IO时间被触发 }

else {判断处理触发地IO事件, 不同地IO事件存在不同地处理方式 (判断处理) }

poll

函数刨析

不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现.

  • timeout设置为-1 就会永久阻塞, 直到有事件发生, 设置为0 是立刻返回
  • 将IO检测的结果放在revents中, 所以我们判断是否触发IO事件的时候, 是通过&revents判断的

事件判断地代码片段:

 if (fds[i].revents & POLLIN) {
//通过&地方式判断事件地类型,读写还是其他的紧急呀啥的
}  

小小案例1: 使用poll监视标准输入

 
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <poll.h>
#include <string.h>
 
int fdNum = 0;//存储当前描述符数目, 方便遍历
int main() {
	//1.创建pollfd结构体数组
	struct pollfd fds[100]; 
	//2.设置需要监视的描述符
	fds[0].fd = 0;//监视标准输入
	fds[0].events = POLLIN;
	fdNum++;
	char buff[256] = {0};
	//3.开始监视
	while (1) {
		int r = poll(fds, fdNum, 1000);//0表示一直阻塞,而不是定时轮询
		if (r) {
			//说明存在事件触发
			if (fds[0].revents & POLLIN) {
				memset(buff, 0, sizeof(buff));
				int n = read(0, buff, sizeof(buff) - 1);
				buff[n] = 0;
				printf("检测到标准输入: %s\n", buff);
			}
    } else if (r == 0) {
      printf("time out\n");//超时
      continue;
    } else {
			printf("No Data\n");
		}
	}
	return 0;
}  

案例2: 基于poll实现一个简易的服务端, 还是老规矩将客户端传过来的字符转成大写写回.

 #include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <poll.h>
#include <map>
#include <string>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <ctype.h>
#include <strings.h>
#include <fcntl.h>
#include <string.h>
 
using namespace std;
 
#define ERR_EXIT(m) \
	do { perror(m); close(EXIT_FAILURE); } while (0)
typedef struct sockaddr SA;
#define BUFSIZE 256
#define CLIENTSIZE 100
#define SERVE_PORT 12345 
 
 
int CreateSocket() {
	int listenfd;
	struct sockaddr_in serveAdd;
	//创建套接字
	if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
		ERR_EXIT("socket");
	}
	int reuseaddr = 1;
 
	//设置可以复用端口地址
	if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) {
		ERR_EXIT("setsockopt");
	}
 
	//设置协议地址簇
	bzero(&serveAdd, sizeof(serveAdd));
	serveAdd.sin_family = AF_INET;
	serveAdd.sin_port = htons(SERVE_PORT);
	serveAdd.sin_addr.s_addr = htonl(INADDR_ANY);
	//绑定本地端口地址
	if (bind(listenfd, (SA*)&serveAdd, sizeof(serveAdd)) == -1) {
		ERR_EXIT("bind");
	}
	//监听
	if (listen(listenfd, 5) == -1) {
		ERR_EXIT("listen");
	}
	return listenfd;
}
//设置非阻塞
int setnoblock(int fd) {
	int oldflag = fcntl(fd, F_GETFL);//获取flag
	fcntl(fd, F_SETFL, oldflag | O_NONBLOCK);
	return oldflag;
}
 
 
 
int main() {
 
	struct pollfd fds[CLIENTSIZE];//定义存储事件的poll集合
	int listenfd = CreateSocket();
	char buf[BUFSIZE] = {0};//存储中间数据
	map<int, string > fddata;//存储<fd, data>键值对
	//每一个fd都有自己的收发数据, 因为收发数据是分开处理的
	fds[0].fd = listenfd;
	fds[0].events = POLLIN | POLLERR;
	fds[0].revents = 0;
	int count = 1;//监视fd事件的数目
	while (1) {
		int ret = poll(fds, count, -1);//-1代表一直阻塞,直到事件到来
		if (ret < 0) {
			ERR_EXIT("poll");
		} 
		//至此, 说明有IO事件满足处理, 遍历寻找满足事件
		for (int i = 0; i < count; ++i) {
			//客户端关闭, 或者出错
			if ((fds[i].revents & POLLRDHUP) || (fds[i].revents & POLLERR)) {
				int fd = fds[i].fd;//提取fd
				fds[i] = fds[count - 1];//使用末尾pollfd覆盖
				i -= 1;//此次i删除, 覆盖过来的末尾事件需要重新检测
				count -= 1;
				fddata.erase(fd);//从mp中删除
				close(fd);
				printf("%d delete connection\n", fd);
			} else if ((fds[i].revents & POLLIN) && (fds[i].fd == listenfd)) {
				//建立新的链接
				struct sockaddr_in clientAdd;
				socklen_t clientLen = sizeof(clientAdd);
				int connfd = accept(fds[i].fd, (SA*)&clientAdd, &clientLen);
				char dst[256] = {0};
				printf ("Get connection %d from %s:%d\n ", connfd, 
					inet_ntop(AF_INET, &clientAdd.sin_addr, dst, sizeof(dst))
					, ntohs(clientAdd.sin_port));
				setnoblock(connfd);//设置非阻塞
				fds[count].fd = connfd;
				fds[count].events = POLLIN | POLLRDHUP | POLLERR;
				fds[count++].revents = 0;
			} else if (fds[i].revents & POLLIN) {
				//有可读事件
				memset(buf, 0, BUFSIZE);
				int read_size = recv(fds[i].fd, buf, BUFSIZE - 1, 0);
				if (read_size < 0) {
					ERR_EXIT("recv");
				}
				if (read_size == 0) {
        			//客户端断开此次连接
        			printf("EXIT\n");
				} else {
					fddata[fds[i].fd] = buf;//数据进行拷贝进去,拷贝构造
					fds[i].events |= POLLOUT;
					fds[i].events &= (~POLLIN);
				}
			} else if (fds[i].revents & POLLOUT) {
				int fd = fds[i].fd;
				//有可写事件
				memset(buf, 0, BUFSIZE);
				strcpy(buf, fddata[fd].c_str());//拿到数据
				int n = fddata[fd].size();
				buf[n] = 0;
				for (int i = 0; i < n; ++i) {
					buf[i] = toupper(buf[i]);
				}
				//写回,同时设置成为读事件
				send(fd, buf, n, 0); 
				fds[i].events |= POLLIN;
				fds[i].events &= (~POLLOUT);
			}
		}
	}
	return 0;
}  

poll小结

优势

  • poll对比select不再使用BitMap来存储关注的文件描述符事件了,
  • pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比 select更方便.
  • poll并没有最大数量限制(相较select打破了文件描述符个数上的限制) (但是数量过大后性能也是会下降, 所以在fd数目很大的情况下也不适合).
  • poll 没有最大数量, 不像select的 fd_set在一开始是设定好了, 上限1024个最多. poll上线取决于pollfd结构体数组的大小, 我们可以使用动态数组或者从堆区new一个足够大的数组

劣势

  • 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符. (所以还是会存在循环遍历找到可读可写的文件描述符, 所以效率依然不算很高),时间复杂度上面其实和select差不多, 也还是会存在内核态和用户态之间的相互拷贝文件描述符集合.,只是形式上不一样罢了. 一个用bitmap 一个 用结构体链式.
  • 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.

所以综上poll也还是不是解决C10k问题的利器

poll编程模型

  1. 创建pollfd结构体数组, 并且创建一个 map 映射fd和对应存储数据的string
  2. init 初始化fds[0] 并且定义一个count记录需要监视的事件数目.
  3. 循环不断的进行 r = poll();
  4. 循环遍历fds监视IO事件并且 通过判断 fds[i].revents & 各种宏的结果来判断IO事件类型进行不同的操作

epoll

搞定C10k问题的神器, 完全没有像上述的select 和 poll的随着fd的数量稍稍一大就会及其拉跨, 性能猛地就掉下来了,如下图分析

  • 正是由于epoll的稳定, 所以奠定了它在高并发服务器,解决C10k类型问题中的霸主地位.
  • 我们还是先图解分析一下epoll的函数, 然后再从底层数据结构康康为啥它相比select和poll实现了质的飞跃.

上述的epoll_wait 使用起来其实是和之前的select还有poll是一个道理, 监视IO事件触发, 通过返回值来执行触发的事件. 不过还是有差别, 它的return val == 触发的IO事件的个数, 我们遍历处理事件的时候不需要再遍历所有的监视IO事件, 只需要直接遍历它的返回值处理就行, 这个就先减小了遍历的压力

epoll_ctl先提前注册号需要监视的事件,然后调用epoll_wait阻塞获取IO触发的事件, 在使用epoll_wait的时候不需要像使用select + poll那样需要每次都重新拷贝需要监视的IO事件到内核中进行检测,而是进行了分离, 提前用epoll_ctl注册好需要监视的事件,然后epoll_wait获取触发的IO事件

小案例1:监视标准输入:

 #include <sys/epoll.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
 
#define ERR_EXIT(m)\
	do { perror(m); exit(EXIT_FAILURE); } while(0)
 
int main() {
	int epollfd = epoll_create(5);//创建epoll句柄对象 
	//注册监视事件
	struct epoll_event ev;
    ev.data.fd = 0;
	ev.events = EPOLLIN;
	if (epoll_ctl(epollfd, EPOLL_CTL_ADD, 0, &ev) == -1) {
		ERR_EXIT("epoll_ctl");
	}
	while (1) {
		struct epoll_event evs[3];//存储返回的IO触发事件
		//循环获取返回的IO触发事件
		int r = epoll_wait(epollfd, evs, 3, -1);
		if (r < 0) {
			ERR_EXIT("epoll_wait");
		} else {
			//说明存在IO事件被触发了,循环遍历处理
			for (int i = 0; i < r; ++i) {
				if (evs[i].events & EPOLLIN) {
					char buff[256] = {0};
					read(0, buff, sizeof(buff) - 1);
					printf("input:%s", buff);
				}
			}
 
		}
	}
	return 0;
}  

基于epoll的服务器实现 (还是简单的将小写字符转成大写然后写回客户端就OK了)其实写起来和poll差不大多:

 
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <map>
#include <string>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <ctype.h>
#include <strings.h>
#include <fcntl.h>
#include <string.h>
using namespace std;
 
#define ERR_EXIT(m) \
	do { perror(m); close(EXIT_FAILURE); } while (0)
 
typedef struct sockaddr SA;
#define BUFSIZE 256
#define CLIENTSIZE 1000
#define SERVE_PORT 12345 
 
int CreateSocket() {
	int listenfd;
	struct sockaddr_in serveAdd;
	//创建套接字
	if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
		ERR_EXIT("socket");
	}
	int reuseaddr = 1;
 
	//设置可以复用端口地址
	if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) {
		ERR_EXIT("setsockopt");
	}
 
	//设置协议地址簇
	bzero(&serveAdd, sizeof(serveAdd));
	serveAdd.sin_family = AF_INET;
	serveAdd.sin_port = htons(SERVE_PORT);
	serveAdd.sin_addr.s_addr = htonl(INADDR_ANY);
	//绑定本地端口地址
	if (bind(listenfd, (SA*)&serveAdd, sizeof(serveAdd)) == -1) {
		ERR_EXIT("bind");
	}
	//监听
	if (listen(listenfd, 5) == -1) {
		ERR_EXIT("listen");
	}
	return listenfd;
}
 
int setnoblock(int fd) {
	int oldfl = fcntl(fd, F_GETFL);
  fcntl(fd, F_SETFL, oldfl | O_NONBLOCK);
	return oldfl;
}
 
//添加监视IO事件                                                                                                                                                       
void addfd(int epfd, int fd) {
	struct epoll_event ev;
	ev.data.fd = fd;
	ev.events = EPOLLIN | EPOLLET | EPOLLERR;//EPOLLET边缘触发
	if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == -1) {
		ERR_EXIT("epoll_ctl");
	}
	setnoblock(fd);//设置非阻塞, 非阻塞IO + IO多路复用技术结合(提高性能)
}
//删除监视IO事件
void delfd(int epfd, int fd) {
	struct epoll_event ev;
	ev.data.fd = fd;
	if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev) == -1) {
		ERR_EXIT("epoll_ctl");
	}
}
 
int main() {
	int listenfd = CreateSocket();
	if (listenfd == -1) {
 
	}
	int epfd = epoll_create(CLIENTSIZE);//创建epoll对象句柄
	if (epfd == -1) {
		ERR_EXIT("epoll_create");
	}
	char buff[BUFSIZE];//中间存储读取到数据的容器
	map<int, string > fddata;//映射fd, data 键值对
	struct epoll_event evs[CLIENTSIZE]; //存储返回的IO事件
	addfd(epfd, listenfd);		  
	while (1) {
		int r = epoll_wait(epfd, evs, CLIENTSIZE, -1);
		for (int i = 0; i < r; ++i) {
			if (evs[i].events & EPOLLERR) {
				//出错断开删除连接删除监视
				delfd(epfd, evs[i].data.fd);
				printf("fd: %d Erron\n", evs[i].data.fd);
				close(evs[i].data.fd);
				fddata.erase(evs[i].data.fd);
			} else if ((evs[i].events & EPOLLIN) && evs[i].data.fd == listenfd) {
				//新的连接到来
				char dst[256];
				struct sockaddr_in clientAdd;
				socklen_t clientLen;
				int connfd = accept(listenfd, (SA*)&clientAdd, &clientLen);
				if (connfd == -1) ERR_EXIT("accept");
				printf("%d Create connection and %s:%d \n", connfd
					, inet_ntop(AF_INET, &clientAdd.sin_addr, dst, sizeof(dst))
					, ntohs(clientAdd.sin_port));
				addfd(epfd, connfd);
			} else if (evs[i].events & EPOLLIN) {
				int fd = evs[i].data.fd;
				//客户端来数据了
				memset(buff, 0, BUFSIZE);
				int read_size;
				if ((read_size = recv(fd, buff, BUFSIZE - 1, 0)) == -1 ) {
					ERR_EXIT("recv");
				} else if (read_size == 0) {
					printf("fd: %d delete connection\n", fd);
					delfd(epfd, fd);//客户端断开了连接了
					fddata.erase(fd);
					close(fd);
				} else {
					fddata[fd] = buff;//会自动调用对应的拷贝构造函数
					evs[i].events |= EPOLLOUT;
					evs[i].events &= (~EPOLLIN);
					if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &evs[i]) == -1) {
						ERR_EXIT("epoll_ctl");
					}					
				}
			} else if (evs[i].events & EPOLLOUT) {
				int fd = evs[i].data.fd;
				//有数据可以向客户端写入了
				memset(buff, 0, BUFSIZE);
				strcpy(buff, fddata[fd].c_str());
				//转成大写
				for (int i = 0; i < fddata[fd].size(); ++i) {
					buff[i] = toupper(buff[i]);
				}
				send(fd, buff, fddata[fd].size(), 0);
				//然后修改监视事件
				evs[i].events &= (~EPOLLOUT);
				evs[i].events |= EPOLLIN;
				if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &evs[i]) == -1) {
					ERR_EXIT("epoll_ctl");
				}
			}
		}
	}
	close(listenfd);
	close(epfd);
	return 0;
}  

Epoll小结

  • epoll在内核里面使用了红黑树来监视所有待检测的文件描述符, 把需要监视的socket全部通过epoll_ctl函数加到红黑树中, 而且这个红黑树是处在内核中的, 所有避免了每一次调用返回IO触发事件的时候的拷贝, 向select 和 poll就需要将需要监视的事件拷贝进入内核, 而且是每一次都需要拷贝, 这个代价还是比较大
  • 而且内核中监视IO事件使用红黑树存储, 增删改查 的效率都远远的超过了线性结构….select和poll都是使用的线性结构, 遍历轮询IO事件效率低下…
  • epoll使用的是事件驱动机制, 我个人觉得有点像中断机制的意思了, 内核中维护一个就绪链表存储就绪事件, 当用户调用epoll_wait的时候直接返回有IO事件发生的描述符个数就是了, 链表中的就绪事件通过我们传入的 struct epoll_event数组直接拿出, 有了这个就绪链表还避免了内核的轮询查找发生了IO的事件.
  • epoll最牛逼的还是它的稳定, 随着fd事件的增加不会像poll和select那样出现线性的急速性能下滑,.. 所以他也被称作解决C10k问题的利器

epoll编程模型

  1. 调用epoll_create创建epoll实例句柄
  2. 调用epoll_ctl注册监视事件
  3. 循环调用epoll_wait() 返回IO发生的事件
  4. 循环遍历返回的事件进行处理, 事件类型通过如下方式判断
  5. if (evs[i].events & EPOLLIN) {处理读数据事件} else if (){} else if (){}

C10K服务端代码

此次仅仅附上代码, 具体很多细节, 我暂时还没学通, 线程池 + 多路复用 + 非阻塞IO结合

 #include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<netinet/ip.h>
#include<strings.h>
#include<unistd.h>
#include<ctype.h>
#include<arpa/inet.h>
#include<pthread.h>
#include<stdlib.h>
#include<string.h>
#include<sys/wait.h>
#include<sys/epoll.h>
 
#define SERV_PORT 8000
#define MAXLINE 80
 
#define prrexit(msg){\
    perror(msg);\
    exit(1);\
    }
 
 
typedef struct Task{
    int fd;
    struct Task *next;
}Task;
 
typedef struct Task_pool{
    Task *head;
    Task *tail;
    pthread_mutex_t lock;
    pthread_cond_t havetask;
}Task_pool;
 
Task_pool *task_pool_init(){
    Task_pool *tp=(Task_pool*)malloc(sizeof(Task_pool));
    tp->head=NULL;
    tp->tail=NULL;
    pthread_mutex_init(&tp->lock,NULL);
    pthread_cond_init(&tp->havetask,NULL);
    
    return tp;
}
 
void task_pool_push(Task_pool *tp,int fd){
    pthread_mutex_lock(&tp->lock);
 
    Task *t=(Task *)malloc(sizeof(Task));
    t->fd=fd;
    t->next=NULL;
 
    if(!tp->tail){
        tp->head=tp->tail=t;
    }else{
        tp->tail->next=t;
        tp->tail=t;
    }
 
    pthread_cond_broadcast(&tp->havetask);//广播条件可用
    pthread_mutex_unlock(&tp->lock);
}
 
Task task_pool_pop(Task_pool *tp){
    pthread_mutex_lock(&tp->lock);
 
    while(tp->head==NULL)
        pthread_cond_wait(&tp->havetask,&tp->lock);//暂时交还锁,等待条件满足
 
    Task tmp,*k;
    k=tp->head;
    tmp=*k;
    tp->head=tp->head->next;
 
    if(!tp->head)//没了
        tp->tail=NULL;
 
    free(k);
    pthread_mutex_unlock(&tp->lock);
 
    return tmp;
}
 
void task_pool_free(Task_pool *tp)
{
    pthread_mutex_lock(&tp->lock);
    Task *p=tp->head,*k;
 
    while(p){
        k=p;
        p=p->next;
        free(k);
    }
    tp->head=NULL;
 
    pthread_mutex_unlock(&tp->lock);
    pthread_mutex_destroy(&tp->lock);
    pthread_cond_destroy(&tp->havetask);
    free(tp);
    return;
}
 
void *up_server(void *arg)
{
    pthread_detach(pthread_self());
    //detech函数可以使一个线程在结束时自动释放相关资源
    char buf[80];//读取缓冲
    Task_pool *tp=arg;
    while(1)
    {
        Task tmp=task_pool_pop(tp);//取出一个线程
        int connfd=tmp.fd;
        printf("get task fd=%d\n",connfd);
        //epoll会在每一次有数据到来时唤醒线程服务,所以不需要while(1)循环等待
        /*while(1)
        {*/
        int n = read(connfd,buf,80);
        write(1,buf,n);
        for(int i=0;i<n;i++)
        {
            buf[i]=toupper(buf[i]);     
        }
        
        write(connfd,buf,n);
        printf("finish task fd=%d\n",connfd);
 
        if(!strncmp(buf,"QUIT",4))//若读到的前四个字符为quit,则断开连接
        {
            close(connfd);
        }
    }
    return (void *)0;
}
 
int main(void){
    
    struct sockaddr_in serveraddr,cliaddr;
    int listenfd,connfd,n;
    socklen_t cliaddr_len;
    char str[16];
    Task_pool *tp=task_pool_init();
    //多线程
    pthread_t tid;
    for(int i;i<3;i++){
        pthread_create(&tid,NULL,up_server,(void *)tp);
        printf("new thread is %#lx\n",tid);
    }
    
    listenfd=socket(AF_INET,SOCK_STREAM,0);//参数:IPV4,TCP,默认协议
    //监听主进程
    int epfd = epoll_create(256);//256表示监听的上限 
 
    struct epoll_event ev,events[256];//触发事件
    ev.events=EPOLLIN | EPOLLET;//有信息传入/边缘触发
    ev.data.fd=listenfd;
    epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);//将listenfd注册进epfd中,监听ev事件
    
    //服务器 ip地址、端口初始化
    bzero(&serveraddr,sizeof(serveraddr));
    serveraddr.sin_family=AF_INET;
    serveraddr.sin_port=htons(SERV_PORT);//hton转成网络字节序,s表示short 16字节
    serveraddr.sin_addr.s_addr=htonl(INADDR_ANY);//监听哪个IP地址,INADDR_ANY表示所有
    
    //绑定socket套接字与sockaddr相关属性,使其关联一个地址与端口号
    bind(listenfd,(struct sockaddr *)&serveraddr,sizeof(serveraddr));
    listen(listenfd,2);//listen使一个套接字成为服务器进程,即监听其他进程的请求
    //2表示连接用户的上限
    printf("Accepting connections..\n.");
    while(1){
        //接收来自客户端的请求
        int nfds=epoll_wait(epfd,events,256,-1);//-1表示阻塞
        for(int i=0;i<nfds;i++)
        {
            if(events[i].data.fd==listenfd)//如果监听对象是主进程
            {
                cliaddr_len=sizeof(cliaddr);
                connfd=accept(listenfd,(struct sockaddr *)&cliaddr,&cliaddr_len);
                //从listen到的客户端中接受一个,将其信息保留于cliaddrr
 
                printf("get from %s:%d\n",\
                   inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)),
                   ntohs(cliaddr.sin_port)); 
                //将客户端也加入监听队列
                ev.events=EPOLLIN | EPOLLET;
                ev.data.fd=connfd;
                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
                //task_pool_push(tp,connfd);
            }else if(events[i].events & EPOLLIN){//有数据到来的情况
                int clifd=events[i].data.fd;
                if(clifd<3)
                    continue;
                task_pool_push(tp,clifd);//将其置入线程池
            }   
        }
    }  

总结

本文主要是理论结合实践的学习IO多路复用技术,又言IO多路转接技术…..

IO多路复用本质上就是在一个进程中同时处理多个IO事件, 所谓的同时处理多个IO事件的方式存在两种, 一种就是轮询处理, 本质其实还是一个一个的IO事件的进行处理, 只是一次监视多个IO事件,然后将状态改变, IO事件触发的事件进行处理…. 处理还是存在先后顺序的, 我们之所以觉得是同时处理的, 不过是因为处理单个事件的时间很短我们看着就像同时处理多个事件一个样….

IO多路复用技术非轮询的, 采取监视事件 和 返回事件结果接口分离的方式来实现, 提前注册好需要监视的事件, 不再需要像select和poll那样每一次调用接口都想内核拷贝一次监视事件, 底层存储监视事件使用的是红黑树的结构,大大提升了效率……

存储就绪事件也不是采取内核中轮询遍历的方式了, 而是单独的使用一个就绪链表存储就绪IO事件,提高了效率….

根据上述可以总结出 epoll节约了空间(避免了内核和用户空间的不断拷贝事件,IO事件的存储对应的是数据结构) 还通过线性结构改成红黑树接口分离提高了效率和稳定性…

文章来源:智云一二三科技

文章标题:网络编程实战-彻底解决面试C10k问题,高并发服务器,IO多路复用

文章地址:https://www.zhihuclub.com/100580.shtml

关于作者: 智云科技

热门文章

网站地图