修改版本

This commit is contained in:
zyimm 2022-08-11 18:50:19 +08:00
commit a439b3a222
38 changed files with 1550 additions and 0 deletions

2
.gitattributes vendored Normal file
View File

@ -0,0 +1,2 @@
/tests export-ignore
/.github export-ignore

25
.github/workflows/release.yml vendored Normal file
View File

@ -0,0 +1,25 @@
on:
push:
# Sequence of patterns matched against refs/tags
tags:
- 'v*' # Push events to matching v*, i.e. v1.0, v20.15.10
name: Release
jobs:
release:
name: Release
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Create Release
id: create_release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: ${{ github.ref }}
release_name: Release ${{ github.ref }}
draft: false
prerelease: false

66
.github/workflows/test.yml vendored Normal file
View File

@ -0,0 +1,66 @@
name: PHPUnit
on: [ push, pull_request ]
env:
SWOOLE_VERSION: 'v4.8.11'
SWOW_VERSION: 'develop'
jobs:
ci:
name: Test PHP ${{ matrix.php-version }} on ${{ matrix.engine }}
runs-on: "${{ matrix.os }}"
strategy:
matrix:
os: [ ubuntu-latest ]
php-version: [ '8.0', '8.1' ]
engine: [ 'swoole' ]
max-parallel: 5
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php-version }}
tools: phpize
ini-values: opcache.enable_cli=0
coverage: none
- name: Setup Swoole
if: ${{ matrix.engine == 'swoole' }}
run: |
sudo apt-get update
sudo apt-get install libcurl4-openssl-dev
wget https://github.com/swoole/swoole-src/archive/${SWOOLE_VERSION}.tar.gz -O swoole.tar.gz
mkdir -p swoole
tar -xf swoole.tar.gz -C swoole --strip-components=1
rm swoole.tar.gz
cd swoole
phpize
./configure --enable-openssl --enable-http2 --enable-swoole-curl --enable-swoole-json
make -j$(nproc)
sudo make install
sudo sh -c "echo extension=swoole > /etc/php/${{ matrix.php-version }}/cli/conf.d/swoole.ini"
sudo sh -c "echo swoole.use_shortname='Off' >> /etc/php/${{ matrix.php-version }}/cli/conf.d/swoole.ini"
php --ri swoole
- name: Setup Swow
if: ${{ matrix.engine == 'swow' }}
run: |
wget https://github.com/swow/swow/archive/"${SWOW_VERSION}".tar.gz -O swow.tar.gz
mkdir -p swow
tar -xf swow.tar.gz -C swow --strip-components=1
rm swow.tar.gz
cd swow/ext || exit
phpize
./configure --enable-debug
make -j "$(nproc)"
sudo make install
sudo sh -c "echo extension=swow > /etc/php/${{ matrix.php-version }}/cli/conf.d/swow.ini"
php --ri swow
- name: Setup Packages
run: composer update -o --no-scripts
- name: Run Test Cases
run: |
composer analyse
composer test

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/vendor/
composer.lock
*.cache
*.log

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/mqtt-server-incubator.iml" filepath="$PROJECT_DIR$/.idea/mqtt-server-incubator.iml" />
</modules>
</component>
</project>

View File

