diff --git a/src/producer/Producer.php b/src/producer/Producer.php index 8365e0a..75dbc95 100644 --- a/src/producer/Producer.php +++ b/src/producer/Producer.php @@ -30,31 +30,49 @@ class Producer public function handleWithBatch(array $data, ?string $delay = null) { $requests = function ($data) use ($delay) { - $uri = $this->connection->config->getHost(); + $host = $this->connection->config->getHost(); foreach ($data as $item) { if ($delay) { - $uri = $uri.'/rocketmq/producer/sendDelay'; + $uri = $host.'/rocketmq/producer/sendDelay'; $data['delayTimes'] = $delay; } else { - $uri = $uri.'/rocketmq/producer/send'; + $uri = $host.'/rocketmq/producer/send'; } - yield new Request('POST', $uri, [], json_encode($this->buildMessage($item))); + yield new Request('POST', $uri, [ + 'Content-Type' => 'application/json; charset=utf-8' + ], json_encode($this->buildMessage($item))); } }; //连接池 $pool = new Pool($this->connection->getClient(), $requests($data), [ - 'concurrency' => 5, + 'concurrency' => 50, 'fulfilled' => function (Response $response, $index) { //todo 成功 + return [ + $index => $response->getBody()->getContents() + ]; }, 'rejected' => function (RequestException $reason, $index) { //todo 失败 + return [ + $index => $reason->getMessage() + ]; }, ]); $promise = $pool->promise(); $promise->wait(); } + private function buildMessage(array $data): array + { + $data['backUrl'] = $this->connection->config->getBackUrl(); + $data['messageBody'] = $data['body'] ?? ''; + $data['messageBody'] = json_encode($data['messageBody']); + $data['consumerType'] = static::TYPE; + unset($data['body']); + return $data; + } + /** * 单个 * @@ -70,14 +88,4 @@ class Producer } return $this->connection->getClient()->request('post', $uri, ['json' => $this->buildMessage($data)]); } - - private function buildMessage(array $data): array - { - $data['backUrl'] = $this->connection->config->getBackUrl(); - $data['messageBody'] = $data['body'] ?? ''; - $data['messageBody'] = json_encode($data['messageBody']); - $data['consumerType'] = static::TYPE; - unset($data['body']); - return $data; - } } \ No newline at end of file