您的位置 首页 php

ThinkPHP6+Supervisor实现进程常驻消息队列

图/文:迷神

之前有需要使用tp开发一个消息队列功能,用来异步处理订单,发送一些消息等。因为是使用的是Thinkphp6,消息队列我用的thinkphp官方的think-queue消息队列,结合 supervisor 进程管理使队列进程常驻。记录一下,顺便分享给大家。

安装 thinkphp-queue

 composer install thinkphp-queue  

存储消息环境

thinkphp-queue的消息存储环境,可以有多种形式, Redis,Database,Topthink ,Sync这四种驱动。为了速度,我们一般都使用redis,当然你也可以数据库,创建提供的表结构就行了。

我们这里使用了Redis,具体安装,大家可以使用集成系统,省事一些,具体不再多做赘述啦。

在config/queue.php中,将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动。

配置如下

添加生产者

修改app/controller/Index.php里使用Queue::later或者Queue::push发布任务。

 namespace app\controller;

use app\BaseController;
use think\facade\Queue;

class Index extends BaseController
{
    public function index()
    {
        //当前任务将由哪个类来负责处理
        $jobHandlerClassName = 'app\job\Job1';
        //业务数据 对象需要手动转序列化
        $jobData = ['ts' => time()];
        //队列名称
        $jobQueueName = "OrderJob";
        //入队列,later延时发送,单位秒。push立即发送
        $isPushed = Queue::later(2, $jobHandlerClassName, $jobData,$jobQueueName);
        //$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
        // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
        if( $isPushed !== false ){
            echo '执行成功';
        }else{
            echo '执行失败';
        }
        //php think queue:listen --queue OrderJob  执行队列
    }
}  

创建消费者

编写 消费者类,用于处理 OrderJob 队列中的任务,并编写其 fire() 方法

 class Job1
{
    public function fire(Job $job, $data)
    {
        //业务处理代码,具体不贴出来了
        $isJobDone = $this->jobDone($data);
        //执行成功删除
        if($isJobDone){
            $job->delete();
            print("任务已经被执行成功并且删除");
        }else{
            $job->release(3); //$delay为延迟时间 表示该任务延迟3秒后再执行
            print("任务3s后再次被执行");
        }
        //通过这个方法可以检查任务重试了几次
        if ($job->attempts() > 3) {
            print("Job has been retried more than 3 times!");
            $job->delete();
        }
    }
 
    public function failed($data)
    {
        // ...任务达到最大重试次数后,失败了
    }
    //job
    private function jobDone($data){
        Log::write('这是数据 ' . json_encode($data));
        return true;
    }  

出队列(消费任务)

在项目根目录执行命令

 php think queue:work --queue OrderJob  

supervisor的安装和配置

yum安装

 # yum install epel-release
# yum install supervisor
//设置成开机自动启动
# systemctl enable supervisord  

/var/supervisor/conf 创建一个 conf配置文件

 [program:queue_worker] ;项目名称
directory = /www/wwwroot/tp6 ; 程序的启动目录,项目根目录的上一级
command = php think queue:work --queue OrderJob --daemon ; 启动命令 queueName就是队列名
process_name=%(program_name)s_%(process_num)02d
numprocs = 3         ; 开启的进程数量
autostart = true     ; 在 supervisord 启动的时候也自动启动
startsecs = 5        ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true   ; 程序异常退出后自动重启
startretries = 3     ; 启动失败自动重试次数,默认是 3
user = root          ; 用哪个用户启动
redirect_stderr = true  ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes = 50MB  ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 20     ; stdout 日志文件备份数
; stdout 日志文件,需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /var/supervisor/log/queue_worker.log
loglevel=info  

重启下

 # systemctl restart supervisord  

好了,大概就这么多了。 有问题,欢迎留言。觉得不错,记得关注迷神笔记哦。

文章来源:智云一二三科技

文章标题:ThinkPHP6+Supervisor实现进程常驻消息队列

文章地址:https://www.zhihuclub.com/151806.shtml

关于作者: 智云科技

热门文章

网站地图