@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/spec" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" packagePrefix="Zyimm\MqttServer\" />
<sourceFolder url="file://$MODULE_DIR$/tests" isTestSource="true" packagePrefix="ZyimmTest\MqttServer\" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/event-dispatcher" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/event-dispatcher-contracts" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/console" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/polyfill-intl-grapheme" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/finder" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/process" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/polyfill-mbstring" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/polyfill-php80" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/filesystem" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/polyfill-php73" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/polyfill-ctype" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/stopwatch" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/deprecation-contracts" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/polyfill-php81" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/options-resolver" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/string" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/polyfill-intl-normalizer" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpunit/php-code-coverage" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpunit/phpunit" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpunit/php-timer" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpunit/php-text-template" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpunit/php-invoker" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpunit/php-file-iterator" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpstan/phpstan" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpspec/prophecy" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phar-io/version" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phar-io/manifest" />
<excludeFolder url="file://$MODULE_DIR$/vendor/myclabs/deep-copy" />
<excludeFolder url="file://$MODULE_DIR$/vendor/mockery/mockery" />
<excludeFolder url="file://$MODULE_DIR$/vendor/laminas/laminas-mime" />
<excludeFolder url="file://$MODULE_DIR$/vendor/laminas/laminas-stdlib" />
<excludeFolder url="file://$MODULE_DIR$/vendor/vlucas/phpdotenv" />
<excludeFolder url="file://$MODULE_DIR$/vendor/swoole/ide-helper" />
<excludeFolder url="file://$MODULE_DIR$/vendor/php-di/phpdoc-reader" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/http-message" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/context" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/di" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/engine" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/exception-handler" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/dispatcher" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/utils" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/contract" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/server" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/macroable" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/event" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hyperf/http-server" />
<excludeFolder url="file://$MODULE_DIR$/vendor/simps/mqtt" />
<excludeFolder url="file://$MODULE_DIR$/vendor/nikic/fast-route" />
<excludeFolder url="file://$MODULE_DIR$/vendor/nikic/php-parser" />
<excludeFolder url="file://$MODULE_DIR$/vendor/psr/event-dispatcher" />
<excludeFolder url="file://$MODULE_DIR$/vendor/psr/log" />
<excludeFolder url="file://$MODULE_DIR$/vendor/psr/http-message" />
<excludeFolder url="file://$MODULE_DIR$/vendor/psr/http-server-handler" />
<excludeFolder url="file://$MODULE_DIR$/vendor/psr/container" />
<excludeFolder url="file://$MODULE_DIR$/vendor/psr/cache" />
<excludeFolder url="file://$MODULE_DIR$/vendor/psr/http-server-middleware" />
<excludeFolder url="file://$MODULE_DIR$/vendor/composer" />
<excludeFolder url="file://$MODULE_DIR$/vendor/graham-campbell/result-type" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpdocumentor/reflection-common" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpdocumentor/type-resolver" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpdocumentor/reflection-docblock" />
<excludeFolder url="file://$MODULE_DIR$/vendor/php-cs-fixer/diff" />
<excludeFolder url="file://$MODULE_DIR$/vendor/friendsofphp/php-cs-fixer" />
<excludeFolder url="file://$MODULE_DIR$/vendor/webmozart/assert" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/code-unit-reverse-lookup" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/comparator" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/lines-of-code" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/environment" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/object-enumerator" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/object-reflector" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/cli-parser" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/resource-operations" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/complexity" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/recursion-context" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/version" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/diff" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/type" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/code-unit" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/global-state" />
<excludeFolder url="file://$MODULE_DIR$/vendor/sebastian/exporter" />
<excludeFolder url="file://$MODULE_DIR$/vendor/phpoption/phpoption" />
<excludeFolder url="file://$MODULE_DIR$/vendor/hamcrest/hamcrest-php" />
<excludeFolder url="file://$MODULE_DIR$/vendor/doctrine/annotations" />
<excludeFolder url="file://$MODULE_DIR$/vendor/doctrine/lexer" />
<excludeFolder url="file://$MODULE_DIR$/vendor/doctrine/inflector" />
<excludeFolder url="file://$MODULE_DIR$/vendor/doctrine/instantiator" />
<excludeFolder url="file://$MODULE_DIR$/vendor/theseer/tokenizer" />
<excludeFolder url="file://$MODULE_DIR$/vendor/symfony/service-contracts" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

98
.idea/php.xml Normal file
View File

