小白强烈建议参考
一直使用swoole workman 但是一直纠结think-queue在实现消息队列上面有何区别
1 2 3 |
https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md https://www.jianshu.com/p/d5e95c7ae526 |
安装think-queue tp5.0安装v2.*
1 2 |
composer require topthink/think-queue=2.0.* |
配置消息队列的驱动
根据选择的存储方式,在 \application\config\queue.php 这个配置文件中,添加消息队列对应的驱动配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
<?php return [ // 'connector' => 'Redis', // Redis 驱动 // 'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null // 'default' => 'default', // 默认的队列名称 // 'host' => '127.0.0.1', // redis 主机ip // 'port' => 6379, // redis 端口 // 'password' => '', // redis 密码 // 'select' => 0, // 使用哪一个 db,默认为 db0 // 'timeout' => 0, // redis连接的超时时间 // 'persistent' => false, // 是否是长连接 'connector' => 'Database', // 数据库驱动 'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 'default' => 'default', // 默认的队列名称 'table' => 'prefix_jobs', // 存储消息的表名,不带前缀 'dsn' => [], // 'connector' => 'Topthink', // ThinkPHP内部的队列通知服务平台 ,本文不作介绍 // 'token' => '', // 'project_id' => '', // 'protocol' => 'https', // 'host' => 'qns.topthink.com', // 'port' => 443, // 'api_version' => 1, // 'max_retries' => 3, // 'default' => 'default', // 'connector' => 'Sync', // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行 ]; |
数据库[不推荐] 使用Redis[推荐]
1 2 3 4 5 6 7 8 9 10 11 12 13 |
CREATE TABLE `prefix_jobs` ( `id` int(11) NOT NULL AUTO_INCREMENT, `queue` varchar(255) NOT NULL, `payload` longtext NOT NULL, `attempts` tinyint(3) unsigned NOT NULL, `reserved` tinyint(3) unsigned NOT NULL, `reserved_at` int(10) unsigned DEFAULT NULL, `available_at` int(10) unsigned NOT NULL, `created_at` int(10) unsigned NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; |
app\index\controller\JobTest
在 \application\index\controller\JobTest.php 控制器中,添加 multiTask()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
<?php namespace app\index\controller; use think\Exception; use think\Queue; class Jobtest { public function actionWithMultiTask(){ $taskType = $_GET['taskType']; switch ($taskType) { case 'taskA': $jobHandlerClassName = 'app\index\job\MultiTask@taskA'; $jobDataArr = ['a' => '1']; $jobQueueName = "multiTaskJobQueue"; break; case 'taskB': $jobHandlerClassName = 'app\index\job\MultiTask@taskB'; $jobDataArr = ['b' => '2']; $jobQueueName = "multiTaskJobQueue"; break; default: break; } $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName); if ($isPushed !== false) { echo(time()."the $taskType 任务加入到 ".$jobQueueName ."<br>"); }else{ throw new Exception("push a new $taskType of MultiTask Job Failed!"); } } } |
消息的消费与删除
新增 \application\index\job\MultiTask.php 消费者类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
<?php /** * 文件路径: \application\index\job\MultiTask.php * 这是一个消费者类,用于处理 multiTaskJobQueue 队列中的任务 */ namespace app\index\job; use think\queue\Job; class MultiTask { public function taskA(Job $job,$data){ $isJobDone = $this->_doTaskA($data); if ($isJobDone) { $job->delete(); print("Info: TaskA of Job MultiTask has been done and deleted"."\n"); }else{ if ($job->attempts() > 3) { $job->delete(); } } } public function taskB(Job $job,$data){ $isJobDone = $this->_doTaskA($data); if ($isJobDone) { $job->delete(); print("Info: TaskB of Job MultiTask has been done and deleted"."\n"); }else{ if ($job->attempts() > 2) { $job->release(); } } } private function _doTaskA($data) { print("Info: doing TaskA of Job MultiTask "."\n"); return true; } private function _doTaskB($data) { print("Info: doing TaskB of Job MultiTask "."\n"); return true; } } |
php think queue:work --daemon --queue multiTaskJobQueue
代码出问题也是不会执行的监听模式查看是否报错:php think queue:listen --daemon --queue multiTaskJobQueue
1 2 3 4 5 6 7 8 |
tp5 // 开启断线重连 'break_reconnect' => true, // TP6是否需要断线重连 'break_reconnect' => false, //多任务防止数据集断开报错【筋斗云网络】 如果发现任务重复执行结果任务始终不删除 请检查任务类是否一致 |

我的微信
把最实用的经验,分享给最需要的读者,希望每一位来访的朋友都能有所收获!
2020年4月16日 上午11:16 1F
tp5
// 开启断线重连
‘break_reconnect’ => true,
// TP6是否需要断线重连
‘break_reconnect’ => false, //多任务防止数据集断开报错【筋斗云网络】
如果发现任务重复执行结果任务始终不删除 请检查任务类是否一致