doc:新增异常说明

This commit is contained in:
zyimm 2023-07-10 09:52:03 +08:00
parent 8630b53f8d
commit c3c6bfd3b7
3 changed files with 24 additions and 9 deletions

View File

@ -3,7 +3,9 @@
namespace tm\xls\rocketMq\thinkphp\consumer; namespace tm\xls\rocketMq\thinkphp\consumer;
use React\EventLoop\Loop; use React\EventLoop\Loop;
use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface;
use tm\xls\rocketMq\thinkphp\consumer\message\Message; use tm\xls\rocketMq\thinkphp\consumer\message\Message;
use tm\xls\rocketMq\thinkphp\exception\ConsumerException;
class Consumer class Consumer
{ {
@ -26,16 +28,20 @@ class Consumer
public function distribute() public function distribute()
{ {
//事件循环 //事件循环
$loop = Loop::get(); $loop = Loop::get();
$key = $this->message->getTopic().$this->message->getTag(); $key = $this->message->getTopic().$this->message->getTag();
$subscribes = $this->getSubscribe()[$key] ?? []; $subscribes = $this->getSubscribe()[$key] ?? [];
foreach ($subscribes as $subscribe){ foreach ($subscribes as $subscribe) {
$loop->addTimer(0, function () use ($subscribe){ $loop->addTimer(0, function () use ($subscribe) {
$handler = app()->make($subscribe, [ $handler = app()->make($subscribe, [
$this->message $this->message
]); ]);
if($handler->enable()){ if ($handler instanceof ListenerInterface) {
$handler->handle(); if ($handler->enable()) {
$handler->handle();
}
} else {
throw new ConsumerException('Consumer needs to implement ListenerInterface');
} }
}); });
} }
@ -47,8 +53,8 @@ class Consumer
* *
* @return array * @return array
*/ */
private function getSubscribe():array private function getSubscribe(): array
{ {
return config('rocket_mq.subscribe'); return config('rocket_mq.subscribe');
} }
} }

View File

@ -0,0 +1,10 @@
<?php
namespace tm\xls\rocketMq\thinkphp\exception;
use Exception;
class ConsumerException extends Exception
{
}

View File

@ -49,7 +49,6 @@ class Producer
//todo 失败 //todo 失败
}, },
]); ]);
$promise = $pool->promise(); $promise = $pool->promise();
$promise->wait(); $promise->wait();
} }