@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="PhpIncludePathManager">
<include_path>
<path value="$PROJECT_DIR$/vendor/symfony/event-dispatcher" />
<path value="$PROJECT_DIR$/vendor/symfony/event-dispatcher-contracts" />
<path value="$PROJECT_DIR$/vendor/symfony/console" />
<path value="$PROJECT_DIR$/vendor/symfony/polyfill-intl-grapheme" />
<path value="$PROJECT_DIR$/vendor/symfony/finder" />
<path value="$PROJECT_DIR$/vendor/symfony/process" />
<path value="$PROJECT_DIR$/vendor/symfony/polyfill-mbstring" />
<path value="$PROJECT_DIR$/vendor/symfony/polyfill-php80" />
<path value="$PROJECT_DIR$/vendor/symfony/filesystem" />
<path value="$PROJECT_DIR$/vendor/symfony/polyfill-php73" />
<path value="$PROJECT_DIR$/vendor/symfony/polyfill-ctype" />
<path value="$PROJECT_DIR$/vendor/symfony/stopwatch" />
<path value="$PROJECT_DIR$/vendor/symfony/deprecation-contracts" />
<path value="$PROJECT_DIR$/vendor/symfony/polyfill-php81" />
<path value="$PROJECT_DIR$/vendor/symfony/options-resolver" />
<path value="$PROJECT_DIR$/vendor/symfony/string" />
<path value="$PROJECT_DIR$/vendor/symfony/polyfill-intl-normalizer" />
<path value="$PROJECT_DIR$/vendor/phpunit/php-code-coverage" />
<path value="$PROJECT_DIR$/vendor/phpunit/phpunit" />
<path value="$PROJECT_DIR$/vendor/phpunit/php-timer" />
<path value="$PROJECT_DIR$/vendor/phpunit/php-text-template" />
<path value="$PROJECT_DIR$/vendor/phpunit/php-invoker" />
<path value="$PROJECT_DIR$/vendor/phpunit/php-file-iterator" />
<path value="$PROJECT_DIR$/vendor/phpstan/phpstan" />
<path value="$PROJECT_DIR$/vendor/phpspec/prophecy" />
<path value="$PROJECT_DIR$/vendor/phar-io/version" />
<path value="$PROJECT_DIR$/vendor/phar-io/manifest" />
<path value="$PROJECT_DIR$/vendor/myclabs/deep-copy" />
<path value="$PROJECT_DIR$/vendor/mockery/mockery" />
<path value="$PROJECT_DIR$/vendor/laminas/laminas-mime" />
<path value="$PROJECT_DIR$/vendor/laminas/laminas-stdlib" />
<path value="$PROJECT_DIR$/vendor/vlucas/phpdotenv" />
<path value="$PROJECT_DIR$/vendor/swoole/ide-helper" />
<path value="$PROJECT_DIR$/vendor/php-di/phpdoc-reader" />
<path value="$PROJECT_DIR$/vendor/hyperf/http-message" />
<path value="$PROJECT_DIR$/vendor/hyperf/context" />
<path value="$PROJECT_DIR$/vendor/hyperf/di" />
<path value="$PROJECT_DIR$/vendor/hyperf/engine" />
<path value="$PROJECT_DIR$/vendor/hyperf/exception-handler" />
<path value="$PROJECT_DIR$/vendor/hyperf/dispatcher" />
<path value="$PROJECT_DIR$/vendor/hyperf/utils" />
<path value="$PROJECT_DIR$/vendor/hyperf/contract" />
<path value="$PROJECT_DIR$/vendor/hyperf/server" />
<path value="$PROJECT_DIR$/vendor/hyperf/macroable" />
<path value="$PROJECT_DIR$/vendor/hyperf/event" />
<path value="$PROJECT_DIR$/vendor/hyperf/http-server" />
<path value="$PROJECT_DIR$/vendor/simps/mqtt" />
<path value="$PROJECT_DIR$/vendor/nikic/fast-route" />
<path value="$PROJECT_DIR$/vendor/nikic/php-parser" />
<path value="$PROJECT_DIR$/vendor/psr/event-dispatcher" />
<path value="$PROJECT_DIR$/vendor/psr/log" />
<path value="$PROJECT_DIR$/vendor/psr/http-message" />
<path value="$PROJECT_DIR$/vendor/psr/http-server-handler" />
<path value="$PROJECT_DIR$/vendor/psr/container" />
<path value="$PROJECT_DIR$/vendor/psr/cache" />
<path value="$PROJECT_DIR$/vendor/psr/http-server-middleware" />
<path value="$PROJECT_DIR$/vendor/composer" />
<path value="$PROJECT_DIR$/vendor/graham-campbell/result-type" />
<path value="$PROJECT_DIR$/vendor/phpdocumentor/reflection-common" />
<path value="$PROJECT_DIR$/vendor/phpdocumentor/type-resolver" />
<path value="$PROJECT_DIR$/vendor/phpdocumentor/reflection-docblock" />
<path value="$PROJECT_DIR$/vendor/php-cs-fixer/diff" />
<path value="$PROJECT_DIR$/vendor/friendsofphp/php-cs-fixer" />
<path value="$PROJECT_DIR$/vendor/webmozart/assert" />
<path value="$PROJECT_DIR$/vendor/sebastian/code-unit-reverse-lookup" />
<path value="$PROJECT_DIR$/vendor/sebastian/comparator" />
<path value="$PROJECT_DIR$/vendor/sebastian/lines-of-code" />
<path value="$PROJECT_DIR$/vendor/sebastian/environment" />
<path value="$PROJECT_DIR$/vendor/sebastian/object-enumerator" />
<path value="$PROJECT_DIR$/vendor/sebastian/object-reflector" />
<path value="$PROJECT_DIR$/vendor/sebastian/cli-parser" />
<path value="$PROJECT_DIR$/vendor/sebastian/resource-operations" />
<path value="$PROJECT_DIR$/vendor/sebastian/complexity" />
<path value="$PROJECT_DIR$/vendor/sebastian/recursion-context" />
<path value="$PROJECT_DIR$/vendor/sebastian/version" />
<path value="$PROJECT_DIR$/vendor/sebastian/diff" />
<path value="$PROJECT_DIR$/vendor/sebastian/type" />
<path value="$PROJECT_DIR$/vendor/sebastian/code-unit" />
<path value="$PROJECT_DIR$/vendor/sebastian/global-state" />
<path value="$PROJECT_DIR$/vendor/sebastian/exporter" />
<path value="$PROJECT_DIR$/vendor/phpoption/phpoption" />
<path value="$PROJECT_DIR$/vendor/hamcrest/hamcrest-php" />
<path value="$PROJECT_DIR$/vendor/doctrine/annotations" />
<path value="$PROJECT_DIR$/vendor/doctrine/lexer" />
<path value="$PROJECT_DIR$/vendor/doctrine/inflector" />
<path value="$PROJECT_DIR$/vendor/doctrine/instantiator" />
<path value="$PROJECT_DIR$/vendor/theseer/tokenizer" />
<path value="$PROJECT_DIR$/vendor/symfony/service-contracts" />
</include_path>
</component>
<component name="PhpProjectSharedConfiguration" php_language_level="8.0">
<option name="suggestChangeDefaultLanguageLevel" value="false" />
</component>
</project>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

