Compare commits
4 Commits
9306016fbe
...
a7da13c585
Author | SHA1 | Date | |
---|---|---|---|
a7da13c585 | |||
bb97aa14fb | |||
dc2fccbc55 | |||
4710f9c6ea |
|
@ -70,6 +70,11 @@ class Consumer implements ListenerInterface
|
|||
|
||||
private $message;
|
||||
|
||||
/**
|
||||
* 所有订阅监听会自动注入 tm\xls\rocketMq\thinkphp\consumer\message\Message 对象
|
||||
*
|
||||
* @param Message $message
|
||||
*/
|
||||
public function __construct(Message $message)
|
||||
{
|
||||
$this->message = $message;
|
||||
|
@ -77,7 +82,10 @@ class Consumer implements ListenerInterface
|
|||
|
||||
public function handle()
|
||||
{
|
||||
//队列消息
|
||||
$this->message;
|
||||
//处理订阅逻辑
|
||||
// code ...
|
||||
}
|
||||
|
||||
public function enable(): bool
|
||||
|
|
|
@ -4,6 +4,10 @@ return [
|
|||
'callback_host' => env('mq_callback_host', 'http://192.168.21.170:8085'),
|
||||
'callback' => '/rocketmq/producer/notify',
|
||||
'subscribe' => [
|
||||
//节点配置 由mq的topic.tag为标识
|
||||
// '[topic].[tag]' => [
|
||||
// Test:class
|
||||
// ]
|
||||
|
||||
]
|
||||
];
|
||||
|
|
|
@ -38,7 +38,7 @@ class Config
|
|||
/**
|
||||
* @return int
|
||||
*/
|
||||
public function getTimeOut()
|
||||
public function getTimeOut(): int
|
||||
{
|
||||
return $this->timeOut ?? 60;
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
namespace tm\xls\rocketMq\thinkphp\consumer;
|
||||
|
||||
use Exception;
|
||||
use React\EventLoop\Loop;
|
||||
use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface;
|
||||
use tm\xls\rocketMq\thinkphp\consumer\message\Message;
|
||||
|
@ -29,7 +30,7 @@ class Consumer
|
|||
{
|
||||
//事件循环
|
||||
$loop = Loop::get();
|
||||
$key = $this->message->getTopic().$this->message->getTag();
|
||||
$key = $this->message->getTopic().'.'.$this->message->getTag();
|
||||
$subscribes = $this->getSubscribe()[$key] ?? [];
|
||||
foreach ($subscribes as $subscribe) {
|
||||
$loop->addTimer(0, function () use ($subscribe) {
|
||||
|
@ -38,7 +39,12 @@ class Consumer
|
|||
]);
|
||||
if ($handler instanceof ListenerInterface) {
|
||||
if ($handler->enable()) {
|
||||
$handler->handle();
|
||||
try {
|
||||
$handler->handle();
|
||||
} catch (Exception $exception) {
|
||||
//todo 记录日志
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new ConsumerException('Consumer needs to implement ListenerInterface');
|
||||
|
|
|
@ -12,6 +12,8 @@ use tm\xls\rocketMq\thinkphp\connection\Connection;
|
|||
|
||||
class Producer
|
||||
{
|
||||
const TYPE = 1;
|
||||
|
||||
private $connection;
|
||||
|
||||
public function __construct(Connection $connection)
|
||||
|
@ -40,7 +42,7 @@ class Producer
|
|||
}
|
||||
};
|
||||
//连接池
|
||||
$pool = new Pool($this->connection->getClient(), $requests($data), [
|
||||
$pool = new Pool($this->connection->getClient(), $requests($data), [
|
||||
'concurrency' => 5,
|
||||
'fulfilled' => function (Response $response, $index) {
|
||||
//todo 成功
|
||||
|
@ -71,9 +73,10 @@ class Producer
|
|||
|
||||
private function buildMessage(array $data): array
|
||||
{
|
||||
$data['backUrl'] = $this->connection->config->getBackUrl();
|
||||
$data['messageBody'] = $data['body'] ?? '';
|
||||
$data['messageBody'] = json_encode($data['messageBody']);
|
||||
$data['backUrl'] = $this->connection->config->getBackUrl();
|
||||
$data['messageBody'] = $data['body'] ?? '';
|
||||
$data['messageBody'] = json_encode($data['messageBody']);
|
||||
$data['consumerType'] = static::TYPE;
|
||||
unset($data['body']);
|
||||
return $data;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user