首先,我对 Akka 没有经验,所以我自己调试它真的很糟糕。我尝试了此处的示例,并且发布消息有效(这意味着凭据有效),但没有发出任何消息。服务帐户被授予所有权限。
我的代码看起来像这样,基本上和例子中的一模一样:
package com.example.google.pubsub
import java.io.FileInputStream
import java.security.spec.PKCS8EncodedKeySpec
import java.security.{KeyFactory, PrivateKey}
import java.util.Base64
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
import akka.stream.alpakka.googlecloud.pubsub.scaladsl.GooglePubSub
import akka.stream.alpakka.googlecloud.pubsub._
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.{Done, NotUsed}
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import scala.concurrent.duration._
import scala.concurrent.Future
object SubscriberMain extends App {
println("#### SUBSCRIBER ####")
val privateKey: PrivateKey = {
import java.io.FileInputStream
val credential = GoogleCredential.fromStream(new FileInputStream("mycredentials.json"))
val privateKey = credential.getServiceAccountPrivateKey
privateKey
}
val clientEmail = "main-19@weirdproject.iam.gserviceaccount.com"
val projectId = "weirdproject"
val apiKey = "xxxx"
val subscription = "somesubscription"
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
val subscriptionSource: Source[ReceivedMessage, NotUsed] =
GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription)
val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription)
subscriptionSource
.map { message =>
val data = message.message.data
println(s"received a message: $data")
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.to(ackSink)
}
我发现它akka.stream.alpakka.googlecloud.pubsub.GooglePubSubSource.createLogic
从未被执行,这似乎是没有获取消息的原因。