89
.php-cs-fixer.php Normal file
View File

@ -0,0 +1,89 @@
<?php
$header = <<<'EOF'
This file is part of Hyperf.
@link https://www.hyperf.io
@document https://hyperf.wiki
@contact group@hyperf.io
@license https://github.com/hyperf/hyperf/blob/master/LICENSE
EOF;
return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
->setRules([
'@PSR2' => true,
'@Symfony' => true,
'@DoctrineAnnotation' => true,
'@PhpCsFixer' => true,
'header_comment' => [
'comment_type' => 'PHPDoc',
'header' => $header,
'separate' => 'none',
'location' => 'after_declare_strict',
],
'array_syntax' => [
'syntax' => 'short'
],
'list_syntax' => [
'syntax' => 'short'
],
'concat_space' => [
'spacing' => 'one'
],
'blank_line_before_statement' => [
'statements' => [
'declare',
],
],
'general_phpdoc_annotation_remove' => [
'annotations' => [
'author'
],
],
'ordered_imports' => [
'imports_order' => [
'class', 'function', 'const',
],
'sort_algorithm' => 'alpha',
],
'single_line_comment_style' => [
'comment_types' => [
],
],
'yoda_style' => [
'always_move_variable' => false,
'equal' => false,
'identical' => false,
],
'phpdoc_align' => [
'align' => 'left',
],
'multiline_whitespace_before_semicolons' => [
'strategy' => 'no_multi_line',
],
'constant_case' => [
'case' => 'lower',
],
'class_attributes_separation' => true,
'combine_consecutive_unsets' => true,
'declare_strict_types' => true,
'linebreak_after_opening_tag' => true,
'lowercase_static_reference' => true,
'no_useless_else' => true,
'no_unused_imports' => true,
'not_operator_with_successor_space' => true,
'not_operator_with_space' => false,
'ordered_class_elements' => true,
'php_unit_strict' => false,
'phpdoc_separation' => false,
'single_quote' => true,
'standardize_not_equals' => true,
'multiline_comment_opening_closing' => true,
])
->setFinder(
PhpCsFixer\Finder::create()
->exclude('vendor')
->in(__DIR__)
)
->setUsingCache(false);

6
.phpstorm.meta.php Normal file
View File

@ -0,0 +1,6 @@
<?php
namespace PHPSTORM_META {
// Reflect
override(\Psr\Container\ContainerInterface::get(0), map('@'));
}

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) Hyperf
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

114
README.md Normal file
View File

@ -0,0 +1,114 @@
# MQTT Server
## 安装
```bash
composer require hyperf/mqtt-server-incubator
```
## 配置服务
```php
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
use Hyperf\Server\Event;use Hyperf\Server\Server;
return [
'mode' => SWOOLE_BASE,
'servers' => [
[
'name' => 'mqtt',
'type' => Server::SERVER_BASE,
'host' => '0.0.0.0',
'port' => 1883,
'sock_type' => SWOOLE_SOCK_TCP,
'callbacks' => [
Event::ON_RECEIVE => [Zyimm\MqttServer\MQTTServer::class, 'onReceive'],
],
],
],
'settings' => [
'enable_coroutine' => true,
'worker_num' => 4,
'pid_file' => BASE_PATH . '/runtime/hyperf.pid',
'open_tcp_nodelay' => true,
'max_coroutine' => 100000,
'open_http2_protocol' => true,
'max_request' => 0,
'socket_buffer_size' => 2 * 1024 * 1024,
'package_max_length' => 2 * 1024 * 1024,
],
'callbacks' => [
Event::ON_BEFORE_START => [Hyperf\Framework\Bootstrap\ServerStartCallback::class, 'beforeStart'],
Event::ON_WORKER_START => [Hyperf\Framework\Bootstrap\WorkerStartCallback::class, 'onWorkerStart'],
Event::ON_PIPE_MESSAGE => [Hyperf\Framework\Bootstrap\PipeMessageCallback::class, 'onPipeMessage'],
Event::ON_WORKER_EXIT => [Hyperf\Framework\Bootstrap\WorkerExitCallback::class, 'onWorkerExit'],
],
];
```
启动服务,我们就可以简单的使用 MQTT 服务了。
## 自定义事件
组件增加了可以监听 MQTT 服务各个阶段的事件,比如我们写一个 `MQTTConnectHandler` 用来监听客户端连接。
> PUBLISH, SUBSCRIBE 和 UNSUBSCRIBE 三个事件,需要自行实现
```php
<?php
declare(strict_types=1);
namespace App\MQTT\Event;
use Hyperf\HttpMessage\Server\Response;use Psr\Http\Message\ServerRequestInterface;use Zyimm\MqttServer\Annotation\MQTTConnect;use Zyimm\MqttServer\Handler\HandlerInterface;
#[MQTTConnect(priority: 1)]
class MQTTConnectHandler implements HandlerInterface
{
public function handle(ServerRequestInterface $request, Response $response): Response
{
var_dump((string) $request->getBody());
return $response;
}
}
```
重启服务,连接 MQTT 时,便可以得到以下输出。
```
$ php bin/hyperf.php start
[INFO] TCP Server listening at 0.0.0.0:1883
string(234) "{"type":1,"protocol_name":"MQTT","protocol_level":4,"clean_session":1,"will":{"qos":0,"retain":0,"topic":"simps-mqtt\/user001\/delete","message":"byebye"},"user_name":"","password":"","keep_alive":10,"client_id":"Simps_60e5aa0c4284f"}"
```
组件支持的事件列表如下:
| 事件 | 备注 |
| :-------------: | :------------------: |
| MQTTConnect | 客户端连接时触发 |
| MQTTDisconnect | 客户端断开连接时触发 |
| MQTTPingReq | |
| MQTTPublish | 客户端发布消息时触发 |
| MQTTSubscribe | 客户端订阅时触发 |
| MQTTUnsubscribe | 客户端取消订阅时触发 |
注解支持参数如下
| 参数 | 备注 |
| :------: | :--------------------------------------------: |
| server | 指定当前事件对应的服务名 |
| type | 事件类型 |
| priority | 事件优先级,越大越先执行,默认的事件优先级为 0 |

