diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php new file mode 100644 index 0000000..1969be6 --- /dev/null +++ b/src/ConfigProvider.php @@ -0,0 +1,38 @@ + [ + MongoDb::class => MongoDb::class, + ], + 'commands' => [ + ], + 'scan' => [ + 'paths' => [ + __DIR__, + ], + ], + 'publish' => [ + [ + 'id' => 'config', + 'description' => 'The config of mongodb client.', + 'source' => __DIR__ . '/../publish/mongodb.php', + 'destination' => BASE_PATH . '/config/autoload/mongodb.php', + ], + ], + ]; + } +} diff --git a/src/Exception/MongoDBException.php b/src/Exception/MongoDBException.php new file mode 100644 index 0000000..3cadadf --- /dev/null +++ b/src/Exception/MongoDBException.php @@ -0,0 +1,21 @@ +factory = $factory; + } + + /** + * 返回满足filer的全部数据 + * + * @param string $namespace + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function fetchAll(string $namespace, array $filter = [], array $options = []): array + { + try { + /** + * @var $collection MongoDBConnection + */ + $collection = $this->getConnection(); + return $collection->executeQueryAll($namespace, $filter, $options); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } + + /** + * 返回满足filer的分页数据 + * + * @param string $namespace + * @param int $limit + * @param int $currentPage + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function fetchPagination(string $namespace, int $limit, int $currentPage, array $filter = [], array $options = []): array + { + try { + /** + * @var $collection MongoDBConnection + */ + $collection = $this->getConnection(); + return $collection->execQueryPagination($namespace, $limit, $currentPage, $filter, $options); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } + + /** + * 批量插入 + * @param $namespace + * @param array $data + * @return bool|string + * @throws MongoDBException + */ + public function insertAll($namespace, array $data) + { + if (count($data) == count($data, 1)) { + throw new MongoDBException('data is can only be a two-dimensional array'); + } + try { + /** + * @var $collection MongoDBConnection + */ + $collection = $this->getConnection(); + return $collection->insertAll($namespace, $data); + } catch (MongoDBException $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } + + /** + * 数据插入数据库 + * + * @param $namespace + * @param array $data + * @return bool|mixed + * @throws MongoDBException + */ + public function insert($namespace, array $data = []) + { + try { + /** + * @var $collection MongoDBConnection + */ + $collection = $this->getConnection(); + return $collection->insert($namespace, $data); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } + + /** + * 更新数据满足$filter的行的信息成$newObject + * + * @param $namespace + * @param array $filter + * @param array $newObj + * @return bool + * @throws MongoDBException + */ + public function updateRow($namespace, array $filter = [], array $newObj = []): bool + { + try { + /** + * @var $collection MongoDBConnection + */ + $collection = $this->getConnection(); + return $collection->updateRow($namespace, $filter, $newObj); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } + + /** + * 只更新数据满足$filter的行的列信息中在$newObject中出现过的字段 + * + * @param $namespace + * @param array $filter + * @param array $newObj + * @return bool + * @throws MongoDBException + */ + public function updateColumn($namespace, array $filter = [], array $newObj = []): bool + { + try { + /** + * @var $collection MongoDBConnection + */ + $collection = $this->getConnection(); + return $collection->updateColumn($namespace, $filter, $newObj); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } + + /** + * 删除满足条件的数据,默认只删除匹配条件的第一条记录,如果要删除多条$limit=true + * + * @param string $namespace + * @param array $filter + * @param bool $limit + * @return bool + * @throws MongoDBException + */ + public function delete(string $namespace, array $filter = [], bool $limit = false): bool + { + try { + /** + * @var $collection MongoDBConnection + */ + $collection = $this->getConnection(); + return $collection->delete($namespace, $filter, $limit); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } + + /** + * 返回collection中满足条件的数量 + * + * @param string $namespace + * @param array $filter + * @return bool + * @throws MongoDBException + */ + public function count(string $namespace, array $filter = []) + { + try { + /** + * @var $collection MongoDBConnection + */ + $collection = $this->getConnection(); + return $collection->count($namespace, $filter); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } + + + /** + * 聚合查询 + * @param string $namespace + * @param array $filter + * @return bool + * @throws MongoDBException + * @throws \MongoDB\Driver\Exception\Exception + */ + public function command(string $namespace, array $filter = []) + { + try { + /** + * @var $collection MongoDBConnection + */ + $collection = $this->getConnection(); + return $collection->command($namespace, $filter); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } + + private function getConnection() + { + $connection = null; + $hasContextConnection = Context::has($this->getContextKey()); + if ($hasContextConnection) { + $connection = Context::get($this->getContextKey()); + } + if (!$connection instanceof MongoDbConnection) { + $pool = $this->factory->getPool($this->poolName); + $connection = $pool->get()->getConnection(); + } + return $connection; + } + + /** + * The key to identify the connection object in coroutine context. + */ + private function getContextKey(): string + { + return sprintf('mongodb.connection.%s', $this->poolName); + } + + /** + * 返回满足filer的全部数据 + * + * @param string $namespace + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function findAll(string $namespace, array $filter = [], array $options = []): array + { + try { + /** + * @var $collection MongoDBConnection + */ + $collection = $this->getConnection(); + return $collection->executeFindAll($namespace, $filter, $options); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } + + + /** + * 返回满足filer的全部数据 + * + * @param string $namespace + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function findOne(string $namespace, array $filter = [], array $options = []): array + { + try { + /** + * @var $collection MongoDBConnection + */ + $collection = $this->getConnection(); + return $collection->executeFindOne($namespace, $filter, $options); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } +} diff --git a/src/MongoDbConnection.php b/src/MongoDbConnection.php new file mode 100644 index 0000000..a1427c6 --- /dev/null +++ b/src/MongoDbConnection.php @@ -0,0 +1,559 @@ +config = $config; + $this->reconnect(); + } + + public function getActiveConnection() + { + // TODO: Implement getActiveConnection() method. + if ($this->check()) { + return $this; + } + if (!$this->reconnect()) { + throw new ConnectionException('Connection reconnect failed.'); + } + return $this; + } + + /** + * Reconnect the connection. + */ + public function reconnect(): bool + { + // TODO: Implement reconnect() method. + try { + /** + * http://php.net/manual/zh/mongodb-driver-manager.construct.php + */ + + $username = $this->config['username']; + $password = $this->config['password']; + if (!empty($username) && !empty($password)) { + $uri = sprintf( + 'mongodb://%s:%s@%s:%d/%s', + $username, + $password, + $this->config['host'], + $this->config['port'], + $this->config['db'] + ); + } else { + $uri = sprintf( + 'mongodb://%s:%d/%s', + $this->config['host'], + $this->config['port'], + $this->config['db'] + ); + } + $urlOptions = []; + //数据集 + $replica = isset($this->config['replica']) ? $this->config['replica'] : null; + if ($replica) { + $urlOptions['replicaSet'] = $replica; + } + $this->connection = new Manager($uri, $urlOptions); + } catch (InvalidArgumentException $e) { + throw MongoDBException::managerError('mongodb 连接参数错误:' . $e->getMessage()); + } catch (RuntimeException $e) { + throw MongoDBException::managerError('mongodb uri格式错误:' . $e->getMessage()); + } + $this->lastUseTime = microtime(true); + return true; + } + + /** + * Close the connection. + */ + public function close(): bool + { + // TODO: Implement close() method. + return true; + } + + + /** + * 查询返回结果的全部数据 + * + * @param string $namespace + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function executeQueryAll(string $namespace, array $filter = [], array $options = []) + { + if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { + $filter['_id'] = new ObjectId($filter['_id']); + } + // 查询数据 + $result = []; + try { + $query = new Query($filter, $options); + $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); + + foreach ($cursor as $document) { + $document = (array)$document; + $document['_id'] = (string)$document['_id']; + $result[] = $document; + } + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $result; + } + } + + /** + * 返回分页数据,默认每页10条 + * + * @param string $namespace + * @param int $limit + * @param int $currentPage + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function execQueryPagination(string $namespace, int $limit = 10, int $currentPage = 0, array $filter = [], array $options = []) + { + if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { + $filter['_id'] = new ObjectId($filter['_id']); + } + // 查询数据 + $data = []; + $result = []; + + //每次最多返回10条记录 + if (!isset($options['limit']) || (int)$options['limit'] <= 0) { + $options['limit'] = $limit; + } + + if (!isset($options['skip']) || (int)$options['skip'] <= 0) { + $options['skip'] = $currentPage * $limit; + } + + try { + $query = new Query($filter, $options); + $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); + + foreach ($cursor as $document) { + $document = (array)$document; + $document['_id'] = (string)$document['_id']; + $data[] = $document; + } + + $result['totalCount'] = $this->count($namespace, $filter); + $result['currentPage'] = $currentPage; + $result['perPage'] = $limit; + $result['list'] = $data; + + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $result; + } + } + + /** + * 数据插入 + * http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php + * $data1 = ['title' => 'one']; + * $data2 = ['_id' => 'custom ID', 'title' => 'two']; + * $data3 = ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three']; + * + * @param string $namespace + * @param array $data + * @return bool|string + * @throws MongoDBException + */ + public function insert(string $namespace, array $data = []) + { + try { + $bulk = new BulkWrite(); + $insertId = (string)$bulk->insert($data); + $written = new WriteConcern(WriteConcern::MAJORITY, 1000); + $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); + } catch (\Exception $e) { + $insertId = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $insertId; + } + } + + /** + * 批量数据插入 + * http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php + * $data = [ + * ['title' => 'one'], + * ['_id' => 'custom ID', 'title' => 'two'], + * ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three'] + * ]; + * @param string $namespace + * @param array $data + * @return bool|string + * @throws MongoDBException + */ + public function insertAll(string $namespace, array $data = []) + { + try { + $bulk = new BulkWrite(); + foreach ($data as $items) { + $insertId[] = (string)$bulk->insert($items); + } + $written = new WriteConcern(WriteConcern::MAJORITY, 1000); + $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); + } catch (\Exception $e) { + $insertId = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $insertId; + } + } + + /** + * 数据更新,效果是满足filter的行,只更新$newObj中的$set出现的字段 + * http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php + * $bulk->update( + * ['x' => 2], + * ['$set' => ['y' => 3]], + * ['multi' => false, 'upsert' => false] + * ); + * + * @param string $namespace + * @param array $filter + * @param array $newObj + * @return bool + * @throws MongoDBException + */ + public function updateRow(string $namespace, array $filter = [], array $newObj = []): bool + { + try { + if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { + $filter['_id'] = new ObjectId($filter['_id']); + } + + $bulk = new BulkWrite; + $bulk->update( + $filter, + ['$set' => $newObj], + ['multi' => true, 'upsert' => false] + ); + $written = new WriteConcern(WriteConcern::MAJORITY, 1000); + $result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); + $modifiedCount = $result->getModifiedCount(); + $update = $modifiedCount == 0 ? false : true; + } catch (\Exception $e) { + $update = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $update; + } + } + + /** + * 数据更新, 效果是满足filter的行数据更新成$newObj + * http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php + * $bulk->update( + * ['x' => 2], + * [['y' => 3]], + * ['multi' => false, 'upsert' => false] + * ); + * + * @param string $namespace + * @param array $filter + * @param array $newObj + * @return bool + * @throws MongoDBException + */ + public function updateColumn(string $namespace, array $filter = [], array $newObj = []): bool + { + try { + if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { + $filter['_id'] = new ObjectId($filter['_id']); + } + + $bulk = new BulkWrite; + $bulk->update( + $filter, + ['$set' => $newObj], + ['multi' => false, 'upsert' => false] + ); + $written = new WriteConcern(WriteConcern::MAJORITY, 1000); + $result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); + $modifiedCount = $result->getModifiedCount(); + $update = $modifiedCount == 1 ? true : false; + } catch (\Exception $e) { + $update = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->release(); + return $update; + } + } + + /** + * 删除数据 + * + * @param string $namespace + * @param array $filter + * @param bool $limit + * @return bool + * @throws MongoDBException + */ + public function delete(string $namespace, array $filter = [], bool $limit = false): bool + { + try { + if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { + $filter['_id'] = new ObjectId($filter['_id']); + } + + $bulk = new BulkWrite; + $bulk->delete($filter, ['limit' => $limit]); + $written = new WriteConcern(WriteConcern::MAJORITY, 1000); + $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); + $delete = true; + } catch (\Exception $e) { + $delete = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $delete; + } + } + + /** + * 获取collection 中满足条件的条数 + * + * @param string $namespace + * @param array $filter + * @return bool + * @throws MongoDBException + */ + public function count(string $namespace, array $filter = []) + { + try { + $command = new Command([ + 'count' => $namespace, + 'query' => $filter + ]); + $cursor = $this->connection->executeCommand($this->config['db'], $command); + $count = $cursor->toArray()[0]->n; + return $count; + } catch (\Exception $e) { + $count = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + $count = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $count; + } + } + + + /** + * 获取collection 中满足条件的条数 + * + * @param string $namespace + * @param array $filter + * @return bool + * @throws Exception + * @throws MongoDBException + */ + public function command(string $namespace, array $filter = []) + { + try { + $command = new Command([ + 'aggregate' => $namespace, + 'pipeline' => $filter, + 'cursor' => new \stdClass() + ]); + $cursor = $this->connection->executeCommand($this->config['db'], $command); + $count = $cursor->toArray()[0]; + } catch (\Exception $e) { + $count = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $count; + } + } + + /** + * 判断当前的数据库连接是否已经超时 + * + * @return bool + * @throws \MongoDB\Driver\Exception\Exception + * @throws MongoDBException + */ + public function check(): bool + { + try { + $command = new Command(['ping' => 1]); + $this->connection->executeCommand($this->config['db'], $command); + return true; + } catch (\Throwable $e) { + return $this->catchMongoException($e); + } + } + + /** + * @param \Throwable $e + * @return bool + * @throws MongoDBException + */ + private function catchMongoException(\Throwable $e) + { + switch ($e) { + case ($e instanceof InvalidArgumentException): + { + throw MongoDBException::managerError('mongo argument exception: ' . $e->getMessage()); + } + case ($e instanceof AuthenticationException): + { + throw MongoDBException::managerError('mongo数据库连接授权失败:' . $e->getMessage()); + } + case ($e instanceof ConnectionException): + { + /** + * https://cloud.tencent.com/document/product/240/4980 + * 存在连接失败的,那么进行重连 + */ + for ($counts = 1; $counts <= 5; $counts++) { + try { + $this->reconnect(); + } catch (\Exception $e) { + continue; + } + break; + } + return true; + } + case ($e instanceof RuntimeException): + { + throw MongoDBException::managerError('mongo runtime exception: ' . $e->getMessage()); + } + default: + { + throw MongoDBException::managerError('mongo unexpected exception: ' . $e->getMessage()); + } + } + } + + /** + * 查询返回结果的全部数据 + * + * @param string $namespace + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function executeFindAll(string $namespace, array $filter = [], array $options = []) + { + // 查询数据 + $result = []; + try { + $query = new Query($filter, $options); + $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); + + foreach ($cursor as $document) { + $document = (array)$document; + $document['_id'] = (string)$document['_id']; + $result[] = $document; + } + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $result; + } + } + + /** + * 查询返回结果的全部数据 + * + * @param string $namespace + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function executeFindOne(string $namespace, array $filter = [], array $options = []) + { + // 查询数据 + $result = []; + try { + $query = new Query($filter, $options); + $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); + + foreach ($cursor as $document) { + $document = (array)$document; + $document['_id'] = (string)$document['_id']; + $result = $document; + break; + } + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $result; + } + } +} diff --git a/src/Pool/MongoDBPool.php b/src/Pool/MongoDBPool.php new file mode 100644 index 0000000..03410fb --- /dev/null +++ b/src/Pool/MongoDBPool.php @@ -0,0 +1,57 @@ +name = $name; + $config = $container->get(ConfigInterface::class); + $key = sprintf('mongodb.%s', $this->name); + if (!$config->has($key)) { + throw new \InvalidArgumentException(sprintf('config[%s] is not exist!', $key)); + } + + $this->config = $config->get($key); + $options = Arr::get($this->config, 'pool', []); + + parent::__construct($container, $options); + } + + /** + * @return string + */ + public function getName(): string + { + return $this->name; + } + + protected function createConnection(): ConnectionInterface + { + return new MongoDbConnection($this->container, $this, $this->config); + } +} diff --git a/src/Pool/PoolFactory.php b/src/Pool/PoolFactory.php new file mode 100644 index 0000000..517c801 --- /dev/null +++ b/src/Pool/PoolFactory.php @@ -0,0 +1,45 @@ +container = $container; + } + + public function getPool(string $name): MongoDBPool + { + if (isset($this->pools[$name])) { + return $this->pools[$name]; + } + + if ($this->container instanceof Container) { + $pool = $this->container->make(MongoDBPool::class, ['name' => $name]); + } else { + $pool = new MongoDBPool($this->container, $name); + } + return $this->pools[$name] = $pool; + } +} diff --git a/src/Publish/mongodb.php b/src/Publish/mongodb.php new file mode 100644 index 0000000..0fc34a3 --- /dev/null +++ b/src/Publish/mongodb.php @@ -0,0 +1,28 @@ + [ + 'username' => env('MONGODB_USERNAME', ''), + 'password' => env('MONGODB_PASSWORD', ''), + 'host' => env('MONGODB_HOST', '127.0.0.1'), + 'port' => env('MONGODB_PORT', 27017), + 'db' => env('MONGODB_DB', 'test'), + 'authMechanism' => 'SCRAM-SHA-256', + //设置复制集,没有不设置 + 'replica' => 'rs0', + 'pool' => [ + 'min_connections' => 1, + 'max_connections' => 100, + 'connect_timeout' => 10.0, + 'wait_timeout' => 3.0, + 'heartbeat' => -1, + 'max_idle_time' => (float)env('MONGODB_MAX_IDLE_TIME', 60), + ], + ], +]; \ No newline at end of file