您的位置 首页 golang

「linux」池化组件之异步请求池

推荐视频:

c/c++ linux服务器开发学习地址:

1、解决问题

当服务器操作需求请求一些资源,例如mysql, redis dns ,http等时,如果不采用任何池化组件,就是基本上都是经过 tcp 三次握手,认证 ,然后再操作,关闭认证,tcp四次挥手。这就是一个耗时的事情,如果再加上业务服务器频繁操作就更加耗时。
就引入了连接池(服务器启动后,提前创建好固定的连接,然后进行同步操作);当服务器业务处理增加量后,此时就可以在连接池中,加入连接的放缩来处理也是可以处理并发的。
也还有另外一种方案:异步请求池

2、同步和异步区别

同步:服务器发送了请求必须要等待第三方服务器返回,业务服务器对应的fd才能做其它的操作;在同一条连接上
异步:服务器只管发送,不等待返回

3、异步请求池原理

组成:四元组:提交commit, EPOLL 创建,线程回调函数,释放
流程:

工作流程:
1 commit:提交
a) 建立连接(网络)
b)组织对应的协议(tcp/udp,dns,redis等)
c)调用发送数据到对应的第三方服务器
d)发送完成后,将发送的fd设置为可读事件添加到epoll管理
2 epoll创建
初始化异步操作上下文(epfd和threadid);主要分为a)epoll_create b)开一个线程用来做接收返回
3 线程的回调函数
a)epoll_wait()检查哪些fd可读
b)遍历可读fd, recv 接收数据
c)读出的数据按照对应的协议进行解析并进行操作
4 destory销毁
a)对应fd关闭close
b)线程退出
注意问题:一般情况下是采用多个连接发送;在网络io基础上发送对应协议;发送完成后对应fd存储的位置(设置为可读事件放入epoll);发送和接受是否能做成同一个线程(我们是提供做的sdk)不能放在同一个线程中处理。

4、编程实现

1 初始化init

 struct async_context {// 异步请求池上下文结构 int epfd; // epoll句柄 pthread_t threadid; // 线程id
};  

上下文其实就是全局变量,只是 全局变量 的作用域比较大,一般采用结构体当成参数或者返回值来传递。

 // 主要工作:a 创建epoll  b 创建一个线程(专门用来接受返回的处理)
//dns_async_client_init()
//epoll init
//thread init
//参数:上下文一般的两种做法a)参数传入  b)返回值  当然也有两者都有的例如mysql_init
struct async_context *dns_async_client_init(void) { // 1 epoll初始化 int epfd = epoll_create(1); //  if (epfd < 0) return NULL; struct async_context *ctx = calloc(1, sizeof(struct async_context)); if (ctx == NULL) { close(epfd); return NULL; } ctx->epfd = epfd; // 2 线程初始化 pthread_t thread_id; int ret = pthread_create(&thread_id, NULL, dns_async_client_proc, ctx); if (ret) { perror("pthread_create"); return NULL; } usleep(1); //child go first return ctx;
}  

【文章福利】需要C/C++ Linux服务器架构师学习资料加群812855908(资料包括C/C++,Linux,golang技术,内核,Nginx,ZeroMQ,MySQL,Redis,fastdfs, MongoDB ,ZK, 流媒体 CDN ,P2P,K8S, Docker ,TCP/IP,协程,DPDK, ffmpeg 等)

2 提交请求:
创建socket,并将fd设置为noblock;组织协议组数据包(可以是任何的协议);调用接口发送数据;
发送完成后将sockfd设置为可读事件抛入epoll管理(当返回的时候epoll_wait检查到可读事件后,怎么处理?所以这里需要将fd与一个回调函数绑定成结构体创建再堆上,将这结构体丢人到epoll_event下的ptr指针中)

 struct ep_arg { int sockfd; // sockfd async_result_cb cb; // 返回后fd对应的回调函数
};  