55
composer.json Normal file
View File

@ -0,0 +1,55 @@
{
"name": "zyimm/mqtt-server-incubator",
"type": "library",
"license": "MIT",
"keywords": [
"php",
"hyperf",
"mqtt"
],
"description": "MQTT Server for Hyperf",
"autoload": {
"psr-4": {
"Zyimm\\MqttServer\\": "src/"
}
},
"autoload-dev": {
"psr-4": {
"ZyimmTest\\MqttServer\\": "tests"
}
},
"require": {
"php": ">=8.0",
"hyperf/contract": "~2.2",
"hyperf/di": "~2.2",
"hyperf/http-server": "~2.2",
"hyperf/utils": "~2.2",
"simps/mqtt": "^1.4"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^3.0",
"mockery/mockery": "^1.0",
"phpstan/phpstan": "^1.0",
"phpunit/phpunit": ">=7.0",
"swoole/ide-helper": "^4.8"
},
"minimum-stability": "dev",
"prefer-stable": true,
"config": {
"optimize-autoloader": true,
"sort-packages": true
},
"scripts": {
"test": "phpunit -c phpunit.xml --colors=always",
"analyse": "phpstan analyse --memory-limit 1024M -l 0 ./src",
"cs-fix": "php-cs-fixer fix $1"
},
"extra": {
"branch-alias": {
"dev-main": "0.3-dev"
},
"hyperf": {
"config": "Zyimm\\MqttServer\\ConfigProvider"
}
}
}

15
phpunit.xml Normal file
View File

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit bootstrap="tests/bootstrap.php"
backupGlobals="false"
backupStaticAttributes="false"
verbose="true"
colors="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false">
<testsuite name="Testsuite">
<directory>./tests/</directory>
</testsuite>
</phpunit>

View File

@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Annotation;
use Attribute;
use Simps\MQTT\Protocol\Types;
#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
class MQTTConnect extends MQTTEvent
{
/**
* @var int
*/
public int $type = Types::CONNECT;
}

View File

@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Annotation;
use Attribute;
use Simps\MQTT\Protocol\Types;
#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
class MQTTDisconnect extends MQTTEvent
{
/**
* @var int
*/
public int $type = Types::DISCONNECT;
}

View File

@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Annotation;
use Attribute;
use Hyperf\Di\Annotation\AbstractAnnotation;
use Hyperf\Di\Annotation\AnnotationCollector;
#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
class MQTTEvent extends AbstractAnnotation
{
/**
* @var string
*/
public string $server = 'mqtt';
/**
* @var int
*/
public int $type = 0;
/**
* @var int
*/
public int $priority = 0;
public function collectClass(string $className): void
{
AnnotationCollector::collectMethod($className, 'handle', MQTTEvent::class, $this);
}
public function collectMethod(string $className, ?string $target): void
{
AnnotationCollector::collectMethod($className, $target, MQTTEvent::class, $this);
}
}

View File

@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Annotation;
use Attribute;
use Simps\MQTT\Protocol\Types;
#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
class MQTTPingReq extends MQTTEvent
{
/**
* @var int
*/
public int $type = Types::PINGREQ;
}

View File

@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Annotation;
use Attribute;
use Simps\MQTT\Protocol\Types;
#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
class MQTTPublish extends MQTTEvent
{
/**
* @var int
*/
public $type = Types::PUBLISH;
}

View File

@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Annotation;
use Attribute;
use Simps\MQTT\Protocol\Types;
#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
class MQTTSubscribe extends MQTTEvent
{
/**
* @var int
*/
public $type = Types::SUBSCRIBE;
}

View File

