Compare commits

...

8 Commits

Author SHA1 Message Date
9306016fbe feat:注册新的回调路由 2023-07-10 09:54:16 +08:00
c3c6bfd3b7 doc:新增异常说明 2023-07-10 09:52:03 +08:00
8630b53f8d doc:更新文档说明 2023-07-09 19:00:29 +08:00
5519531c06 doc:更新文档说明 2023-07-09 18:59:57 +08:00
f915a23bbd style:类型约束 2023-07-09 18:54:29 +08:00
6853fd6699 doc:支持环境变量 2023-07-09 18:29:44 +08:00
ac79df8bf5 doc:文件说明 2023-07-09 18:19:57 +08:00
f5fcf13b35 composer update 2023-07-09 18:12:14 +08:00
11 changed files with 224 additions and 42 deletions

View File

@ -1,15 +1,90 @@
# rocket-mq-thinkphp 适配客户端
## 使用说明
本客户端对接公司mq实现mq消息推送与订阅目前支持一对一与一对多订阅
### 安装
1. 添加镜像地址
```shell
{
"repositories": [{
"type": "composer",
"url": "http://composer.saas.test.tianmagroup.com"
}]
}
```
2. composer安装
```shell
composer require tm_xls/rocket-mq-client-thinkphp
```
### 配置
```php
return [
'host' => env('mq_host', 'http://192.168.21.170:8085'), //mq地址
'callback_host' => env('mq_callback_host', 'http://192.168.21.170:8085'),//回调host
'callback' => '/rocketmq/producer/notify',//回调地址默认不做修改
//订阅节点
'subscribe' => [
]
];
```
### 生产者
见example "Producer"
### 消费者
消费者执行逻辑由开发者定义,但是仍遵循如下规则:
1. 首先在config/rocket_mq.php 文件subscribe节点配置订阅
```php
return [
'subscribe' => [
//节点配置 由mq的topic.tag为标识
'[topic].[tag]' => [
Test:class
]
]
]
```
2. `Test:class` 需要实现 `tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface` 接口
```php
namespace example;
use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface;
use tm\xls\rocketMq\thinkphp\consumer\message\Message;
class Consumer implements ListenerInterface
{
private $message;
public function __construct(Message $message)
{
$this->message = $message;
}
public function handle()
{
//处理订阅逻辑
}
public function enable(): bool
{
// 这边判断是否执行
return true;
}
}
```

29
example/Consumer.php Normal file
View File

@ -0,0 +1,29 @@
<?php
namespace example;
use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface;
use tm\xls\rocketMq\thinkphp\consumer\message\Message;
class Consumer implements ListenerInterface
{
private $message;
public function __construct(Message $message)
{
$this->message = $message;
}
public function handle()
{
//处理订阅逻辑
}
public function enable(): bool
{
// 这边判断是否执行
return true;
}
}

45
example/Producer.php Normal file
View File

@ -0,0 +1,45 @@
<?php
namespace example;
use GuzzleHttp\Exception\GuzzleException;
use tm\xls\rocketMq\thinkphp\Client;
use tm\xls\rocketMq\thinkphp\connection\Connection;
class Producer
{
/**
* 单个
*
* @return void
* @throws GuzzleException
*/
public function demo()
{
$data = [
"body" => [], //消息
"tag" => "string", //tag
"topic" => "string" // 主题
];
Client::producer(new Connection())->handle($data);
}
/**
* 批量
*
* @return void
* @throws GuzzleException
*/
public function demoBatch()
{
$data = [
[
"body" => [], //消息
"tag" => "string", //tag
"topic" => "string" // 主题
]
];
Client::producer(new Connection())->handleWithBatch($data);
}
}

View File

