这篇文章给大家分享的内容是关于Swoft 源码剖析之Swoole和Swoft的一些介绍(Task投递/定时任务篇),有一定的参考价值,有需要的朋友可以参考一下。
喜欢我的文章就关注我吧,持续更新中!
前言
Swoft 的任务功能基于 Swoole 的 Task机制 ,或者说 Swoft 的 Task 机制本质就是对 Swoole 的 Task机制 的封装和加强。
任务投递
//Swoft\Task\Task.php
class Task
{
/**
* Deliver coroutine or async task
*
* @param string $taskName
* @param string $methodName
* @param array $params
* @param string $type
* @param int $timeout
*
* @return bool|array
* @throws TaskException
*/
public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
{
$data = TaskHelper::pack($taskName, $methodName, $params, $type);
if(!App::isWorkerStatus() && !App::isCoContext()){
return self:: deliver ByQueue($data);//见下文 Command 章节
}
if(!App::isWorkerStatus() && App::isCoContext()){
throw new TaskException('Please deliver task by http!');
}
$ server = App::$server->getServer();
// Delier coroutine task
if ($type == self::TYPE_CO) {
$tasks[0] = $data;
$prifleKey = 'task' . '.' . $taskName . '.' . $methodName;
App::profileStart($prifleKey);
$result = $server->taskCo($tasks, $timeout);
App::profileEnd($prifleKey);
return $result;
}
// Deliver async task
return $server->task($data);
}
}
任务投递 Task::deliver() 将调用参数打包后根据 $type 参数通过 Swoole 的 $server->taskCo() 或 $server->task() 接口投递到 Task进程 。
Task 本身始终是同步执行的, $type 仅仅影响投递这一操作的行为, Task::TYPE_ASYNC 对应的 $server->task() 是异步投递, Task::deliver() 调用后马上返回; Task::TYPE_CO 对应的 $server->taskCo() 是协程投递,投递后让出协程控制,任务完成或执行超时后 Task::deliver() 才从协程返回。
任务执行
//Swoft\Task\Bootstrap\Listeners\TaskEventListener
/**
* The listener of swoole task
* @SwooleListener({
* SwooleEvent::ON_TASK,
* SwooleEvent::ON_FINISH,
* })
*/
class TaskEventListener implements TaskInterface, FinishInterface
{
/**
* @param \Swoole\Server $server
* @param int $taskId
* @param int $ Worker Id
* @param mixed $data
* @return mixed
* @throws \InvalidArgumentException
*/
public function onTask(Server $server, int $taskId, int $workerId, $data)
{
try {
/* @var TaskExecutor $taskExecutor*/
$taskExecutor = App::getBean(TaskExecutor::class);
$result = $taskExecutor->run($data);
} catch (\Throwable $throwable) {
App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->get message (), $throwable->getFile(), $throwable->getLine()));
$result = false;
// Release system resources
App::trigger(AppEvent::RESOURCE_RELEASE);
App::trigger(TaskEvent::AFTER_TASK);
}
return $result;
}
}
此处是 swoole.onTask 的事件回调,其职责仅仅是将将 Worker 进程投递来的打包后的数据转发给 TaskExecutor 。
Swoole 的 Task 机制的本质是 Worker进程 将耗时任务投递给同步的 Task进程 (又名 TaskWorker )处理,所以 swoole.onTask 的事件回调是在 Task进程 中执行的。上文说过, Worker进程 是你大部分 HTTP 服务代码执行的环境,但是从 TaskEventListener.onTask() 方法开始,代码的执行环境都是 Task进程 ,也就是说, TaskExecutor 和具体的 TaskBean 都是执行在 Task进程 中的。
//Swoft\Task\TaskExecutor
/**
* The task executor
*
* @Bean()
*/
class TaskExecutor
{
/**
* @param string $data
* @return mixed
*/
public function run(string $data)
{
$data = TaskHelper::unpack($data);
$name = $data['name'];
$type = $data['type'];
$method = $data['method'];
$params = $data['params'];
$logid = $data['logid'] ?? uniqid('', true);
$spanid = $data['spanid'] ?? 0;
$collector = TaskCollector::getCollector();
if (!isset($collector['task'][$name])) {
return false;
}
list(, $coroutine) = $collector['task'][$name];
$task = App::getBean($name);
if ($coroutine) {
$result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
} else {
$result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);
}
return $result;
}
}
任务执行思路很简单,将 Worker进程 发过来的数据解包还原成原来的调用参数,根据 $name 参数找到对应的 TaskBean 并调用其对应的 task() 方法。其中 TaskBean 使用类级别注解 @Task(name=”TaskName”) 或者 @Task(“TaskName”) 声明。
值得一提的一点是, @Task 注解除了 name 属性,还有一个 coroutine 属性,上述代码会根据该参数选择使用协程的 runCoTask() 或者同步的 runSyncTask() 执行 Task 。但是由于而且由于 Swoole 的 Task进程 的执行是完全同步的,不支持协程,所以目前版本请该参数不要配置为 true 。同样的在 TaskBean 中编写的任务代码必须的同步阻塞的或者是要能根据环境自动将异步非阻塞和协程降级为同步阻塞的
从 Process 中投递任务
前面我们提到:
换句话说, Swoole 的 $server->taskCo() 或 $server->task() 都只能在 Worker进程 中使用。
这个限制大大的限制了使用场景。 如何能够为了能够在 process 中投递任务呢? Swoft 为了绕过这个限制提供了 Task::deliverByProcess() 方法。其实现原理也很简单,通过 Swoole 的 $server-> sendMessage () 方法将调用信息从 Process 中投递到 Worker进程 中,然后由Worker进程替其投递到 Task进程 当中,相关代码如下:
//Swoft\Task\Task.php
/**
* Deliver task by process
*
* @param string $taskName
* @param string $methodName
* @param array $params
* @param string $type
* @param int $timeout
* @param int $workId
*
* @return bool
*/
public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
{
/* @var PipeMessageInterface $pipeMessage */
$server = App::$server->getServer();
$pipeMessage = App::getBean(PipeMessage::class);
$data = [
'name' => $taskName,
'method' => $methodName,
'params' => $params,
'timeout' => $timeout,
'type' => $type,
];
$message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);
return $server->sendMessage($message, $workId);
}
数据打包后使用 $server->sendMessage() 投递给 Worker :
//Swoft\Bootstrap\Server\ServerTrait.php
/**
* onPipeMessage event callback
*
* @param \Swoole\Server $server
* @param int $srcWorkerId
* @param string $message
* @return void
* @throws \InvalidArgumentException
*/
public function onPipeMessage(Server $server, int $srcWorkerId, string $message)
{
/* @var PipeMessageInterface $pipeMessage */
$pipeMessage = App::getBean(PipeMessage::class);
list($type, $data) = $pipeMessage->unpack($message);
App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);
}
$server->sendMessage 后, Worker进程 收到数据时会触发一个 swoole.pipeMessage 事件的回调, Swoft 会将其转换成自己的 swoft.pipeMessage 事件并触发.
//Swoft\Task\Event\Listeners\PipeMessageListener.php
/**
* The pipe message listener
*
* @Listener(event=AppEvent::PIPE_MESSAGE)
*/
class PipeMessageListener implements EventHandlerInterface
{
/**
* @param \Swoft\Event\EventInterface $event
*/
public function handle(EventInterface $event)
{
$params = $event->getParams();
if (count($params) < 3) {
return;
}
list($type, $data, $srcWorkerId) = $params;
if ($type != PipeMessage::MESSAGE_TYPE_TASK) {
return;
}
$type = $data['type'];
$taskName = $data['name'];
$params = $data['params'];
$timeout = $data['timeout'];
$methodName = $data['method'];
// delever task
Task::deliver($taskName, $methodName, $params, $type, $timeout);
}
}
swoft.pipeMessage 事件最终由 PipeMessageListener 处理。在相关的监听其中,如果发现 swoft.pipeMessage 事件由 Task::deliverByProcess() 产生的, Worker进程 会替其执行一次 Task::deliver() ,最终将任务数据投递到 TaskWorker进程 中。
一道简单的回顾练习:从 Task::deliverByProcess() 到某 TaskBean 最终执行任务,经历了哪些进程,而调用链的哪些部分又分别是在哪些进程中执行?
从Command进程或其子进程中投递任务
//Swoft\Task\QueueTask.php
/**
* @param string $data
* @param int $taskWorkerId
* @param int $srcWorkerId
*
* @return bool
*/
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
{
if ($taskWorkerId === null) {
$taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
}
if ($srcWorkerId === null) {
$srcWorkerId = mt_rand(0, $this->workerNum - 1);
}
$this->check();
$data = $this->pack($data, $srcWorkerId);
$result = \msg_send($this->queueId, $taskWorkerId, $data, false);
if (!$result) {
return false;
}
return true;
}
对于 Command 进程的任务投递,情况会更复杂一点。
上文提到的 Process ,其往往衍生于 Http/Rpc 服务,作为同一个 Manager 的子孙进程,他们能够拿到 Swoole\Server 的句柄变量,从而通过 $server->sendMessage() , $server->task() 等方法进行任务投递。
但在 Swoft 的体系中,还有一个十分路人的角色: Command 。
Command 的进程从 shell 或 cronb 独立启动,和 Http/Rpc 服务相关的进程没有亲缘关系。因此 Command 进程以及从 Command 中启动的 Process 进程是没有办法拿到 Swoole\Server 的调用句柄直接通过 UnixSocket 进行任务投递的。
为了为这种进程提供任务投递支持, Swoft 利用了 Swoole 的 Task进程 的一个特殊功能—- 消息队列 。
同一个项目中 Command 和 Http\RpcServer 通过约定一个 message_queue_key 获取到系统内核中的同一条消息队列,然后 Comand 进程就可以通过该消息队列向 Task进程 投递任务了。
该机制没有提供对外的公开方法,仅仅被包含在 Task::deliver() 方法中, Swoft 会根据当前环境隐式切换投递方式。但该消息队列的实现依赖 Semaphore 拓展,如果你想使用,需要在编译 PHP 时加上 –enable-sysvmsg 参数。
定时任务
除了手动执行的普通任务, Swoft 还提供了精度为秒的定时任务功能用来在项目中替代Linux的 Crontab 功能.
Swoft 用两个前置 Process —任务计划进程: CronTimerProcess 和任务执行进程 CronExecProcess
,和两张内存数据表—– RunTimeTable (任务(配置)表) OriginTable ((任务)执行表)用于定时任务的管理调度。
两张表的每行记录的结构如下:
\\Swoft\Task\Crontab\TableCrontab.php
/**
* 任务表,记录用户配置的任务信息
* 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录
* @var array $originStruct
*/
private $originStruct = [
'rule' => [\Swoole\Table::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名)
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法
'add_time' => [\Swoole\Table::TYPE_STRING, 11],//初始化该表内容时的10位时间戳
];
/**
* 执行表,记录短时间内要执行的任务列表及其执行状态
* 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录
* @var array $runTimeStruct
*/
private $runTimeStruct = [
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//同上
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上
'minute' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date('YmdHi')
'sec' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳
'runStatus' => [\Swoole\TABLE::TYPE_INT, 4],//任务状态,有 0(未执行) 1(已执行) 2(执行中) 三种。
//注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。
];
此处为何要使用Swoole的内存Table?
Swoft 的的定时任务管理是分别由 任务计划进程 和 任务执行进程 进程负责的。两个进程的运行共同管理定时任务,如果使用进程间独立的 array() 等结构,两个进程必然需要频繁的进程间通信。而使用跨进程的 Table (本文的 Table ,除非特别说明,都指 Swoole 的 Swoole\Table 结构)直接进行进程间数据共享,不仅性能高,操作简单 还解耦了两个进程。
为了 Table 能够在两个进程间共同使用, Table 必须在 Swoole Server 启动前创建并分配内存。具体代码在 Swoft\Task\Bootstrap\Listeners->onBeforeStart() 中,比较简单,有兴趣的可以自行阅读。
背景介绍完了,我们来看看这两个定时任务进程的行为
//Swoft\Task\Bootstrap\Process\CronTimerProcess.php
/**
* Crontab timer process
*
* @Process(name="cronTimer", boot=true)
*/
class CronTimerProcess implements ProcessInterface
{
/**
* @param \Swoft\Process\Process $process
*/
public function run(SwoftProcess $process)
{
//code....
/* @var \Swoft\Task\Crontab\Crontab $cron*/
$cron = App::getBean('crontab');
// Swoole/HttpServer
$server = App::$server->getServer();
$time = (60 - date('s')) * 1000;
$server->after($time, function () use ($server, $cron) {
// Every minute check all tasks, and prepare the tasks that next execution point needs
$cron->checkTask();
$server->tick(60 * 1000, function () use ($cron) {
$cron->checkTask();
});
});
}
}
//Swoft\Task\Crontab\Crontab.php
/**
* 初始化runTimeTable数据
*
* @param array $task 任务
* @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行
* @return bool
*/
private function initRunTimeTableData(array $task, array $parseResult): bool
{
$runTimeTableTasks = $this->getRunTimeTable()->table;
$min = date('YmdHi');
$sec = strtotime(date('Y-m-d H:i'));
foreach ($parseResult as $time) {
$this->checkTaskQueue(false);
$key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
$runTimeTableTasks->set($key, [
'taskClass' => $task['taskClass'],
'taskMethod' => $task['taskMethod'],
'minute' => $min,
'sec' => $time + $sec,
'runStatus' => self::NORMAL
]);
}
return true;
}
CronTimerProcess 是 Swoft 的定时任务调度进程,其核心方法是 Crontab->initRunTimeTableData() 。
该进程使用了 Swoole 的定时器功能,通过 Swoole\Timer 在每分钟首秒时执行的回调, CronTimerProcess 每次被唤醒后都会遍历任务表计算出当前这一分钟内的60秒分别需要执行的任务清单,写入执行表并标记为 未执行。
//Swoft\Task\Bootstrap\Process
/**
* Crontab process
*
* @Process(name="cronExec", boot=true)
*/
class CronExecProcess implements ProcessInterface
{
/**
* @param \Swoft\Process\Process $process
*/
public function run(SwoftProcess $process)
{
$pname = App::$server->getPname();
$process->name(sprintf('%s cronexec process', $pname));
/** @var \Swoft\Task\Crontab\Crontab $cron */
$cron = App::getBean('crontab');
// Swoole/HttpServer
$server = App::$server->getServer();
$server->tick(0.5 * 1000, function () use ($cron) {
$tasks = $cron->getExecTasks();
if (!empty($tasks)) {
foreach ($tasks as $task) {
// Diliver task
Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
$cron->finishTask($task['key']);
}
}
});
}
}
CronExecProcess 作为定时任务的执行者,通过 Swoole\Timer 每 0.5s 唤醒自身一次,然后把 执行表 遍历一次,挑选当下需要执行的任务,通过 sendMessage() 投递出去并更新该 任务执行表中的状态。
该执行进程只负责任务的投递,任务的实际实际执行仍然在 Task进程 中由 TaskExecutor 处理。