@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Annotation;
use Attribute;
use Simps\MQTT\Protocol\Types;
#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
class MQTTUnsubscribe extends MQTTEvent
{
/**
* @var int
*/
public int $type = Types::UNSUBSCRIBE;
}

32
src/ConfigProvider.php Normal file
View File

@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer;
class ConfigProvider
{
public function __invoke(): array
{
return [
'dependencies' => [
],
'commands' => [
],
'annotations' => [
'scan' => [
'paths' => [
__DIR__,
],
],
],
];
}
}

101
src/CoreMiddleware.php Normal file
View File

@ -0,0 +1,101 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\MqttServer;
use Hyperf\Context\Context;
use Hyperf\Di\Annotation\AnnotationCollector;
use Hyperf\HttpMessage\Base\Response;
use Hyperf\HttpServer\Contract\CoreMiddlewareInterface;
use Laminas\Stdlib\SplPriorityQueue;
use Psr\Container\ContainerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\RequestHandlerInterface;
use Simps\MQTT\Protocol\Types;
use Zyimm\MqttServer\Annotation\MQTTEvent;
use Zyimm\MqttServer\Handler\MQTTConnectHandler;
use Zyimm\MqttServer\Handler\MQTTDisconnectHandler;
use Zyimm\MqttServer\Handler\MQTTPingReqHandler;
class CoreMiddleware implements CoreMiddlewareInterface
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @var string
*/
protected $serverName;
public function __construct(ContainerInterface $container, string $serverName)
{
$this->container = $container;
$this->serverName = $serverName;
}
public function dispatch(ServerRequestInterface $request): ServerRequestInterface
{
return $request;
}
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
$type = $request->getAttribute(Types::class);
$items = AnnotationCollector::getMethodsByAnnotation(MQTTEvent::class);
$response = Context::get(ResponseInterface::class);
$queue = new SplPriorityQueue();
foreach ($items as $item) {
$class = $item['class'];
$method = $item['method'];
/** @var MQTTEvent $annotation */
$annotation = $item['annotation'];
if ($annotation->server === $this->serverName && $annotation->type === $type) {
$queue->insert([$class, $method], $annotation->priority);
}
}
if ($defaultHandler = $this->getDefaultHandler($type)) {
$queue->insert([$defaultHandler, 'handle'], 0);
}
foreach ($queue as [$class, $method]) {
if (! $this->container->has($class)) {
continue;
}
$response = $this->container->get($class)->{$method}($request, $response);
if ($response instanceof Response && $response->getAttribute('stopped', false)) {
break;
}
}
return $response;
}
protected function getDefaultHandler(int $type): ?string
{
$handlers = [
Types::CONNECT => MQTTConnectHandler::class,
Types::DISCONNECT => MQTTDisconnectHandler::class,
Types::PINGREQ => MQTTPingReqHandler::class,
// The handlers below are not necessary.
// Types::PUBLISH => MQTTPublishHandler::class,
// Types::SUBSCRIBE => MQTTSubscribeHandler::class,
// Types::UNSUBSCRIBE => MQTTUnsubscribeHandler::class,
];
return $handlers[$type] ?? null;
}
}

View File

@ -0,0 +1,56 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Exception\Handler;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\ExceptionHandler\ExceptionHandler;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use Hyperf\HttpMessage\Base\Response;
use Psr\Http\Message\ResponseInterface;
use Throwable;
class MqttExceptionHandler extends ExceptionHandler
{
/**
* @var StdoutLoggerInterface
*/
protected StdoutLoggerInterface $logger;
/**
* @var FormatterInterface
*/
protected FormatterInterface $formatter;
public function __construct(StdoutLoggerInterface $logger, FormatterInterface $formatter)
{
$this->logger = $logger;
$this->formatter = $formatter;
}
public function handle(Throwable $throwable, ResponseInterface $response): ResponseInterface
{
$this->logger->warning($this->formatter->format($throwable));
$this->stopPropagation();
if ($response instanceof Response) {
$response = $response->withAttribute('closed', true);
}
return $response;
}
public function isValid(Throwable $throwable): bool
{
return true;
}
}

View File

@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Exception;
use RuntimeException;
class InvalidProtocolException extends RuntimeException
{
}

View File

@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Handler;
use Hyperf\HttpMessage\Server\Response;
use Psr\Http\Message\ServerRequestInterface;
interface HandlerInterface
{
public function handle(ServerRequestInterface $request, Response $response): Response;
}

View File

@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Handler;
use Hyperf\HttpMessage\Server\Response;
use Hyperf\HttpMessage\Stream\SwooleStream;
use Psr\Http\Message\ServerRequestInterface;
use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V3;
use Throwable;
class MQTTConnectHandler implements HandlerInterface
{
use ResponseRewritable;
/**
* @throws Throwable
*/
public function handle(ServerRequestInterface $request, Response $response): Response
{
$data = $request->getParsedBody();
if ($data['protocol_name'] != 'MQTT') {
return $response->withAttribute('closed', true);
}
if (! $this->isRewritable($response)) {
return $response;
}
return $response->withBody(new SwooleStream(V3::pack(
[
'type' => Types::CONNACK,
'code' => 0,
'session_present' => 0,
]
)));
}
}

