初始化

This commit is contained in:
zyimm 2023-07-07 16:20:30 +08:00
commit 9b75fa7912
13 changed files with 422 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/vendor/
/.idea/
/composer.lock

15
README.md Normal file
View File

@ -0,0 +1,15 @@
# rocket-mq-thinkphp 适配客户端
## 使用说明
### 安装
### 配置
### 生产者
### 消费者

42
composer.json Normal file
View File

@ -0,0 +1,42 @@
{
"name": "tm_xls/rocket-mq-client-thinkphp",
"description": "rocket-mq-thinkphp 适配客户端",
"minimum-stability": "stable",
"license": "proprietary",
"authors": [
{
"name": "zyimm",
"email": "email@example.com"
}
],
"autoload": {
"psr-4": {
"tm\\xls\\rocketMq\\thinkphp\\": "src"
}
},
"extra": {
"think": {
"config": {
"rocket_mq": "src/config.php"
},
"route": {
"rocket_route": "src/route.php"
}
}
},
"require": {
"php": ">=7.2",
"topthink/framework": "5.1.*",
"guzzlehttp/guzzle": "6.*|7.*",
"react/http": "1.*",
"react/event-loop": "1.*",
"ext-json": "*",
"react/async": "^3.1",
"tm_xls/composer-thinkphp-installer": "dev-master"
},
"config": {
"allow-plugins": {
"tm_xls/composer-thinkphp-installer": true
}
}
}

15
src/Client.php Normal file
View File

@ -0,0 +1,15 @@
<?php
namespace tm\xls\rocketMq\thinkphp;
use tm\xls\rocketMq\thinkphp\connection\Connection;
use ttm\xls\rocketMq\thinkphp\producer\Producer;
class Client
{
public static function producer(Connection $connection): Producer
{
return new Producer($connection);
}
}

11
src/config.php Normal file
View File

@ -0,0 +1,11 @@
<?php
return [
'host' => '',
'callback' => '/rocketmq/producer/notify',
'subscribe' => [
]
];

60
src/connection/Config.php Normal file
View File

@ -0,0 +1,60 @@
<?php
namespace tm\xls\rocketMq\thinkphp\connection;
class Config
{
private $host;
private $timeOut;
public function __construct(array $configs = [])
{
foreach ($configs as $key => $val){
if(method_exists($key, $this)){
call_user_func([$this, $key], $val);
}
}
}
/**
* @return mixed
*/
public function getHost()
{
return $this->host;
}
/**
* @param mixed $host
*/
public function setHost($host): void
{
$this->host = $host;
}
/**
* @return mixed
*/
public function getTimeOut()
{
return $this->timeOut;
}
/**
* @param mixed $timeOut
*/
public function setTimeOut($timeOut): void
{
$this->timeOut = $timeOut;
}
public function getProducerUrl(): string
{
return $this->getHost().'notifyUsing';
}
}

View File

@ -0,0 +1,53 @@
<?php
namespace tm\xls\rocketMq\thinkphp\connection;
use GuzzleHttp\Client;
class Connection
{
/**
* @var Client
*/
private $client;
/**
* @var Config
*/
public $config;
/**
* 初始化连接
*
* @param array $config
*
* @return void
*/
public function init(array $config = [])
{
$this->config = new Config($config);
$this->client = new Client([
'base_uri' => $this->config->getHost(),
'timeout' => $this->config->getTimeOut(),
]);
}
/**
* 获取连接客户端
*
* @param bool $state
*
* @return Client
*/
public function getClient(bool $state = false): Client
{
if (is_null($this->client) || $state === true) {
$this->init();
}
return $this->client;
}
}

51
src/consumer/Consumer.php Normal file
View File

