Deprecated: htmlspecialchars(): Passing null to parameter #1 ($string) of type string is deprecated in /webData/hua/www.ascwh.com/var/Typecho/Feed.php on line 221
ASCWH - 队列 https://www.ascwh.com/tag/%E9%98%9F%E5%88%97/ 轻量级队列 Beanstalkd https://www.ascwh.com/404.html 2023-06-19T00:25:33+08:00 一:介绍Beanstalkd 是一个个简单、快速、轻量级的内存型队列。它是典型的类Memcached设计,协议和使用方式都是同样风格。github:https://github.com/beanstalkd官网:https://beanstalkd.github.io/安装在 Centos7 上通过命令 yum -y install beanstalkd --enablerepo=epel;其他系统的安装在 官网 上查看查看当前版本号 beanstalkd -v 启动常见启动如下:beanstalkd -l 0.0.0.0 -p 11300 -b /home/software/binstalkd/binlogs启动后对 beanstalkd 的操作可以使用 telnet,比如 telnet 127.0.0.1 11300。然后便可以执行 beanstalkd 的各命令,如 stats 查看信息,use, put, watch 等等。二:功能特性2.1 优先级任务job可以有0~2^32 个优先级, 0 代表最高优先级,默认优先级为1024。2.2 延迟 delay比如说多长时间后执行某个任务2.3 持久化可以通过binlog将job及其状态记录到文件里面,在Beanstalkd下次启动时可以通过读取binlog来恢复之前的job及状态。2.4 超时控制为了防止某个consumer长时间占用任务但不能处理的情况,Beanstalkd为reserve操作设置了timeout时间,如果该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其他consumer执行。2.5 分布式容错因为它是类Memcached设计,beanstalkd各个server之间并不知道彼此的存在,都是通过client来实现分布式以及根据tube名称去特定server获取job。三:使用场景用作延时队列:比如可以用于如果用户30分钟内不操作,任务关闭。用作定时任务:比如可以用于专门的后台任务。用作异步操作:这是所有消息队列都最常用的,先将任务仍进去,顺序执行。用作循环队列:用release命令可以循环执行任务,比如可以做负载均衡任务分发。用作兜底机制:比如一个请求有失败的概率,可以用Beanstalk不断重试,设定超时时间,时间内尝试到成功为止。其实我们最重要的使用场景是把它作为延迟队列类使用,比如:下订单后多长时间没有付款,要取消订单,并退库存用户注册成功后,发送一封邮件定期检查退款状态的订单是否退款成功四:Beanstalkd设计基本概念4.1 核心概念#job:一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中tube:一个有名的队列,用来存储统一类型的job,是producer和consumer操作的对象。tube可以称为管道producer:Job 的生产者,通过 put 命令来将一个 job 放到一个 tube 中consumer:Job的消费者,通过 reserve/release/bury/delete 命令来获取 job 或改变 job 的状态4.2 job的生命周期#生产者生成任务,并根据业务需求将任务放到不同的管道中。比如与注册有关的任务放到注册管道中,和订单有关的任务放到订单管道中。任务进入管道到离开管道一共有5个状态 :(ready,delayed,reserved,buried,delete)READY - 需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;DELAYED - 延迟执行的任务, 当消费者处理任务后, 可以用将消息再次放回 DELAYED 队列延迟执行;RESERVED - 已经被消费者获取, 正在执行的任务。Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;BURIED - 保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;DELETED - 消息被彻底删除。Beanstalkd 不再维持这些消息。状态流程#1.生产任务:当producer生产者put一个job到tube时,这个job就处于 ready 状态,等待consumer来消费处理; producer生产者也可以选择延迟put一个job,这时job就先达到 delayed 状态(比如设置一个5秒延迟的job,那么5秒之后,这个job才会变成 ready 状态,才可以被consumer消费) 消费任务:consumer获取了当前 ready 的job后,该job就会迁移到 reserved 状态,这样其他的consumer就不能在操作该job消费完任务后:当consumer消费完该job后,可以选择delete, release 或者 bury 3种操作。delete操作:job从系统消亡,之后不能在获取;release操作:可以重新把该job状态迁移回 ready (也可以延迟状态 delayed 操作),使其他的consumer可以继 续获取和执行该job;bury操作: 把job置为 buried 状态,及是把该job休眠,等到需要的时候,还可以将休眠的 job 重新置为 ready 状态, 也可以delete掉 buried 状态的job。也就是说:当消费者处理完任务后,任务的状态可能是delete(删除,处理成功),可能是buried(预留,意味着先把任务放一边,等待条件成熟还要用),可能是ready,也可能是delayed,需要根据具体业务场景自己进行判断定义五:例子使用的PHP库:https://github.com/pheanstalk/pheanstalk直接用composer进行安装composer require pda/pheanstalk5.1 生产者和消费者例子**生产者: producer.php**require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = Pheanstalk::create('192.168.1.100', 11301); $tubeName = 'use_email_list'; $jobData = array( 'uid' => time(), 'email' => '12917@qq.com', 'message' => 'hello beanstalkd', 'dtime' => date('Y-m-d H:i:s'), ); $pheanstalk->useTube($tubeName)->put(json_encode($jobData)); echo json_encode($jobData);运行程序: php ./producer.php消费者:consumer.php<?php require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = Pheanstalk::create('192.168.1.109', 11301); $tubeName = 'use_email_list'; while(true) { //获取队列信息,reserve 阻塞获取 $job = $pheanstalk->watch($tubeName)->ignore('default')->reserve(); $data = $job->getData(); //执行相关逻辑 $result = file_put_contents('./send_email.log', $data, FILE_APPEND | LOCK_EX); if ($result) { echo 'success :'.$data.PHP_EOF; $pheanstalk->delete($job); } //暂停(不可能是百分百的准确,跟系统的调度、CPU时钟周期等有一定关系) usleep(500000); } echo 'Success !';运行程序:php ./consumer.php5.2 监控例子monitor.php//监控服务状态 require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = Pheanstalk::create('192.168.1.109', 11301); //可以开发监控面板,监控数据的,有多少tube,多少队列,多少延迟等等 //查看beanstalkd状态 var_export($pheanstalk->stats()); //查看有多少个tube var_export($pheanstalk->listTubes()); //设置要监听的tube $pheanstalk->watch('use_email_list'); //取消对默认tube的监听,可以省略 $pheanstalk->ignore('default'); //查看监听的tube列表 var_export($pheanstalk->listTubesWatched()); //查看use_email_list的tube当前的状态 var_export($pheanstalk->statsTube('use_email_list')); //单个job信息 // var_export($pheanstalk->statsJob($job));六: pheanstalk一些方法说明6.1 整个 beanstalkd 当前状态信息#var_export($pheanstalk->stats()) //beanstalkd 当前状态信息 //output: Pheanstalk\Response\ArrayResponse::__set_state(array( 'current-jobs-urgent' => '0', // 优先级小于1024状态为ready的job数量 'current-jobs-ready' => '0', // 状态为ready的job数量 'current-jobs-reserved' => '0', // 状态为reserved的job数量 'current-jobs-delayed' => '0', // 状态为delayed的job数量 'current-jobs-buried' => '0', // 状态为buried的job数量 'cmd-put' => '0', // 总共执行put指令的次数 'cmd-peek' => '0', // 总共执行peek指令的次数 'cmd-peek-ready' => '0', // 总共执行peek-ready指令的次数 'cmd-peek-delayed' => '0', // 总共执行peek-delayed指令的次数 'cmd-peek-buried' => '0', // 总共执行peek-buried指令的次数 'cmd-reserve' => '0', // 总共执行reserve指令的次数 'cmd-reserve-with-timeout' => '0', 'cmd-delete' => '0', 'cmd-release' => '0', 'cmd-use' => '0', // 总共执行use指令的次数 'cmd-watch' => '0', // 总共执行watch指令的次数 'cmd-ignore' => '0', 'cmd-bury' => '0', 'cmd-kick' => '0', 'cmd-touch' => '0', 'cmd-stats' => '2', 'cmd-stats-job' => '0', 'cmd-stats-tube' => '0', 'cmd-list-tubes' => '0', 'cmd-list-tube-used' => '0', 'cmd-list-tubes-watched' => '0', 'cmd-pause-tube' => '0', 'job-timeouts' => '0', // 所有超时的job的总共数量 'total-jobs' => '0', // 创建的所有job数量 'max-job-size' => '65535', // job的数据部分最大长度 'current-tubes' => '1', // 当前存在的tube数量 'current-connections' => '1', // 当前打开的连接数 'current-producers' => '0', // 当前所有的打开的连接中至少执行一次put指令的连接数量 'current-workers' => '0', // 当前所有的打开的连接中至少执行一次reserve指令的连接数量 'current-waiting' => '0', // 当前所有的打开的连接中执行reserve指令但是未响应的连接数量 'total-connections' => '11', // 总共处理的连接数 'pid' => '4839', // 服务器进程的id 'version' => '1.10', // 服务器版本号 'rusage-utime' => '0.000000', // 进程总共占用的用户CPU时间 'rusage-stime' => '0.001478', // 进程总共占用的系统CPU时间 'uptime' => '12031', // 服务器进程运行的秒数 'binlog-oldest-index' => '2', // 开始储存jobs的binlog索引号 'binlog-current-index' => '2', // 当前储存jobs的binlog索引号 'binlog-records-migrated' => '0', 'binlog-records-written' => '0', // 累积写入的记录数 'binlog-max-size' => '10485760', // binlog的最大容量 'id' => '4b005307e8af5b37', // 一个随机字符串,在beanstalkd进程启动时产生 'hostname' => 'xing', ))6.2 单个job任务的信息:#//单个job信息 var_export($pheanstalk->statsJob($job1)); 'id' => '1', // job id 'tube' => 'use_email_list', // job 所在的管道 'state' => 'reserved', // job 当前的状态 'pri' => '1024', // job 的优先级 'age' => '5222', // 自 job 创建时间为止 单位:秒 'delay' => '0', 'ttr' => '60', // time to run 'time-left' => '58', // 仅在job状态为reserved或者delayed时有意义,当job状态为reserved时表示剩余的超时时间 'file' => '2', // 表示包含此job的binlog序号,如果没有开启它将为0 'reserves' => '10', // 表示job被reserved的次数 'timeouts' => '0', // 表示job处理的超时时间 'releases' => '1', // 表示job被released的次数 'buries' => '0', // 表示job被buried的次数 'kicks' => '0', // 表示job被kiced的次数6.3 tube 管道的信息:#//查看有多少个tube var_export($pheanstalk->listTubes()); //设置要监听的tube $pheanstalk->watch('use_email_list'); //取消对默认tube的监听,可以省略 $pheanstalk->ignore('default'); //查看监听的tube列表 var_export($pheanstalk->listTubesWatched()); //查看use_email_list的tube当前的状态 var_export($pheanstalk->statsTube('use_email_list'));6.4 生产者调用方法#// put 任务 方式一; 返回新 job 的任务标识,整型值; $pheanstalk->useTube('use_email_list')->put( 'hello, world', // 任务内容 23, // 任务的优先级, 默认为 1024 0, // 不等待直接放到ready队列中. 60 // 处理任务的时间(单位为秒) ); // put 任务 方式二; 返回新 job 的任务标识,整型值; $pheanstalk->putInTube( 'use_email_list', // 管道名称 'hello, world', // 任务内容 23, // 任务的优先级, 默认为 1024 0, // 不等待直接放到ready队列中. 如值为 60 表示 60秒; 60 // 处理任务的时间(单位为秒) ); // 给管道里所有新任务设置延迟 $pheanstalk->pauseTube('use_email_list', 30); // 取消管道延迟 $pheanstalk->resumeTube('use_email_list');概念说明:任务优先级任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn).ttr(time to run, 预设的执行时间)消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release / bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,状态改为 ready,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 重置该任务的 time-left 剩余执行时间.6.5 消费者调用方法正常获取和执行job流程// 获取 use_email_list 管道的 job $job = $pheanstalk->watch('use_email_list')->ignore('default')->reserve(); $job_2 = $pheanstalk->reserveFromTube('use_email_list'); $job_3 = $pheanstalk->peekReady('use_email_list'); // 如果知道 job 的 id, 也可以 $job_4 = $pheanstalk->peek($id); // var_export($pheanstalk->statsJob($job_4)); // 获取下一个延迟时间最短 的 job $job_5 = $pheanstalk->peekDelayed('use_email_list'); // do job .... 这里省略异常的考虑 // 释放任务 让别人执行 $pheanstalk->release($job); // 或成功执行完,则删除任务 //$pheanstalk->delete($job); // 将任务埋起来,预留 //$pheanstalk->bury($job); 处理 buried 状态的 Job// 获取下一个被埋藏的 job $job = $pheanstalk->peekBuried('use_email_list'); // 将任务状态从 buried 改为 ready //$pheanstalk->kickJob($job); // 批量将指定数目的任务从 buried 改为 ready $pheanstalk->kick(10);处理 buried 状态的 Job// 获取下一个被埋藏的 job $job = $pheanstalk->peekBuried('use_email_list'); // 将任务状态从 buried 改为 ready //$pheanstalk->kickJob($job); // 批量将指定数目的任务从 buried 改为 ready $pheanstalk->kick(10); Thinkphp 队列 think-queue 官方文档 https://www.ascwh.com/383.html 2021-01-26T22:05:00+08:00 官方文档安装composer require topthink/think-queue配置配置文件位于 config/queue.php公共配置[ 'default'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名 ]创建任务类单模块项目推荐使用 app\job 作为任务类的命名空间多模块项目可用使用 app\module\job 作为任务类的命名空间也可以放在任意可以自动加载到的地方任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个fire方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别 每个方法会传入两个参数 think\queue\Job $job(当前的任务对象) 和 $data(发布任务时自定义的数据)还有个可选的任务失败执行的方法 failed 传入的参数为$data(发布任务时自定义的数据)下面写两个例子namespace app\job; use think\queue\Job; class Job1{ public function fire(Job $job, $data){ //....这里执行具体的任务 if ($job->attempts() > 3) { //通过这个方法可以检查这个任务已经重试了几次了 } //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法 $job->delete(); // 也可以重新发布这个任务 $job->release($delay); //$delay为延迟时间 } public function failed($data){ // ...任务达到最大重试次数后,失败了 } } namespace app\lib\job; use think\queue\Job; class Job2{ public function task1(Job $job, $data){ } public function task2(Job $job, $data){ } public function failed($data){ } }发布任务think\facade\Queue::push($job, $data = '', $queue = null) 和 think\facade\Queue::later($delay, $job, $data = '', $queue = null) 两个方法,前者是立即执行,后者是在$delay秒后执行$job 是任务名 单模块的,且命名空间是app\job的,比如上面的例子一,写Job1类名即可 多模块的,且命名空间是app\module\job的,写model/Job1即可 其他的需要些完整的类名,比如上面的例子二,需要写完整的类名app\lib\job\Job2 如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1、app\lib\job\Job2@task2$data 是你要传到任务里的参数$queue 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填监听任务并执行php think queue:listenphp think queue:work两种,具体的可选参数可以输入命令加 --help 查看可配合supervisor使用,保证进程常驻