View File

@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Handler;
use Hyperf\HttpMessage\Server\Response;
use Psr\Http\Message\ServerRequestInterface;
class MQTTDisconnectHandler implements HandlerInterface
{
public function handle(ServerRequestInterface $request, Response $response): Response
{
return $response->withAttribute('closed', true);
}
}

View File

@ -0,0 +1,38 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Handler;
use Hyperf\HttpMessage\Server\Response;
use Hyperf\HttpMessage\Stream\SwooleStream;
use Psr\Http\Message\ServerRequestInterface;
use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V3;
use Throwable;
class MQTTPingReqHandler implements HandlerInterface
{
use ResponseRewritable;
/**
* @throws Throwable
*/
public function handle(ServerRequestInterface $request, Response $response): Response
{
if (! $this->isRewritable($response)) {
return $response;
}
return $response->withBody(new SwooleStream(V3::pack(
['type' => Types::PINGRESP]
)));
}
}

View File

@ -0,0 +1,60 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Handler;
use Hyperf\HttpMessage\Server\Response;
use Hyperf\HttpMessage\Stream\SwooleStream;
use Psr\Http\Message\ServerRequestInterface;
use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V3;
use Swoole\Coroutine\Server\Connection;
use Swoole\Server;
class MQTTPublishHandler implements HandlerInterface
{
public function handle(ServerRequestInterface $request, Response $response): Response
{
/** @var Server|Connection $server */
$server = $response->getAttribute('server');
$fd = $request->getAttribute('fd');
$data = $request->getParsedBody();
foreach ($server->connections as $targetFd) {
if ($targetFd != $fd) {
$server->send(
$targetFd,
V3::pack(
[
'type' => $data['type'],
'topic' => $data['topic'],
'message' => $data['message'],
'dup' => $data['dup'],
'qos' => $data['qos'],
'retain' => $data['retain'],
'message_id' => $data['message_id'] ?? '',
]
)
);
}
}
if ($data['qos'] === 1) {
$response = $response->withBody(new SwooleStream(V3::pack(
[
'type' => Types::PUBACK,
'message_id' => $data['message_id'] ?? '',
]
)));
}
return $response;
}
}

View File

@ -0,0 +1,42 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Handler;
use Hyperf\HttpMessage\Server\Response;
use Hyperf\HttpMessage\Stream\SwooleStream;
use Psr\Http\Message\ServerRequestInterface;
use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V3;
class MQTTSubscribeHandler implements HandlerInterface
{
public function handle(ServerRequestInterface $request, Response $response): Response
{
$data = $request->getParsedBody();
$payload = [];
foreach ($data['topics'] as $k => $qos) {
if (is_numeric($qos) && $qos < 3) {
$payload[] = $qos;
} else {
$payload[] = 0x80;
}
}
return $response->withBody(new SwooleStream(V3::pack(
[
'type' => Types::SUBACK,
'message_id' => $data['message_id'] ?? '',
'codes' => $payload,
]
)));
}
}

View File

@ -0,0 +1,37 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Handler;
use Hyperf\HttpMessage\Server\Response;
use Hyperf\HttpMessage\Stream\SwooleStream;
use Psr\Http\Message\ServerRequestInterface;
use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V3;
use Throwable;
class MQTTUnsubscribeHandler implements HandlerInterface
{
/**
* @throws Throwable
*/
public function handle(ServerRequestInterface $request, Response $response): Response
{
$data = $request->getParsedBody();
return $response->withBody(new SwooleStream(V3::pack(
[
'type' => Types::UNSUBACK,
'message_id' => $data['message_id'] ?? '',
]
)));
}
}

View File

@ -0,0 +1,26 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Zyimm\MqttServer\Handler;
use Psr\Http\Message\ResponseInterface;
trait ResponseRewritable
{
/**
* When the body of response is written by custom connect handler.
* It is no need to rewrite again.
*/
public function isRewritable(ResponseInterface $response): bool
{
return ((string) $response->getBody()) === '';
}
}

187
src/MQTTServer.php Normal file
View File

