您的位置 首页 golang

memcache源码解析-多线程网络模型

memcache 网络模型采用的是单进程多线程模型,内部使用 libevent 事件库来处理网络请求。

其工作模式是主线程负责 accept 新的客户端连接请求,然后把获取到的新的连接请求经过 Round Robin 方式分配各个 worker 线程,worker 线程负责处理请求。

线程 对象类型如下

  typedef   struct  {
pthread_t thread_id; /* unique ID of this thread */struct event_base *base; /* libevent handle this thread uses */struct event notify_event; /* listen event for notify pipe */int notify_receive_fd; /* receiving end of notify pipe */int notify_send_fd; /* sending end of notify pipe */struct conn_queue *new_conn_queue ; 
...

} LIBEVENT_THREAD;  

每个线程都有一个 LIB event 的实例 event_base, 各个线程都在自己的事件实例中处理触发的事件。

每个线程都有一个连接队列,当有客户端连接请求到来时,主线程把获取到的新连接节点放到线程的 new_conn_queue 队列中,线程从自己的队里中取出节点进行接收消息并处理。

每个线程都有一个 pipe,用于主线程和 worker 线程进行通信。

关于线程的 new_conn_queue 队列是一个 链表 ,存放着 CQ_ITEM 节点信息,该节点信息中保存着连接信息。

 typedef struct conn_queue_item CQ_ITEM;

struct conn_queue_item {
int sfd;
enum conn_states init_state;
int event_flags;
int read_buffer_size;
enum network_transport transport;
CQ_ITEM *next;
};

typedef struct conn_queue CQ;

struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t  lock ;
};  

每个队列都有一个锁,保证互斥操作。

启动流程

在分析 workers 线程之前先分析下主线程整个启动流程。

主线程创建监听套接字流程

  static  int server_ socket (const char *interface, int port, enum network_transport transport,) {
int sfd;
struct linger ling = {0, 0};
...

hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;

if (port == -1) {
port = 0;
}
snprintf(port_buf, sizeof(port_buf), "%d", port);
error= getaddrinfo(interface, port_buf, &hints, &ai);


for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
//创建一个套接字
if ((sfd = new_socket(next)) == -1) {

if ( errno  == EMFILE) {
/* ...unless we're out of fds */ perror ("server_socket");
exit(EX_OSERR);
}
continue;
}

//设置套接字属性
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, ( void  *)&flags, sizeof(flags));

error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));

error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));

error = setsockopt(sfd, IPPROTO_TCP,  tcp _NODELAY, (void *)&flags, sizeof(flags));

}
//绑定套接字
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
if (errno != EADDRINUSE) {
...
return 1;
}
close(sfd);
continue;
} else {
success++;
//监听
 if  (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
...
return 1;
}
if (portnumber_file != NULL &&
(next->ai_addr->sa_family == AF_INET ||
next->ai_addr->sa_family == AF_INET6)) {
union {
struct sockaddr_in in;
struct sockaddr_in6 in6;
} my_sockaddr;
socklen_t len = sizeof(my_sockaddr);
if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
if (next->ai_addr->sa_family == AF_INET) {
...
} else {
...
}
}
}
}

// 创建一个主线程用于监听的 连接对象,同时把监听套接字注册到main_base事件中
if (!(listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1,
transport, main_base, NULL))) {
...
}

listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;

}

freeaddrinfo(ai);

/* Return  zero  iff we detected no errors in starting up connections */return success == 0;
}  

相关视频推荐

学习地址:

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括 C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs, MongoDB ZK ,流媒体, CDN ,P2P,K8S, Docker ,TCP/IP,协程,DPDK, ffmpeg 等),免费分享

主线程创建监听套接字,同时在连接数组中给对应套接字生成一个连接对象,该连接对象用于对网络操作的一种封装。

 conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
conn *c;

c = conns[sfd];

if (NULL == c) {
if (!(c = (conn *)calloc(1, sizeof(conn)))) {
...
return NULL;
}
MEMCACHED_CONN_CREATE(c);
c->read = NULL;
c->sendmsg = NULL;
c->write = NULL;
c->rbuf = NULL;

c->rsize = read_buffer_size;

...

STATS_LOCK();
stats_state.conn_structs++;
STATS_UNLOCK();

c->sfd = sfd;
conns[sfd] = c;
}

