composer update

This commit is contained in:
zyimm 2023-07-09 18:12:14 +08:00
parent f662f64cf0
commit f5fcf13b35
8 changed files with 162 additions and 16 deletions

View File

@ -12,4 +12,51 @@
### 生产者 ### 生产者
### 消费者 ### 消费者
消费者执行逻辑由开发者定义,但是仍遵循如下规则:
1. 首先在config/rocket_mq.php 文件subscribe节点配置订阅
```php
return [
'subscribe' => [
//节点配置 由mq的topic.tag为标识
'[topic].[tag]' => [
Test:class
]
]
]
```
2. `Test:class` 需要实现 `tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface` 接口
```php
namespace example;
use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface;
use tm\xls\rocketMq\thinkphp\consumer\message\Message;
class Consumer implements ListenerInterface
{
private $message;
public function __construct(Message $message)
{
$this->message = $message;
}
public function handle()
{
//处理订阅逻辑
}
public function enable(): bool
{
// 这边判断是否执行
return true;
}
}
```

29
example/Consumer.php Normal file
View File

@ -0,0 +1,29 @@
<?php
namespace example;
use tm\xls\rocketMq\thinkphp\consumer\contract\ListenerInterface;
use tm\xls\rocketMq\thinkphp\consumer\message\Message;
class Consumer implements ListenerInterface
{
private $message;
public function __construct(Message $message)
{
$this->message = $message;
}
public function handle()
{
//处理订阅逻辑
}
public function enable(): bool
{
// 这边判断是否执行
return true;
}
}

45
example/Producer.php Normal file
View File

@ -0,0 +1,45 @@
<?php
namespace example;
use GuzzleHttp\Exception\GuzzleException;
use tm\xls\rocketMq\thinkphp\Client;
use tm\xls\rocketMq\thinkphp\connection\Connection;
class Producer
{
/**
* 单个
*
* @return void
* @throws GuzzleException
*/
public function demo()
{
$data = [
"body" => [], //消息
"tag" => "string", //tag
"topic" => "string" // 主题
];
Client::producer(new Connection())->handle($data);
}
/**
* 批量
*
* @return void
* @throws GuzzleException
*/
public function demoBatch()
{
$data = [
[
"body" => [], //消息
"tag" => "string", //tag
"topic" => "string" // 主题
]
];
Client::producer(new Connection())->handleWithBatch($data);
}
}

View File

@ -16,9 +16,7 @@ class Config
if(method_exists($key, $this)){ if(method_exists($key, $this)){
call_user_func([$this, $key], $val); call_user_func([$this, $key], $val);
} }
} }
} }
/** /**
@ -26,7 +24,7 @@ class Config
*/ */
public function getHost() public function getHost()
{ {
return $this->host; return $this->host ?? config('rocket_mq.host');
} }
/** /**
@ -57,4 +55,9 @@ class Config
{ {
return $this->getHost().'notifyUsing'; return $this->getHost().'notifyUsing';
} }
public function getBackUrl(): string
{
return $this->getHost().config('rocket_mq.callback');
}
} }

View File

@ -48,6 +48,4 @@ class Connection
} }
return $this->client; return $this->client;
} }
} }

View File

@ -31,9 +31,12 @@ class Consumer
$subscribes = $this->getSubscribe()[$key] ?? []; $subscribes = $this->getSubscribe()[$key] ?? [];
foreach ($subscribes as $subscribe){ foreach ($subscribes as $subscribe){
$loop->addTimer(0, function () use ($subscribe){ $loop->addTimer(0, function () use ($subscribe){
call_user_func([app()->make($subscribe, [ $handler = app()->make($subscribe, [
$this->message $this->message
]), 'handle']); ]);
if($handler->enable()){
$handler->handle();
}
}); });
} }
$loop->run(); $loop->run();

View File

@ -6,5 +6,5 @@ interface ListenerInterface
{ {
public function handle(); public function handle();
public function enable(); public function enable():bool;
} }

View File

@ -19,18 +19,24 @@ class Producer
$this->connection = $connection; $this->connection = $connection;
} }
/** /**
* 批量 * 批量
* *
* @param array $data * @param array $data
* @param string|null $delay 延时 1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h
*/ */
public function handleWithBatch(array $data) public function handleWithBatch(array $data, ?string $delay = null)
{ {
$requests = function ($data) { $requests = function ($data) use ($delay) {
$uri = $this->connection->config->getHost(); $uri = $this->connection->config->getHost();
foreach ($data as $item) { foreach ($data as $item) {
yield new Request('POST', $uri, [], $item); if ($delay) {
$uri = $uri.'/rocketmq/producer/sendDelay';
$data['delayTimes'] = $delay;
} else {
$uri = $uri.'/rocketmq/producer/send';
}
yield new Request('POST', $uri, [], $this->buildMessage($item));
} }
}; };
//连接池 //连接池
@ -53,8 +59,23 @@ class Producer
* *
* @throws GuzzleException * @throws GuzzleException
*/ */
public function handle(array $data): ResponseInterface public function handle(array $data, ?string $delay = null): ResponseInterface
{ {
return $this->connection->getClient()->request('post', ['json' => $data]); if ($delay) {
$uri = '/rocketmq/producer/sendDelay';
$data['delayTimes'] = $delay;
} else {
$uri = '/rocketmq/producer/send';
}
return $this->connection->getClient()->request('post', $uri, ['json' => $this->buildMessage($data)]);
}
private function buildMessage(array $data): array
{
$data['backUrl'] = $this->connection->config->getBackUrl();
$data['messageBody'] = $data['body'] ?? '';
$data['messageBody'] = json_encode($data['messageBody']);
unset($data['body']);
return $data;
} }
} }