您的位置 首页 php

laravel8中laravel-swoole的扩展不兼容消息队列怎么办?

下面由laravel教程栏目给大家介绍laravel-swoole消息队列,希望对需要的朋友有所帮助!

这段时间用laravel8+laravel-swoole做项目,可发现laravel-swoole的扩展不兼容消息队列;

思来想去这咋办呢,这咋办呢.咋办那就自己写咯!还好thinkphp-swoole扩展已经兼容了,那不就嘿嘿嘿!

直接上修改的思路和代码!开干!

一种是增加另外启动的命令或者在swoole启动的时候一起启动消息队列进行消费,我这么懒的人一个命令能解决的,绝不写两命令.

首先重写swoole启动命令

<?phpnamespace crmeb\swoole\command;use Illuminate\Support\Arr;use Swoole\Process;use SwooleTW\Http\Server\Facades\Server;use SwooleTW\Http\Server\Manager;use crmeb\swoole\server\InteractsWithQueue;use crmeb\swoole\server\FileWatcher;use Swoole\Runtime;class HttpServerCommand extends \SwooleTW\Http\Commands\HttpServerCommand{    use InteractsWithQueue;    /**     * The name and signature of the console command.     *     * @var string     */    protected $signature = 'crmeb:http {action : start|stop|restart|reload|infos}';    /**     * Run swoole_http_server.     */    protected function start()    {        if ($this->isRunning()) {            $this->error('Failed! swoole_http_server process is already running.');            return;        }        $host             = Arr::get($this->config, 'server.host');        $port             = Arr::get($this->config, 'server.port');        $hotReloadEnabled = Arr::get($this->config, 'hot_reload.enabled');        $queueEnabled     = Arr::get($this->config, 'queue.enabled');        $accessLogEnabled = Arr::get($this->config, 'server.access_log');        $coroutineEnable  = Arr::get($this->config, 'coroutine.enable');        $this->info('Starting swoole http server...');        $this->info("Swoole http server started: <http://{$host}:{$port}>");        if ($this->isDaemon()) {            $this->info(                '> (You can run this command to ensure the ' .                'swoole_http_server process is running: ps aux|grep "swoole")'            );        }        $manager = $this->laravel->make(Manager::class);        $server  = $this->laravel->make(Server::class);        if ($accessLogEnabled) {            $this->registerAccessLog();        }        //热更新重写        if ($hotReloadEnabled) {            $manager->addProcess($this->getHotReloadProcessNow($server));        }        //启动消息队列进行消费        if ($queueEnabled) {            $this->prepareQueue($manager);        }        if ($coroutineEnable) {            Runtime::enableCoroutine(true, Arr::get($this->config, 'coroutine.flags', SWOOLE_HOOK_ALL));        }        $manager->run();    }    /**     * @param Server $server     * @return Process|void     */    protected function getHotReloadProcessNow($server)    {        return new Process(function () use ($server) {            $watcher = new FileWatcher(                Arr::get($this->config, 'hot_reload.include', []),                Arr::get($this->config, 'hot_reload.exclude', []),                Arr::get($this->config, 'hot_reload.name', [])            );            $watcher->watch(function () use ($server) {                $server->reload();            });        }, false, 0, true);    }}

InteractsWithQueue 类

<?phpnamespace crmeb\swoole\server;use crmeb\swoole\queue\Manager as QueueManager;use SwooleTW\Http\Server\Manager;/** * Trait InteractsWithQueue * @package crmeb\swoole\server */trait InteractsWithQueue{    public function prepareQueue(Manager $manager)    {        /** @var QueueManager $queueManager */        $queueManager = $this->laravel->make(QueueManager::class);        $queueManager->attachToServer($manager, $this->output);    }}

Manager类

<?phpnamespace crmeb\swoole\queue;use Illuminate\Contracts\Container\Container;use Swoole\Constant;use Swoole\Process;use Swoole\Process\Pool;use Swoole\Timer;use Illuminate\Support\Arr;use Illuminate\Queue\Events\JobFailed;use Illuminate\Queue\Worker;use crmeb\swoole\server\WithContainer;use Illuminate\Queue\Jobs\Job;use function Swoole\Coroutine\run;use Illuminate\Queue\WorkerOptions;use SwooleTW\Http\Server\Manager as ServerManager;use Illuminate\Console\OutputStyle;class Manager{    use WithContainer;    /**     * Container.     *     * @var \Illuminate\Contracts\Container\Container     */    protected $container;    /**     * @var OutputStyle     */    protected $output;    /**     * @var Closure[]     */    protected $workers = [];    /**     * Manager constructor.     * @param Container $container     */    public function __construct(Container $container)    {        $this->container = $container;    }    /**     * @param ServerManager $server     */    public function attachToServer(ServerManager $server, OutputStyle $output)    {        $this->output = $output;        $this->listenForEvents();        $this->createWorkers();        foreach ($this->workers as $worker) {            $server->addProcess(new Process($worker, false, 0, true));        }    }    /**     * 运行消息队列命令     */    public function run(): void    {        @cli_set_process_title("swoole queue: manager process");        $this->listenForEvents();        $this->createWorkers();        $pool = new Pool(count($this->workers));        $pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, int $workerId) {            $process = $pool->getProcess($workerId);            run($this->workers[$workerId], $process);        });        $pool->start();    }    /**     * 创建执行任务     */    protected function createWorkers()    {        $workers = $this->getConfig('queue.workers', []);        foreach ($workers as $queue => $options) {            if (strpos($queue, '@') !== false) {                [$queue, $connection] = explode('@', $queue);            } else {                $connection = null;            }            $this->workers[] = function (Process $process) use ($options, $connection, $queue) {                @cli_set_process_title("swoole queue: worker process");                /** @var Worker $worker */                $worker = $this->container->make('queue.worker');                /** @var WorkerOptions $option */                $option = $this->container->make(WorkerOptions::class);                $option->sleep = Arr::get($options, "sleep", 3);                $option->maxTries = Arr::get($options, "tries", 0);                $option->timeout = Arr::get($options, "timeout", 60);                $timer = Timer::after($option->timeout * 1000, function () use ($process) {                    $process->exit();                });                $worker->runNextJob($connection, $queue, $option);                Timer::clear($timer);            };        }    }    /**     * 注册事件     */    protected function listenForEvents()    {        $this->container->make('events')->listen(JobFailed::class, function (JobFailed $event) {            $this->writeOutput($event->job);            $this->logFailedJob($event);        });    }    /**     * 记录失败任务     * @param JobFailed $event     */    protected function logFailedJob(JobFailed $event)    {        $this->container['queue.failer']->log(            $event->connection,            $event->job->getQueue(),            $event->job->getRawBody(),            $event->exception        );    }    /**     * Write the status output for the queue worker.     *     * @param Job $job     * @param     $status     */    protected function writeOutput(Job $job, $status)    {        switch ($status) {            case 'starting':                $this->writeStatus($job, 'Processing', 'comment');                break;            case 'success':                $this->writeStatus($job, 'Processed', 'info');                break;            case 'failed':                $this->writeStatus($job, 'Failed', 'error');                break;        }    }    /**     * Format the status output for the queue worker.     *     * @param Job $job     * @param string $status     * @param string $type     * @return void     */    protected function writeStatus(Job $job, $status, $type)    {        $this->output->writeln(sprintf(            "<{$type}>[%s][%s] %s</{$type}> %s",            date('Y-m-d H:i:s'),            $job->getJobId(),            str_pad("{$status}:", 11), $job->getName()        ));    }}

