feat:新增消费端类型
This commit is contained in:
parent
bb97aa14fb
commit
a7da13c585
|
@ -12,6 +12,8 @@ use tm\xls\rocketMq\thinkphp\connection\Connection;
|
||||||
|
|
||||||
class Producer
|
class Producer
|
||||||
{
|
{
|
||||||
|
const TYPE = 1;
|
||||||
|
|
||||||
private $connection;
|
private $connection;
|
||||||
|
|
||||||
public function __construct(Connection $connection)
|
public function __construct(Connection $connection)
|
||||||
|
@ -40,7 +42,7 @@ class Producer
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
//连接池
|
//连接池
|
||||||
$pool = new Pool($this->connection->getClient(), $requests($data), [
|
$pool = new Pool($this->connection->getClient(), $requests($data), [
|
||||||
'concurrency' => 5,
|
'concurrency' => 5,
|
||||||
'fulfilled' => function (Response $response, $index) {
|
'fulfilled' => function (Response $response, $index) {
|
||||||
//todo 成功
|
//todo 成功
|
||||||
|
@ -71,9 +73,10 @@ class Producer
|
||||||
|
|
||||||
private function buildMessage(array $data): array
|
private function buildMessage(array $data): array
|
||||||
{
|
{
|
||||||
$data['backUrl'] = $this->connection->config->getBackUrl();
|
$data['backUrl'] = $this->connection->config->getBackUrl();
|
||||||
$data['messageBody'] = $data['body'] ?? '';
|
$data['messageBody'] = $data['body'] ?? '';
|
||||||
$data['messageBody'] = json_encode($data['messageBody']);
|
$data['messageBody'] = json_encode($data['messageBody']);
|
||||||
|
$data['consumerType'] = static::TYPE;
|
||||||
unset($data['body']);
|
unset($data['body']);
|
||||||
return $data;
|
return $data;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user