c->transport = transport;
c->protocol = settings.binding_protocol;

/* unix socket mode doesn't need this, so zeroed out. but why
* is this done for every command? presumably for  UDP 
* mode. */if (!settings.socketpath) {
c-> request _addr_size = sizeof(c->request_addr);
} else {
c->request_addr_size = 0;
}

if (transport == tcp_transport && init_state == conn_new_cmd) {
if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
&c->request_addr_size)) {
perror("getpeername");
 memset (&c->request_addr, 0, sizeof(c->request_addr));
}
}

if (init_state == conn_new_cmd) {
LOGGER_LOG(NULL, LOG_CONNEVENTS, LOGGER_CONNECTION_NEW, NULL,
&c->request_addr, c->request_addr_size, c->transport, 0, sfd);
}

...
c->state = init_state;
c->rlbytes = 0;
c->cmd = -1;
...

// 把监听套接字注册到事件中
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;

if (event_add(&c->event, 0) == -1) {
return NULL;
}

STATS_LOCK();
stats_state.curr_conns++;
stats.total_conns++;
STATS_UNLOCK();

MEMCACHED_CONN_ALLOCATE(c->sfd);

return c;
}  

主线程创建的连接对象与监听套接字进行关联,同时把监听套接字注册到事件驱动中,当有客户端进行向主线程发起连接时,主线程触发回调 event_handler。

workers 线程的初始化创建

 void thread_init(int nthreads, struct event_base *main_base) {
int i;
int power;

...

pthread_mutex_init(&init_lock, NULL);
 pthread _cond_init(&init_cond, NULL);


/* Want a wide lock table, but don't waste memory */if (nthreads < 3) {
power = 10;
} else if (nthreads < 4) {
power = 11;
} else if (nthreads < 5) {
power = 12;
} else {
power = 13;
}

...

//创建nthreads个worker线程对象
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));

dispatcher_thread.base = main_base;//设置主线程对象的event_base
dispatcher_thread.thread_id = pthread_self();//设置主线程对象线程id

//为每个worker线程创建与主线程通信的管道
for (i = 0; i < nthreads; i++) {

int fds[2];
if (pipe(fds)) {
perror("Can't create notify pipe");
exit(1);
}

threads[i].notify_receive_fd = fds[0]; //worker线程管道接收fd
threads[i].notify_send_fd = fds[1]; //worker线程管道写入fd
//设置worker线程的属性信息
setup_thread(&threads[i]);

/* Reserve three fds for the libevent base, and two for the pipe */stats_state.reserved_fds += 5;
}

/* Create threads after we've done all the libevent setup. */for (i = 0; i < nthreads; i++) {
//创建线程并启动
create_worker(worker_libevent, &threads[i]);
}

/* Wait for all the threads to set themselves up before returning. */pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);//等待所有worker线程启动完毕
pthread_mutex_unlock(&init_lock);
}  

主线程创建 worker 线程池 并启动线程,并为每个线程创建一个与主进程进行通信的管道,同时为每个线程创建一个连接队列。

每个线程启动后都在自己的事件驱动中进行循环,当有事件发生时,触发回调

 thread_libevent_process。

static void setup_thread(LIBEVENT_THREAD *me) {

me->base = event_init();
// 把管道读注册到线程的事件驱动中
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
event_add(&me->notify_event, 0)
//创建一个连接队列,并进行初始化
me->new_conn_queue = malloc(sizeof(struct conn_queue));
cq_init(me->ev_queue);

pthread_mutex_init(&me->stats.mutex, NULL)

me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char *), NULL, NULL);

}  

到此,主线程和 worker 线程都在自己的事件循环中。

客户端发起连接