@ -0,0 +1,51 @@
<?php
namespace tm\xls\rocketMq\thinkphp\consumer;
use React\EventLoop\Loop;
use tm\xls\rocketMq\thinkphp\consumer\message\Message;
class Consumer
{
private $message;
public function __construct(array $data)
{
$this->message = new Message();
$this->message->setMessageBody($data['messageBody'] ?? '');
$this->message->setTopic($data['topic'] ?? '');
$this->message->setTag($data['tag'] ?? '');
}
/**
* 分发订阅
*
* @return void
*/
public function distribute()
{
//事件循环
$loop = Loop::get();
$key = $this->message->getTopic().$this->message->getTag();
$subscribes = $this->getSubscribe()[$key] ?? [];
foreach ($subscribes as $subscribe){
$loop->addTimer(0, function () use ($subscribe){
call_user_func([app()->make($subscribe, [
$this->message
]), 'handle']);
});
}
$loop->run();
}
/**
* getSubscribe
*
* @return array
*/
private function getSubscribe():array
{
return config('rocket_mq.subscribe');
}
}

View File

@ -0,0 +1,24 @@
<?php
namespace tm\xls\rocketMq\thinkphp\consumer\command;
use Psr\Http\Message\ServerRequestInterface;
use React\Http\HttpServer;
use React\Http\Message\Response;
use React\Socket\SocketServer;
class Listen
{
public function handle()
{
$http = new HttpServer(function (ServerRequestInterface $request) {
return Response::plaintext(
"Hello World!\n"
);
});
$socket = new SocketServer('127.0.0.1:8080');
$http->listen($socket);
}
}

View File

@ -0,0 +1,8 @@
<?php
namespace tm\xls\rocketMq\thinkphp\consumer\contract;
interface ListenerInterface
{
public function handle();
}

View File

@ -0,0 +1,60 @@
<?php
namespace tm\xls\rocketMq\thinkphp\consumer\message;
class Message
{
private $messageBody;
private $tag;
private $topic;
/**
* @return mixed
*/
public function getMessageBody()
{
return $this->messageBody;
}
/**
* @param string $message_body
*/
public function setMessageBody(string $message_body): void
{
$this->messageBody = json_decode($message_body, true);
}
/**
* @return mixed
*/
public function getTag()
{
return $this->tag;
}
/**
* @param mixed $tag
*/
public function setTag($tag): void
{
$this->tag = $tag;
}
/**
* @return mixed
*/
public function getTopic()
{
return $this->topic;
}
/**
* @param mixed $topic
*/
public function setTopic($topic): void
{
$this->topic = $topic;
}
}

60
src/producer/Producer.php Normal file
View File

@ -0,0 +1,60 @@
<?php
namespace ttm\xls\rocketMq\thinkphp\producer;
use GuzzleHttp\Exception\GuzzleException;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\Pool;
use GuzzleHttp\Psr7\Request;
use GuzzleHttp\Psr7\Response;
use Psr\Http\Message\ResponseInterface;
use tm\xls\rocketMq\thinkphp\connection\Connection;
class Producer
{
private $connection;
public function __construct(Connection $connection)
{
$this->connection = $connection;
}
/**
* 批量
*
* @param array $data
*/
public function handleWithBatch(array $data)
{
$requests = function ($data) {
$uri = $this->connection->config->getHost();
foreach ($data as $item) {
yield new Request('POST', $uri, [], $item);
}
};
//连接池
$pool = new Pool($this->connection->getClient(), $requests($data), [
'concurrency' => 5,
'fulfilled' => function (Response $response, $index) {
//todo 成功
},
'rejected' => function (RequestException $reason, $index) {
//todo 失败
},
]);
$promise = $pool->promise();
$promise->wait();
}
/**
* 单个
*
* @throws GuzzleException
*/
public function handle(array $data): ResponseInterface
{
return $this->connection->getClient()->request('post', ['json' => $data]);
}
}

20
src/route.php Normal file
View File

@ -0,0 +1,20 @@
<?php
use think\facade\Route;
use tm\xls\rocketMq\thinkphp\consumer\Consumer;
use function React\Async\coroutine;
$path = config('rocket_mq.callback', '/rocketmq/producer/notify');
//注册回调路由
Route::post($path, function () {
$promise = coroutine(function () {
app()->make(Consumer::class, [
request()->param()
])->distribute();
});
$promise->then(function () {
return 'success';
},function (){
return 'error';
});
});