0

从 Alpakka PubSub 库运行订阅源时,我收到了可能的编码数据。

@Singleton
class Consumer @Inject()(config: Configuration, credentialsService: google.creds.Service)(implicit actorSystem: ActorSystem) {

  implicit val m: ActorMaterializer = ActorMaterializer.create(actorSystem)
  val logger = Logger(this.getClass)
  val subName: String = config.get[String]("google.pubsub.subname")
  val credentials: Credentials = credentialsService.getCredentials
  val pubSubConfig = PubSubConfig(credentials.projectId, credentials.clientEmail, credentials.privateKey)

  val subSource: Source[ReceivedMessage, NotUsed] = GooglePubSub.subscribe(subName, pubSubConfig)
  val ackSink: Sink[AcknowledgeRequest, Future[Done]] = GooglePubSub.acknowledge(subName, pubSubConfig)

  val computeGraph = Flow[ReceivedMessage].map {
    x =>
      logger.info(x.message.data)
      x
  }

  val ackGraph = Flow.fromFunction((msgs: Seq[ReceivedMessage]) => AcknowledgeRequest(msgs.map(_.ackId).toList))

  subSource
    .via(computeGraph)
    .groupedWithin(10, 5.minutes)
    .via(ackGraph)
    .to(ackSink)
    .run()
}

我从 PubSub 控制台发布消息。我希望我的测试消息会出现,但是在发布时test我会收到dGVzdA==. 这是预期的结果吗?我在导入私钥时遇到问题,这可能是它的结果吗?

消费者热切地与 Guice 绑定在一起。

4

1 回答 1

3

通过 REST API 接收的数据将采用base64 编码。我的猜测是使用 REST API的 Alpakka Pub/Sub 库没有正确解码接收到的数据。看起来他们也有一个使用 GRPC Pub/Sub 客户端作为底层的库,可能不会受到这个缺陷的影响?您还可以直接使用 Scala 中的Cloud Pub/Sub Java 客户端库

于 2019-04-23T20:27:35.370 回答