发送完数据包后,会将fd设置为可读,并且开辟一个ep_arg结构体,将与epoll_event绑定赋值给ptr指针,用于当response后,拿到fd可以做对应的处理。

 int dns_async_client_commit(struct async_context* ctx, const char *domain, async_result_cb cb) { // 1 创建socket int sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (sockfd < 0) { perror("create socket failed\n"); exit(-1); } printf("url:%s\n", domain); set_block(sockfd, 0); //nonblock 设置非阻塞 struct sockaddr_in dest; bzero(&dest, sizeof(dest)); dest.sin_family = AF_INET; dest.sin_port = htons(53); dest.sin_addr.s_addr = inet_addr(DNS_SVR); int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest)); //printf("connect :%d\n", ret); // 2 组织协议数据包 struct dns_header header = {0}; dns_create_header(&header); struct dns_question question = {0}; dns_create_question(&question, domain); // 3 调用发送接口发送数据 char request[1024] = {0}; int req_len = dns_build_request(&header, &question, request); int slen = sendto(sockfd, request, req_len, 0, (struct sockaddr*)&dest, sizeof(struct sockaddr)); // 4 发送成功后,将sockfd设置为可读事件抛入epoll管理 struct ep_arg *eparg = (struct ep_arg*)calloc(1, sizeof(struct ep_arg));  // 将fd与一个回调函数绑定一一绑定,创建一个结构体再堆上,fd与结构体绑定丢人epoll中 if (eparg == NULL) return -1; eparg->sockfd = sockfd; eparg->cb = cb; struct epoll_event ev; ev.data.ptr = eparg; ev.events = EPOLLIN; ret = epoll_ctl(ctx->epfd, EPOLL_CTL_ADD, sockfd, &ev); //printf(" epoll_ctl ADD: sockfd->%d, ret:%d\n", sockfd, ret); return ret;
}  

3 线程初始回调函数
主要是死循环,通过epoll_wait去检查那些sockfd的可读事件就绪,就绪后读出对应的buffer,做相应的解析,做对应的回调操作

 // 线程回调函数
//dns_async_client_proc()
//epoll_wait
//result callback
static void* dns_async_client_proc(void *arg) { struct async_context *ctx = (struct async_context*)arg; int epfd = ctx->epfd; while (1) { struct epoll_event events[ASYNC_CLIENT_NUM] = {0}; // 1 检查那些sockfd的可读事件 int nready = epoll_wait(epfd, events, ASYNC_CLIENT_NUM, -1); if (nready < 0) { if (errno == EINTR || errno == EAGAIN) {continue;} else {break;} } else if (nready == 0) {continue;} // 2 遍历检查出来的那些fd可读事件已经就绪 printf("nready:%d\n", nready); int i = 0; for (i = 0;i < nready;i ++) { struct ep_arg *data = (struct ep_arg*)events[i].data.ptr; int sockfd = data->sockfd; // 3 读取出返回的buffer数据 char buffer[1024] = {0}; struct sockaddr_in addr; size_t addr_len = sizeof(struct sockaddr_in); int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len); // 4 按照对应的协议解析读出出来的数据 struct dns_item *domain_list = NULL; int count = dns_parse_response(buffer, &domain_list); // 5 调用commit前设置的回调函数对应操作 data->cb(domain_list, count); //call cb  可以自定义函数操作 // 6 sockfd移除epoll(目前这里有几种作法:a直接删除  b可以不删除重复利用,fd设置为MODIFY事件)  int ret = epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL); //printf("epoll_ctl DEL --> sockfd:%d\n", sockfd); close(sockfd); / dns_async_client_free_domains(domain_list, count); free(data); } }
}  

4 销毁destory
a:response后,对应的fd关闭close
b:线程退出

5 使用异步请求池sdk

 int main(int argc, char *argv[]) {
#if 0 dns_client_commit(argv[1]);
#else struct async_context *ctx = dns_async_client_init(); if (ctx == NULL) return -2; int count = sizeof(domain) / sizeof(domain[0]); int i = 0; for (i = 0;i < count;i ++) { dns_async_client_commit(ctx, domain[i], dns_async_client_result_callback); //sleep(2); } getchar();
#endif
}  

#5 设计异步请求池的注意几个问题:
一般情况下是基于网络io的;发送完后将对应的fd设置为可读抛入epoll管理;发送和接受不能再同一个线程,主要是我们设计这个组件sdk是提供给别人用,发送提供接口,响应框架处理。

文章来源:智云一二三科技

文章标题:「linux」池化组件之异步请求池

文章地址:https://www.zhihuclub.com/99356.shtml

关于作者: 智云科技

热门文章

网站地图