增加CrmebServiceProvider类

<?phpnamespace crmeb\swoole;use Illuminate\Contracts\Debug\ExceptionHandler;use Illuminate\Contracts\Http\Kernel;use crmeb\swoole\command\HttpServerCommand;use Illuminate\Queue\Worker;use SwooleTW\Http\HttpServiceProvider;use SwooleTW\Http\Middleware\AccessLog;use SwooleTW\Http\Server\Manager;/** * Class CrmebServiceProvider * @package crmeb\swoole */class CrmebServiceProvider extends HttpServiceProvider{    /**     * Register manager.     *     * @return void     */    protected function registerManager()    {        $this->app->singleton(Manager::class, function ($app) {            return new Manager($app, 'laravel');        });        $this->app->alias(Manager::class, 'swoole.manager');        $this->app->singleton('queue.worker', function ($app) {            $isDownForMaintenance = function () {                return $this->app->isDownForMaintenance();            };            return new Worker(                $app['queue'],                $app['events'],                $app[ExceptionHandler::class],                $isDownForMaintenance            );        });    }    /**     * Boot websocket routes.     *     * @return void     */    protected function bootWebsocketRoutes()    {        require base_path('vendor/swooletw/laravel-swoole') . '/routes/laravel_routes.php';    }    /**     * Register access log middleware to container.     *     * @return void     */    protected function pushAccessLogMiddleware()    {        $this->app->make(Kernel::class)->pushMiddleware(AccessLog::class);    }    /**     * Register commands.     */    protected function registerCommands()    {        $this->commands([            HttpServerCommand::class,        ]);    }    /**     * Merge configurations.     */    protected function mergeConfigs()    {        $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php', 'swoole_http');        $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php', 'swoole_websocket');    }    /**     * Publish files of this package.     */    protected function publishFiles()    {        $this->publishes([            base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php' => base_path('config/swoole_http.php'),            base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php' => base_path('config/swoole_websocket.php'),            base_path('vendor/swooletw/laravel-swoole') . '/routes/websocket.php' => base_path('routes/websocket.php'),        ], 'laravel-swoole');    }}

然后再把\crmeb\swoole\CrmebServiceProvider::class放入config/app.php中的providers中加载重写了swoole的命令启动方式

配置config/swoole_http.php

return [    'queue'        => [        //是否开启自动消费队列        'enabled' => true,        'workers' => [            //队列名称            'CRMEB' => []        ]    ],];

输入命令:
php artisan crmeb:http restart

swoole启动后就可以自动消费队列了。

相关推荐:最新的五个Laravel视频教程

文章来源:智云一二三科技

文章标题:laravel8中laravel-swoole的扩展不兼容消息队列怎么办?

文章地址:https://www.zhihuclub.com/25134.shtml

关于作者: 智云科技

热门文章

网站地图