This commit is contained in:
chenyao 2020-07-28 10:53:39 +08:00
parent 080e811055
commit 1f64858e16
3 changed files with 525 additions and 499 deletions

View File

@ -66,6 +66,11 @@ $where = ['_id' => '1'];
$result = $this->$mongodb->findAll('test', $where);
```
### 分页查询
```php
$list = $this->$mongodb->findPagination('article', 10, 0, ['author' => $author]);
```
### 查询一条数据_id自动转对象
```php
@ -80,14 +85,12 @@ $where = ['_id' => '1'];
$result = $this->$mongodb->fetchAll('test', $where);
```
### 分页查询
### 分页查询_id自动转对象
```php
$list = $this->$mongodb->fetchPagination('article', 10, 0, ['author' => $author]);
```
### 新增
单个添加
### 插入一条数据
```php
$insert = [
'_id' => '',
@ -96,7 +99,7 @@ $insert = [
$this->$mongodb->insert('test',$insert);
```
批量添加
### 插入批量数据
```php
$insert = [
[
@ -120,24 +123,19 @@ $this->$mongodb->updateColumn('test', $where,$updateData); // 只更新数据满
$this->$mongodb->updateRow('test',$where,$updateData);// 更新数据满足$where的行的信息成$newObject
```
### 删除
```php
$where = ['_id'=>'1112313423'];
$all = true; // 为false只删除匹配的一条true删除多条
$this->$mongodb->delete('test',$where,$all);
```
### count统计
### 统计
```php
$filter = ['isGroup' => "0", 'wechat' => '15584044700'];
$count = $this->$mongodb->count('test', $filter);
```
### Command执行更复杂的mongo命令
### 聚合查询
**sql** 和 **mongodb** 关系对比图
| SQL | MongoDb |
@ -172,4 +170,4 @@ $pipeline= [
];
$count = $this->$mongodb->command('test', $pipeline);
```
```

View File

@ -23,19 +23,16 @@ class Mongodb
*/
protected $factory;
/**
* @var string
*/
protected $poolName = 'default';
public function __construct(PoolFactory $factory)
{
$this->factory = $factory;
}
/**
* The key to identify the connection object in coroutine context.
*/
@ -44,7 +41,6 @@ class Mongodb
return sprintf('mongodb.connection.%s', $this->poolName);
}
private function getConnection()
{
$connection = null;
@ -59,7 +55,6 @@ class Mongodb
return $connection;
}
/**
* 返回满足filer的一条数据
*
@ -73,7 +68,7 @@ class Mongodb
{
try {
/**
* @var $collection MongoDBConnection
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->executeFindOne($namespace, $filter, $options);
@ -82,7 +77,6 @@ class Mongodb
}
}
/**
* 返回满足filer的全部数据
*
@ -96,7 +90,7 @@ class Mongodb
{
try {
/**
* @var $collection MongoDBConnection
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->executeFindAll($namespace, $filter, $options);
@ -105,6 +99,29 @@ class Mongodb
}
}
/**
* 返回满足filer的分页数据
*
* @param string $namespace
* @param int $limit
* @param int $currentPage
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function findPagination(string $namespace, int $limit, int $currentPage, array $filter = [], array $options = []): array
{
try {
/**
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->execFindPagination($namespace, $limit, $currentPage, $filter, $options);
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
}
/**
* 返回满足filer的一条数据_id为自动转对象
@ -119,16 +136,15 @@ class Mongodb
{
try {
/**
* @var $collection MongoDBConnection
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->executeQueryOne($namespace, $filter, $options);
return $collection->executeFetchOne($namespace, $filter, $options);
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
}
/**
* 返回满足filer的全部数据_id自动转对象
*
@ -142,10 +158,10 @@ class Mongodb
{
try {
/**
* @var $collection MongoDBConnection
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->executeQueryAll($namespace, $filter, $options);
return $collection->executeFetchAll($namespace, $filter, $options);
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
@ -153,12 +169,8 @@ class Mongodb
/**
* 返回满足filer的分页数据
* 返回满足filer的分页数据_id自动转对象
*
* @param string $namespace
* @param int $limit
@ -172,17 +184,38 @@ class Mongodb
{
try {
/**
* @var $collection MongoDBConnection
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->execQueryPagination($namespace, $limit, $currentPage, $filter, $options);
return $collection->execFetchPagination($namespace, $limit, $currentPage, $filter, $options);
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
}
/**
* 批量插入
* 插入一条数据
*
* @param $namespace
* @param array $data
* @return bool|mixed
* @throws MongoDBException
*/
public function insertOne($namespace, array $data = [])
{
try {
/**
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->execInsertOne($namespace, $data);
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
}
/**
* 插入批量数据
* @param $namespace
* @param array $data
* @return bool|string
@ -195,36 +228,15 @@ class Mongodb
}
try {
/**
* @var $collection MongoDBConnection
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->insertAll($namespace, $data);
return $collection->execInsertAll($namespace, $data);
} catch (MongoDBException $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
}
/**
* 数据插入数据库
*
* @param $namespace
* @param array $data
* @return bool|mixed
* @throws MongoDBException
*/
public function insert($namespace, array $data = [])
{
try {
/**
* @var $collection MongoDBConnection
*/
$collection = $this->getConnection();
return $collection->insert($namespace, $data);
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
}
/**
* 更新数据满足$filter的行的信息成$newObject
*
@ -238,10 +250,10 @@ class Mongodb
{
try {
/**
* @var $collection MongoDBConnection
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->updateRow($namespace, $filter, $newObj);
return $collection->execUpdateRow($namespace, $filter, $newObj);
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
@ -260,10 +272,10 @@ class Mongodb
{
try {
/**
* @var $collection MongoDBConnection
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->updateColumn($namespace, $filter, $newObj);
return $collection->execUpdateColumn($namespace, $filter, $newObj);
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
@ -282,10 +294,10 @@ class Mongodb
{
try {
/**
* @var $collection MongoDBConnection
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->delete($namespace, $filter, $limit);
return $collection->execDelete($namespace, $filter, $limit);
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
@ -303,16 +315,15 @@ class Mongodb
{
try {
/**
* @var $collection MongoDBConnection
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->count($namespace, $filter);
return $collection->execCount($namespace, $filter);
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
}
/**
* 聚合查询
* @param string $namespace
@ -325,16 +336,12 @@ class Mongodb
{
try {
/**
* @var $collection MongoDBConnection
* @var $collection MongodbConnection
*/
$collection = $this->getConnection();
return $collection->command($namespace, $filter);
return $collection->execCommand($namespace, $filter);
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
}
}
}

View File

@ -111,433 +111,6 @@ class MongodbConnection extends Connection implements ConnectionInterface
return true;
}
/**
* 查询返回结果的一条数据
*
* @param string $namespace
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function executeFindOne(string $namespace, array $filter = [], array $options = [])
{
// 查询数据
$result = [];
try {
$options['limit'] = 1;
$query = new Query($filter, $options);
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
foreach ($cursor as $document) {
$result = (array)$document;
break;
}
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $result;
}
}
/**
* 查询返回结果的全部数据
*
* @param string $namespace
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function executeFindAll(string $namespace, array $filter = [], array $options = [])
{
// 查询数据
$result = [];
try {
$query = new Query($filter, $options);
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
foreach ($cursor as $document) {
$result[] = (array)$document;
}
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $result;
}
}
/**
* 查询返回结果的一条数据_id自动转对象
*
* @param string $namespace
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function executeQueryOne(string $namespace, array $filter = [], array $options = [])
{
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
// 查询数据
$result = [];
try {
$options['limit'] = 1;
$query = new Query($filter, $options);
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
foreach ($cursor as $document) {
$document = (array)$document;
$document['_id'] = (string)$document['_id'];
$result = $document;
break;
}
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $result;
}
}
/**
* 查询返回结果的全部数据_id自动转对象
*
* @param string $namespace
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function executeQueryAll(string $namespace, array $filter = [], array $options = [])
{
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
// 查询数据
$result = [];
try {
$query = new Query($filter, $options);
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
foreach ($cursor as $document) {
$document = (array)$document;
$document['_id'] = (string)$document['_id'];
$result[] = $document;
}
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $result;
}
}
/**
* 返回分页数据默认每页10条
*
* @param string $namespace
* @param int $limit
* @param int $currentPage
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function execQueryPagination(string $namespace, int $limit = 10, int $currentPage = 0, array $filter = [], array $options = [])
{
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
// 查询数据
$data = [];
$result = [];
//每次最多返回10条记录
if (!isset($options['limit']) || (int)$options['limit'] <= 0) {
$options['limit'] = $limit;
}
if (!isset($options['skip']) || (int)$options['skip'] <= 0) {
$options['skip'] = $currentPage * $limit;
}
try {
$query = new Query($filter, $options);
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
foreach ($cursor as $document) {
$document = (array)$document;
$document['_id'] = (string)$document['_id'];
$data[] = $document;
}
$result['totalCount'] = $this->count($namespace, $filter);
$result['currentPage'] = $currentPage;
$result['perPage'] = $limit;
$result['list'] = $data;
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $result;
}
}
/**
* 数据插入
* http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php
* $data1 = ['title' => 'one'];
* $data2 = ['_id' => 'custom ID', 'title' => 'two'];
* $data3 = ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three'];
*
* @param string $namespace
* @param array $data
* @return bool|string
* @throws MongoDBException
*/
public function insert(string $namespace, array $data = [])
{
try {
$bulk = new BulkWrite();
$insertId = (string)$bulk->insert($data);
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
$this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
} catch (\Exception $e) {
$insertId = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $insertId;
}
}
/**
* 批量数据插入
* http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php
* $data = [
* ['title' => 'one'],
* ['_id' => 'custom ID', 'title' => 'two'],
* ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three']
* ];
* @param string $namespace
* @param array $data
* @return bool|string
* @throws MongoDBException
*/
public function insertAll(string $namespace, array $data = [])
{
try {
$bulk = new BulkWrite();
foreach ($data as $items) {
$insertId[] = (string)$bulk->insert($items);
}
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
$this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
} catch (\Exception $e) {
$insertId = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $insertId;
}
}
/**
* 数据更新,效果是满足filter的行,只更新$newObj中的$set出现的字段
* http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php
* $bulk->update(
* ['x' => 2],
* ['$set' => ['y' => 3]],
* ['multi' => false, 'upsert' => false]
* );
*
* @param string $namespace
* @param array $filter
* @param array $newObj
* @return bool
* @throws MongoDBException
*/
public function updateRow(string $namespace, array $filter = [], array $newObj = []): bool
{
try {
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
$bulk = new BulkWrite;
$bulk->update(
$filter,
['$set' => $newObj],
['multi' => true, 'upsert' => false]
);
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
$result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
$modifiedCount = $result->getModifiedCount();
$update = $modifiedCount == 0 ? false : true;
} catch (\Exception $e) {
$update = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $update;
}
}
/**
* 数据更新, 效果是满足filter的行数据更新成$newObj
* http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php
* $bulk->update(
* ['x' => 2],
* [['y' => 3]],
* ['multi' => false, 'upsert' => false]
* );
*
* @param string $namespace
* @param array $filter
* @param array $newObj
* @return bool
* @throws MongoDBException
*/
public function updateColumn(string $namespace, array $filter = [], array $newObj = []): bool
{
try {
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
$bulk = new BulkWrite;
$bulk->update(
$filter,
['$set' => $newObj],
['multi' => false, 'upsert' => false]
);
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
$result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
$modifiedCount = $result->getModifiedCount();
$update = $modifiedCount == 1 ? true : false;
} catch (\Exception $e) {
$update = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->release();
return $update;
}
}
/**
* 删除数据
*
* @param string $namespace
* @param array $filter
* @param bool $limit
* @return bool
* @throws MongoDBException
*/
public function delete(string $namespace, array $filter = [], bool $limit = false): bool
{
try {
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
$bulk = new BulkWrite;
$bulk->delete($filter, ['limit' => $limit]);
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
$this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
$delete = true;
} catch (\Exception $e) {
$delete = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $delete;
}
}
/**
* 获取collection 中满足条件的条数
*
* @param string $namespace
* @param array $filter
* @return bool
* @throws MongoDBException
*/
public function count(string $namespace, array $filter = [])
{
try {
$command = new Command([
'count' => $namespace,
'query' => $filter
]);
$cursor = $this->connection->executeCommand($this->config['db'], $command);
$count = $cursor->toArray()[0]->n;
return $count;
} catch (\Exception $e) {
$count = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
$count = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $count;
}
}
/**
* 获取collection 中满足条件的条数
*
* @param string $namespace
* @param array $filter
* @return bool
* @throws Exception
* @throws MongoDBException
*/
public function command(string $namespace, array $filter = [])
{
try {
$command = new Command([
'aggregate' => $namespace,
'pipeline' => $filter,
'cursor' => new \stdClass()
]);
$cursor = $this->connection->executeCommand($this->config['db'], $command);
$count = $cursor->toArray()[0];
} catch (\Exception $e) {
$count = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $count;
}
}
/**
* 判断当前的数据库连接是否已经超时
*
@ -599,5 +172,453 @@ class MongodbConnection extends Connection implements ConnectionInterface
}
}
/**
* 查询返回结果的一条数据
*
* @param string $namespace
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function executeFindOne(string $namespace, array $filter = [], array $options = [])
{
// 查询数据
$result = [];
try {
$options['limit'] = 1;
$query = new Query($filter, $options);
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
foreach ($cursor as $document) {
$result = (array)$document;
break;
}
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $result;
}
}
/**
* 查询返回结果的全部数据
*
* @param string $namespace
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function executeFindAll(string $namespace, array $filter = [], array $options = [])
{
// 查询数据
$result = [];
try {
$query = new Query($filter, $options);
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
foreach ($cursor as $document) {
$result[] = (array)$document;
}
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $result;
}
}
/**
* 返回分页数据默认每页10条
*
* @param string $namespace
* @param int $limit
* @param int $currentPage
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function execFindPagination(string $namespace, int $limit = 10, int $currentPage = 0, array $filter = [], array $options = [])
{
// 查询数据
$data = [];
$result = [];
//每次最多返回10条记录
if (!isset($options['limit']) || (int)$options['limit'] <= 0) {
$options['limit'] = $limit;
}
if (!isset($options['skip']) || (int)$options['skip'] <= 0) {
$options['skip'] = $currentPage * $limit;
}
try {
$query = new Query($filter, $options);
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
foreach ($cursor as $document) {
$document = (array)$document;
$document['_id'] = (string)$document['_id'];
$data[] = $document;
}
$result['totalCount'] = $this->count($namespace, $filter);
$result['currentPage'] = $currentPage;
$result['perPage'] = $limit;
$result['list'] = $data;
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $result;
}
}
/**
* 查询返回结果的一条数据_id自动转对象
*
* @param string $namespace
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function executeFetchOne(string $namespace, array $filter = [], array $options = [])
{
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
// 查询数据
$result = [];
try {
$options['limit'] = 1;
$query = new Query($filter, $options);
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
foreach ($cursor as $document) {
$document = (array)$document;
$document['_id'] = (string)$document['_id'];
$result = $document;
break;
}
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $result;
}
}
/**
* 查询返回结果的全部数据_id自动转对象
*
* @param string $namespace
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function executeFetchAll(string $namespace, array $filter = [], array $options = [])
{
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
// 查询数据
$result = [];
try {
$query = new Query($filter, $options);
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
foreach ($cursor as $document) {
$document = (array)$document;
$document['_id'] = (string)$document['_id'];
$result[] = $document;
}
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $result;
}
}
/**
* 返回分页数据默认每页10条_id自动转对象
*
* @param string $namespace
* @param int $limit
* @param int $currentPage
* @param array $filter
* @param array $options
* @return array
* @throws MongoDBException
*/
public function execFetchPagination(string $namespace, int $limit = 10, int $currentPage = 0, array $filter = [], array $options = [])
{
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
// 查询数据
$data = [];
$result = [];
//每次最多返回10条记录
if (!isset($options['limit']) || (int)$options['limit'] <= 0) {
$options['limit'] = $limit;
}
if (!isset($options['skip']) || (int)$options['skip'] <= 0) {
$options['skip'] = $currentPage * $limit;
}
try {
$query = new Query($filter, $options);
$cursor = $this->connection->executeQuery($this->config['db'] . '.' . $namespace, $query);
foreach ($cursor as $document) {
$document = (array)$document;
$document['_id'] = (string)$document['_id'];
$data[] = $document;
}
$result['totalCount'] = $this->count($namespace, $filter);
$result['currentPage'] = $currentPage;
$result['perPage'] = $limit;
$result['list'] = $data;
} catch (\Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $result;
}
}
/**
* 插入一条数据
* http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php
* $data1 = ['title' => 'one'];
* $data2 = ['_id' => 'custom ID', 'title' => 'two'];
* $data3 = ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three'];
*
* @param string $namespace
* @param array $data
* @return bool|string
* @throws MongoDBException
*/
public function execInsertOne(string $namespace, array $data = [])
{
try {
$bulk = new BulkWrite();
$insertId = (string)$bulk->insert($data);
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
$this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
} catch (\Exception $e) {
$insertId = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $insertId;
}
}
/**
* 插入批量数据
* http://php.net/manual/zh/mongodb-driver-bulkwrite.insert.php
* $data = [
* ['title' => 'one'],
* ['_id' => 'custom ID', 'title' => 'two'],
* ['_id' => new MongoDB\BSON\ObjectId, 'title' => 'three']
* ];
*
* @param string $namespace
* @param array $data
* @return bool|string
* @throws MongoDBException
*/
public function execInsertAll(string $namespace, array $data = [])
{
try {
$bulk = new BulkWrite();
foreach ($data as $items) {
$insertId[] = (string)$bulk->insert($items);
}
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
$this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
} catch (\Exception $e) {
$insertId = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $insertId;
}
}
/**
* 数据更新,效果是满足filter的行,只更新$newObj中的$set出现的字段
* http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php
* $bulk->update(
* ['x' => 2],
* ['$set' => ['y' => 3]],
* ['multi' => false, 'upsert' => false]
* );
*
* @param string $namespace
* @param array $filter
* @param array $newObj
* @return bool
* @throws MongoDBException
*/
public function execUpdateRow(string $namespace, array $filter = [], array $newObj = []): bool
{
try {
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
$bulk = new BulkWrite;
$bulk->update(
$filter,
['$set' => $newObj],
['multi' => true, 'upsert' => false]
);
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
$result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
$modifiedCount = $result->getModifiedCount();
$update = $modifiedCount == 0 ? false : true;
} catch (\Exception $e) {
$update = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $update;
}
}
/**
* 数据更新, 效果是满足filter的行数据更新成$newObj
* http://php.net/manual/zh/mongodb-driver-bulkwrite.update.php
* $bulk->update(
* ['x' => 2],
* [['y' => 3]],
* ['multi' => false, 'upsert' => false]
* );
*
* @param string $namespace
* @param array $filter
* @param array $newObj
* @return bool
* @throws MongoDBException
*/
public function execUpdateColumn(string $namespace, array $filter = [], array $newObj = []): bool
{
try {
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
$bulk = new BulkWrite;
$bulk->update(
$filter,
['$set' => $newObj],
['multi' => false, 'upsert' => false]
);
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
$result = $this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
$modifiedCount = $result->getModifiedCount();
$update = $modifiedCount == 1 ? true : false;
} catch (\Exception $e) {
$update = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->release();
return $update;
}
}
/**
* 删除数据
*
* @param string $namespace
* @param array $filter
* @param bool $limit
* @return bool
* @throws MongoDBException
*/
public function execDelete(string $namespace, array $filter = [], bool $limit = false): bool
{
try {
if (!empty($filter['_id']) && !($filter['_id'] instanceof ObjectId)) {
$filter['_id'] = new ObjectId($filter['_id']);
}
$bulk = new BulkWrite;
$bulk->delete($filter, ['limit' => $limit]);
$written = new WriteConcern(WriteConcern::MAJORITY, 1000);
$this->connection->executeBulkWrite($this->config['db'] . '.' . $namespace, $bulk, $written);
$delete = true;
} catch (\Exception $e) {
$delete = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $delete;
}
}
/**
* 获取collection 中满足条件的条数
*
* @param string $namespace
* @param array $filter
* @return bool
* @throws MongoDBException
*/
public function execCount(string $namespace, array $filter = [])
{
try {
$command = new Command([
'count' => $namespace,
'query' => $filter
]);
$cursor = $this->connection->executeCommand($this->config['db'], $command);
$count = $cursor->toArray()[0]->n;
return $count;
} catch (\Exception $e) {
$count = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} catch (Exception $e) {
$count = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $count;
}
}
/**
* 聚合查询
*
* @param string $namespace
* @param array $filter
* @return bool
* @throws Exception
* @throws MongoDBException
*/
public function execCommand(string $namespace, array $filter = [])
{
try {
$command = new Command([
'aggregate' => $namespace,
'pipeline' => $filter,
'cursor' => new \stdClass()
]);
$cursor = $this->connection->executeCommand($this->config['db'], $command);
$count = $cursor->toArray()[0];
} catch (\Exception $e) {
$count = false;
throw new MongoDBException($e->getFile() . $e->getLine() . $e->getMessage());
} finally {
$this->pool->release($this);
return $count;
}
}
}