0

我是 Akka Streams 和 gRPC 的新手,我正在尝试构建一个端点,客户端发送一个请求,服务器发送多个响应。

这是我的protobuf

syntax = "proto3";

option java_multiple_files = true;
option java_package = "customer.service.proto";

service CustomerService {

  rpc CreateCustomer(CustomerRequest) returns (stream CustomerResponse) {}

}

message CustomerRequest {
  string customerId = 1;
  string customerName = 2;
}

message CustomerResponse {
  enum Status {
    No_Customer = 0;
    Creating_Customer = 1;
    Customer_Created = 2;
  }

  string customerId = 1;
  Status status = 2;
}

我试图通过发送客户请求来实现这一点,然后服务器将首先检查并响应 No_Customer,然后它会发送 Creating_Customer,最后服务器会说 Customer_Created。

我不知道从哪里开始实施它,找了几个小时但仍然一无所知,如果有人能指出我正确的方向,我将非常感激。

4

1 回答 1

2

开始的地方是Akka gRPC 文档,特别是服务WalkThrough。让样本在一个干净的项目中工作非常简单。

相关的服务器示例方法是这样的:

override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
  println(s"sayHello to ${in.name} with stream of chars...")
  Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString))
}

现在的问题是创建一个Source返回正确结果的方法,但这取决于您计划如何实现服务器,因此很难回答。查看Akka Streams 文档以获取各种选项。

客户端代码更简单,只需调用示例中runForeach返回Source的:CreateCustomer

def runStreamingReplyExample(): Unit = {
  val responseStream = client.itKeepsReplying(HelloRequest("Alice"))
  val done: Future[Done] =
    responseStream.runForeach(reply => println(s"got streaming reply: ${reply.message}"))

  done.onComplete {
    case Success(_) =>
      println("streamingReply done")
    case Failure(e) =>
      println(s"Error streamingReply: $e")
  }
}
于 2021-04-28T14:48:32.390 回答