使用spring-data-mongodb流式聚合操作的结果

我正在使用spring-data-mongodb,我想使用游标进行聚合操作。

MongoTemplate.stream()获取一个Query,所以我尝试创建Aggregation实例,使用Aggregation.toDbObject()将其转换为DbObject,使用DbObject创建BasicQuery ,然后调用stream()方法。
这将返回一个空光标。

调试spring-data-mongodb代码显示MongoTemplate.stream()使用FindOperation ,这使得thinkspring-data-mongodb不支持流聚合操作。
有没有人能够使用spring-data-mongodb来汇总聚合查询的结果?

为了记录,我可以使用Java mongodb驱动程序,但我更喜欢使用spring-data。

编辑 11月10日 – 添加示例代码:

MatchOperation match = Aggregation.match(Criteria.where("type").ne("AType")); GroupOperation group = Aggregation.group("name", "type"); group = group.push("color").as("colors"); group = group.push("size").as("sizes"); TypedAggregation agg = Aggregation.newAggregation(MyClass.class, Arrays.asList(match, group)); MongoConverter converter = mongoTemplate.getConverter(); MappingContext<? extends MongoPersistentEntity, MongoPersistentProperty> mappingContext = converter.getMappingContext(); QueryMapper queryMapper = new QueryMapper(converter); AggregationOperationContext context = new TypeBasedAggregationOperationContext(MyClass.class, mappingContext, queryMapper); // create a BasicQuery to be used in the stream() method by converting the Aggregation to a DbObject BasicQuery query = new BasicQuery(agg.toDbObject("myClass", context)); // spring-mongo attributes the stream() method to find() operationsm not to aggregate() operations so the stream returns an empty cursor CloseableIterator iter = mongoTemplate.stream(query, MyClass.class); // this is an empty cursor while(iter.hasNext()) { System.out.println(iter.next().getName()); } 

以下代码(不使用stream()方法)返回聚合的预期非空结果:

  AggregationResults result = mongoTemplate.aggregate(agg, "myClass", HashMap.class); 

对于那些仍在努力寻找答案的人:

从spring-data-mongo版本2.0.0.M4开始( AFAIKMongoTemplate得到了一个aggregateStream方法。

所以你可以做到以下几点:

  AggregationOptions aggregationOptions = Aggregation.newAggregationOptions() // this is very important: if you do not set the batch size, you'll get all the objects at once and you might run out of memory if the returning data set is too large .cursorBatchSize(mongoCursorBatchSize) .build(); data = mongoTemplate.aggregateStream(Aggregation.newAggregation( Aggregation.group("person_id").count().as("count")).withOptions(aggregationOptions), collectionName, YourClazz.class);