Compare commits

...

4 Commits

Author SHA1 Message Date
a7da13c585 feat:新增消费端类型 2023-08-03 09:52:25 +08:00
bb97aa14fb 完善使用说明 2023-07-13 15:58:30 +08:00
dc2fccbc55 op:多节点监听异常抑制 2023-07-11 15:23:59 +08:00
4710f9c6ea doc:更新文档 2023-07-11 14:26:17 +08:00
5 changed files with 28 additions and 7 deletions

View File

@ -70,6 +70,11 @@ class Consumer implements ListenerInterface
private $message;
/**
* 所有订阅监听会自动注入 tm\xls\rocketMq\thinkphp\consumer\message\Message 对象
*
* @param Message $message
*/
public function __construct(Message $message)
{
$this->message = $message;
@ -77,7 +82,10 @@ class Consumer implements ListenerInterface
public function handle()
{
//队列消息
$this->message;
//处理订阅逻辑
// code ...
}
public function enable(): bool

View File

@ -4,6 +4,10 @@ return [
'callback_host' => env('mq_callback_host', 'http://192.168.21.170:8085'),
'callback' => '/rocketmq/producer/notify',
'subscribe' => [
//节点配置 由mq的topic.tag为标识
// '[topic].[tag]' => [
// Test:class
// ]
]
];

View File

@ -38,7 +38,7 @@ class Config
/**
* @return int
*/
public function getTimeOut()
public function getTimeOut(): int
{
return $this->timeOut ?? 60;
}

View File

@ -2,6 +2,7 @@
namespace tm\xls\rocketMq\thinkphp\consumer;
use Exception;
use React\EventLoop\Loop;
use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface;
use tm\xls\rocketMq\thinkphp\consumer\message\Message;
@ -29,7 +30,7 @@ class Consumer
{
//事件循环
$loop = Loop::get();
$key = $this->message->getTopic().$this->message->getTag();
$key = $this->message->getTopic().'.'.$this->message->getTag();
$subscribes = $this->getSubscribe()[$key] ?? [];
foreach ($subscribes as $subscribe) {
$loop->addTimer(0, function () use ($subscribe) {
@ -38,7 +39,12 @@ class Consumer
]);
if ($handler instanceof ListenerInterface) {
if ($handler->enable()) {
$handler->handle();
try {
$handler->handle();
} catch (Exception $exception) {
//todo 记录日志
return;
}
}
} else {
throw new ConsumerException('Consumer needs to implement ListenerInterface');

View File

@ -12,6 +12,8 @@ use tm\xls\rocketMq\thinkphp\connection\Connection;
class Producer
{
const TYPE = 1;
private $connection;
public function __construct(Connection $connection)
@ -40,7 +42,7 @@ class Producer
}
};
//连接池
$pool = new Pool($this->connection->getClient(), $requests($data), [
$pool = new Pool($this->connection->getClient(), $requests($data), [
'concurrency' => 5,
'fulfilled' => function (Response $response, $index) {
//todo 成功
@ -71,9 +73,10 @@ class Producer
private function buildMessage(array $data): array
{
$data['backUrl'] = $this->connection->config->getBackUrl();
$data['messageBody'] = $data['body'] ?? '';
$data['messageBody'] = json_encode($data['messageBody']);
$data['backUrl'] = $this->connection->config->getBackUrl();
$data['messageBody'] = $data['body'] ?? '';
$data['messageBody'] = json_encode($data['messageBody']);
$data['consumerType'] = static::TYPE;
unset($data['body']);
return $data;
}