我正在努力从正在写入云流处理程序的目标通道中读取消息。
我有用于在测试容器中运行的 pub/sub 的 gcp 模拟器。我能够成功地将消息发送到输入队列/主题(对不起,我来自 JMS 背景,不确定 gcp pub/sub 的正确术语)
弹簧流配置:
spring.cloud.gcp.pubsub.emulator-host=localhost:8085
spring.cloud.gcp.pubsub.project-id=project-treeline
spring.cloud.gcp.project-id=project-treeline
spring.cloud.stream.bindings.input.destination=split
spring.cloud.stream.bindings.output.destination=amount
spring.cloud.stream.gcp.pubsub.bindings.amount.consumer.auto-create-resources=true
测试用例:
@ExtendWith(SpringExtension.class)
@SpringBootTest
@Testcontainers
@ActiveProfiles({"test", "gcp"})
@Slf4j
public class SpringProcessorTest {
private static final int SUCCESS_EXPECTED_SPLIT_MSG_COUNT = 3;
@Autowired
private PubSubTemplate template;
@Autowired
private PlaylistRepository playlistRepo;
@Autowired
private PlaylistEntryRepository playlistEntryRepo;
@Value("${spring.cloud.stream.bindings.input.destination}")
private String inputTopic;
@Value("${spring.cloud.stream.bindings.output.destination}")
private String outputTopic;
@Test
public void successfullyProcessPlaylist() throws Exception {
final Long playlistId = playlistRepo.findByFileUri(TestDataConst.PLAYLIST_URI).getId();
playlistEntryRepo.findByPlaylist(playlistId).forEach(c -> template.publish(inputTopic, c.getId().toString()));
Thread.sleep(4000);
final List<AcknowledgeablePubsubMessage> msgs = template.pull("output", 3, Boolean.TRUE);
assertNotNull(msgs);
assertEquals(SUCCESS_EXPECTED_SPLIT_MSG_COUNT, msgs.size());
}
}
而不是使用template.pull
我尝试使用template.subscribe
但我仍然有同样的问题。控制台日志:
com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Subscription does not exist
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Subscription does not exist
日志确实表明来自测试代码的事件正在发送并且消息正在由处理器处理:
14:15:24.274 [gcp-pubsub-subscriber3] INFO treeline.service.splits.SplitService - Featured performer id: [7], name: [Robb Flynn], receiving split: [25,000000]
14:15:24.274 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Non-featured performer id: [7], name: [Robb Flynn], receiving split: [7,000000]
14:15:24.274 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [7], name: [Robb Flynn], receiving split: [5,833333]
14:15:24.279 [gcp-pubsub-subscriber3] INFO treeline.service.splits.SplitService - Featured performer id: [12], name: [Logan Mader], receiving split: [25,000000]
14:15:24.279 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Non-featured performer id: [12], name: [Logan Mader], receiving split: [7,000000]
14:15:24.279 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [12], name: [Logan Mader], receiving split: [5,833333]
14:15:24.283 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [2], name: [Tom Araya], receiving split: [5,833333]
14:15:24.283 [gcp-pubsub-subscriber3] INFO treeline.service.splits.SplitService - Featured performer id: [8], name: [Adam Deuce], receiving split: [25,000000]
14:15:24.283 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Non-featured performer id: [2], name: [Tom Araya], receiving split: [7,000000]
14:15:24.286 [gcp-pubsub-subscriber3] INFO treeline.service.splits.SplitService - Featured performer id: [10], name: [Chris Kontos], receiving split: [25,000000]
14:15:24.286 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Featured performer id: [13], name: [Nobody 3], receiving split: [26,333333]
14:15:24.286 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [4], name: [Kerry King], receiving split: [5,833333]
14:15:24.290 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [5], name: [Gary Holt], receiving split: [5,833333]
14:15:24.290 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Featured performer id: [14], name: [Nobody 1], receiving split: [26,333333]
14:15:24.293 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Featured performer id: [15], name: [Nobody 2], receiving split: [26,333333]
14:15:24.293 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [3], name: [Jeff Hanneman], receiving split: [5,833333]
14:15:24.297 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Featured performer id: [13], name: [Nobody 3], receiving split: [21,666667]
14:15:24.302 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Featured performer id: [14], name: [Nobody 1], receiving split: [21,666667]
14:15:24.308 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Featured performer id: [15], name: [Nobody 2], receiving split: [21,666667]
14:15:24.319 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.m.DirectWithAttributesChannel - preSend on channel 'output', message: GenericMessage [payload=[Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=7), Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=12), Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=8), Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=10)], headers={id=dfcb46c4-7cbc-05b1-0030-219dd378141f, timestamp=1566389724287}]
14:15:24.320 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.m.DirectWithAttributesChannel - preSend on channel 'output', message: GenericMessage [payload=[Split(splitAmount=null, splitPercentage=7, playlistEntry=1, person=7), Split(splitAmount=null, splitPercentage=7, playlistEntry=1, person=12), Split(splitAmount=null, splitPercentage=7, playlistEntry=1, person=2), Split(splitAmount=null, splitPercentage=26.333333333333332149095440399833023548126220703125, playlistEntry=1, person=13), Split(splitAmount=null, splitPercentage=26.333333333333332149095440399833023548126220703125, playlistEntry=1, person=14), Split(splitAmount=null, splitPercentage=26.333333333333332149095440399833023548126220703125, playlistEntry=1, person=15)], headers={id=387b1f11-14b9-34c2-0a60-054eda20dfa3, timestamp=1566389724293}]
14:15:24.320 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.m.DirectWithAttributesChannel - preSend on channel 'output', message: GenericMessage [payload=[Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=7), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=12), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=2), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=4), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=5), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=3), Split(splitAmount=null, splitPercentage=21.666666666666667850904559600166976451873779296875, playlistEntry=2, person=13), Split(splitAmount=null, splitPercentage=21.666666666666667850904559600166976451873779296875, playlistEntry=2, person=14), Split(splitAmount=null, splitPercentage=21.666666666666667850904559600166976451873779296875, playlistEntry=2, person=15)], headers={id=539cd992-168f-5731-ea9d-c9212f0656ee, timestamp=1566389724309}]
14:15:24.390 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.b.AbstractMessageChannelBinder$SendingHandler - org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@1f1ef8b2 received message: GenericMessage [payload=byte[575], headers={contentType=application/json, id=c34eb207-1a4c-6bc9-f91e-517c983c73b8, timestamp=1566389724390}]
14:15:24.390 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.b.AbstractMessageChannelBinder$SendingHandler - org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@1f1ef8b2 received message: GenericMessage [payload=byte[1091], headers={contentType=application/json, id=6d8d417d-3f45-7b83-d243-2cee0162cfb1, timestamp=1566389724390}]
14:15:24.391 [gcp-pubsub-subscriber2] DEBUG o.s.c.g.p.i.o.PubSubMessageHandler - org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler@1c84a8f5 received message: GenericMessage [payload=byte[1091], headers={contentType=application/json, id=6d696eab-b2d1-4d6c-bb45-9385c4567cf3, timestamp=1566389724391}]
14:15:24.391 [gcp-pubsub-subscriber1] DEBUG o.s.c.g.p.i.o.PubSubMessageHandler - org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler@1c84a8f5 received message: GenericMessage [payload=byte[575], headers={contentType=application/json, id=3e02b7af-cca7-9172-398f-b1bbc4245a00, timestamp=1566389724391}]
14:15:24.394 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.b.AbstractMessageChannelBinder$SendingHandler - org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@1f1ef8b2 received message: GenericMessage [payload=byte[287], headers={contentType=application/json, id=077a18d2-dec6-8272-1fcf-d271d11117f5, timestamp=1566389724394}]
14:15:24.394 [gcp-pubsub-subscriber3] DEBUG o.s.c.g.p.i.o.PubSubMessageHandler - org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler@1c84a8f5 received message: GenericMessage [payload=byte[287], headers={contentType=application/json, id=6cf9bda7-2270-8892-0e2e-e402ea85f88a, timestamp=1566389724394}]
14:15:24.398 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'output', message: GenericMessage [payload=byte[287], headers={contentType=application/json, id=077a18d2-dec6-8272-1fcf-d271d11117f5, timestamp=1566389724394}]
14:15:24.399 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'input', message: GenericMessage [payload=byte[1], headers={id=52e2d9ff-2553-0f20-cd6c-1a9d6135f3cf, contentType=application/json, timestamp=1566389724166}]
14:15:24.400 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'output', message: GenericMessage [payload=byte[575], headers={contentType=application/json, id=c34eb207-1a4c-6bc9-f91e-517c983c73b8, timestamp=1566389724390}]
14:15:24.400 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'output', message: GenericMessage [payload=byte[1091], headers={contentType=application/json, id=6d8d417d-3f45-7b83-d243-2cee0162cfb1, timestamp=1566389724390}]
14:15:24.400 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'input', message: GenericMessage [payload=byte[1], headers={id=b6808f8b-0169-93e7-64b1-25d6875c14da, contentType=application/json, timestamp=1566389724166}]
14:15:24.401 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'input', message: GenericMessage [payload=byte[1], headers={id=f6869d9f-e0ea-1581-e42b-57986624861e, contentType=application/json, timestamp=1566389724166}]
14:15:24.439 [grpc-default-executor-1] DEBUG o.s.c.g.p.c.p.PubSubPublisherTemplate - Publishing to amount was successful. Message ID: 5
14:15:24.446 [grpc-default-executor-1] DEBUG o.s.c.g.p.c.p.PubSubPublisherTemplate - Publishing to amount was successful. Message ID: 4
14:15:24.446 [grpc-default-executor-1] DEBUG o.s.c.g.p.c.p.PubSubPublisherTemplate - Publishing to amount was successful. Message ID: 6
我将不胜感激任何帮助