@ -1,9 +1,7 @@
<?php
return [
'host' => '',
'host' => env('mq_host', 'http://192.168.21.170:8085'),
'callback_host' => env('mq_callback_host', 'http://192.168.21.170:8085'),
'callback' => '/rocketmq/producer/notify',
'subscribe' => [

View File

@ -12,13 +12,11 @@ class Config
public function __construct(array $configs = [])
{
foreach ($configs as $key => $val){
if(method_exists($key, $this)){
foreach ($configs as $key => $val) {
if (method_exists($key, $this)) {
call_user_func([$this, $key], $val);
}
}
}
/**
@ -26,35 +24,35 @@ class Config
*/
public function getHost()
{
return $this->host;
return $this->host ?? config('rocket_mq.host');
}
/**
* @param mixed $host
* @param string $host
*/
public function setHost($host): void
public function setHost(string $host): void
{
$this->host = $host;
}
/**
* @return mixed
* @return int
*/
public function getTimeOut()
{
return $this->timeOut;
return $this->timeOut ?? 60;
}
/**
* @param mixed $timeOut
* @param int $timeOut
*/
public function setTimeOut($timeOut): void
public function setTimeOut(int $timeOut): void
{
$this->timeOut = $timeOut;
}
public function getProducerUrl(): string
public function getBackUrl(): string
{
return $this->getHost().'notifyUsing';
return config('rocket_mq.callback_host').config('rocket_mq.callback');
}
}

View File

@ -48,6 +48,4 @@ class Connection
}
return $this->client;
}
}

View File

@ -3,7 +3,9 @@
namespace tm\xls\rocketMq\thinkphp\consumer;
use React\EventLoop\Loop;
use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface;
use tm\xls\rocketMq\thinkphp\consumer\message\Message;
use tm\xls\rocketMq\thinkphp\exception\ConsumerException;
class Consumer
{
@ -29,11 +31,18 @@ class Consumer
$loop = Loop::get();
$key = $this->message->getTopic().$this->message->getTag();
$subscribes = $this->getSubscribe()[$key] ?? [];
foreach ($subscribes as $subscribe){
$loop->addTimer(0, function () use ($subscribe){
call_user_func([app()->make($subscribe, [
foreach ($subscribes as $subscribe) {
$loop->addTimer(0, function () use ($subscribe) {
$handler = app()->make($subscribe, [
$this->message
]), 'handle']);
]);
if ($handler instanceof ListenerInterface) {
if ($handler->enable()) {
$handler->handle();
}
} else {
throw new ConsumerException('Consumer needs to implement ListenerInterface');
}
});
}
$loop->run();
@ -44,7 +53,7 @@ class Consumer
*
* @return array
*/
private function getSubscribe():array
private function getSubscribe(): array
{
return config('rocket_mq.subscribe');
}

View File

@ -6,5 +6,5 @@ interface ListenerInterface
{
public function handle();
public function enable();
public function enable():bool;
}

View File

@ -0,0 +1,10 @@
<?php
namespace tm\xls\rocketMq\thinkphp\exception;
use Exception;
class ConsumerException extends Exception
{
}

View File

@ -19,18 +19,24 @@ class Producer
$this->connection = $connection;
}
/**
* 批量
*
* @param array $data
* @param string|null $delay 延时 1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h
*/
public function handleWithBatch(array $data)
public function handleWithBatch(array $data, ?string $delay = null)
{
$requests = function ($data) {
$requests = function ($data) use ($delay) {
$uri = $this->connection->config->getHost();
foreach ($data as $item) {
yield new Request('POST', $uri, [], $item);
if ($delay) {
$uri = $uri.'/rocketmq/producer/sendDelay';
$data['delayTimes'] = $delay;
} else {
$uri = $uri.'/rocketmq/producer/send';
}
yield new Request('POST', $uri, [], $this->buildMessage($item));
}
};
//连接池
@ -43,7 +49,6 @@ class Producer
//todo 失败
},
]);
$promise = $pool->promise();
$promise->wait();
}
@ -53,8 +58,23 @@ class Producer
*
* @throws GuzzleException
*/
public function handle(array $data): ResponseInterface
public function handle(array $data, ?string $delay = null): ResponseInterface
{
return $this->connection->getClient()->request('post', ['json' => $data]);
if ($delay) {
$uri = '/rocketmq/producer/sendDelay';
$data['delayTimes'] = $delay;
} else {
$uri = '/rocketmq/producer/send';
}
return $this->connection->getClient()->request('post', $uri, ['json' => $this->buildMessage($data)]);
}
private function buildMessage(array $data): array
{
$data['backUrl'] = $this->connection->config->getBackUrl();
$data['messageBody'] = $data['body'] ?? '';
$data['messageBody'] = json_encode($data['messageBody']);
unset($data['body']);
return $data;
}
}

View File

@ -6,7 +6,7 @@ use function React\Async\coroutine;
$path = config('rocket_mq.callback', '/rocketmq/producer/notify');
//注册回调路由
Route::post($path, function () {
Route::any($path.'$', function () {
coroutine(function () {
app()->make(Consumer::class, [
request()->param()