From a7da13c58511d2e20dfbe84679cdc978be653a70 Mon Sep 17 00:00:00 2001 From: zyimm Date: Thu, 3 Aug 2023 09:52:25 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=96=B0=E5=A2=9E=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E7=AB=AF=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/producer/Producer.php | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/producer/Producer.php b/src/producer/Producer.php index eff313d..f617617 100644 --- a/src/producer/Producer.php +++ b/src/producer/Producer.php @@ -12,6 +12,8 @@ use tm\xls\rocketMq\thinkphp\connection\Connection; class Producer { + const TYPE = 1; + private $connection; public function __construct(Connection $connection) @@ -40,7 +42,7 @@ class Producer } }; //连接池 - $pool = new Pool($this->connection->getClient(), $requests($data), [ + $pool = new Pool($this->connection->getClient(), $requests($data), [ 'concurrency' => 5, 'fulfilled' => function (Response $response, $index) { //todo 成功 @@ -71,9 +73,10 @@ class Producer private function buildMessage(array $data): array { - $data['backUrl'] = $this->connection->config->getBackUrl(); - $data['messageBody'] = $data['body'] ?? ''; - $data['messageBody'] = json_encode($data['messageBody']); + $data['backUrl'] = $this->connection->config->getBackUrl(); + $data['messageBody'] = $data['body'] ?? ''; + $data['messageBody'] = json_encode($data['messageBody']); + $data['consumerType'] = static::TYPE; unset($data['body']); return $data; }