本文工具: ThinkPHP 5.1, RabbitMQ
1.安装 Erlang 环境
wget
yum install erlang-22.2-1.el7.x86_64.rpm
2.下载rabbitMQ的rpm包
wget
3.安装RabbitMQ
yum install rabbitmq-server-3.8.1-1.el7.noarch.rpm
4.启动RabbitMQ
chkconfig rabbitmq-server on #设置rabbitmq 服务为开机启动
/sbin/service rabbitmq-server start #启动
/sbin/service rabbitmq -server stop #关闭
或者
/bin/systemctl start rabbitmq-server.service #启动
/bin/systemctl start rabbitmq-server.service #关闭
5.启动UI插件
创建登录用户 rabbit mq -plugins enable rabbitmq_management #启动管理插件,下次无需再手动启动该插件
PHP
composer “yarayzw/rabbitmq” : “dev-master”,
创建worker配置文件
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( )
// +----------------------------------------------------------------------
// | Author: liu21st <liu21st@gmail.com>
// +----------------------------------------------------------------------
use core\base\BaseMqService;
use core\helper\SysHelper;
// +----------------------------------------------------------------------
// | Workerman设置 仅对 php think worker 指令有效
// +----------------------------------------------------------------------
return [
// 扩展自身需要的配置
//自动化 处理redis 3677
//sendplan 11001 - 11010
//sendmsg 11020 - 11030
'host' => '0.0.0.0', // 监听地址
'port' => 11020, // 监听端口
' root ' => '', // WEB 根目录 默认会定位public目录
'app_path' => '', // 应用目录 守护进程模式必须设置(绝对路径)
' File _monitor' => false, // 是否开启PHP文件更改监控(调试模式下自动开启)
'file_monitor_interval' => 2, // 文件监控检测时间间隔(秒)
'file_monitor_path' => [], // 文件监控目录 默认监控application和config目录
// 支持workerman的所有配置参数
'name' => 'xx-tj',
'count' => 2,
'daemonize' => false,
// 'pidFile' => App::getRuntimePath().'worker/' . 'worker.pid',
'onWorkerStart' => static function ($worker) {
switch ($worker->id) {
case 0:
//mq消费监听
BaseMqService::setConfig(SysHelper::getConf('rabbitmq.'));
BaseMqService::receive();
break;
case 1:
break;
case 2:
break;
case 3:
break;
default:
throw new Exception ('无效进程');
}
},
];
启动worker
php think worker
创建mq基础类
<?php
namespace core\base;
use app\admin\service\UserPromulgatingService;
use core\common\BaseException;
use Exception;
use hq\mq\MqSendDataStruct;
use hq\mq\MqService;
class BaseMqService extends MqService
{
protected static $appName = 'xx-tj';
protected static $config = [
'host' => ' 127.0.0.1 ',
'port' => '5672',
'user' => 'yara',
'password' => 'qwer1234',
'vhost' => 'xx',
'is_delay' => true, //是否需要开启延迟队列
'pre_exchange' => 'xx', //交换机前缀
'exchange_type' => 'topic', //默认topic类型
'exchange_key' => '',
'passive' => false, //查询某一个队列是否已存在,如果不存在,不想建立该队列
'durable' => true, //是否持久化
'auto_delete' => false, //是否自动删除
'exclusive' => false, //队列的排他性
'no_local' => false,
'no_ack' => false, //是否需不需要应答
'nowait' => false, //该方法需要应答确认
'consumer_tag' => ''
];
protected static $consumer = [
'yara' => [
'name' => 'yara',
'exchange' => 'xx.yara',
'route' => 'xx.yara.*',
'queue' => 'xx-tj.yara',
'operations' => [
['name' => 'yara', 'title' => '测试', 'queue' => 'xx-tj.yara.yaratest', 'route' => 'xx.yara.yaratest', 'class' => UserPromulgatingService::class, 'method' => 'yarayzwtest'],
]
],
];
//延迟队列
protected static $delays = [
// 'order_timeout_cancel' => ['name' => 'order_timeout_cancel', 'title' => '订单超时取消订单', 'expiry' => 30 * 60, 'class' => MqDelayOrderService::class, 'method' => 'cancel'],
// 'order_assemble_timeout_cancel' => ['name' => 'order_assemble_timeout_cancel', 'title' => '拼团订单超时退订单', 'expiry' => 24 * 60 * 60, 'class' => MqDelayOrderService::class, 'method' => 'refundAssemble'],
];
/**
* @param MqSendDataStruct $data
* @param $routeKey
* @throws Exception
*/ public static function sendEventMq(Mq send DataStruct $data, string $routeKey): void
{
if (empty($routeKey)) {
throw new BaseException('mq路由key必须填写!');
}
if (empty($data->getEvent())) {
throw new BaseException('请在事件类中定义event属性,并设置默认event_code值,参照Event类中事件对应的code!');
}
if (empty($data->getUserId())) {
throw new BaseException('请在事件类中定义user_id属性');
}
$ EventLog = [
'mq_router_key' => $routeKey,
# 'user_type' => 0,
'user_id' => $data->getUserId(),
'user_name' => $data->getUserName(),
'event_code' => $data->getEvent(),
'send_data' => json_encode($data-> toArray ()),
'create_at' => date('Y-m-d H:i:s')
];
// EventLogService::insert($eventLog);
self::send($data, $routeKey);
}
}
创建字段约定类
<?php
namespace entity\mq;
use core\base\BaseEntity;
use hq\mq\MqSendDataStruct;
class GetOrder extends MqSendDataStruct
{
use BaseEntity;
private $thirdparty;
private $channel_id;
private $start_time;
private $end_time;
/**
* @return mixed
*/ public function getThirdparty()
{
return $this->thirdparty;
}
/**
* @param mixed $thirdparty
*/ public function setThirdparty($thirdparty): void
{
$this->thirdparty = $thirdparty;
}
/**
* @return mixed
*/ public function getChannelId()
{
return $this->channel_id;
}
/**
* @param mixed $channel_id
*/ public function setChannelId($channel_id): void
{
$this->channel_id = $channel_id;
}
/**
* @return mixed
*/ public function getStartTime()
{
return $this->start_time;
}
/**
* @param mixed $start_time
*/ public function setStartTime($start_time): void
{
$this->start_time = $start_time;
}
/**
* @return mixed
*/ public function getEndTime()
{
return $this->end_time;
}
/**
* @param mixed $end_time
*/ public function setEndTime($end_time): void
{
$this->end_time = $end_time;
}
}
发送mq消息
public function setDataBy Mysql ()
{
$sendData = new GetOrder([
'channel_id' => 0,
'start_time' => 'T00:00:00+08:00',
'end_time' => 'T23:59:59+08:00',
'thirdparty' => 1,
]);
try {
BaseMqService::send($sendData, 'xx.yara.yaratest');
} catch (\Exception $e) {
FilterHelper::writeLog('dsrw_error','推广链接数据采集-error',$e->getMessage());
}
}