@ -0,0 +1,187 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\MqttServer;
use Hyperf\Context\Context;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Contract\DispatcherInterface;
use Hyperf\Contract\MiddlewareInitializerInterface;
use Hyperf\Contract\OnReceiveInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Dispatcher\HttpDispatcher;
use Hyperf\ExceptionHandler\ExceptionHandlerDispatcher;
use Hyperf\HttpMessage\Server\Request;
use Hyperf\HttpMessage\Server\Response as PsrResponse;
use Hyperf\HttpMessage\Stream\SwooleStream;
use Hyperf\HttpMessage\Uri\Uri;
use Hyperf\HttpServer\Contract\CoreMiddlewareInterface;
use Hyperf\Utils\Codec\Json;
use Hyperf\Utils\Coordinator\Constants;
use Hyperf\Utils\Coordinator\CoordinatorManager;
use Psr\Container\ContainerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Log\LoggerInterface;
use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V3;
use Swoole\Coroutine\Server\Connection;
use Swoole\Server as SwooleServer;
use Throwable;
use Zyimm\MqttServer\Exception\Handler\MqttExceptionHandler;
class MQTTServer implements OnReceiveInterface, MiddlewareInitializerInterface
{
/**
* @var ContainerInterface
*/
protected ContainerInterface $container;
/**
* @var DispatcherInterface
*/
protected DispatcherInterface $dispatcher;
/**
* @var ExceptionHandlerDispatcher
*/
protected ExceptionHandlerDispatcher $exceptionHandlerDispatcher;
/**
* @var array
*/
protected array $middlewares;
/**
* @var CoreMiddlewareInterface
*/
protected CoreMiddlewareInterface $coreMiddleware;
/**
* @var array
*/
protected array $exceptionHandlers;
/**
* @var string
*/
protected string $serverName;
/**
* @var LoggerInterface
*/
protected LoggerInterface|StdoutLoggerInterface $logger;
public function __construct(
ContainerInterface $container,
HttpDispatcher $dispatcher,
ExceptionHandlerDispatcher $exceptionDispatcher,
StdoutLoggerInterface $logger
) {
$this->container = $container;
$this->dispatcher = $dispatcher;
$this->exceptionHandlerDispatcher = $exceptionDispatcher;
$this->logger = $logger;
}
public function initCoreMiddleware(string $serverName): void
{
$this->serverName = $serverName;
$this->coreMiddleware = $this->createCoreMiddleware();
$config = $this->container->get(ConfigInterface::class);
$this->middlewares = $config->get('middlewares.' . $serverName, []);
$this->exceptionHandlers = $config->get('exceptions.handler.' . $serverName, $this->getDefaultExceptionHandler());
}
public function onReceive($server, int $fd, int $reactorId, string $data): void
{
$request = $response = null;
try {
CoordinatorManager::until(Constants::WORKER_START)->yield();
// Initialize PSR-7 Request and Response objects.
Context::set(ResponseInterface::class, $this->buildResponse($fd, $server));
Context::set(ServerRequestInterface::class, $request = $this->buildRequest($fd, $reactorId, $data));
$middlewares = $this->middlewares;
$request = $this->coreMiddleware->dispatch($request);
$response = $this->dispatcher->dispatch($request, $middlewares, $this->coreMiddleware);
} catch (Throwable $throwable) {
// Delegate the exception to exception handler.
$exceptionHandlerDispatcher = $this->container->get(ExceptionHandlerDispatcher::class);
$response = $exceptionHandlerDispatcher->dispatch($throwable, $this->exceptionHandlers);
} finally {
if ($response instanceof PsrResponse && $response->getAttribute('closed', false)) {
$this->close($server, $fd);
}
if ($response instanceof ResponseInterface) {
$this->send($server, $fd, $response);
}
}
}
protected function getDefaultExceptionHandler(): array
{
return [
MqttExceptionHandler::class,
];
}
/**
* @param Connection|SwooleServer $server
*/
protected function send($server, int $fd, ResponseInterface $response): void
{
$body = (string) $response->getBody();
if (empty($body)) {
return;
}
if ($server instanceof SwooleServer) {
$server->send($fd, $body);
} elseif ($server instanceof Connection) {
$server->send($body);
}
}
protected function close($server, int $fd): void
{
if ($server instanceof SwooleServer) {
$server->close($fd);
} elseif ($server instanceof Connection) {
$server->close();
}
}
protected function createCoreMiddleware(): CoreMiddlewareInterface
{
return new CoreMiddleware($this->container, $this->serverName);
}
protected function buildRequest(int $fd, int $reactorId, string $data): ServerRequestInterface
{
$data = V3::unpack($data);
$uri = new Uri('http://0.0.0.0/');
$request = new Request('POST', $uri, ['Content-Type' => 'application/json'], new SwooleStream(Json::encode($data)));
return $request->withAttribute(Types::class, $data['type'] ?? 0)
->withAttribute('fd', $fd)
->withAttribute('reactorId', $reactorId)
->withParsedBody($data);
}
protected function buildResponse(int $fd, $server): ResponseInterface
{
return (new PsrResponse())->withAttribute('fd', $fd)->withAttribute('server', $server);
}
}

View File

@ -0,0 +1,21 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace ZyimmTest\MqttServer\Cases;
use PHPUnit\Framework\TestCase;
/**
* Class AbstractTestCase.
*/
abstract class AbstractTestCase extends TestCase
{
}

View File

@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace ZyimmTest\MqttServer\Cases;
/**
* @internal
* @coversNothing
*/
class ExampleTest extends AbstractTestCase
{
public function testExample()
{
$this->assertTrue(true);
}
}

12
tests/bootstrap.php Normal file
View File

@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
require_once dirname(__FILE__, 2). '/vendor/autoload.php';