diff --git a/README.md b/README.md index 39b40f0..4f73e0d 100644 --- a/README.md +++ b/README.md @@ -12,4 +12,51 @@ ### 生产者 -### 消费者 \ No newline at end of file +### 消费者 +消费者执行逻辑由开发者定义,但是仍遵循如下规则: +1. 首先在config/rocket_mq.php 文件subscribe节点配置订阅 +```php +return [ + 'subscribe' => [ + //节点配置 由mq的topic.tag为标识 + '[topic].[tag]' => [ + Test:class + ] + ] +] + +``` +2. `Test:class` 需要实现 `tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface` 接口 + +```php + + +namespace example; + + +use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface; +use tm\xls\rocketMq\thinkphp\consumer\message\Message; + +class Consumer implements ListenerInterface +{ + + private $message; + + public function __construct(Message $message) + { + $this->message = $message; + } + + public function handle() + { + //处理订阅逻辑 + } + + public function enable(): bool + { + // 这边判断是否执行 + return true; + } +} + +``` \ No newline at end of file diff --git a/example/Consumer.php b/example/Consumer.php new file mode 100644 index 0000000..915d5ae --- /dev/null +++ b/example/Consumer.php @@ -0,0 +1,29 @@ +message = $message; + } + + public function handle() + { + //处理订阅逻辑 + } + + public function enable(): bool + { + // 这边判断是否执行 + return true; + } +} \ No newline at end of file diff --git a/example/Producer.php b/example/Producer.php new file mode 100644 index 0000000..b070948 --- /dev/null +++ b/example/Producer.php @@ -0,0 +1,45 @@ + [], //消息 + "tag" => "string", //tag + "topic" => "string" // 主题 + ]; + Client::producer(new Connection())->handle($data); + } + + /** + * 批量 + * + * @return void + * @throws GuzzleException + */ + public function demoBatch() + { + $data = [ + [ + "body" => [], //消息 + "tag" => "string", //tag + "topic" => "string" // 主题 + ] + ]; + Client::producer(new Connection())->handleWithBatch($data); + } +} \ No newline at end of file diff --git a/src/connection/Config.php b/src/connection/Config.php index 0eae26a..81ee372 100644 --- a/src/connection/Config.php +++ b/src/connection/Config.php @@ -16,9 +16,7 @@ class Config if(method_exists($key, $this)){ call_user_func([$this, $key], $val); } - } - } /** @@ -26,7 +24,7 @@ class Config */ public function getHost() { - return $this->host; + return $this->host ?? config('rocket_mq.host'); } /** @@ -57,4 +55,9 @@ class Config { return $this->getHost().'notifyUsing'; } + + public function getBackUrl(): string + { + return $this->getHost().config('rocket_mq.callback'); + } } \ No newline at end of file diff --git a/src/connection/Connection.php b/src/connection/Connection.php index 61cb07f..d3a6a23 100644 --- a/src/connection/Connection.php +++ b/src/connection/Connection.php @@ -48,6 +48,4 @@ class Connection } return $this->client; } - - } \ No newline at end of file diff --git a/src/consumer/Consumer.php b/src/consumer/Consumer.php index 1dddfb0..8f57328 100644 --- a/src/consumer/Consumer.php +++ b/src/consumer/Consumer.php @@ -31,9 +31,12 @@ class Consumer $subscribes = $this->getSubscribe()[$key] ?? []; foreach ($subscribes as $subscribe){ $loop->addTimer(0, function () use ($subscribe){ - call_user_func([app()->make($subscribe, [ + $handler = app()->make($subscribe, [ $this->message - ]), 'handle']); + ]); + if($handler->enable()){ + $handler->handle(); + } }); } $loop->run(); diff --git a/src/consumer/contract/ListenerInterface.php b/src/consumer/contract/ListenerInterface.php index 98222dd..5391dbc 100644 --- a/src/consumer/contract/ListenerInterface.php +++ b/src/consumer/contract/ListenerInterface.php @@ -6,5 +6,5 @@ interface ListenerInterface { public function handle(); - public function enable(); + public function enable():bool; } \ No newline at end of file diff --git a/src/producer/Producer.php b/src/producer/Producer.php index f05b5bb..c463df5 100644 --- a/src/producer/Producer.php +++ b/src/producer/Producer.php @@ -19,18 +19,24 @@ class Producer $this->connection = $connection; } - /** * 批量 * - * @param array $data + * @param array $data + * @param string|null $delay 延时 1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h */ - public function handleWithBatch(array $data) + public function handleWithBatch(array $data, ?string $delay = null) { - $requests = function ($data) { + $requests = function ($data) use ($delay) { $uri = $this->connection->config->getHost(); foreach ($data as $item) { - yield new Request('POST', $uri, [], $item); + if ($delay) { + $uri = $uri.'/rocketmq/producer/sendDelay'; + $data['delayTimes'] = $delay; + } else { + $uri = $uri.'/rocketmq/producer/send'; + } + yield new Request('POST', $uri, [], $this->buildMessage($item)); } }; //连接池 @@ -53,8 +59,23 @@ class Producer * * @throws GuzzleException */ - public function handle(array $data): ResponseInterface + public function handle(array $data, ?string $delay = null): ResponseInterface { - return $this->connection->getClient()->request('post', ['json' => $data]); + if ($delay) { + $uri = '/rocketmq/producer/sendDelay'; + $data['delayTimes'] = $delay; + } else { + $uri = '/rocketmq/producer/send'; + } + return $this->connection->getClient()->request('post', $uri, ['json' => $this->buildMessage($data)]); + } + + private function buildMessage(array $data): array + { + $data['backUrl'] = $this->connection->config->getBackUrl(); + $data['messageBody'] = $data['body'] ?? ''; + $data['messageBody'] = json_encode($data['messageBody']); + unset($data['body']); + return $data; } } \ No newline at end of file