From c3c6bfd3b7a8f4620f81437efbb05dca88be05b9 Mon Sep 17 00:00:00 2001 From: zyimm Date: Mon, 10 Jul 2023 09:52:03 +0800 Subject: [PATCH] =?UTF-8?q?doc:=E6=96=B0=E5=A2=9E=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/consumer/Consumer.php | 22 ++++++++++++++-------- src/exception/ConsumerException.php | 10 ++++++++++ src/producer/Producer.php | 1 - 3 files changed, 24 insertions(+), 9 deletions(-) create mode 100644 src/exception/ConsumerException.php diff --git a/src/consumer/Consumer.php b/src/consumer/Consumer.php index 8f57328..6e3075b 100644 --- a/src/consumer/Consumer.php +++ b/src/consumer/Consumer.php @@ -3,7 +3,9 @@ namespace tm\xls\rocketMq\thinkphp\consumer; use React\EventLoop\Loop; +use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface; use tm\xls\rocketMq\thinkphp\consumer\message\Message; +use tm\xls\rocketMq\thinkphp\exception\ConsumerException; class Consumer { @@ -26,16 +28,20 @@ class Consumer public function distribute() { //事件循环 - $loop = Loop::get(); - $key = $this->message->getTopic().$this->message->getTag(); + $loop = Loop::get(); + $key = $this->message->getTopic().$this->message->getTag(); $subscribes = $this->getSubscribe()[$key] ?? []; - foreach ($subscribes as $subscribe){ - $loop->addTimer(0, function () use ($subscribe){ + foreach ($subscribes as $subscribe) { + $loop->addTimer(0, function () use ($subscribe) { $handler = app()->make($subscribe, [ $this->message ]); - if($handler->enable()){ - $handler->handle(); + if ($handler instanceof ListenerInterface) { + if ($handler->enable()) { + $handler->handle(); + } + } else { + throw new ConsumerException('Consumer needs to implement ListenerInterface'); } }); } @@ -47,8 +53,8 @@ class Consumer * * @return array */ - private function getSubscribe():array + private function getSubscribe(): array { - return config('rocket_mq.subscribe'); + return config('rocket_mq.subscribe'); } } \ No newline at end of file diff --git a/src/exception/ConsumerException.php b/src/exception/ConsumerException.php new file mode 100644 index 0000000..d9ec6dc --- /dev/null +++ b/src/exception/ConsumerException.php @@ -0,0 +1,10 @@ +promise(); $promise->wait(); }