- 简单阻塞版的TCP-Server
php_server.php
简单说下这个逻辑,很简单,创建一个server,然后等待客户端请求,客户端连接上之后接收数据、发送数据,结束
<?php
ini_set("memory_limit", -1);
// 创建一个tcp server
$server = stream_socket_server("tcp://127.0.0.1:8091", $errno, $errstr);
if ($errno) {
throw new Exception("server create err". $errstr);
}
echo "server start ". PHP_EOL;
// 等待请求
while ($clientConn = stream_socket_accept($server, -1)) {
$streamId = (string) $clientConn;
echo "get-new-client-connect: ". $streamId. PHP_EOL;
// 接收数据
$data = stream_socket_recvfrom($clientConn, 100);
echo "receive data :". $data. PHP_EOL;
$datas = explode(',', $data);
[$minId,$maxId] = explode('-', $datas[1]);
$start = microtime_float();
// 通过查询DB产生IO
$count = selectDB($minId, $maxId);
$end = microtime_float();
$times = $end-$start;
// 发送数据
stream_socket_sendto($clientConn, "你好客户端: ". $streamId . ' : ' . date("Y-m-d H:i:s") . ' minid: '. $minId. ' maxid: '. $maxId . ' dbCount: '. $times);
fclose($clientConn);
}
fclose($server);
function selectDB($minId, $maxId)
{
$mysqlconn = new mysqli('127.0.0.1', 'root', 'root', 'test');
$result = $mysqlconn->query("
select * from user where id between ".$minId." and ". $maxId);
$arr = [];
foreach ($result->fetch_all(MYSQLI_ASSOC) as $k=>$info) {
$arr[] = $info;
}
return $arr;
}
function microtime_float()
{
list($usec, $sec) = explode(" ", microtime());
return ((float)$usec + (float)$sec);
}
php_client.php
客户端则是连接server,发送数据,接收数据
<?php
// 连接到server
$client = stream_socket_client("tcp://127.0.0.1:8091", $errno, $errstr);
if ($errno) {
throw new Exception("client create err: ". $errstr);
}
echo "client connect success". PHP_EOL;
$minId = $argv[1];
$maxId = $argv[2];
$sleep = $argv[3];
echo "sleep: ". $sleep. PHP_EOL;
// 通过sleep产生阻塞,占用server连接
sleep($sleep);
// 写入数据
fwrite($client, "hello server,{$minId}-{$maxId}");
// 读取数据
while ($content = fread($client, 100)) {
echo "get content from server :". $content. PHP_EOL;
}
启动server
gaoz@nobodyMBP hyperf_study % php hyperf-skeleton/tcpip/php_server.php
server start
// client1 等待10s
get-new-client-connect: Resource id #6
receive data :hello server,1-2
// client要等client1处理完才能被accept
get-new-client-connect: Resource id #8
receive data :hello server,1-10
client 1 , 我们给的参数是查询两条数据,等待10s
gaoz@nobodyMBP hyperf_study % php hyperf-skeleton/tcpip/php_client.php 1 2 10
client connect success
sleep: 10
get content from server :你好客户端: Resource id #6 : 2020-09-08 02:31:10 minid: 1 maxid: 2 dbCount: 0.0069458484649658
client 2 , 这边呢是查询10条诗句,等待2s
gaoz@nobodyMBP hyperf_study % php hyperf-skeleton/tcpip/php_client.php 1 10 2
client connect success
sleep: 2
get content from server :你好客户端: Resource id #8 : 2020-09-08 02:31:10 minid: 1 maxid: 10 dbCount: 0.006360054016113
get content from server :3
我们看先后接着启动client1、client2,client2必须要等到client1处理完成之后才能再处理,也就是必须要等待12s才能接受到server的处理,这就是阻塞了,只能一个个处理,server只能一个一个的处理。
- 简单select多路复用非阻塞版的TCP-Server
我们之前学习过Nginx使用epoll多路复用来处理单个Worker的多个client的处理,这里我们使用select来体验一下,虽然select某些情况下比epoll效率低,但也是可以实现的,这里利用php的系统函数stream_select来搞一下。
php_select_server.php
这里比上一个多了两个地方:
一个是设置server为非阻塞模式 stream_set_blocking ,从文档中可以见到,当启用no-block之后,reade如果没有数据会立刻返回,而不会阻塞等待。
二个是使用 stream_select 监控可读写事件的发生,发生之后进行相关处理即可。对于server来说有新的客户端连接、客户端发送数据过来都是一种可读事件。
<?php
ini_set("memory_limit", -1);
// 创建一个tcp server
$server = stream_socket_server("tcp://127.0.0.1:8091", $errno, $errstr);
// 设置非阻塞模式
stream_set_blocking($server, 0);
if ($errno) {
throw new Exception("server create err" . $errstr);
}
echo "server start ". PHP_EOL;
$wirtes = $exceps = [];
$clients[] = $server;
while (true) {
$reads = $clients;
// 调用select去轮询read事件
if (@stream_select($reads, $wirtes, $exceps, 100000) > 0) {
// 如果是主socket也就是server有可读事件,也就是客户端连接
if (in_array($server, $reads)) {
$clients[] = stream_socket_accept($server, 1000, $peername);
$serverK = array_search($server, $reads);
unset($reads[$serverK]);
}
if (count($reads) <= 0) continue;
// 剩下的都是client的消息
foreach ($reads as $ks => $_server) {
// client
$data = fread($_server, 100);
$streamId = (string)$_server;
echo "get-data-from-client :". $streamId. PHP_EOL;
$peername = stream_socket_get_name($_server, true);
echo "receive data :" . $data . PHP_EOL;
// 解析ID
$datas = explode(',', $data);
[$minId, $maxId] = explode('-', $datas[1]);
$start = microtime_float();
$count = selectDB($minId, $maxId);
$end = microtime_float();
$times = $end - $start;
echo $streamId. ": ". $peername. "-select-finish". PHP_EOL;
// 发送数据
fwrite($_server,
"你好客户端: " . $streamId . ':' . date("Y-m-d H:i:s") . ' minid: ' . $minId . ' maxid: ' . $maxId . ' dbCount: ' . $times);
fclose($_server);
}
}
}
function selectDB($minId, $maxId)
{
$mysqlconn = new mysqli('127.0.0.1', 'root', 'root', 'test');
$result = $mysqlconn->query("
select * from user where id between ".$minId." and ". $maxId);
$arr = [];
foreach ($result->fetch_all(MYSQLI_ASSOC) as $k=>$info) {
$arr[] = $info;
}
return $arr;
}
function microtime_float()
{
list($usec, $sec) = explode(" ", microtime());
return ((float)$usec + (float)$sec);
}
启动server
gaoz@nobodyMBP hyperf_study % php hyperf-skeleton/tcpip/php_select_server.php
server start
get-data-from-client :Resource id #7
receive data :hello server,1-10
Resource id #7: 127.0.0.1:63980-select-finish
get-data-from-client :Resource id #6
receive data :hello server,1-2
Resource id #6: 127.0.0.1:63975-select-finish
client1: 这里还是查询两条数据,等待10s
gaoz@nobodyMBP hyperf_study % php hyperf-skeleton/tcpip/php_client.php 1 2 10
client connect success
sleep: 10
get content from server :你好客户端: Resource id #6:2020-09-08 02:31:40 minid: 1 maxid: 2 dbCount: 0.0031378269195557
client2: 查询10条数据,等待2s
gaoz@nobodyMBP hyperf_study % php hyperf-skeleton/tcpip/php_client.php 1 10 2
client connect success
sleep: 2
get content from server :你好客户端: Resource id #7:2020-09-08 02:31:32 minid: 1 maxid: 10 dbCount: 0.0093188285827637
同志们注意看client2的返回时间奥,我们是先启动client1,接着启动client2, 但是明显client2先返回数据,这就是非阻塞的提现,I/O多路复用的优势就提现出来了。
stream_select最终会调用select()系统调用,
stream_select(ReadFds, WriteFds, Exceptions, timeout) ——> select(FdTotals, ReadFds, WriteFds, timeout)
select()大概是这样一种实现,他会不停的去循环检查是否有可读、可写事件发生的FD, 是否有可读、可写是socket连接对应的驱动设备提供的一种能力,所有Linux文件类型如文件、socket、devnull各种都必须实现这种通知机制。这一点大家可以看下下面的文章。
总结 :
- select通过内核监控多个fd的Read、Write事件
- select本身会阻塞
- select只是通知用户程序,数据准备好可读了,但还是需要用户程序自己去取数据
相关文档:
stream-blocking:
stream-select:
关于select的一篇文章:
具体代码: