图/文:迷神
之前有需要使用tp开发一个消息队列功能,用来异步处理订单,发送一些消息等。因为是使用的是Thinkphp6,消息队列我用的thinkphp官方的think-queue消息队列,结合 supervisor 进程管理使队列进程常驻。记录一下,顺便分享给大家。
安装 thinkphp-queue
composer install thinkphp-queue
存储消息环境
thinkphp-queue的消息存储环境,可以有多种形式, Redis,Database,Topthink ,Sync这四种驱动。为了速度,我们一般都使用redis,当然你也可以数据库,创建提供的表结构就行了。
我们这里使用了Redis,具体安装,大家可以使用集成系统,省事一些,具体不再多做赘述啦。
在config/queue.php中,将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动。
添加生产者
修改app/controller/Index.php里使用Queue::later或者Queue::push发布任务。
namespace app\controller;
use app\BaseController;
use think\facade\Queue;
class Index extends BaseController
{
public function index()
{
//当前任务将由哪个类来负责处理
$jobHandlerClassName = 'app\job\Job1';
//业务数据 对象需要手动转序列化
$jobData = ['ts' => time()];
//队列名称
$jobQueueName = "OrderJob";
//入队列,later延时发送,单位秒。push立即发送
$isPushed = Queue::later(2, $jobHandlerClassName, $jobData,$jobQueueName);
//$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
// database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
if( $isPushed !== false ){
echo '执行成功';
}else{
echo '执行失败';
}
//php think queue:listen --queue OrderJob 执行队列
}
}
创建消费者
编写 消费者类,用于处理 OrderJob 队列中的任务,并编写其 fire() 方法
class Job1
{
public function fire(Job $job, $data)
{
//业务处理代码,具体不贴出来了
$isJobDone = $this->jobDone($data);
//执行成功删除
if($isJobDone){
$job->delete();
print("任务已经被执行成功并且删除");
}else{
$job->release(3); //$delay为延迟时间 表示该任务延迟3秒后再执行
print("任务3s后再次被执行");
}
//通过这个方法可以检查任务重试了几次
if ($job->attempts() > 3) {
print("Job has been retried more than 3 times!");
$job->delete();
}
}
public function failed($data)
{
// ...任务达到最大重试次数后,失败了
}
//job
private function jobDone($data){
Log::write('这是数据 ' . json_encode($data));
return true;
}
出队列(消费任务)
在项目根目录执行命令
php think queue:work --queue OrderJob
supervisor的安装和配置
yum安装
# yum install epel-release
# yum install supervisor
//设置成开机自动启动
# systemctl enable supervisord
在 /var/supervisor/conf 创建一个 conf配置文件
[program:queue_worker] ;项目名称
directory = /www/wwwroot/tp6 ; 程序的启动目录,项目根目录的上一级
command = php think queue:work --queue OrderJob --daemon ; 启动命令 queueName就是队列名
process_name=%(program_name)s_%(process_num)02d
numprocs = 3 ; 开启的进程数量
autostart = true ; 在 supervisord 启动的时候也自动启动
startsecs = 5 ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true ; 程序异常退出后自动重启
startretries = 3 ; 启动失败自动重试次数,默认是 3
user = root ; 用哪个用户启动
redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes = 50MB ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 20 ; stdout 日志文件备份数
; stdout 日志文件,需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /var/supervisor/log/queue_worker.log
loglevel=info
重启下
# systemctl restart supervisord
好了,大概就这么多了。 有问题,欢迎留言。觉得不错,记得关注迷神笔记哦。