thinkphp6.0消息队列配置database驱动 作者:七棵菜 日期:2022-11-30 栏目:计算机分类:1 人气:234 ### 安装 thinkphp6.0消息队列安装 ``` composer install topthink/think-queue ``` 最新版本为`^3.0`,安装好之后自动生成配置文件`config/queue.php` ``` return [ 'default' => 'database', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ]; ``` 但是如果把默认驱动改成`database`的话,源码有bug,主要问题有两个 - 构造函数参数$db的类型错误 - 数据操作对象单例模式造成`where`条件不断累加 自己尝试修复这一些bug,可以跑通,修复后的文件如下 ``` <?php // +---------------------------------------------------------------------- // | ThinkPHP [ WE CAN DO IT JUST THINK IT ] // +---------------------------------------------------------------------- // | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved. // +---------------------------------------------------------------------- // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 ) // +---------------------------------------------------------------------- // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- namespace think\queue\connector; use Carbon\Carbon; use stdClass; use think\Db; use think\db\ConnectionInterface; use think\db\Query; use think\queue\Connector; use think\queue\InteractsWithTime; use think\queue\job\Database as DatabaseJob; class Database extends Connector { use InteractsWithTime; protected $db; /** * The database table that holds the jobs. * * @var string */ protected $table; /** * The name of the default queue. * * @var string */ protected $default; /** * The expiration time of a job. * * @var int|null */ protected $retryAfter = 60; public function __construct(Query $db, $table, $default = 'default', $retryAfter = 60) { $this->db = $db; $this->table = $table; $this->default = $default; $this->retryAfter = $retryAfter; } public static function __make(Db $db, $config) { $connection = $db->connect($config['connection'] ?? null); return new self($connection, $config['table'], $config['queue'], $config['retry_after'] ?? 60); } public function size($queue = null) { $this->db->name($this->table) ->where('queue', $this->getQueue($queue)) ->count(); } public function push($job, $data = '', $queue = null) { return $this->pushToDatabase($queue, $this->createPayload($job, $data)); } public function pushRaw($payload, $queue = null, array $options = []) { return $this->pushToDatabase($queue, $payload); } public function later($delay, $job, $data = '', $queue = null) { return $this->pushToDatabase($queue, $this->createPayload($job, $data), $delay); } public function bulk($jobs, $data = '', $queue = null) { $queue = $this->getQueue($queue); $availableAt = $this->availableAt(); return $this->db->name($this->table)->insertAll(collect((array) $jobs)->map( function ($job) use ($queue, $data, $availableAt) { return [ 'queue' => $queue, 'attempts' => 0, 'reserve_time' => null, 'available_time' => $availableAt, 'create_time' => $this->currentTime(), 'payload' => $this->createPayload($job, $data), ]; } )->all()); } /** * 重新发布任务 * * @param string $queue * @param StdClass $job * @param int $delay * @return mixed */ public function release($queue, $job, $delay) { return $this->pushToDatabase($queue, $job->payload, $delay, $job->attempts); } /** * Push a raw payload to the database with a given delay. * * @param \DateTime|int $delay * @param string|null $queue * @param string $payload * @param int $attempts * @return mixed */ protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0) { return $this->db->name($this->table)->insertGetId([ 'queue' => $this->getQueue($queue), 'attempts' => $attempts, 'reserve_time' => null, 'available_time' => $this->availableAt($delay), 'create_time' => $this->currentTime(), 'payload' => $payload, ]); } public function pop($queue = null) { $queue = $this->getQueue($queue); return $this->db->transaction(function () use ($queue) { if ($job = $this->getNextAvailableJob($queue)) { $job = $this->markJobAsReserved($job); return new DatabaseJob($this->app, $this, $job, $this->connection, $queue); } }); } /** * 获取下个有效任务 * * @param string|null $queue * @return StdClass|null */ protected function getNextAvailableJob($queue) { $db = clone $this->db; $job = $db->name($this->table) ->lock(true) ->where('queue', $this->getQueue($queue)) ->where(function (Query $query) { $query->where(function (Query $query) { $query->whereNull('reserve_time') ->where('available_time', '<=', $this->currentTime()); }); //超时任务重试 $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp(); $query->whereOr(function (Query $query) use ($expiration) { $query->where('reserve_time', '<=', $expiration); }); }) ->order('id', 'asc') ->find(); return $job ? (object) $job : null; } /** * 标记任务正在执行. * * @param stdClass $job * @return stdClass */ protected function markJobAsReserved($job) { $db = clone $this->db; $db->name($this->table) ->where('id', $job->id) ->update([ 'reserve_time' => $job->reserve_time = $this->currentTime(), 'attempts' => ++$job->attempts, ]); return $job; } /** * 删除任务 * * @param string $id * @return void */ public function deleteReserved($id) { $this->db->transaction(function () use ($id) { $db = clone $this->db; $db->name($this->table)->where('id', $id)->delete(); }); } protected function getQueue($queue) { return $queue ?: $this->default; } } ``` ### 鸣谢 - [github](https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md) 标签: thinkphp6.0 消息队列 think-queue 上一篇:如何使用mvn命令导入依赖 下一篇:apache 清除并且重新配置 X-FRAME-OPTIONS 随便看看 2024-02-19 PHP7 运算符“??” 和“?:”的区别 2022-11-30 Linux 后台运行命令 2022-11-25 关于我们 2022-11-30 centos一键系统安装lnmp集成环境 2022-11-30 linux 生成 ssh 公钥 留言