php
This commit is contained in:
parent
cae5186e52
commit
662632992c
38
src/ConfigProvider.php
Normal file
38
src/ConfigProvider.php
Normal file
|
@ -0,0 +1,38 @@
|
|||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: adamchen1208
|
||||
* Date: 2020/7/24
|
||||
* Time: 15:29
|
||||
*/
|
||||
|
||||
namespace Hyperf\Mongodb;
|
||||
|
||||
use Hyperf\Mongodb\MongoDb;
|
||||
|
||||
class ConfigProvider
|
||||
{
|
||||
public function __invoke(): array
|
||||
{
|
||||
return [
|
||||
'dependencies' => [
|
||||
MongoDb::class => MongoDb::class,
|
||||
],
|
||||
'commands' => [
|
||||
],
|
||||
'scan' => [
|
||||
'paths' => [
|
||||
__DIR__,
|
||||
],
|
||||
],
|
||||
'publish' => [
|
||||
[
|
||||
'id' => 'config',
|
||||
'description' => 'The config of mongodb client.',
|
||||
'source' => __DIR__ . '/../publish/mongodb.php',
|
||||
'destination' => BASE_PATH . '/config/autoload/mongodb.php',
|
||||
],
|
||||
],
|
||||
];
|
||||
}
|
||||
}
|
21
src/Exception/MongoDBException.php
Normal file
21
src/Exception/MongoDBException.php
Normal file
|
@ -0,0 +1,21 @@
|
|||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: adamchen1208
|
||||
* Date: 2020/7/24
|
||||
* Time: 15:25
|
||||
*/
|
||||
|
||||
namespace Hyperf\Mongodb\Exception;
|
||||
|
||||
class MongoDBException extends \Exception
|
||||
{
|
||||
/**
|
||||
* @param string $msg
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public static function managerError(string $msg)
|
||||
{
|
||||
throw new self($msg);
|
||||
}
|
||||
}
|
301
src/MongoDb.php
Normal file
301
src/MongoDb.php
Normal file
|
@ -0,0 +1,301 @@
|
|||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: adamchen1208
|
||||
* Date: 2020/7/24
|
||||
* Time: 15:30
|
||||
*/
|
||||
|
||||
namespace Hyperf\Mongodb;
|
||||
|
||||
use Hyperf\Mongodb\Exception\MongoDBException;
|
||||
use Hyperf\Mongodb\Pool\PoolFactory;
|
||||
use Hyperf\Utils\Context;
|
||||
|
||||
/**
|
||||
* Class MongoDb
|
||||
* @package Hyperf\Mongodb
|
||||
*/
|
||||
class MongoDb
|
||||
{
|
||||
/**
|
||||
* @var PoolFactory
|
||||
*/
|
||||
protected $factory;
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $poolName = 'default';
|
||||
|
||||
public function __construct(PoolFactory $factory)
|
||||
{
|
||||
$this->factory = $factory;
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回满足filer的全部数据
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @param array $options
|
||||
* @return array
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function fetchAll(string $namespace, array $filter = [], array $options = []): array
|
||||
{
|
||||
try {
|
||||
/**
|
||||
* @var $collection MongoDBConnection
|
||||
*/
|
||||
$collection = $this->getConnection();
|
||||
return $collection->executeQueryAll($namespace, $filter, $options);
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回满足filer的分页数据
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param int $limit
|
||||
* @param int $currentPage
|
||||
* @param array $filter
|
||||
* @param array $options
|
||||
* @return array
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function fetchPagination(string $namespace, int $limit, int $currentPage, array $filter = [], array $options = []): array
|
||||
{
|
||||
try {
|
||||
/**
|
||||
* @var $collection MongoDBConnection
|
||||
*/
|
||||
$collection = $this->getConnection();
|
||||
return $collection->execQueryPagination($namespace, $limit, $currentPage, $filter, $options);
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量插入
|
||||
* @param $namespace
|
||||
* @param array $data
|
||||
* @return bool|string
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function insertAll($namespace, array $data)
|
||||
{
|
||||
if (count($data) == count($data, 1)) {
|
||||
throw new MongoDBException('data is can only be a two-dimensional array');
|
||||
}
|
||||
try {
|
||||
/**
|
||||
* @var $collection MongoDBConnection
|
||||
*/
|
||||
$collection = $this->getConnection();
|
||||
return $collection->insertAll($namespace, $data);
|
||||
} catch (MongoDBException $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据插入数据库
|
||||
*
|
||||
* @param $namespace
|
||||
* @param array $data
|
||||
* @return bool|mixed
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function insert($namespace, array $data = [])
|
||||
{
|
||||
try {
|
||||
/**
|
||||
* @var $collection MongoDBConnection
|
||||
*/
|
||||
$collection = $this->getConnection();
|
||||
return $collection->insert($namespace, $data);
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新数据满足$filter的行的信息成$newObject
|
||||
*
|
||||
* @param $namespace
|
||||
* @param array $filter
|
||||
* @param array $newObj
|
||||
* @return bool
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function updateRow($namespace, array $filter = [], array $newObj = []): bool
|
||||
{
|
||||
try {
|
||||
/**
|
||||
* @var $collection MongoDBConnection
|
||||
*/
|
||||
$collection = $this->getConnection();
|
||||
return $collection->updateRow($namespace, $filter, $newObj);
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 只更新数据满足$filter的行的列信息中在$newObject中出现过的字段
|
||||
*
|
||||
* @param $namespace
|
||||
* @param array $filter
|
||||
* @param array $newObj
|
||||
* @return bool
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function updateColumn($namespace, array $filter = [], array $newObj = []): bool
|
||||
{
|
||||
try {
|
||||
/**
|
||||
* @var $collection MongoDBConnection
|
||||
*/
|
||||
$collection = $this->getConnection();
|
||||
return $collection->updateColumn($namespace, $filter, $newObj);
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除满足条件的数据,默认只删除匹配条件的第一条记录,如果要删除多条$limit=true
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @param bool $limit
|
||||
* @return bool
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function delete(string $namespace, array $filter = [], bool $limit = false): bool
|
||||
{
|
||||
try {
|
||||
/**
|
||||
* @var $collection MongoDBConnection
|
||||
*/
|
||||
$collection = $this->getConnection();
|
||||
return $collection->delete($namespace, $filter, $limit);
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回collection中满足条件的数量
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @return bool
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function count(string $namespace, array $filter = [])
|
||||
{
|
||||
try {
|
||||
/**
|
||||
* @var $collection MongoDBConnection
|
||||
*/
|
||||
$collection = $this->getConnection();
|
||||
return $collection->count($namespace, $filter);
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 聚合查询
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @return bool
|
||||
* @throws MongoDBException
|
||||
* @throws \MongoDB\Driver\Exception\Exception
|
||||
*/
|
||||
public function command(string $namespace, array $filter = [])
|
||||
{
|
||||
try {
|
||||
/**
|
||||
* @var $collection MongoDBConnection
|
||||
*/
|
||||
$collection = $this->getConnection();
|
||||
return $collection->command($namespace, $filter);
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private function getConnection()
|
||||
{
|
||||
$connection = null;
|
||||
$hasContextConnection = Context::has($this->getContextKey());
|
||||
if ($hasContextConnection) {
|
||||
$connection = Context::get($this->getContextKey());
|
||||
}
|
||||
if (!$connection instanceof MongoDbConnection) {
|
||||
$pool = $this->factory->getPool($this->poolName);
|
||||
$connection = $pool->get()->getConnection();
|
||||
}
|
||||
return $connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* The key to identify the connection object in coroutine context.
|
||||
*/
|
||||
private function getContextKey(): string
|
||||
{
|
||||
return sprintf('mongodb.connection.%s', $this->poolName);
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回满足filer的全部数据
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @param array $options
|
||||
* @return array
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function findAll(string $namespace, array $filter = [], array $options = []): array
|
||||
{
|
||||
try {
|
||||
/**
|
||||
* @var $collection MongoDBConnection
|
||||
*/
|
||||
$collection = $this->getConnection();
|
||||
return $collection->executeFindAll($namespace, $filter, $options);
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 返回满足filer的全部数据
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @param array $options
|
||||
* @return array
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function findOne(string $namespace, array $filter = [], array $options = []): array
|
||||
{
|
||||
try {
|
||||
/**
|
||||
* @var $collection MongoDBConnection
|
||||
*/
|
||||
$collection = $this->getConnection();
|
||||
return $collection->executeFindOne($namespace, $filter, $options);
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
}
|
||||
}
|
||||
}
|
559
src/MongoDbConnection.php
Normal file
559
src/MongoDbConnection.php
Normal file
|
@ -0,0 +1,559 @@
|
|||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: adamchen1208
|
||||
* Date: 2020/7/24
|
||||
* Time: 15:31
|
||||
*/
|
||||
|
||||
namespace Hyperf\Mongodb;
|
||||
|
||||
use Hyperf\Contract\ConnectionInterface;
|
||||
use Hyperf\Mongodb\Exception\MongoDBException;
|
||||
use Hyperf\Pool\Connection;
|
||||
use Hyperf\Pool\Exception\ConnectionException;
|
||||
use Hyperf\Pool\Pool;
|
||||
use MongoDB\BSON\ObjectId;
|
||||
use MongoDB\Driver\BulkWrite;
|
||||
use MongoDB\Driver\Command;
|
||||
use MongoDB\Driver\Exception\AuthenticationException;
|
||||
use MongoDB\Driver\Exception\Exception;
|
||||
use MongoDB\Driver\Exception\InvalidArgumentException;
|
||||
use MongoDB\Driver\Exception\RuntimeException;
|
||||
use MongoDB\Driver\Manager;
|
||||
use MongoDB\Driver\Query;
|
||||
use MongoDB\Driver\WriteConcern;
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
class MongoDbConnection extends Connection implements ConnectionInterface
|
||||
{
|
||||
/**
|
||||
* @var Manager
|
||||
*/
|
||||
protected $connection;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
protected $config;
|
||||
|
||||
public function __construct(ContainerInterface $container, Pool $pool, array $config)
|
||||
{
|
||||
parent::__construct($container, $pool);
|
||||
$this->config = $config;
|
||||
$this->reconnect();
|
||||
}
|
||||
|
||||
public function getActiveConnection()
|
||||
{
|
||||
// TODO: Implement getActiveConnection() method.
|
||||
if ($this->check()) {
|
||||
return $this;
|
||||
}
|
||||
if (!$this->reconnect()) {
|
||||
throw new ConnectionException('Connection reconnect failed.');
|
||||
}
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnect the connection.
|
||||
*/
|
||||
public function reconnect(): bool
|
||||
{
|
||||
// TODO: Implement reconnect() method.
|
||||
try {
|
||||
/**
|
||||
* http://php.net/manual/zh/mongodb-driver-manager.construct.php
|
||||
*/
|
||||
|
||||
$username = $this->config['username'];
|
||||
$password = $this->config['password'];
|
||||
if (!empty($username) && !empty($password)) {
|
||||
$uri = sprintf(
|
||||
'mongodb://%s:%s@%s:%d/%s',
|
||||
$username,
|
||||
$password,
|
||||
$this->config['host'],
|
||||
$this->config['port'],
|
||||
$this->config['db']
|
||||
);
|
||||
} else {
|
||||
$uri = sprintf(
|
||||
'mongodb://%s:%d/%s',
|
||||
$this->config['host'],
|
||||
$this->config['port'],
|
||||
$this->config['db']
|
||||
);
|
||||
}
|
||||
$urlOptions = [];
|
||||
//数据集
|
||||
$replica = isset($this->config['replica']) ? $this->config['replica'] : null;
|
||||
if ($replica) {
|
||||
$urlOptions['replicaSet'] = $replica;
|
||||
}
|
||||
$this->connection = new Manager($uri, $urlOptions);
|
||||
} catch (InvalidArgumentException $e) {
|
||||
throw MongoDBException::managerError('mongodb 连接参数错误:' . $e->getMessage());
|
||||
} catch (RuntimeException $e) {
|
||||
throw MongoDBException::managerError('mongodb uri格式错误:' . $e->getMessage());
|
||||
}
|
||||
$this->lastUseTime = microtime(true);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the connection.
|
||||
*/
|
||||
public function close(): bool
|
||||
{
|
||||
// TODO: Implement close() method.
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 查询返回结果的全部数据
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @param array $options
|
||||
* @return array
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function executeQueryAll(string $namespace, array $filter = [], array $options = [])
|
||||
{
|
||||
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
|
||||
$filter['_id'] = new ObjectId($filter['_id']);
|
||||
}
|
||||
// 查询数据
|
||||
$result = [];
|
||||
try {
|
||||
$query = new Query($filter, $options);
|
||||
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
|
||||
|
||||
foreach ($cursor as $document) {
|
||||
$document = (array)$document;
|
||||
$document['_id'] = (string)$document['_id'];
|
||||
$result[] = $document;
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} catch (Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} finally {
|
||||
$this->pool->release($this);
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回分页数据,默认每页10条
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param int $limit
|
||||
* @param int $currentPage
|
||||
* @param array $filter
|
||||
* @param array $options
|
||||
* @return array
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function execQueryPagination(string $namespace, int $limit = 10, int $currentPage = 0, array $filter = [], array $options = [])
|
||||
{
|
||||
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
|
||||
$filter['_id'] = new ObjectId($filter['_id']);
|
||||
}
|
||||
// 查询数据
|
||||
$data = [];
|
||||
$result = [];
|
||||
|
||||
//每次最多返回10条记录
|
||||
if (!isset($options['limit']) || (int)$options['limit'] <= 0) {
|
||||
$options['limit'] = $limit;
|
||||
}
|
||||
|
||||
if (!isset($options['skip']) || (int)$options['skip'] <= 0) {
|
||||
$options['skip'] = $currentPage * $limit;
|
||||
}
|
||||
|
||||
try {
|
||||
$query = new Query($filter, $options);
|
||||
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
|
||||
|
||||
foreach ($cursor as $document) {
|
||||
$document = (array)$document;
|
||||
$document['_id'] = (string)$document['_id'];
|
||||
$data[] = $document;
|
||||
}
|
||||
|
||||
$result['totalCount'] = $this->count($namespace, $filter);
|
||||
$result['currentPage'] = $currentPage;
|
||||
$result['perPage'] = $limit;
|
||||
$result['list'] = $data;
|
||||
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} catch (Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} finally {
|
||||
$this->pool->release($this);
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据插入
|
||||
* http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php
|
||||
* $data1 = ['title' => 'one'];
|
||||
* $data2 = ['_id' => 'custom ID', 'title' => 'two'];
|
||||
* $data3 = ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three'];
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $data
|
||||
* @return bool|string
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function insert(string $namespace, array $data = [])
|
||||
{
|
||||
try {
|
||||
$bulk = new BulkWrite();
|
||||
$insertId = (string)$bulk->insert($data);
|
||||
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
|
||||
$this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
|
||||
} catch (\Exception $e) {
|
||||
$insertId = false;
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} finally {
|
||||
$this->pool->release($this);
|
||||
return $insertId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量数据插入
|
||||
* http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php
|
||||
* $data = [
|
||||
* ['title' => 'one'],
|
||||
* ['_id' => 'custom ID', 'title' => 'two'],
|
||||
* ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three']
|
||||
* ];
|
||||
* @param string $namespace
|
||||
* @param array $data
|
||||
* @return bool|string
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function insertAll(string $namespace, array $data = [])
|
||||
{
|
||||
try {
|
||||
$bulk = new BulkWrite();
|
||||
foreach ($data as $items) {
|
||||
$insertId[] = (string)$bulk->insert($items);
|
||||
}
|
||||
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
|
||||
$this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
|
||||
} catch (\Exception $e) {
|
||||
$insertId = false;
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} finally {
|
||||
$this->pool->release($this);
|
||||
return $insertId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据更新,效果是满足filter的行,只更新$newObj中的$set出现的字段
|
||||
* http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php
|
||||
* $bulk->update(
|
||||
* ['x' => 2],
|
||||
* ['$set' => ['y' => 3]],
|
||||
* ['multi' => false, 'upsert' => false]
|
||||
* );
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @param array $newObj
|
||||
* @return bool
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function updateRow(string $namespace, array $filter = [], array $newObj = []): bool
|
||||
{
|
||||
try {
|
||||
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
|
||||
$filter['_id'] = new ObjectId($filter['_id']);
|
||||
}
|
||||
|
||||
$bulk = new BulkWrite;
|
||||
$bulk->update(
|
||||
$filter,
|
||||
['$set' => $newObj],
|
||||
['multi' => true, 'upsert' => false]
|
||||
);
|
||||
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
|
||||
$result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
|
||||
$modifiedCount = $result->getModifiedCount();
|
||||
$update = $modifiedCount == 0 ? false : true;
|
||||
} catch (\Exception $e) {
|
||||
$update = false;
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} finally {
|
||||
$this->pool->release($this);
|
||||
return $update;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据更新, 效果是满足filter的行数据更新成$newObj
|
||||
* http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php
|
||||
* $bulk->update(
|
||||
* ['x' => 2],
|
||||
* [['y' => 3]],
|
||||
* ['multi' => false, 'upsert' => false]
|
||||
* );
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @param array $newObj
|
||||
* @return bool
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function updateColumn(string $namespace, array $filter = [], array $newObj = []): bool
|
||||
{
|
||||
try {
|
||||
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
|
||||
$filter['_id'] = new ObjectId($filter['_id']);
|
||||
}
|
||||
|
||||
$bulk = new BulkWrite;
|
||||
$bulk->update(
|
||||
$filter,
|
||||
['$set' => $newObj],
|
||||
['multi' => false, 'upsert' => false]
|
||||
);
|
||||
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
|
||||
$result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
|
||||
$modifiedCount = $result->getModifiedCount();
|
||||
$update = $modifiedCount == 1 ? true : false;
|
||||
} catch (\Exception $e) {
|
||||
$update = false;
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} finally {
|
||||
$this->release();
|
||||
return $update;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除数据
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @param bool $limit
|
||||
* @return bool
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function delete(string $namespace, array $filter = [], bool $limit = false): bool
|
||||
{
|
||||
try {
|
||||
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
|
||||
$filter['_id'] = new ObjectId($filter['_id']);
|
||||
}
|
||||
|
||||
$bulk = new BulkWrite;
|
||||
$bulk->delete($filter, ['limit' => $limit]);
|
||||
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
|
||||
$this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
|
||||
$delete = true;
|
||||
} catch (\Exception $e) {
|
||||
$delete = false;
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} finally {
|
||||
$this->pool->release($this);
|
||||
return $delete;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取collection 中满足条件的条数
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @return bool
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function count(string $namespace, array $filter = [])
|
||||
{
|
||||
try {
|
||||
$command = new Command([
|
||||
'count' => $namespace,
|
||||
'query' => $filter
|
||||
]);
|
||||
$cursor = $this->connection->executeCommand($this->config['db'], $command);
|
||||
$count = $cursor->toArray()[0]->n;
|
||||
return $count;
|
||||
} catch (\Exception $e) {
|
||||
$count = false;
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} catch (Exception $e) {
|
||||
$count = false;
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} finally {
|
||||
$this->pool->release($this);
|
||||
return $count;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取collection 中满足条件的条数
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @return bool
|
||||
* @throws Exception
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function command(string $namespace, array $filter = [])
|
||||
{
|
||||
try {
|
||||
$command = new Command([
|
||||
'aggregate' => $namespace,
|
||||
'pipeline' => $filter,
|
||||
'cursor' => new \stdClass()
|
||||
]);
|
||||
$cursor = $this->connection->executeCommand($this->config['db'], $command);
|
||||
$count = $cursor->toArray()[0];
|
||||
} catch (\Exception $e) {
|
||||
$count = false;
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} finally {
|
||||
$this->pool->release($this);
|
||||
return $count;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断当前的数据库连接是否已经超时
|
||||
*
|
||||
* @return bool
|
||||
* @throws \MongoDB\Driver\Exception\Exception
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function check(): bool
|
||||
{
|
||||
try {
|
||||
$command = new Command(['ping' => 1]);
|
||||
$this->connection->executeCommand($this->config['db'], $command);
|
||||
return true;
|
||||
} catch (\Throwable $e) {
|
||||
return $this->catchMongoException($e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \Throwable $e
|
||||
* @return bool
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
private function catchMongoException(\Throwable $e)
|
||||
{
|
||||
switch ($e) {
|
||||
case ($e instanceof InvalidArgumentException):
|
||||
{
|
||||
throw MongoDBException::managerError('mongo argument exception: ' . $e->getMessage());
|
||||
}
|
||||
case ($e instanceof AuthenticationException):
|
||||
{
|
||||
throw MongoDBException::managerError('mongo数据库连接授权失败:' . $e->getMessage());
|
||||
}
|
||||
case ($e instanceof ConnectionException):
|
||||
{
|
||||
/**
|
||||
* https://cloud.tencent.com/document/product/240/4980
|
||||
* 存在连接失败的,那么进行重连
|
||||
*/
|
||||
for ($counts = 1; $counts <= 5; $counts++) {
|
||||
try {
|
||||
$this->reconnect();
|
||||
} catch (\Exception $e) {
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
case ($e instanceof RuntimeException):
|
||||
{
|
||||
throw MongoDBException::managerError('mongo runtime exception: ' . $e->getMessage());
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw MongoDBException::managerError('mongo unexpected exception: ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询返回结果的全部数据
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @param array $options
|
||||
* @return array
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function executeFindAll(string $namespace, array $filter = [], array $options = [])
|
||||
{
|
||||
// 查询数据
|
||||
$result = [];
|
||||
try {
|
||||
$query = new Query($filter, $options);
|
||||
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
|
||||
|
||||
foreach ($cursor as $document) {
|
||||
$document = (array)$document;
|
||||
$document['_id'] = (string)$document['_id'];
|
||||
$result[] = $document;
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} catch (Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} finally {
|
||||
$this->pool->release($this);
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询返回结果的全部数据
|
||||
*
|
||||
* @param string $namespace
|
||||
* @param array $filter
|
||||
* @param array $options
|
||||
* @return array
|
||||
* @throws MongoDBException
|
||||
*/
|
||||
public function executeFindOne(string $namespace, array $filter = [], array $options = [])
|
||||
{
|
||||
// 查询数据
|
||||
$result = [];
|
||||
try {
|
||||
$query = new Query($filter, $options);
|
||||
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
|
||||
|
||||
foreach ($cursor as $document) {
|
||||
$document = (array)$document;
|
||||
$document['_id'] = (string)$document['_id'];
|
||||
$result = $document;
|
||||
break;
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} catch (Exception $e) {
|
||||
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
|
||||
} finally {
|
||||
$this->pool->release($this);
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
}
|
57
src/Pool/MongoDBPool.php
Normal file
57
src/Pool/MongoDBPool.php
Normal file
|
@ -0,0 +1,57 @@
|
|||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: adamchen1208
|
||||
* Date: 2020/7/24
|
||||
* Time: 15:26
|
||||
*/
|
||||
|
||||
namespace Hyperf\Mongodb\Pool;
|
||||
|
||||
use Hyperf\Contract\ConfigInterface;
|
||||
use Hyperf\Contract\ConnectionInterface;
|
||||
use Hyperf\Mongodb\MongoDbConnection;
|
||||
use Hyperf\Pool\Pool;
|
||||
use Hyperf\Utils\Arr;
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
class MongoDBPool extends Pool
|
||||
{
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $name;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
protected $config;
|
||||
|
||||
public function __construct(ContainerInterface $container, string $name)
|
||||
{
|
||||
$this->name = $name;
|
||||
$config = $container->get(ConfigInterface::class);
|
||||
$key = sprintf('mongodb.%s', $this->name);
|
||||
if (!$config->has($key)) {
|
||||
throw new \InvalidArgumentException(sprintf('config[%s] is not exist!', $key));
|
||||
}
|
||||
|
||||
$this->config = $config->get($key);
|
||||
$options = Arr::get($this->config, 'pool', []);
|
||||
|
||||
parent::__construct($container, $options);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getName(): string
|
||||
{
|
||||
return $this->name;
|
||||
}
|
||||
|
||||
protected function createConnection(): ConnectionInterface
|
||||
{
|
||||
return new MongoDbConnection($this->container, $this, $this->config);
|
||||
}
|
||||
}
|
45
src/Pool/PoolFactory.php
Normal file
45
src/Pool/PoolFactory.php
Normal file
|
@ -0,0 +1,45 @@
|
|||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: adamchen1208
|
||||
* Date: 2020/7/24
|
||||
* Time: 15:28
|
||||
*/
|
||||
|
||||
namespace Hyperf\Mongodb\Pool;
|
||||
|
||||
use Hyperf\Di\Container;
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Swoole\Coroutine\Channel;
|
||||
|
||||
class PoolFactory
|
||||
{
|
||||
/**
|
||||
* @var ContainerInterface
|
||||
*/
|
||||
protected $container;
|
||||
|
||||
/**
|
||||
* @var Channel[]
|
||||
*/
|
||||
protected $pools = [];
|
||||
|
||||
public function __construct(ContainerInterface $container)
|
||||
{
|
||||
$this->container = $container;
|
||||
}
|
||||
|
||||
public function getPool(string $name): MongoDBPool
|
||||
{
|
||||
if (isset($this->pools[$name])) {
|
||||
return $this->pools[$name];
|
||||
}
|
||||
|
||||
if ($this->container instanceof Container) {
|
||||
$pool = $this->container->make(MongoDBPool::class, ['name' => $name]);
|
||||
} else {
|
||||
$pool = new MongoDBPool($this->container, $name);
|
||||
}
|
||||
return $this->pools[$name] = $pool;
|
||||
}
|
||||
}
|
28
src/Publish/mongodb.php
Normal file
28
src/Publish/mongodb.php
Normal file
|
@ -0,0 +1,28 @@
|
|||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: CY
|
||||
* Date: 2020/7/24
|
||||
* Time: 15:28
|
||||
*/
|
||||
|
||||
return [
|
||||
'default' => [
|
||||
'username' => env('MONGODB_USERNAME', ''),
|
||||
'password' => env('MONGODB_PASSWORD', ''),
|
||||
'host' => env('MONGODB_HOST', '127.0.0.1'),
|
||||
'port' => env('MONGODB_PORT', 27017),
|
||||
'db' => env('MONGODB_DB', 'test'),
|
||||
'authMechanism' => 'SCRAM-SHA-256',
|
||||
//设置复制集,没有不设置
|
||||
'replica' => 'rs0',
|
||||
'pool' => [
|
||||
'min_connections' => 1,
|
||||
'max_connections' => 100,
|
||||
'connect_timeout' => 10.0,
|
||||
'wait_timeout' => 3.0,
|
||||
'heartbeat' => -1,
|
||||
'max_idle_time' => (float)env('MONGODB_MAX_IDLE_TIME', 60),
|
||||
],
|
||||
],
|
||||
];
|
Loading…
Reference in New Issue
Block a user