From 9b75fa7912b92d5cfaf34d271fe0a644cca50cc5 Mon Sep 17 00:00:00 2001 From: zyimm Date: Fri, 7 Jul 2023 16:20:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 ++ README.md | 15 ++++++ composer.json | 42 +++++++++++++++ src/Client.php | 15 ++++++ src/config.php | 11 ++++ src/connection/Config.php | 60 +++++++++++++++++++++ src/connection/Connection.php | 53 ++++++++++++++++++ src/consumer/Consumer.php | 51 ++++++++++++++++++ src/consumer/command/Listen.php | 24 +++++++++ src/consumer/contract/ListenerInterface.php | 8 +++ src/consumer/message/Message.php | 60 +++++++++++++++++++++ src/producer/Producer.php | 60 +++++++++++++++++++++ src/route.php | 20 +++++++ 13 files changed, 422 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 composer.json create mode 100644 src/Client.php create mode 100644 src/config.php create mode 100644 src/connection/Config.php create mode 100644 src/connection/Connection.php create mode 100644 src/consumer/Consumer.php create mode 100644 src/consumer/command/Listen.php create mode 100644 src/consumer/contract/ListenerInterface.php create mode 100644 src/consumer/message/Message.php create mode 100644 src/producer/Producer.php create mode 100644 src/route.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..aea8263 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/vendor/ +/.idea/ +/composer.lock \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..39b40f0 --- /dev/null +++ b/README.md @@ -0,0 +1,15 @@ +# rocket-mq-thinkphp 适配客户端 + +## 使用说明 + + +### 安装 + + +### 配置 + + +### 生产者 + + +### 消费者 \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..a8a350f --- /dev/null +++ b/composer.json @@ -0,0 +1,42 @@ +{ + "name": "tm_xls/rocket-mq-client-thinkphp", + "description": "rocket-mq-thinkphp 适配客户端", + "minimum-stability": "stable", + "license": "proprietary", + "authors": [ + { + "name": "zyimm", + "email": "email@example.com" + } + ], + "autoload": { + "psr-4": { + "tm\\xls\\rocketMq\\thinkphp\\": "src" + } + }, + "extra": { + "think": { + "config": { + "rocket_mq": "src/config.php" + }, + "route": { + "rocket_route": "src/route.php" + } + } + }, + "require": { + "php": ">=7.2", + "topthink/framework": "5.1.*", + "guzzlehttp/guzzle": "6.*|7.*", + "react/http": "1.*", + "react/event-loop": "1.*", + "ext-json": "*", + "react/async": "^3.1", + "tm_xls/composer-thinkphp-installer": "dev-master" + }, + "config": { + "allow-plugins": { + "tm_xls/composer-thinkphp-installer": true + } + } +} diff --git a/src/Client.php b/src/Client.php new file mode 100644 index 0000000..04e631d --- /dev/null +++ b/src/Client.php @@ -0,0 +1,15 @@ + '', + 'callback' => '/rocketmq/producer/notify', + 'subscribe' => [ + + ] +]; diff --git a/src/connection/Config.php b/src/connection/Config.php new file mode 100644 index 0000000..0eae26a --- /dev/null +++ b/src/connection/Config.php @@ -0,0 +1,60 @@ + $val){ + if(method_exists($key, $this)){ + call_user_func([$this, $key], $val); + } + + } + + } + + /** + * @return mixed + */ + public function getHost() + { + return $this->host; + } + + /** + * @param mixed $host + */ + public function setHost($host): void + { + $this->host = $host; + } + + /** + * @return mixed + */ + public function getTimeOut() + { + return $this->timeOut; + } + + /** + * @param mixed $timeOut + */ + public function setTimeOut($timeOut): void + { + $this->timeOut = $timeOut; + } + + public function getProducerUrl(): string + { + return $this->getHost().'notifyUsing'; + } +} \ No newline at end of file diff --git a/src/connection/Connection.php b/src/connection/Connection.php new file mode 100644 index 0000000..61cb07f --- /dev/null +++ b/src/connection/Connection.php @@ -0,0 +1,53 @@ +config = new Config($config); + $this->client = new Client([ + 'base_uri' => $this->config->getHost(), + 'timeout' => $this->config->getTimeOut(), + ]); + } + + /** + * 获取连接客户端 + * + * @param bool $state + * + * @return Client + */ + public function getClient(bool $state = false): Client + { + if (is_null($this->client) || $state === true) { + $this->init(); + } + return $this->client; + } + + +} \ No newline at end of file diff --git a/src/consumer/Consumer.php b/src/consumer/Consumer.php new file mode 100644 index 0000000..1dddfb0 --- /dev/null +++ b/src/consumer/Consumer.php @@ -0,0 +1,51 @@ +message = new Message(); + $this->message->setMessageBody($data['messageBody'] ?? ''); + $this->message->setTopic($data['topic'] ?? ''); + $this->message->setTag($data['tag'] ?? ''); + } + + + /** + * 分发订阅 + * + * @return void + */ + public function distribute() + { + //事件循环 + $loop = Loop::get(); + $key = $this->message->getTopic().$this->message->getTag(); + $subscribes = $this->getSubscribe()[$key] ?? []; + foreach ($subscribes as $subscribe){ + $loop->addTimer(0, function () use ($subscribe){ + call_user_func([app()->make($subscribe, [ + $this->message + ]), 'handle']); + }); + } + $loop->run(); + } + + /** + * getSubscribe + * + * @return array + */ + private function getSubscribe():array + { + return config('rocket_mq.subscribe'); + } +} \ No newline at end of file diff --git a/src/consumer/command/Listen.php b/src/consumer/command/Listen.php new file mode 100644 index 0000000..95e7abd --- /dev/null +++ b/src/consumer/command/Listen.php @@ -0,0 +1,24 @@ +listen($socket); + } +} \ No newline at end of file diff --git a/src/consumer/contract/ListenerInterface.php b/src/consumer/contract/ListenerInterface.php new file mode 100644 index 0000000..581fe5c --- /dev/null +++ b/src/consumer/contract/ListenerInterface.php @@ -0,0 +1,8 @@ +messageBody; + } + + /** + * @param string $message_body + */ + public function setMessageBody(string $message_body): void + { + $this->messageBody = json_decode($message_body, true); + } + + /** + * @return mixed + */ + public function getTag() + { + return $this->tag; + } + + /** + * @param mixed $tag + */ + public function setTag($tag): void + { + $this->tag = $tag; + } + + /** + * @return mixed + */ + public function getTopic() + { + return $this->topic; + } + + /** + * @param mixed $topic + */ + public function setTopic($topic): void + { + $this->topic = $topic; + } +} \ No newline at end of file diff --git a/src/producer/Producer.php b/src/producer/Producer.php new file mode 100644 index 0000000..79a0e35 --- /dev/null +++ b/src/producer/Producer.php @@ -0,0 +1,60 @@ +connection = $connection; + } + + + /** + * 批量 + * + * @param array $data + */ + public function handleWithBatch(array $data) + { + $requests = function ($data) { + $uri = $this->connection->config->getHost(); + foreach ($data as $item) { + yield new Request('POST', $uri, [], $item); + } + }; + //连接池 + $pool = new Pool($this->connection->getClient(), $requests($data), [ + 'concurrency' => 5, + 'fulfilled' => function (Response $response, $index) { + //todo 成功 + }, + 'rejected' => function (RequestException $reason, $index) { + //todo 失败 + }, + ]); + + $promise = $pool->promise(); + $promise->wait(); + } + + /** + * 单个 + * + * @throws GuzzleException + */ + public function handle(array $data): ResponseInterface + { + return $this->connection->getClient()->request('post', ['json' => $data]); + } +} \ No newline at end of file diff --git a/src/route.php b/src/route.php new file mode 100644 index 0000000..90fc111 --- /dev/null +++ b/src/route.php @@ -0,0 +1,20 @@ +make(Consumer::class, [ + request()->param() + ])->distribute(); + }); + $promise->then(function () { + return 'success'; + },function (){ + return 'error'; + }); +}); \ No newline at end of file