当客户端向主线程发起连接请求时,主线程触发回调 event_handler。

 void event_handler(const evutil_socket_t fd, const  short  which, void *arg) {
conn *c;

c = (conn *)arg; 
c->which = which;

/* sanity */if (fd != c->sfd) {
if (settings.verbose > 0)
 fprintf (stderr, "Catastrophic: event fd doesn't match conn fd!\n");
conn_close(c);
return;
}
//调用drive_machine进行业务逻辑处理
drive_machine(c);

/* wait for next event */return;
}  

主线程监听时,conn 对象状态为 conn_listening,因此主线程调用 accept 用于接收客户端端连接,只分析 TCP 部分。

 static void drive_machine(conn *c) {

...

while (!stop) {

switch(c->state) {

case conn_listening:
addrlen = sizeof(addr);
//接收一个新的连接
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);

if (!use_accept4) {
//设置套接字为非阻塞
if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
...
}
}

 bool  reject;
if (settings.maxconns_fast) {
reject = sfd >= settings.maxconns - 1;
if (reject) {
STATS_LOCK();
stats.rejected_conns++;
STATS_UNLOCK();
}
} else {
reject = false;
}

if (reject) {
str = "ERROR Too many open connections\r\n";
res = write(sfd, str,  strlen (str));
close(sfd);
} else {
void *ssl_v = NULL;

//把接收到新的连接分配给worker线程
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
READ_BUFFER_SIZE, tcp_transport);
}

stop = true;
break;

case conn_waiting:
...
break;

case conn_read:

res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);

switch (res) {
case READ_NO_DATA_RECEIVED:
conn_set_state(c, conn_waiting);
break;
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
break;
case READ_ERROR:
conn_set_state(c, conn_closing);
break;
case READ_MEMORY_ERROR: /* Failed to allocate more memory *//* State already set by try_read_network */break;
}
break;

case conn_parse_cmd:
... 
break;

case conn_new_cmd:
...
break;

case conn_nread:
...
break;

case conn_swallow:
...
break;

case conn_write:
case conn_mwrite:
...
break;

case conn_closing:
... 
break;

case conn_closed:
... 
break;

case conn_watch:
... 
break;
case conn_io_queue:
... 
break;
case conn_max_state:
assert(false);
break;
}
}

return;
}  

主线程接收到客户端连接后,通过 dispatch_conn_new 把连接分配到worker 线程池中。

 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {

CQ_ITEM *item = cqi_new(thread->ev_queue);
char buf[1];
//以 轮询 的方式找到待分配给的worker线程
int tid = (last_thread +1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;

last_thread = tid;
//初始化存放到消息队列中的节点信息
item->sfd = sfd;
//此时init_state 为 conn_new_cmd
item->init_state = init_state; 
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
//把节点放到worker线程的连接队列中
cq_push(thread->new_conn_queue, item);

MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
buf[0] = 'c';
//发worker线程的管道中发送一个‘c’字符,让work线程处理连接请求。
write(thread->notify_send_fd, buf, 1); 
}  

worker线程触发事件

主线程通过相关到写入一个字符,触发 worker 线程读事件,调用回调thread_libevent_process。

 static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];

read(fd, buf, 1);

switch (buf[0]) {
case 'c':
//从队列中获取一个节点
item = cq_pop(me->ev_queue);
if (item != NULL) {
//为该连接创建一个连接对象
c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport,
me->base);
if (c == NULL) {
...
} else {
c->thread = me;
} 
//释放节点
cqi_free(item);
}
break;

case 'l':
...
break;

case 'g':
...
break;
}

}  

该方法的主要功能就是从 worker 线程从自己的队列中获取一个节点项,从该节点项中获取连接信息,然后为该连接信息生成一个 conn 连接对象,其中状态为 conn_new_cmd。

把连接对象与客户端连接进行关联,同时把客户端连接注册到 worker 线程自己的 event 事件中。当客户端向worker线程发送命令时,触发 worker 线程的回调函数 event_handler。

以上流程可见如下图

后续客户端根据 conn->state 状态的转变进行不同的业务处理。

简单的 main_thread 和 worker_thread 示意图如下

文章来源:智云一二三科技

文章标题:memcache源码解析-多线程网络模型

文章地址:https://www.zhihuclub.com/98414.shtml

关于作者: 智云科技

热门文章

网站地图