Compare commits
8 Commits
f662f64cf0
...
9306016fbe
Author | SHA1 | Date | |
---|---|---|---|
9306016fbe | |||
c3c6bfd3b7 | |||
8630b53f8d | |||
5519531c06 | |||
f915a23bbd | |||
6853fd6699 | |||
ac79df8bf5 | |||
f5fcf13b35 |
79
README.md
79
README.md
|
@ -1,15 +1,90 @@
|
|||
# rocket-mq-thinkphp 适配客户端
|
||||
|
||||
## 使用说明
|
||||
|
||||
本客户端对接公司mq,实现mq消息推送与订阅,目前支持一对一与一对多订阅!
|
||||
|
||||
### 安装
|
||||
|
||||
1. 添加镜像地址
|
||||
```shell
|
||||
{
|
||||
"repositories": [{
|
||||
"type": "composer",
|
||||
"url": "http://composer.saas.test.tianmagroup.com"
|
||||
}]
|
||||
}
|
||||
|
||||
```
|
||||
2. composer安装
|
||||
```shell
|
||||
composer require tm_xls/rocket-mq-client-thinkphp
|
||||
```
|
||||
|
||||
|
||||
### 配置
|
||||
```php
|
||||
|
||||
|
||||
return [
|
||||
'host' => env('mq_host', 'http://192.168.21.170:8085'), //mq地址
|
||||
'callback_host' => env('mq_callback_host', 'http://192.168.21.170:8085'),//回调host
|
||||
'callback' => '/rocketmq/producer/notify',//回调地址默认不做修改
|
||||
//订阅节点
|
||||
'subscribe' => [
|
||||
|
||||
]
|
||||
];
|
||||
|
||||
```
|
||||
|
||||
### 生产者
|
||||
|
||||
见example "Producer"
|
||||
|
||||
### 消费者
|
||||
消费者执行逻辑由开发者定义,但是仍遵循如下规则:
|
||||
1. 首先在config/rocket_mq.php 文件subscribe节点配置订阅
|
||||
```php
|
||||
return [
|
||||
'subscribe' => [
|
||||
//节点配置 由mq的topic.tag为标识
|
||||
'[topic].[tag]' => [
|
||||
Test:class
|
||||
]
|
||||
]
|
||||
]
|
||||
|
||||
```
|
||||
2. `Test:class` 需要实现 `tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface` 接口
|
||||
|
||||
```php
|
||||
|
||||
|
||||
namespace example;
|
||||
|
||||
|
||||
use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface;
|
||||
use tm\xls\rocketMq\thinkphp\consumer\message\Message;
|
||||
|
||||
class Consumer implements ListenerInterface
|
||||
{
|
||||
|
||||
private $message;
|
||||
|
||||
public function __construct(Message $message)
|
||||
{
|
||||
$this->message = $message;
|
||||
}
|
||||
|
||||
public function handle()
|
||||
{
|
||||
//处理订阅逻辑
|
||||
}
|
||||
|
||||
public function enable(): bool
|
||||
{
|
||||
// 这边判断是否执行
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
```
|
29
example/Consumer.php
Normal file
29
example/Consumer.php
Normal file
|
@ -0,0 +1,29 @@
|
|||
<?php
|
||||
|
||||
namespace example;
|
||||
|
||||
|
||||
use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface;
|
||||
use tm\xls\rocketMq\thinkphp\consumer\message\Message;
|
||||
|
||||
class Consumer implements ListenerInterface
|
||||
{
|
||||
|
||||
private $message;
|
||||
|
||||
public function __construct(Message $message)
|
||||
{
|
||||
$this->message = $message;
|
||||
}
|
||||
|
||||
public function handle()
|
||||
{
|
||||
//处理订阅逻辑
|
||||
}
|
||||
|
||||
public function enable(): bool
|
||||
{
|
||||
// 这边判断是否执行
|
||||
return true;
|
||||
}
|
||||
}
|
45
example/Producer.php
Normal file
45
example/Producer.php
Normal file
|
@ -0,0 +1,45 @@
|
|||
<?php
|
||||
|
||||
namespace example;
|
||||
|
||||
use GuzzleHttp\Exception\GuzzleException;
|
||||
use tm\xls\rocketMq\thinkphp\Client;
|
||||
use tm\xls\rocketMq\thinkphp\connection\Connection;
|
||||
|
||||
class Producer
|
||||
{
|
||||
/**
|
||||
* 单个
|
||||
*
|
||||
* @return void
|
||||
* @throws GuzzleException
|
||||
*/
|
||||
public function demo()
|
||||
{
|
||||
$data = [
|
||||
|
||||
"body" => [], //消息
|
||||
"tag" => "string", //tag
|
||||
"topic" => "string" // 主题
|
||||
];
|
||||
Client::producer(new Connection())->handle($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量
|
||||
*
|
||||
* @return void
|
||||
* @throws GuzzleException
|
||||
*/
|
||||
public function demoBatch()
|
||||
{
|
||||
$data = [
|
||||
[
|
||||
"body" => [], //消息
|
||||
"tag" => "string", //tag
|
||||
"topic" => "string" // 主题
|
||||
]
|
||||
];
|
||||
Client::producer(new Connection())->handleWithBatch($data);
|
||||
}
|
||||
}
|
|
@ -1,9 +1,7 @@
|
|||
<?php
|
||||
|
||||
|
||||
|
||||
return [
|
||||
'host' => '',
|
||||
'host' => env('mq_host', 'http://192.168.21.170:8085'),
|
||||
'callback_host' => env('mq_callback_host', 'http://192.168.21.170:8085'),
|
||||
'callback' => '/rocketmq/producer/notify',
|
||||
'subscribe' => [
|
||||
|
||||
|
|
|
@ -12,13 +12,11 @@ class Config
|
|||
public function __construct(array $configs = [])
|
||||
{
|
||||
|
||||
foreach ($configs as $key => $val){
|
||||
if(method_exists($key, $this)){
|
||||
foreach ($configs as $key => $val) {
|
||||
if (method_exists($key, $this)) {
|
||||
call_user_func([$this, $key], $val);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -26,35 +24,35 @@ class Config
|
|||
*/
|
||||
public function getHost()
|
||||
{
|
||||
return $this->host;
|
||||
return $this->host ?? config('rocket_mq.host');
|
||||
}
|
||||
|
||||
/**
|
||||
* @param mixed $host
|
||||
* @param string $host
|
||||
*/
|
||||
public function setHost($host): void
|
||||
public function setHost(string $host): void
|
||||
{
|
||||
$this->host = $host;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return mixed
|
||||
* @return int
|
||||
*/
|
||||
public function getTimeOut()
|
||||
{
|
||||
return $this->timeOut;
|
||||
return $this->timeOut ?? 60;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param mixed $timeOut
|
||||
* @param int $timeOut
|
||||
*/
|
||||
public function setTimeOut($timeOut): void
|
||||
public function setTimeOut(int $timeOut): void
|
||||
{
|
||||
$this->timeOut = $timeOut;
|
||||
}
|
||||
|
||||
public function getProducerUrl(): string
|
||||
public function getBackUrl(): string
|
||||
{
|
||||
return $this->getHost().'notifyUsing';
|
||||
return config('rocket_mq.callback_host').config('rocket_mq.callback');
|
||||
}
|
||||
}
|
|
@ -48,6 +48,4 @@ class Connection
|
|||
}
|
||||
return $this->client;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -3,7 +3,9 @@
|
|||
namespace tm\xls\rocketMq\thinkphp\consumer;
|
||||
|
||||
use React\EventLoop\Loop;
|
||||
use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface;
|
||||
use tm\xls\rocketMq\thinkphp\consumer\message\Message;
|
||||
use tm\xls\rocketMq\thinkphp\exception\ConsumerException;
|
||||
|
||||
class Consumer
|
||||
{
|
||||
|
@ -29,11 +31,18 @@ class Consumer
|
|||
$loop = Loop::get();
|
||||
$key = $this->message->getTopic().$this->message->getTag();
|
||||
$subscribes = $this->getSubscribe()[$key] ?? [];
|
||||
foreach ($subscribes as $subscribe){
|
||||
$loop->addTimer(0, function () use ($subscribe){
|
||||
call_user_func([app()->make($subscribe, [
|
||||
foreach ($subscribes as $subscribe) {
|
||||
$loop->addTimer(0, function () use ($subscribe) {
|
||||
$handler = app()->make($subscribe, [
|
||||
$this->message
|
||||
]), 'handle']);
|
||||
]);
|
||||
if ($handler instanceof ListenerInterface) {
|
||||
if ($handler->enable()) {
|
||||
$handler->handle();
|
||||
}
|
||||
} else {
|
||||
throw new ConsumerException('Consumer needs to implement ListenerInterface');
|
||||
}
|
||||
});
|
||||
}
|
||||
$loop->run();
|
||||
|
@ -44,7 +53,7 @@ class Consumer
|
|||
*
|
||||
* @return array
|
||||
*/
|
||||
private function getSubscribe():array
|
||||
private function getSubscribe(): array
|
||||
{
|
||||
return config('rocket_mq.subscribe');
|
||||
}
|
||||
|
|
|
@ -6,5 +6,5 @@ interface ListenerInterface
|
|||
{
|
||||
public function handle();
|
||||
|
||||
public function enable();
|
||||
public function enable():bool;
|
||||
}
|
10
src/exception/ConsumerException.php
Normal file
10
src/exception/ConsumerException.php
Normal file
|
@ -0,0 +1,10 @@
|
|||
<?php
|
||||
|
||||
namespace tm\xls\rocketMq\thinkphp\exception;
|
||||
|
||||
use Exception;
|
||||
|
||||
class ConsumerException extends Exception
|
||||
{
|
||||
|
||||
}
|
|
@ -19,18 +19,24 @@ class Producer
|
|||
$this->connection = $connection;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 批量
|
||||
*
|
||||
* @param array $data
|
||||
* @param string|null $delay 延时 1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h
|
||||
*/
|
||||
public function handleWithBatch(array $data)
|
||||
public function handleWithBatch(array $data, ?string $delay = null)
|
||||
{
|
||||
$requests = function ($data) {
|
||||
$requests = function ($data) use ($delay) {
|
||||
$uri = $this->connection->config->getHost();
|
||||
foreach ($data as $item) {
|
||||
yield new Request('POST', $uri, [], $item);
|
||||
if ($delay) {
|
||||
$uri = $uri.'/rocketmq/producer/sendDelay';
|
||||
$data['delayTimes'] = $delay;
|
||||
} else {
|
||||
$uri = $uri.'/rocketmq/producer/send';
|
||||
}
|
||||
yield new Request('POST', $uri, [], $this->buildMessage($item));
|
||||
}
|
||||
};
|
||||
//连接池
|
||||
|
@ -43,7 +49,6 @@ class Producer
|
|||
//todo 失败
|
||||
},
|
||||
]);
|
||||
|
||||
$promise = $pool->promise();
|
||||
$promise->wait();
|
||||
}
|
||||
|
@ -53,8 +58,23 @@ class Producer
|
|||
*
|
||||
* @throws GuzzleException
|
||||
*/
|
||||
public function handle(array $data): ResponseInterface
|
||||
public function handle(array $data, ?string $delay = null): ResponseInterface
|
||||
{
|
||||
return $this->connection->getClient()->request('post', ['json' => $data]);
|
||||
if ($delay) {
|
||||
$uri = '/rocketmq/producer/sendDelay';
|
||||
$data['delayTimes'] = $delay;
|
||||
} else {
|
||||
$uri = '/rocketmq/producer/send';
|
||||
}
|
||||
return $this->connection->getClient()->request('post', $uri, ['json' => $this->buildMessage($data)]);
|
||||
}
|
||||
|
||||
private function buildMessage(array $data): array
|
||||
{
|
||||
$data['backUrl'] = $this->connection->config->getBackUrl();
|
||||
$data['messageBody'] = $data['body'] ?? '';
|
||||
$data['messageBody'] = json_encode($data['messageBody']);
|
||||
unset($data['body']);
|
||||
return $data;
|
||||
}
|
||||
}
|
|
@ -6,7 +6,7 @@ use function React\Async\coroutine;
|
|||
|
||||
$path = config('rocket_mq.callback', '/rocketmq/producer/notify');
|
||||
//注册回调路由
|
||||
Route::post($path, function () {
|
||||
Route::any($path.'$', function () {
|
||||
coroutine(function () {
|
||||
app()->make(Consumer::class, [
|
||||
request()->param()
|
||||
|
|
Loading…
Reference in New Issue
Block a user