commit a439b3a2229abfe38a8b65bc91d0f2861548315c Author: zyimm Date: Thu Aug 11 18:50:19 2022 +0800 修改版本 diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..27b765f --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +/tests export-ignore +/.github export-ignore diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..0f7d23f --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,25 @@ +on: + push: + # Sequence of patterns matched against refs/tags + tags: + - 'v*' # Push events to matching v*, i.e. v1.0, v20.15.10 + +name: Release + +jobs: + release: + name: Release + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + - name: Create Release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ github.ref }} + release_name: Release ${{ github.ref }} + draft: false + prerelease: false diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..6105290 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,66 @@ +name: PHPUnit + +on: [ push, pull_request ] + +env: + SWOOLE_VERSION: 'v4.8.11' + SWOW_VERSION: 'develop' + +jobs: + ci: + name: Test PHP ${{ matrix.php-version }} on ${{ matrix.engine }} + runs-on: "${{ matrix.os }}" + strategy: + matrix: + os: [ ubuntu-latest ] + php-version: [ '8.0', '8.1' ] + engine: [ 'swoole' ] + max-parallel: 5 + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php-version }} + tools: phpize + ini-values: opcache.enable_cli=0 + coverage: none + - name: Setup Swoole + if: ${{ matrix.engine == 'swoole' }} + run: | + sudo apt-get update + sudo apt-get install libcurl4-openssl-dev + wget https://github.com/swoole/swoole-src/archive/${SWOOLE_VERSION}.tar.gz -O swoole.tar.gz + mkdir -p swoole + tar -xf swoole.tar.gz -C swoole --strip-components=1 + rm swoole.tar.gz + cd swoole + phpize + ./configure --enable-openssl --enable-http2 --enable-swoole-curl --enable-swoole-json + make -j$(nproc) + sudo make install + sudo sh -c "echo extension=swoole > /etc/php/${{ matrix.php-version }}/cli/conf.d/swoole.ini" + sudo sh -c "echo swoole.use_shortname='Off' >> /etc/php/${{ matrix.php-version }}/cli/conf.d/swoole.ini" + php --ri swoole + - name: Setup Swow + if: ${{ matrix.engine == 'swow' }} + run: | + wget https://github.com/swow/swow/archive/"${SWOW_VERSION}".tar.gz -O swow.tar.gz + mkdir -p swow + tar -xf swow.tar.gz -C swow --strip-components=1 + rm swow.tar.gz + cd swow/ext || exit + + phpize + ./configure --enable-debug + make -j "$(nproc)" + sudo make install + sudo sh -c "echo extension=swow > /etc/php/${{ matrix.php-version }}/cli/conf.d/swow.ini" + php --ri swow + - name: Setup Packages + run: composer update -o --no-scripts + - name: Run Test Cases + run: | + composer analyse + composer test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7e11e88 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/vendor/ +composer.lock +*.cache +*.log \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..61c500c --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/mqtt-server-incubator.iml b/.idea/mqtt-server-incubator.iml new file mode 100644 index 0000000..24ca1a9 --- /dev/null +++ b/.idea/mqtt-server-incubator.iml @@ -0,0 +1,100 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/php.xml b/.idea/php.xml new file mode 100644 index 0000000..9d94356 --- /dev/null +++ b/.idea/php.xml @@ -0,0 +1,98 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..9661ac7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.php-cs-fixer.php b/.php-cs-fixer.php new file mode 100644 index 0000000..b9a08df --- /dev/null +++ b/.php-cs-fixer.php @@ -0,0 +1,89 @@ +setRiskyAllowed(true) + ->setRules([ + '@PSR2' => true, + '@Symfony' => true, + '@DoctrineAnnotation' => true, + '@PhpCsFixer' => true, + 'header_comment' => [ + 'comment_type' => 'PHPDoc', + 'header' => $header, + 'separate' => 'none', + 'location' => 'after_declare_strict', + ], + 'array_syntax' => [ + 'syntax' => 'short' + ], + 'list_syntax' => [ + 'syntax' => 'short' + ], + 'concat_space' => [ + 'spacing' => 'one' + ], + 'blank_line_before_statement' => [ + 'statements' => [ + 'declare', + ], + ], + 'general_phpdoc_annotation_remove' => [ + 'annotations' => [ + 'author' + ], + ], + 'ordered_imports' => [ + 'imports_order' => [ + 'class', 'function', 'const', + ], + 'sort_algorithm' => 'alpha', + ], + 'single_line_comment_style' => [ + 'comment_types' => [ + ], + ], + 'yoda_style' => [ + 'always_move_variable' => false, + 'equal' => false, + 'identical' => false, + ], + 'phpdoc_align' => [ + 'align' => 'left', + ], + 'multiline_whitespace_before_semicolons' => [ + 'strategy' => 'no_multi_line', + ], + 'constant_case' => [ + 'case' => 'lower', + ], + 'class_attributes_separation' => true, + 'combine_consecutive_unsets' => true, + 'declare_strict_types' => true, + 'linebreak_after_opening_tag' => true, + 'lowercase_static_reference' => true, + 'no_useless_else' => true, + 'no_unused_imports' => true, + 'not_operator_with_successor_space' => true, + 'not_operator_with_space' => false, + 'ordered_class_elements' => true, + 'php_unit_strict' => false, + 'phpdoc_separation' => false, + 'single_quote' => true, + 'standardize_not_equals' => true, + 'multiline_comment_opening_closing' => true, + ]) + ->setFinder( + PhpCsFixer\Finder::create() + ->exclude('vendor') + ->in(__DIR__) + ) + ->setUsingCache(false); diff --git a/.phpstorm.meta.php b/.phpstorm.meta.php new file mode 100644 index 0000000..1014069 --- /dev/null +++ b/.phpstorm.meta.php @@ -0,0 +1,6 @@ + SWOOLE_BASE, + 'servers' => [ + [ + 'name' => 'mqtt', + 'type' => Server::SERVER_BASE, + 'host' => '0.0.0.0', + 'port' => 1883, + 'sock_type' => SWOOLE_SOCK_TCP, + 'callbacks' => [ + Event::ON_RECEIVE => [Zyimm\MqttServer\MQTTServer::class, 'onReceive'], + ], + ], + ], + 'settings' => [ + 'enable_coroutine' => true, + 'worker_num' => 4, + 'pid_file' => BASE_PATH . '/runtime/hyperf.pid', + 'open_tcp_nodelay' => true, + 'max_coroutine' => 100000, + 'open_http2_protocol' => true, + 'max_request' => 0, + 'socket_buffer_size' => 2 * 1024 * 1024, + 'package_max_length' => 2 * 1024 * 1024, + ], + 'callbacks' => [ + Event::ON_BEFORE_START => [Hyperf\Framework\Bootstrap\ServerStartCallback::class, 'beforeStart'], + Event::ON_WORKER_START => [Hyperf\Framework\Bootstrap\WorkerStartCallback::class, 'onWorkerStart'], + Event::ON_PIPE_MESSAGE => [Hyperf\Framework\Bootstrap\PipeMessageCallback::class, 'onPipeMessage'], + Event::ON_WORKER_EXIT => [Hyperf\Framework\Bootstrap\WorkerExitCallback::class, 'onWorkerExit'], + ], +]; + +``` + +启动服务,我们就可以简单的使用 MQTT 服务了。 + +## 自定义事件 + +组件增加了可以监听 MQTT 服务各个阶段的事件,比如我们写一个 `MQTTConnectHandler` 用来监听客户端连接。 + +> PUBLISH, SUBSCRIBE 和 UNSUBSCRIBE 三个事件,需要自行实现 + +```php +getBody()); + return $response; + } +} +``` + +重启服务,连接 MQTT 时,便可以得到以下输出。 + +``` +$ php bin/hyperf.php start +[INFO] TCP Server listening at 0.0.0.0:1883 +string(234) "{"type":1,"protocol_name":"MQTT","protocol_level":4,"clean_session":1,"will":{"qos":0,"retain":0,"topic":"simps-mqtt\/user001\/delete","message":"byebye"},"user_name":"","password":"","keep_alive":10,"client_id":"Simps_60e5aa0c4284f"}" +``` + +组件支持的事件列表如下: + +| 事件 | 备注 | +| :-------------: | :------------------: | +| MQTTConnect | 客户端连接时触发 | +| MQTTDisconnect | 客户端断开连接时触发 | +| MQTTPingReq | | +| MQTTPublish | 客户端发布消息时触发 | +| MQTTSubscribe | 客户端订阅时触发 | +| MQTTUnsubscribe | 客户端取消订阅时触发 | + +注解支持参数如下 + +| 参数 | 备注 | +| :------: | :--------------------------------------------: | +| server | 指定当前事件对应的服务名 | +| type | 事件类型 | +| priority | 事件优先级,越大越先执行,默认的事件优先级为 0 | + diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..ee5bb19 --- /dev/null +++ b/composer.json @@ -0,0 +1,55 @@ +{ + "name": "zyimm/mqtt-server-incubator", + "type": "library", + "license": "MIT", + "keywords": [ + "php", + "hyperf", + "mqtt" + ], + "description": "MQTT Server for Hyperf", + "autoload": { + "psr-4": { + "Zyimm\\MqttServer\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "ZyimmTest\\MqttServer\\": "tests" + } + }, + "require": { + "php": ">=8.0", + "hyperf/contract": "~2.2", + "hyperf/di": "~2.2", + "hyperf/http-server": "~2.2", + "hyperf/utils": "~2.2", + "simps/mqtt": "^1.4" + }, + "require-dev": { + "friendsofphp/php-cs-fixer": "^3.0", + "mockery/mockery": "^1.0", + "phpstan/phpstan": "^1.0", + "phpunit/phpunit": ">=7.0", + "swoole/ide-helper": "^4.8" + }, + "minimum-stability": "dev", + "prefer-stable": true, + "config": { + "optimize-autoloader": true, + "sort-packages": true + }, + "scripts": { + "test": "phpunit -c phpunit.xml --colors=always", + "analyse": "phpstan analyse --memory-limit 1024M -l 0 ./src", + "cs-fix": "php-cs-fixer fix $1" + }, + "extra": { + "branch-alias": { + "dev-main": "0.3-dev" + }, + "hyperf": { + "config": "Zyimm\\MqttServer\\ConfigProvider" + } + } +} diff --git a/phpunit.xml b/phpunit.xml new file mode 100644 index 0000000..d2c615a --- /dev/null +++ b/phpunit.xml @@ -0,0 +1,15 @@ + + + + ./tests/ + + \ No newline at end of file diff --git a/src/Annotation/MQTTConnect.php b/src/Annotation/MQTTConnect.php new file mode 100644 index 0000000..07c8893 --- /dev/null +++ b/src/Annotation/MQTTConnect.php @@ -0,0 +1,24 @@ + [ + ], + 'commands' => [ + ], + 'annotations' => [ + 'scan' => [ + 'paths' => [ + __DIR__, + ], + ], + ], + ]; + } +} diff --git a/src/CoreMiddleware.php b/src/CoreMiddleware.php new file mode 100644 index 0000000..e2a4538 --- /dev/null +++ b/src/CoreMiddleware.php @@ -0,0 +1,101 @@ +container = $container; + $this->serverName = $serverName; + } + + public function dispatch(ServerRequestInterface $request): ServerRequestInterface + { + return $request; + } + + public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface + { + $type = $request->getAttribute(Types::class); + $items = AnnotationCollector::getMethodsByAnnotation(MQTTEvent::class); + $response = Context::get(ResponseInterface::class); + + $queue = new SplPriorityQueue(); + foreach ($items as $item) { + $class = $item['class']; + $method = $item['method']; + /** @var MQTTEvent $annotation */ + $annotation = $item['annotation']; + if ($annotation->server === $this->serverName && $annotation->type === $type) { + $queue->insert([$class, $method], $annotation->priority); + } + } + + if ($defaultHandler = $this->getDefaultHandler($type)) { + $queue->insert([$defaultHandler, 'handle'], 0); + } + + foreach ($queue as [$class, $method]) { + if (! $this->container->has($class)) { + continue; + } + + $response = $this->container->get($class)->{$method}($request, $response); + if ($response instanceof Response && $response->getAttribute('stopped', false)) { + break; + } + } + + return $response; + } + + protected function getDefaultHandler(int $type): ?string + { + $handlers = [ + Types::CONNECT => MQTTConnectHandler::class, + Types::DISCONNECT => MQTTDisconnectHandler::class, + Types::PINGREQ => MQTTPingReqHandler::class, + // The handlers below are not necessary. + // Types::PUBLISH => MQTTPublishHandler::class, + // Types::SUBSCRIBE => MQTTSubscribeHandler::class, + // Types::UNSUBSCRIBE => MQTTUnsubscribeHandler::class, + ]; + + return $handlers[$type] ?? null; + } +} diff --git a/src/Exception/Handler/MqttExceptionHandler.php b/src/Exception/Handler/MqttExceptionHandler.php new file mode 100644 index 0000000..ef23c9d --- /dev/null +++ b/src/Exception/Handler/MqttExceptionHandler.php @@ -0,0 +1,56 @@ +logger = $logger; + $this->formatter = $formatter; + } + + public function handle(Throwable $throwable, ResponseInterface $response): ResponseInterface + { + $this->logger->warning($this->formatter->format($throwable)); + + $this->stopPropagation(); + + if ($response instanceof Response) { + $response = $response->withAttribute('closed', true); + } + + return $response; + } + + public function isValid(Throwable $throwable): bool + { + return true; + } +} diff --git a/src/Exception/InvalidProtocolException.php b/src/Exception/InvalidProtocolException.php new file mode 100644 index 0000000..dd5750c --- /dev/null +++ b/src/Exception/InvalidProtocolException.php @@ -0,0 +1,18 @@ +getParsedBody(); + if ($data['protocol_name'] != 'MQTT') { + return $response->withAttribute('closed', true); + } + + if (! $this->isRewritable($response)) { + return $response; + } + + return $response->withBody(new SwooleStream(V3::pack( + [ + 'type' => Types::CONNACK, + 'code' => 0, + 'session_present' => 0, + ] + ))); + } +} diff --git a/src/Handler/MQTTDisconnectHandler.php b/src/Handler/MQTTDisconnectHandler.php new file mode 100644 index 0000000..06f9e21 --- /dev/null +++ b/src/Handler/MQTTDisconnectHandler.php @@ -0,0 +1,23 @@ +withAttribute('closed', true); + } +} diff --git a/src/Handler/MQTTPingReqHandler.php b/src/Handler/MQTTPingReqHandler.php new file mode 100644 index 0000000..0c8c710 --- /dev/null +++ b/src/Handler/MQTTPingReqHandler.php @@ -0,0 +1,38 @@ +isRewritable($response)) { + return $response; + } + + return $response->withBody(new SwooleStream(V3::pack( + ['type' => Types::PINGRESP] + ))); + } +} diff --git a/src/Handler/MQTTPublishHandler.php b/src/Handler/MQTTPublishHandler.php new file mode 100644 index 0000000..8c86b91 --- /dev/null +++ b/src/Handler/MQTTPublishHandler.php @@ -0,0 +1,60 @@ +getAttribute('server'); + $fd = $request->getAttribute('fd'); + $data = $request->getParsedBody(); + foreach ($server->connections as $targetFd) { + if ($targetFd != $fd) { + $server->send( + $targetFd, + V3::pack( + [ + 'type' => $data['type'], + 'topic' => $data['topic'], + 'message' => $data['message'], + 'dup' => $data['dup'], + 'qos' => $data['qos'], + 'retain' => $data['retain'], + 'message_id' => $data['message_id'] ?? '', + ] + ) + ); + } + } + + if ($data['qos'] === 1) { + $response = $response->withBody(new SwooleStream(V3::pack( + [ + 'type' => Types::PUBACK, + 'message_id' => $data['message_id'] ?? '', + ] + ))); + } + + return $response; + } +} diff --git a/src/Handler/MQTTSubscribeHandler.php b/src/Handler/MQTTSubscribeHandler.php new file mode 100644 index 0000000..11b9329 --- /dev/null +++ b/src/Handler/MQTTSubscribeHandler.php @@ -0,0 +1,42 @@ +getParsedBody(); + $payload = []; + foreach ($data['topics'] as $k => $qos) { + if (is_numeric($qos) && $qos < 3) { + $payload[] = $qos; + } else { + $payload[] = 0x80; + } + } + + return $response->withBody(new SwooleStream(V3::pack( + [ + 'type' => Types::SUBACK, + 'message_id' => $data['message_id'] ?? '', + 'codes' => $payload, + ] + ))); + } +} diff --git a/src/Handler/MQTTUnsubscribeHandler.php b/src/Handler/MQTTUnsubscribeHandler.php new file mode 100644 index 0000000..de163c3 --- /dev/null +++ b/src/Handler/MQTTUnsubscribeHandler.php @@ -0,0 +1,37 @@ +getParsedBody(); + + return $response->withBody(new SwooleStream(V3::pack( + [ + 'type' => Types::UNSUBACK, + 'message_id' => $data['message_id'] ?? '', + ] + ))); + } +} diff --git a/src/Handler/ResponseRewritable.php b/src/Handler/ResponseRewritable.php new file mode 100644 index 0000000..3bbf66c --- /dev/null +++ b/src/Handler/ResponseRewritable.php @@ -0,0 +1,26 @@ +getBody()) === ''; + } +} diff --git a/src/MQTTServer.php b/src/MQTTServer.php new file mode 100644 index 0000000..c36b29f --- /dev/null +++ b/src/MQTTServer.php @@ -0,0 +1,187 @@ +container = $container; + $this->dispatcher = $dispatcher; + $this->exceptionHandlerDispatcher = $exceptionDispatcher; + $this->logger = $logger; + } + + public function initCoreMiddleware(string $serverName): void + { + $this->serverName = $serverName; + $this->coreMiddleware = $this->createCoreMiddleware(); + + $config = $this->container->get(ConfigInterface::class); + $this->middlewares = $config->get('middlewares.' . $serverName, []); + $this->exceptionHandlers = $config->get('exceptions.handler.' . $serverName, $this->getDefaultExceptionHandler()); + } + + public function onReceive($server, int $fd, int $reactorId, string $data): void + { + $request = $response = null; + try { + CoordinatorManager::until(Constants::WORKER_START)->yield(); + + // Initialize PSR-7 Request and Response objects. + Context::set(ResponseInterface::class, $this->buildResponse($fd, $server)); + Context::set(ServerRequestInterface::class, $request = $this->buildRequest($fd, $reactorId, $data)); + + $middlewares = $this->middlewares; + + $request = $this->coreMiddleware->dispatch($request); + + $response = $this->dispatcher->dispatch($request, $middlewares, $this->coreMiddleware); + } catch (Throwable $throwable) { + // Delegate the exception to exception handler. + $exceptionHandlerDispatcher = $this->container->get(ExceptionHandlerDispatcher::class); + $response = $exceptionHandlerDispatcher->dispatch($throwable, $this->exceptionHandlers); + } finally { + if ($response instanceof PsrResponse && $response->getAttribute('closed', false)) { + $this->close($server, $fd); + } + if ($response instanceof ResponseInterface) { + $this->send($server, $fd, $response); + } + } + } + + protected function getDefaultExceptionHandler(): array + { + return [ + MqttExceptionHandler::class, + ]; + } + + /** + * @param Connection|SwooleServer $server + */ + protected function send($server, int $fd, ResponseInterface $response): void + { + $body = (string) $response->getBody(); + if (empty($body)) { + return; + } + + if ($server instanceof SwooleServer) { + $server->send($fd, $body); + } elseif ($server instanceof Connection) { + $server->send($body); + } + } + + protected function close($server, int $fd): void + { + if ($server instanceof SwooleServer) { + $server->close($fd); + } elseif ($server instanceof Connection) { + $server->close(); + } + } + + protected function createCoreMiddleware(): CoreMiddlewareInterface + { + return new CoreMiddleware($this->container, $this->serverName); + } + + protected function buildRequest(int $fd, int $reactorId, string $data): ServerRequestInterface + { + $data = V3::unpack($data); + $uri = new Uri('http://0.0.0.0/'); + $request = new Request('POST', $uri, ['Content-Type' => 'application/json'], new SwooleStream(Json::encode($data))); + return $request->withAttribute(Types::class, $data['type'] ?? 0) + ->withAttribute('fd', $fd) + ->withAttribute('reactorId', $reactorId) + ->withParsedBody($data); + } + + protected function buildResponse(int $fd, $server): ResponseInterface + { + return (new PsrResponse())->withAttribute('fd', $fd)->withAttribute('server', $server); + } +} diff --git a/tests/Cases/AbstractTestCase.php b/tests/Cases/AbstractTestCase.php new file mode 100644 index 0000000..87d9abc --- /dev/null +++ b/tests/Cases/AbstractTestCase.php @@ -0,0 +1,21 @@ +assertTrue(true); + } +} diff --git a/tests/bootstrap.php b/tests/bootstrap.php new file mode 100644 index 0000000..1166c10 --- /dev/null +++ b/tests/bootstrap.php @@ -0,0 +1,12 @@ +