From 741766221998f143a6241bb6251b56639e678bba Mon Sep 17 00:00:00 2001 From: zyimm Date: Thu, 24 Aug 2023 15:13:09 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=89=B9=E9=87=8F=E9=BB=98?= =?UTF-8?q?=E8=AE=A4header=E5=A4=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/producer/Producer.php | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/src/producer/Producer.php b/src/producer/Producer.php index 8365e0a..75dbc95 100644 --- a/src/producer/Producer.php +++ b/src/producer/Producer.php @@ -30,31 +30,49 @@ class Producer public function handleWithBatch(array $data, ?string $delay = null) { $requests = function ($data) use ($delay) { - $uri = $this->connection->config->getHost(); + $host = $this->connection->config->getHost(); foreach ($data as $item) { if ($delay) { - $uri = $uri.'/rocketmq/producer/sendDelay'; + $uri = $host.'/rocketmq/producer/sendDelay'; $data['delayTimes'] = $delay; } else { - $uri = $uri.'/rocketmq/producer/send'; + $uri = $host.'/rocketmq/producer/send'; } - yield new Request('POST', $uri, [], json_encode($this->buildMessage($item))); + yield new Request('POST', $uri, [ + 'Content-Type' => 'application/json; charset=utf-8' + ], json_encode($this->buildMessage($item))); } }; //连接池 $pool = new Pool($this->connection->getClient(), $requests($data), [ - 'concurrency' => 5, + 'concurrency' => 50, 'fulfilled' => function (Response $response, $index) { //todo 成功 + return [ + $index => $response->getBody()->getContents() + ]; }, 'rejected' => function (RequestException $reason, $index) { //todo 失败 + return [ + $index => $reason->getMessage() + ]; }, ]); $promise = $pool->promise(); $promise->wait(); } + private function buildMessage(array $data): array + { + $data['backUrl'] = $this->connection->config->getBackUrl(); + $data['messageBody'] = $data['body'] ?? ''; + $data['messageBody'] = json_encode($data['messageBody']); + $data['consumerType'] = static::TYPE; + unset($data['body']); + return $data; + } + /** * 单个 * @@ -70,14 +88,4 @@ class Producer } return $this->connection->getClient()->request('post', $uri, ['json' => $this->buildMessage($data)]); } - - private function buildMessage(array $data): array - { - $data['backUrl'] = $this->connection->config->getBackUrl(); - $data['messageBody'] = $data['body'] ?? ''; - $data['messageBody'] = json_encode($data['messageBody']); - $data['consumerType'] = static::TYPE; - unset($data['body']); - return $data; - } } \ No newline at end of file