From 1f64858e167f1f5d3cece18f0e291ed66f5e8d28 Mon Sep 17 00:00:00 2001 From: chenyao Date: Tue, 28 Jul 2020 10:53:39 +0800 Subject: [PATCH] 2.1.2 --- README.md | 24 +- src/Mongodb.php | 125 +++--- src/MongodbConnection.php | 875 +++++++++++++++++++------------------- 3 files changed, 525 insertions(+), 499 deletions(-) diff --git a/README.md b/README.md index 1de4ee5..fceadea 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,11 @@ $where = ['_id' => '1']; $result = $this->$mongodb->findAll('test', $where); ``` +### 分页查询 +```php +$list = $this->$mongodb->findPagination('article', 10, 0, ['author' => $author]); +``` + ### 查询一条数据(_id自动转对象) ```php @@ -80,14 +85,12 @@ $where = ['_id' => '1']; $result = $this->$mongodb->fetchAll('test', $where); ``` -### 分页查询 +### 分页查询(_id自动转对象) ```php $list = $this->$mongodb->fetchPagination('article', 10, 0, ['author' => $author]); ``` -### 新增 - -单个添加 +### 插入一条数据 ```php $insert = [ '_id' => '', @@ -96,7 +99,7 @@ $insert = [ $this->$mongodb->insert('test',$insert); ``` -批量添加 +### 插入批量数据 ```php $insert = [ [ @@ -120,24 +123,19 @@ $this->$mongodb->updateColumn('test', $where,$updateData); // 只更新数据满 $this->$mongodb->updateRow('test',$where,$updateData);// 更新数据满足$where的行的信息成$newObject ``` ### 删除 - ```php $where = ['_id'=>'1112313423']; $all = true; // 为false只删除匹配的一条,true删除多条 $this->$mongodb->delete('test',$where,$all); ``` -### count统计 - +### 统计 ```php $filter = ['isGroup' => "0", 'wechat' => '15584044700']; $count = $this->$mongodb->count('test', $filter); ``` - - -### Command,执行更复杂的mongo命令 - +### 聚合查询 **sql** 和 **mongodb** 关系对比图 | SQL | MongoDb | @@ -172,4 +170,4 @@ $pipeline= [ ]; $count = $this->$mongodb->command('test', $pipeline); -``` \ No newline at end of file +``` diff --git a/src/Mongodb.php b/src/Mongodb.php index 8bc248c..8f5c3e3 100644 --- a/src/Mongodb.php +++ b/src/Mongodb.php @@ -23,19 +23,16 @@ class Mongodb */ protected $factory; - /** * @var string */ protected $poolName = 'default'; - public function __construct(PoolFactory $factory) { $this->factory = $factory; } - /** * The key to identify the connection object in coroutine context. */ @@ -44,7 +41,6 @@ class Mongodb return sprintf('mongodb.connection.%s', $this->poolName); } - private function getConnection() { $connection = null; @@ -59,7 +55,6 @@ class Mongodb return $connection; } - /** * 返回满足filer的一条数据 * @@ -73,7 +68,7 @@ class Mongodb { try { /** - * @var $collection MongoDBConnection + * @var $collection MongodbConnection */ $collection = $this->getConnection(); return $collection->executeFindOne($namespace, $filter, $options); @@ -82,7 +77,6 @@ class Mongodb } } - /** * 返回满足filer的全部数据 * @@ -96,7 +90,7 @@ class Mongodb { try { /** - * @var $collection MongoDBConnection + * @var $collection MongodbConnection */ $collection = $this->getConnection(); return $collection->executeFindAll($namespace, $filter, $options); @@ -105,6 +99,29 @@ class Mongodb } } + /** + * 返回满足filer的分页数据 + * + * @param string $namespace + * @param int $limit + * @param int $currentPage + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function findPagination(string $namespace, int $limit, int $currentPage, array $filter = [], array $options = []): array + { + try { + /** + * @var $collection MongodbConnection + */ + $collection = $this->getConnection(); + return $collection->execFindPagination($namespace, $limit, $currentPage, $filter, $options); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } /** * 返回满足filer的一条数据(_id为自动转对象) @@ -119,16 +136,15 @@ class Mongodb { try { /** - * @var $collection MongoDBConnection + * @var $collection MongodbConnection */ $collection = $this->getConnection(); - return $collection->executeQueryOne($namespace, $filter, $options); + return $collection->executeFetchOne($namespace, $filter, $options); } catch (\Exception $e) { throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); } } - /** * 返回满足filer的全部数据(_id自动转对象) * @@ -142,10 +158,10 @@ class Mongodb { try { /** - * @var $collection MongoDBConnection + * @var $collection MongodbConnection */ $collection = $this->getConnection(); - return $collection->executeQueryAll($namespace, $filter, $options); + return $collection->executeFetchAll($namespace, $filter, $options); } catch (\Exception $e) { throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); } @@ -153,12 +169,8 @@ class Mongodb - - - - /** - * 返回满足filer的分页数据 + * 返回满足filer的分页数据(_id自动转对象) * * @param string $namespace * @param int $limit @@ -172,17 +184,38 @@ class Mongodb { try { /** - * @var $collection MongoDBConnection + * @var $collection MongodbConnection */ $collection = $this->getConnection(); - return $collection->execQueryPagination($namespace, $limit, $currentPage, $filter, $options); + return $collection->execFetchPagination($namespace, $limit, $currentPage, $filter, $options); } catch (\Exception $e) { throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); } } /** - * 批量插入 + * 插入一条数据 + * + * @param $namespace + * @param array $data + * @return bool|mixed + * @throws MongoDBException + */ + public function insertOne($namespace, array $data = []) + { + try { + /** + * @var $collection MongodbConnection + */ + $collection = $this->getConnection(); + return $collection->execInsertOne($namespace, $data); + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } + } + + /** + * 插入批量数据 * @param $namespace * @param array $data * @return bool|string @@ -195,36 +228,15 @@ class Mongodb } try { /** - * @var $collection MongoDBConnection + * @var $collection MongodbConnection */ $collection = $this->getConnection(); - return $collection->insertAll($namespace, $data); + return $collection->execInsertAll($namespace, $data); } catch (MongoDBException $e) { throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); } } - /** - * 数据插入数据库 - * - * @param $namespace - * @param array $data - * @return bool|mixed - * @throws MongoDBException - */ - public function insert($namespace, array $data = []) - { - try { - /** - * @var $collection MongoDBConnection - */ - $collection = $this->getConnection(); - return $collection->insert($namespace, $data); - } catch (\Exception $e) { - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } - } - /** * 更新数据满足$filter的行的信息成$newObject * @@ -238,10 +250,10 @@ class Mongodb { try { /** - * @var $collection MongoDBConnection + * @var $collection MongodbConnection */ $collection = $this->getConnection(); - return $collection->updateRow($namespace, $filter, $newObj); + return $collection->execUpdateRow($namespace, $filter, $newObj); } catch (\Exception $e) { throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); } @@ -260,10 +272,10 @@ class Mongodb { try { /** - * @var $collection MongoDBConnection + * @var $collection MongodbConnection */ $collection = $this->getConnection(); - return $collection->updateColumn($namespace, $filter, $newObj); + return $collection->execUpdateColumn($namespace, $filter, $newObj); } catch (\Exception $e) { throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); } @@ -282,10 +294,10 @@ class Mongodb { try { /** - * @var $collection MongoDBConnection + * @var $collection MongodbConnection */ $collection = $this->getConnection(); - return $collection->delete($namespace, $filter, $limit); + return $collection->execDelete($namespace, $filter, $limit); } catch (\Exception $e) { throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); } @@ -303,16 +315,15 @@ class Mongodb { try { /** - * @var $collection MongoDBConnection + * @var $collection MongodbConnection */ $collection = $this->getConnection(); - return $collection->count($namespace, $filter); + return $collection->execCount($namespace, $filter); } catch (\Exception $e) { throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); } } - /** * 聚合查询 * @param string $namespace @@ -325,16 +336,12 @@ class Mongodb { try { /** - * @var $collection MongoDBConnection + * @var $collection MongodbConnection */ $collection = $this->getConnection(); - return $collection->command($namespace, $filter); + return $collection->execCommand($namespace, $filter); } catch (\Exception $e) { throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); } } - - - - } diff --git a/src/MongodbConnection.php b/src/MongodbConnection.php index 8838fbb..c772c5a 100644 --- a/src/MongodbConnection.php +++ b/src/MongodbConnection.php @@ -111,433 +111,6 @@ class MongodbConnection extends Connection implements ConnectionInterface return true; } - - /** - * 查询返回结果的一条数据 - * - * @param string $namespace - * @param array $filter - * @param array $options - * @return array - * @throws MongoDBException - */ - public function executeFindOne(string $namespace, array $filter = [], array $options = []) - { - // 查询数据 - $result = []; - try { - $options['limit'] = 1; - $query = new Query($filter, $options); - $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); - foreach ($cursor as $document) { - $result = (array)$document; - break; - } - } catch (\Exception $e) { - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } catch (Exception $e) { - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->pool->release($this); - return $result; - } - } - - - /** - * 查询返回结果的全部数据 - * - * @param string $namespace - * @param array $filter - * @param array $options - * @return array - * @throws MongoDBException - */ - public function executeFindAll(string $namespace, array $filter = [], array $options = []) - { - // 查询数据 - $result = []; - try { - $query = new Query($filter, $options); - $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); - foreach ($cursor as $document) { - $result[] = (array)$document; - } - } catch (\Exception $e) { - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } catch (Exception $e) { - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->pool->release($this); - return $result; - } - } - - - /** - * 查询返回结果的一条数据(_id自动转对象) - * - * @param string $namespace - * @param array $filter - * @param array $options - * @return array - * @throws MongoDBException - */ - public function executeQueryOne(string $namespace, array $filter = [], array $options = []) - { - if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { - $filter['_id'] = new ObjectId($filter['_id']); - } - // 查询数据 - $result = []; - try { - $options['limit'] = 1; - $query = new Query($filter, $options); - $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); - foreach ($cursor as $document) { - $document = (array)$document; - $document['_id'] = (string)$document['_id']; - $result = $document; - break; - } - } catch (\Exception $e) { - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } catch (Exception $e) { - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->pool->release($this); - return $result; - } - } - - - /** - * 查询返回结果的全部数据(_id自动转对象) - * - * @param string $namespace - * @param array $filter - * @param array $options - * @return array - * @throws MongoDBException - */ - public function executeQueryAll(string $namespace, array $filter = [], array $options = []) - { - if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { - $filter['_id'] = new ObjectId($filter['_id']); - } - // 查询数据 - $result = []; - try { - $query = new Query($filter, $options); - $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); - foreach ($cursor as $document) { - $document = (array)$document; - $document['_id'] = (string)$document['_id']; - $result[] = $document; - } - } catch (\Exception $e) { - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } catch (Exception $e) { - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->pool->release($this); - return $result; - } - } - - - - - - - - - - - /** - * 返回分页数据,默认每页10条 - * - * @param string $namespace - * @param int $limit - * @param int $currentPage - * @param array $filter - * @param array $options - * @return array - * @throws MongoDBException - */ - public function execQueryPagination(string $namespace, int $limit = 10, int $currentPage = 0, array $filter = [], array $options = []) - { - if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { - $filter['_id'] = new ObjectId($filter['_id']); - } - // 查询数据 - $data = []; - $result = []; - - //每次最多返回10条记录 - if (!isset($options['limit']) || (int)$options['limit'] <= 0) { - $options['limit'] = $limit; - } - - if (!isset($options['skip']) || (int)$options['skip'] <= 0) { - $options['skip'] = $currentPage * $limit; - } - - try { - $query = new Query($filter, $options); - $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); - - foreach ($cursor as $document) { - $document = (array)$document; - $document['_id'] = (string)$document['_id']; - $data[] = $document; - } - - $result['totalCount'] = $this->count($namespace, $filter); - $result['currentPage'] = $currentPage; - $result['perPage'] = $limit; - $result['list'] = $data; - - } catch (\Exception $e) { - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } catch (Exception $e) { - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->pool->release($this); - return $result; - } - } - - /** - * 数据插入 - * http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php - * $data1 = ['title' => 'one']; - * $data2 = ['_id' => 'custom ID', 'title' => 'two']; - * $data3 = ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three']; - * - * @param string $namespace - * @param array $data - * @return bool|string - * @throws MongoDBException - */ - public function insert(string $namespace, array $data = []) - { - try { - $bulk = new BulkWrite(); - $insertId = (string)$bulk->insert($data); - $written = new WriteConcern(WriteConcern::MAJORITY, 1000); - $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); - } catch (\Exception $e) { - $insertId = false; - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->pool->release($this); - return $insertId; - } - } - - /** - * 批量数据插入 - * http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php - * $data = [ - * ['title' => 'one'], - * ['_id' => 'custom ID', 'title' => 'two'], - * ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three'] - * ]; - * @param string $namespace - * @param array $data - * @return bool|string - * @throws MongoDBException - */ - public function insertAll(string $namespace, array $data = []) - { - try { - $bulk = new BulkWrite(); - foreach ($data as $items) { - $insertId[] = (string)$bulk->insert($items); - } - $written = new WriteConcern(WriteConcern::MAJORITY, 1000); - $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); - } catch (\Exception $e) { - $insertId = false; - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->pool->release($this); - return $insertId; - } - } - - /** - * 数据更新,效果是满足filter的行,只更新$newObj中的$set出现的字段 - * http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php - * $bulk->update( - * ['x' => 2], - * ['$set' => ['y' => 3]], - * ['multi' => false, 'upsert' => false] - * ); - * - * @param string $namespace - * @param array $filter - * @param array $newObj - * @return bool - * @throws MongoDBException - */ - public function updateRow(string $namespace, array $filter = [], array $newObj = []): bool - { - try { - if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { - $filter['_id'] = new ObjectId($filter['_id']); - } - - $bulk = new BulkWrite; - $bulk->update( - $filter, - ['$set' => $newObj], - ['multi' => true, 'upsert' => false] - ); - $written = new WriteConcern(WriteConcern::MAJORITY, 1000); - $result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); - $modifiedCount = $result->getModifiedCount(); - $update = $modifiedCount == 0 ? false : true; - } catch (\Exception $e) { - $update = false; - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->pool->release($this); - return $update; - } - } - - /** - * 数据更新, 效果是满足filter的行数据更新成$newObj - * http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php - * $bulk->update( - * ['x' => 2], - * [['y' => 3]], - * ['multi' => false, 'upsert' => false] - * ); - * - * @param string $namespace - * @param array $filter - * @param array $newObj - * @return bool - * @throws MongoDBException - */ - public function updateColumn(string $namespace, array $filter = [], array $newObj = []): bool - { - try { - if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { - $filter['_id'] = new ObjectId($filter['_id']); - } - - $bulk = new BulkWrite; - $bulk->update( - $filter, - ['$set' => $newObj], - ['multi' => false, 'upsert' => false] - ); - $written = new WriteConcern(WriteConcern::MAJORITY, 1000); - $result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); - $modifiedCount = $result->getModifiedCount(); - $update = $modifiedCount == 1 ? true : false; - } catch (\Exception $e) { - $update = false; - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->release(); - return $update; - } - } - - /** - * 删除数据 - * - * @param string $namespace - * @param array $filter - * @param bool $limit - * @return bool - * @throws MongoDBException - */ - public function delete(string $namespace, array $filter = [], bool $limit = false): bool - { - try { - if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { - $filter['_id'] = new ObjectId($filter['_id']); - } - - $bulk = new BulkWrite; - $bulk->delete($filter, ['limit' => $limit]); - $written = new WriteConcern(WriteConcern::MAJORITY, 1000); - $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); - $delete = true; - } catch (\Exception $e) { - $delete = false; - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->pool->release($this); - return $delete; - } - } - - /** - * 获取collection 中满足条件的条数 - * - * @param string $namespace - * @param array $filter - * @return bool - * @throws MongoDBException - */ - public function count(string $namespace, array $filter = []) - { - try { - $command = new Command([ - 'count' => $namespace, - 'query' => $filter - ]); - $cursor = $this->connection->executeCommand($this->config['db'], $command); - $count = $cursor->toArray()[0]->n; - return $count; - } catch (\Exception $e) { - $count = false; - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } catch (Exception $e) { - $count = false; - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->pool->release($this); - return $count; - } - } - - - /** - * 获取collection 中满足条件的条数 - * - * @param string $namespace - * @param array $filter - * @return bool - * @throws Exception - * @throws MongoDBException - */ - public function command(string $namespace, array $filter = []) - { - try { - $command = new Command([ - 'aggregate' => $namespace, - 'pipeline' => $filter, - 'cursor' => new \stdClass() - ]); - $cursor = $this->connection->executeCommand($this->config['db'], $command); - $count = $cursor->toArray()[0]; - } catch (\Exception $e) { - $count = false; - throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); - } finally { - $this->pool->release($this); - return $count; - } - } - /** * 判断当前的数据库连接是否已经超时 * @@ -599,5 +172,453 @@ class MongodbConnection extends Connection implements ConnectionInterface } } + /** + * 查询返回结果的一条数据 + * + * @param string $namespace + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function executeFindOne(string $namespace, array $filter = [], array $options = []) + { + // 查询数据 + $result = []; + try { + $options['limit'] = 1; + $query = new Query($filter, $options); + $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); + foreach ($cursor as $document) { + $result = (array)$document; + break; + } + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $result; + } + } + /** + * 查询返回结果的全部数据 + * + * @param string $namespace + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function executeFindAll(string $namespace, array $filter = [], array $options = []) + { + // 查询数据 + $result = []; + try { + $query = new Query($filter, $options); + $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); + foreach ($cursor as $document) { + $result[] = (array)$document; + } + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $result; + } + } + + /** + * 返回分页数据,默认每页10条 + * + * @param string $namespace + * @param int $limit + * @param int $currentPage + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function execFindPagination(string $namespace, int $limit = 10, int $currentPage = 0, array $filter = [], array $options = []) + { + // 查询数据 + $data = []; + $result = []; + //每次最多返回10条记录 + if (!isset($options['limit']) || (int)$options['limit'] <= 0) { + $options['limit'] = $limit; + } + if (!isset($options['skip']) || (int)$options['skip'] <= 0) { + $options['skip'] = $currentPage * $limit; + } + try { + $query = new Query($filter, $options); + $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); + foreach ($cursor as $document) { + $document = (array)$document; + $document['_id'] = (string)$document['_id']; + $data[] = $document; + } + $result['totalCount'] = $this->count($namespace, $filter); + $result['currentPage'] = $currentPage; + $result['perPage'] = $limit; + $result['list'] = $data; + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $result; + } + } + + /** + * 查询返回结果的一条数据(_id自动转对象) + * + * @param string $namespace + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function executeFetchOne(string $namespace, array $filter = [], array $options = []) + { + if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { + $filter['_id'] = new ObjectId($filter['_id']); + } + // 查询数据 + $result = []; + try { + $options['limit'] = 1; + $query = new Query($filter, $options); + $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); + foreach ($cursor as $document) { + $document = (array)$document; + $document['_id'] = (string)$document['_id']; + $result = $document; + break; + } + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $result; + } + } + + /** + * 查询返回结果的全部数据(_id自动转对象) + * + * @param string $namespace + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function executeFetchAll(string $namespace, array $filter = [], array $options = []) + { + if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { + $filter['_id'] = new ObjectId($filter['_id']); + } + // 查询数据 + $result = []; + try { + $query = new Query($filter, $options); + $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); + foreach ($cursor as $document) { + $document = (array)$document; + $document['_id'] = (string)$document['_id']; + $result[] = $document; + } + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $result; + } + } + + /** + * 返回分页数据,默认每页10条(_id自动转对象) + * + * @param string $namespace + * @param int $limit + * @param int $currentPage + * @param array $filter + * @param array $options + * @return array + * @throws MongoDBException + */ + public function execFetchPagination(string $namespace, int $limit = 10, int $currentPage = 0, array $filter = [], array $options = []) + { + if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { + $filter['_id'] = new ObjectId($filter['_id']); + } + // 查询数据 + $data = []; + $result = []; + //每次最多返回10条记录 + if (!isset($options['limit']) || (int)$options['limit'] <= 0) { + $options['limit'] = $limit; + } + if (!isset($options['skip']) || (int)$options['skip'] <= 0) { + $options['skip'] = $currentPage * $limit; + } + try { + $query = new Query($filter, $options); + $cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query); + foreach ($cursor as $document) { + $document = (array)$document; + $document['_id'] = (string)$document['_id']; + $data[] = $document; + } + $result['totalCount'] = $this->count($namespace, $filter); + $result['currentPage'] = $currentPage; + $result['perPage'] = $limit; + $result['list'] = $data; + } catch (\Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $result; + } + } + + /** + * 插入一条数据 + * http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php + * $data1 = ['title' => 'one']; + * $data2 = ['_id' => 'custom ID', 'title' => 'two']; + * $data3 = ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three']; + * + * @param string $namespace + * @param array $data + * @return bool|string + * @throws MongoDBException + */ + public function execInsertOne(string $namespace, array $data = []) + { + try { + $bulk = new BulkWrite(); + $insertId = (string)$bulk->insert($data); + $written = new WriteConcern(WriteConcern::MAJORITY, 1000); + $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); + } catch (\Exception $e) { + $insertId = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $insertId; + } + } + + /** + * 插入批量数据 + * http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php + * $data = [ + * ['title' => 'one'], + * ['_id' => 'custom ID', 'title' => 'two'], + * ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three'] + * ]; + * + * @param string $namespace + * @param array $data + * @return bool|string + * @throws MongoDBException + */ + public function execInsertAll(string $namespace, array $data = []) + { + try { + $bulk = new BulkWrite(); + foreach ($data as $items) { + $insertId[] = (string)$bulk->insert($items); + } + $written = new WriteConcern(WriteConcern::MAJORITY, 1000); + $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); + } catch (\Exception $e) { + $insertId = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $insertId; + } + } + + /** + * 数据更新,效果是满足filter的行,只更新$newObj中的$set出现的字段 + * http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php + * $bulk->update( + * ['x' => 2], + * ['$set' => ['y' => 3]], + * ['multi' => false, 'upsert' => false] + * ); + * + * @param string $namespace + * @param array $filter + * @param array $newObj + * @return bool + * @throws MongoDBException + */ + public function execUpdateRow(string $namespace, array $filter = [], array $newObj = []): bool + { + try { + if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { + $filter['_id'] = new ObjectId($filter['_id']); + } + $bulk = new BulkWrite; + $bulk->update( + $filter, + ['$set' => $newObj], + ['multi' => true, 'upsert' => false] + ); + $written = new WriteConcern(WriteConcern::MAJORITY, 1000); + $result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); + $modifiedCount = $result->getModifiedCount(); + $update = $modifiedCount == 0 ? false : true; + } catch (\Exception $e) { + $update = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $update; + } + } + + /** + * 数据更新, 效果是满足filter的行数据更新成$newObj + * http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php + * $bulk->update( + * ['x' => 2], + * [['y' => 3]], + * ['multi' => false, 'upsert' => false] + * ); + * + * @param string $namespace + * @param array $filter + * @param array $newObj + * @return bool + * @throws MongoDBException + */ + public function execUpdateColumn(string $namespace, array $filter = [], array $newObj = []): bool + { + try { + if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { + $filter['_id'] = new ObjectId($filter['_id']); + } + $bulk = new BulkWrite; + $bulk->update( + $filter, + ['$set' => $newObj], + ['multi' => false, 'upsert' => false] + ); + $written = new WriteConcern(WriteConcern::MAJORITY, 1000); + $result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); + $modifiedCount = $result->getModifiedCount(); + $update = $modifiedCount == 1 ? true : false; + } catch (\Exception $e) { + $update = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->release(); + return $update; + } + } + + /** + * 删除数据 + * + * @param string $namespace + * @param array $filter + * @param bool $limit + * @return bool + * @throws MongoDBException + */ + public function execDelete(string $namespace, array $filter = [], bool $limit = false): bool + { + try { + if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) { + $filter['_id'] = new ObjectId($filter['_id']); + } + $bulk = new BulkWrite; + $bulk->delete($filter, ['limit' => $limit]); + $written = new WriteConcern(WriteConcern::MAJORITY, 1000); + $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written); + $delete = true; + } catch (\Exception $e) { + $delete = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $delete; + } + } + + /** + * 获取collection 中满足条件的条数 + * + * @param string $namespace + * @param array $filter + * @return bool + * @throws MongoDBException + */ + public function execCount(string $namespace, array $filter = []) + { + try { + $command = new Command([ + 'count' => $namespace, + 'query' => $filter + ]); + $cursor = $this->connection->executeCommand($this->config['db'], $command); + $count = $cursor->toArray()[0]->n; + return $count; + } catch (\Exception $e) { + $count = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } catch (Exception $e) { + $count = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $count; + } + } + + /** + * 聚合查询 + * + * @param string $namespace + * @param array $filter + * @return bool + * @throws Exception + * @throws MongoDBException + */ + public function execCommand(string $namespace, array $filter = []) + { + try { + $command = new Command([ + 'aggregate' => $namespace, + 'pipeline' => $filter, + 'cursor' => new \stdClass() + ]); + $cursor = $this->connection->executeCommand($this->config['db'], $command); + $count = $cursor->toArray()[0]; + } catch (\Exception $e) { + $count = false; + throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage()); + } finally { + $this->pool->release($this); + return $count; + } + } }