think-queue消息队列ThinkPHP5.0

  • 3,588

小白强烈建议参考

一直使用swoole workman 但是一直纠结think-queue在实现消息队列上面有何区别

https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md
https://www.jianshu.com/p/d5e95c7ae526

安装think-queue tp5.0安装v2.*

composer require topthink/think-queue=2.0.*

配置消息队列的驱动

根据选择的存储方式,在 \application\config\queue.php 这个配置文件中,添加消息队列对应的驱动配置

<?php
return [
       // 'connector'  => 'Redis',      // Redis 驱动
       // 'expire'     => 60,       // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
       // 'default'    => 'default',        // 默认的队列名称
       // 'host'       => '127.0.0.1',  // redis 主机ip
       // 'port'       => 6379,     // redis 端口
       // 'password'   => '',       // redis 密码
       // 'select'     => 0,        // 使用哪一个 db,默认为 db0
       // 'timeout'    => 0,        // redis连接的超时时间
       // 'persistent' => false,        // 是否是长连接

      'connector' => 'Database',   // 数据库驱动
      'expire'    => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
      'default'   => 'default',    // 默认的队列名称
      'table'     => 'prefix_jobs',       // 存储消息的表名,不带前缀
      'dsn'       => [],

   //    'connector'   => 'Topthink',   // ThinkPHP内部的队列通知服务平台 ,本文不作介绍
   //    'token'       => '',
   //    'project_id'  => '',
   //    'protocol'    => 'https',
   //    'host'        => 'qns.topthink.com',
   //    'port'        => 443,
   //    'api_version' => 1,
   //    'max_retries' => 3,
   //    'default'     => 'default',

   //    'connector'   => 'Sync',       // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行
   ];


数据库[不推荐] 使用Redis[推荐]

CREATE TABLE `prefix_jobs` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `queue` varchar(255) NOT NULL,
  `payload` longtext NOT NULL,
  `attempts` tinyint(3) unsigned NOT NULL,
  `reserved` tinyint(3) unsigned NOT NULL,
  `reserved_at` int(10) unsigned DEFAULT NULL,
  `available_at` int(10) unsigned NOT NULL,
  `created_at` int(10) unsigned NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

app\index\controller\JobTest

在 \application\index\controller\JobTest.php 控制器中,添加 multiTask()方法:

<?php
namespace app\index\controller;
use think\Exception;
use think\Queue;
class Jobtest
{
    public function actionWithMultiTask(){

  $taskType = $_GET['taskType'];
    switch ($taskType) {
       case 'taskA':
           $jobHandlerClassName  = 'app\index\job\MultiTask@taskA';
           $jobDataArr = ['a'   => '1'];
           $jobQueueName = "multiTaskJobQueue"; 
           break;
       case 'taskB':
           $jobHandlerClassName  = 'app\index\job\MultiTask@taskB';
           $jobDataArr = ['b'   => '2'];
           $jobQueueName = "multiTaskJobQueue";     
           break;
        default:
           break;
   }

  $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
  if ($isPushed !== false) {
    echo(time()."the $taskType 任务加入到 ".$jobQueueName ."<br>");
  }else{
    throw new Exception("push a new $taskType of MultiTask Job Failed!");
  }
}
}

消息的消费与删除

新增 \application\index\job\MultiTask.php 消费者类

<?php
/**
 * 文件路径: \application\index\job\MultiTask.php
 * 这是一个消费者类,用于处理 multiTaskJobQueue 队列中的任务
 */
namespace app\index\job;

use think\queue\Job;

class MultiTask {

    public function taskA(Job $job,$data){

        $isJobDone = $this->_doTaskA($data);

        if ($isJobDone) {
            $job->delete();
            print("Info: TaskA of Job MultiTask has been done and deleted"."\n");
        }else{
            if ($job->attempts() > 3) {
                $job->delete();     
            }
        }
    }

    public function taskB(Job $job,$data){

        $isJobDone = $this->_doTaskA($data);

        if ($isJobDone) {
            $job->delete();
            print("Info: TaskB of Job MultiTask has been done and deleted"."\n");
        }else{
            if ($job->attempts() > 2) {
                $job->release();    
            }
        }
    }

    private function _doTaskA($data) {
        print("Info: doing TaskA of Job MultiTask "."\n");
        return true;
    }

    private function _doTaskB($data) {
        print("Info: doing TaskB of Job MultiTask "."\n");
        return true;
    }
}

php think queue:work --daemon --queue multiTaskJobQueue

代码出问题也是不会执行的监听模式查看是否报错:php think queue:listen --daemon --queue multiTaskJobQueue

tp5 
// 开启断线重连
'break_reconnect' => true,
// TP6是否需要断线重连
 'break_reconnect'   => false, //多任务防止数据集断开报错【筋斗云网络】
如果发现任务重复执行结果任务始终不删除  请检查任务类是否一致

weinxin
我的微信
这是我的微信扫一扫
开拓者博主
  • 本文由 发表于 2020年3月18日17:09:52
  • 转载请务必保留本文链接:https://www.150643.com/543.html
匿名

发表评论

匿名网友 填写信息

评论:1   其中:访客  0   博主  1
    • 备忘 备忘

      tp5
      // 开启断线重连
      ‘break_reconnect’ => true,
      // TP6是否需要断线重连
      ‘break_reconnect’ => false, //多任务防止数据集断开报错【筋斗云网络】
      如果发现任务重复执行结果任务始终不删除 请检查任务类是否一致