支持批量默认header头

This commit is contained in:
zyimm 2023-08-24 15:13:09 +08:00
parent 7e67952874
commit 7417662219

View File

@ -30,31 +30,49 @@ class Producer
public function handleWithBatch(array $data, ?string $delay = null)
{
$requests = function ($data) use ($delay) {
$uri = $this->connection->config->getHost();
$host = $this->connection->config->getHost();
foreach ($data as $item) {
if ($delay) {
$uri = $uri.'/rocketmq/producer/sendDelay';
$uri = $host.'/rocketmq/producer/sendDelay';
$data['delayTimes'] = $delay;
} else {
$uri = $uri.'/rocketmq/producer/send';
$uri = $host.'/rocketmq/producer/send';
}
yield new Request('POST', $uri, [], json_encode($this->buildMessage($item)));
yield new Request('POST', $uri, [
'Content-Type' => 'application/json; charset=utf-8'
], json_encode($this->buildMessage($item)));
}
};
//连接池
$pool = new Pool($this->connection->getClient(), $requests($data), [
'concurrency' => 5,
'concurrency' => 50,
'fulfilled' => function (Response $response, $index) {
//todo 成功
return [
$index => $response->getBody()->getContents()
];
},
'rejected' => function (RequestException $reason, $index) {
//todo 失败
return [
$index => $reason->getMessage()
];
},
]);
$promise = $pool->promise();
$promise->wait();
}
private function buildMessage(array $data): array
{
$data['backUrl'] = $this->connection->config->getBackUrl();
$data['messageBody'] = $data['body'] ?? '';
$data['messageBody'] = json_encode($data['messageBody']);
$data['consumerType'] = static::TYPE;
unset($data['body']);
return $data;
}
/**
* 单个
*
@ -70,14 +88,4 @@ class Producer
}
return $this->connection->getClient()->request('post', $uri, ['json' => $this->buildMessage($data)]);
}
private function buildMessage(array $data): array
{
$data['backUrl'] = $this->connection->config->getBackUrl();
$data['messageBody'] = $data['body'] ?? '';
$data['messageBody'] = json_encode($data['messageBody']);
$data['consumerType'] = static::TYPE;
unset($data['body']);
return $data;
}
}