I have the below query that runs fine in mongo shell. I have to use this from my spring boot java application.
db.UserGames.aggregate([
{ $match: {$and: [{timestamp: {$gte : ISODate("2021-12-14T09:34:51.000Z"), $lte: ISODate("2022-12-31T09:34:51.000Z")}}]}},
{ $sort: {timestamp: 1 }},
{ $group: {
_id: {"userId" : "$userId", "username" : "$username"},
docs: { $push: { win: "$win", timestamp: "$timestamp" } }
}
},
{$project: {_id:1, docs:1, noOfGames: {$size: "$docs"}}},
{$match: {"noOfGames": {$gt: 1}}},
{$project: {_id:1, docs:1}},
{$addFields: { losingStreak : {
$function: {
body: function(docs) {
let longestStreak = 0;
let currentStreak = 0;
docs.forEach(element => {
if(element.win < 0) {
currentStreak = currentStreak + 1;
if(longestStreak < currentStreak) {
longestStreak = currentStreak;
}
} else {
currentStreak = 0;
}
});
return longestStreak;
},
args: ["$docs"],
lang: "js"
}}}},
{$project: {_id:1, losingStreak:1}},
{$match: {"losingStreak": {$gt: 0}}},
{ $sort: {"losingStreak": -1 }}
], { "allowDiskUse" : true })
I have used ReactiveMongoRepository and @Aggregation annotation as below:
public interface MyRepository extends ReactiveMongoRepository<UserGameDetails, Object> {
@Meta(allowDiskUse = true)
@Aggregation(pipeline = {
"{ $match: {$and: [{'timestamp': {$gte : ?0, $lte: ?1 } } ] } }",
"{ $sort: {'timestamp' : 1 }}",
"{ $group: { _id: {'userId' : '$userId', 'username' : '$username'},"
+ " docs: { $push: { 'win': '$win', 'timestamp': '$timestamp' } } } }",
"{ $project: {_id : 1, docs : 1, noOfGames: {$size: '$docs'}}}",
"{ $match: {'noOfGames': { $gt: ?2 } } }",
"{ $project: {_id : 1, docs : 1 } }",
"{ $addFields: { losingStreak : { $function: { "
+ "body: function(docs) { "
+ "let longestStreak = 0; "
+ "let currentStreak = 0; "
+ "docs.forEach(element => { "
+ "if(element.win < 0) { "
+ "currentStreak = currentStreak + 1; "
+ "if(longestStreak < currentStreak) { "
+ "longestStreak = currentStreak; "
+ "} "
+ "} else { "
+ "currentStreak = 0; "
+ "} });"
+ " return longestStreak; "
+ "}, "
+ "args: ['$docs'], "
+ "lang: 'js' }}}}",
"{ $project: {_id : 1, losingStreak : 1 } }",
"{ $match: {'losingStreak' : { $gt : ?2 } } }",
"{ $sort: {'losingStreak': -1 } }"})
Mono<List<UsersScores>> getLosingStreakForAllGames(LocalDateTime startTime, LocalDateTime endTime, int minValue);
}
But it fails with below error. This error is coming from the java script function inside the query. How can I run this query from my Java Application?
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.bson.json.JsonParseException: JSON reader was expecting a name but found '('.
Caused by: org.bson.json.JsonParseException: JSON reader was expecting a name but found '('.
at org.springframework.data.mongodb.util.json.ParameterBindingJsonReader.readBsonType(ParameterBindingJsonReader.java:186)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.decode(ParameterBindingDocumentCodec.java:227)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.decode(ParameterBindingDocumentCodec.java:66)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.readValue(ParameterBindingDocumentCodec.java:360)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.decode(ParameterBindingDocumentCodec.java:229)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.decode(ParameterBindingDocumentCodec.java:66)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.readValue(ParameterBindingDocumentCodec.java:360)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.decode(ParameterBindingDocumentCodec.java:229)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.decode(ParameterBindingDocumentCodec.java:66)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.readValue(ParameterBindingDocumentCodec.java:360)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.decode(ParameterBindingDocumentCodec.java:229)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.captureExpressionDependencies(ParameterBindingDocumentCodec.java:202)
at org.springframework.data.mongodb.repository.query.ReactiveStringBasedAggregation.computePipelineStage(ReactiveStringBasedAggregation.java:141)
at org.springframework.data.mongodb.repository.query.ReactiveStringBasedAggregation.lambda$computePipeline$2(ReactiveStringBasedAggregation.java:132)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:189)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:62)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:93)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515)
at reactor.core.publisher.Mono.subscribe(Mono.java:4232)
at com.dangalgames.leaderboard.service.LeaderboardJobManagerImpl.lambda$1(LeaderboardJobManagerImpl.java:49)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at com.dangalgames.leaderboard.service.LeaderboardJobManagerImpl.lambda$0(LeaderboardJobManagerImpl.java:49)
at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:171)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onComplete(FluxConcatMap.java:276)
at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197)
at com.mongodb.reactivestreams.client.internal.BatchCursorFlux.lambda$recurseCursor$4(BatchCursorFlux.java:98)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:171)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
at com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.lambda$sinkToCallback$30(MongoOperationPublisher.java:549)
at com.mongodb.internal.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:167)
at com.mongodb.reactivestreams.client.internal.BatchCursor.lambda$next$0(BatchCursor.java:35)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515)
at reactor.core.publisher.Mono.subscribe(Mono.java:4232)
at com.mongodb.reactivestreams.client.internal.BatchCursorFlux.recurseCursor(BatchCursorFlux.java:104)
at com.mongodb.reactivestreams.client.internal.BatchCursorFlux.lambda$subscribe$0(BatchCursorFlux.java:56)
at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:171)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
at com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.lambda$sinkToCallback$30(MongoOperationPublisher.java:549)
at com.mongodb.reactivestreams.client.internal.OperationExecutorImpl.lambda$execute$2(OperationExecutorImpl.java:92)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:95)
at com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier$RetryingCallback.onResult(RetryingAsyncCallbackSupplier.java:114)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:95)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:95)
at com.mongodb.internal.operation.FindOperation$2.onResult(FindOperation.java:762)
at com.mongodb.internal.operation.CommandOperationHelper.lambda$transformingReadCallback$10(CommandOperationHelper.java:329)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:280)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:84)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:687)
at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:159)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:526)
at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:503)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:815)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:782)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:650)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:647)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:158)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:562)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:277)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:298)
at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:144)
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:118)
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:107)
at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:647)
at com.mongodb.internal.connection.InternalStreamConnection.access$600(InternalStreamConnection.java:86)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:772)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:757)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:650)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:647)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:439)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:191)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312)
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: org.bson.json.JsonParseException: JSON reader was expecting ':' but found ')'.
at org.springframework.data.mongodb.util.json.ParameterBindingJsonReader.readBsonType(ParameterBindingJsonReader.java:191)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:679)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721)
at org.bson.AbstractBsonReader.readString(AbstractBsonReader.java:456)
at org.bson.codecs.StringCodec.decode(StringCodec.java:80)
at org.bson.codecs.StringCodec.decode(StringCodec.java:31)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.readValue(ParameterBindingDocumentCodec.java:360)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.decode(ParameterBindingDocumentCodec.java:235)
... 129 common frames omitted
Suppressed: org.bson.json.JsonParseException: JSON reader was expecting a name but found '{'.
at org.springframework.data.mongodb.util.json.ParameterBindingJsonReader.readBsonType(ParameterBindingJsonReader.java:186)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:679)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721)
at org.bson.AbstractBsonReader.readString(AbstractBsonReader.java:456)
at org.bson.codecs.StringCodec.decode(StringCodec.java:80)
at org.bson.codecs.StringCodec.decode(StringCodec.java:31)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.readValue(ParameterBindingDocumentCodec.java:360)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.decode(ParameterBindingDocumentCodec.java:235)
... 126 common frames omitted
Suppressed: org.bson.json.JsonParseException: JSON reader was expecting ':' but found 'longestStreak'.
at org.springframework.data.mongodb.util.json.ParameterBindingJsonReader.readBsonType(ParameterBindingJsonReader.java:191)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:679)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721)
at org.bson.AbstractBsonReader.readString(AbstractBsonReader.java:456)
at org.bson.codecs.StringCodec.decode(StringCodec.java:80)
at org.bson.codecs.StringCodec.decode(StringCodec.java:31)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.readValue(ParameterBindingDocumentCodec.java:360)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.decode(ParameterBindingDocumentCodec.java:235)
... 123 common frames omitted
Suppressed: org.bson.json.JsonParseException: Invalid JSON input. Position: 88. Character: '='.
at org.springframework.data.mongodb.util.json.JsonScanner.nextToken(JsonScanner.java:118)
at org.springframework.data.mongodb.util.json.ParameterBindingJsonReader.popToken(ParameterBindingJsonReader.java:732)
at org.springframework.data.mongodb.util.json.ParameterBindingJsonReader.readBsonType(ParameterBindingJsonReader.java:171)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:679)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721)
at org.bson.AbstractBsonReader.readString(AbstractBsonReader.java:456)
at org.bson.codecs.StringCodec.decode(StringCodec.java:80)
at org.bson.codecs.StringCodec.decode(StringCodec.java:31)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.readValue(ParameterBindingDocumentCodec.java:360)
at org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec.decode(ParameterBindingDocumentCodec.java:235)
... 120 common frames omitted
Sample Document:
{
"_id" : ObjectId("61efcb757995cb2a0a408ca4"),
"userId" : UUID("ae45122e-3bb8-40cf-af59-7362ecfa4d1e"),
"username" : "testUser1",
"userAvatar" : "test avatar",
"game" : 8,
"rake" : 43.0,
"win" : -5121.0,
"entryFee" : 34.0,
"score" : 56,
"timestamp" : ISODate("2022-01-25T10:05:41.725Z")
}