问题标签 [scalapb]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
813 浏览

scala - 使用 Spark 2.0.2、Kafka 源和 scalapb 的结构化流

我正在使用结构化流(Spark 2.0.2)来使用 kafka 消息。使用 scalapb,protobuf 中的消息。我收到以下错误。请帮忙..

线程“主”中的异常 scala.ScalaReflectionException:不是 scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199) 处的术语 scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols. scala:84) 在 org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(ScalaReflection.scala:811) 在 org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(ScalaReflection.scala:39) 在org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:800) at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:39) at org.apache.spark .sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:582) 在 org.apache.spark.sql.catalyst。ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:460) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583) 在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) 在 scala.collection。 TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)在 scala.collection.immutable.List.flatMap(List.scala:344) 在 org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:第583章)spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425) 在 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61) 在 org.apache.spark.sql。 org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47) 的编码器$.product(Encoders.scala:27​​4) 在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala ) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect。 Method.invoke(Method.java:498) 在 com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)serializerFor(ScalaReflection.scala:425) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61) at org.apache.spark.sql.Encoders$.product(Encoders.scala: 274) 在 org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47) 在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0 (本机方法)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498 ) 在 com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)serializerFor(ScalaReflection.scala:425) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61) at org.apache.spark.sql.Encoders$.product(Encoders.scala: 274) 在 org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47) 在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0 (本机方法)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498 ) 在 com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)ExpressionEncoder$.apply(ExpressionEncoder.scala:61) at org.apache.spark.sql.Encoders$.product(Encoders.scala:27​​4) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47)在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun .reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147 )ExpressionEncoder$.apply(ExpressionEncoder.scala:61) at org.apache.spark.sql.Encoders$.product(Encoders.scala:27​​4) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47)在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun .reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147 )newProductEncoder(SQLImplicits.scala:47) 在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 com.intellij.rt.execution.application.AppMain .main(AppMain.java:147)newProductEncoder(SQLImplicits.scala:47) 在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 com.intellij.rt.execution.application.AppMain .main(AppMain.java:147)com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 的 java.lang.reflect.Method.invoke(Method.java:498) 的调用(DelegatingMethodAccessorImpl.java:43)com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 的 java.lang.reflect.Method.invoke(Method.java:498) 的调用(DelegatingMethodAccessorImpl.java:43)

以下是我的代码...

0 投票
2 回答
2630 浏览

scala - 使用 scalapb 在 Spark Streaming 中解码 Proto Buf 消息时出错

这是一个 Spark Streaming 应用程序,它使用Proto Buf. 使用scalapb库。我收到以下错误。请帮忙。

以下是我的代码...

0 投票
2 回答
642 浏览

scala - 如何访问数据框 .proto、ScalaPB 中的嵌套字段

以下我的数据框架构

我想输出名称和城市。以下是我的 spark 流应用程序,它输出名称和地址,但我想要输出中的名称和城市。感谢你的帮助。谢谢。

0 投票
0 回答
670 浏览

scala - ScalaPB 和 SBT 着色不起作用

我正在尝试让 https://scalapb.github.io/">ScalaPB 与 Spark 一起使用。为了做到这一点,我需要使用如下代码在 SBT 中隐藏我的协议缓冲区

这在 spark-shell 中不起作用。事实上,它甚至不能在 Scala REPL 中工作。下面转载的是我在 REPL 中的错误。

这是我的 build.sbt 文件的完整副本

和项目/protos.sbt

谁能帮我理解我在这里缺少什么?

谢谢

编辑

为了看看这是否是我要介绍的问题,我下载了这个项目并按原样构建它,只是将 Scala 版本更改为 2.11.8,我得到了同样的错误。

0 投票
1 回答
99 浏览

scala - 为相同的“请求”分组/识别参与者日志

我正在为日志记录问题寻找一种方法或最佳/更好的设计决策。我在集群中使用 Akka actor 来提供后端服务,并在前端使用 Play 来接受 HTTP 请求。我的问题是从让整个应用程序日志可识别的旧问题扩展到相同的 HTTP 请求,这些请求只是使用大多数当前日志记录框架中存在的 MDC,方法是在开头生成一个 UUID 并将其放入上下文中。

我们的数据流示例可能如下所示:

“Http 请求/系统 A” -> “Actor1/Cluster B” -> “Actor2/Cluster C” -> “回复系统 A 并完成请求”

这意味着该过程中至少涉及 3 个独立的系统。我所有的日志都转到 Logstash。我可以从系统 A 的请求开始生成一个 UUID。但是,我希望 UUID 可以被携带/搭载到所有子系统,这些子系统都使用 Protobuf 序列化来相互通信,处理属于的作业到同一个http请求。

我知道我总是可以在我的所有消息中添加一个 id 字段,但这非常难看。

我想知道是否有更好的方法或更好的机制将信息传送到所有其他调用 Akka 系统,而不会给我的业务逻辑处理带来太多噪音?

0 投票
1 回答
641 浏览

scala - ScalaPB 与 Scala.js 和 Scala(jvm) - 存在链接错误

我的 sbt 中有多个子项目,一个是服务器(基于 playframework),另一个是客户端(scala.js),第三个是两者之间以 protobuf(scalapb)形式进行的通信。

现在,这是我的build.sbt

这是plugins.sbt

这是一个proto文件:

编译后我得到Test.class

现在在客户端我尝试:

(当我通过它发送字符串时,websocket 本身工作得很好!)

现在我得到了这个巨大的堆栈跟踪:

我的 ide 找到了一切,但显然我做错了什么。我已经看过https://github.com/thesamet/scalapbjs-test但无济于事。

该行出现问题val msg = Test().withId(1337)

编辑:评论后我改变了build.sbt

现在现在既play不能client也不能解决原型类:(

(我也知道多余的PB.targets in Compile...,我只是认为共享在那里可能不起作用,所以我再次将它添加到两个不同的设置中)

0 投票
3 回答
290 浏览

scala - 在运行时以优雅的方式处理不同消息类型的负载

为了能够处理大量不同的请求类型,我创建了一个.proto这样的文件:

我添加了,typeId以便知道实际protobuf bytes代表什么。(自我描述)

现在我的问题是以优雅的方式处理不同的“具体类型”。(注意:如果我简单地使用类似switch-case的方法,一切正常!)

我想到了这样的解决方案:

1)具有不同处理程序必须实现的特征,例如:

2) 提供某种注册表来查询给定消息的正确处理程序:

3)如果有请求进来,它会识别自己,所以我们需要另一个 Mapper:

4)如果有请求进来,处理它:

现在,直到这里一切看起来都很好,花花公子,但是这条线打破了我的整个世界:

它导致:

类型不匹配; 发现:com.trueaccord.scalapb.GeneratedMessage 与 Product 与 com.trueaccord.scalapb.Message [_ >: tld.test.proto.Message.Test 与 tld.test.proto.Message.Other <: com.trueaccord.scalapb。 GeneratedMessage with Product] with com.trueaccord.lenses.Updatable[_ >: tld.test.proto.Message.Other with tld.test.proto.Message.Test <: com.trueaccord.scalapb.GeneratedMessage with Product]{def companion :可序列化} 需要:_1

据我了解-如果有误,请在此处纠正我-编译器无法确定此处actualRequest是否可以由处理程序“处理”。这意味着它缺乏这样的知识,即actualRequest它肯定存在于其中,而且它mapper也存在一个handler

它基本上是人类可以获得的隐含知识,但编译器无法推断。

那么,话虽这么说,我怎样才能优雅地克服这种情况呢?

0 投票
0 回答
41 浏览

scala - scalapb 在使用普通测试时没有返回频道

我正在尝试使用带有 scalaPB 的 gRpc 构建客户端。根据文档,我应该能够运行以下命令来创建频道:

但是当我无法识别构建方法时,这是因为以下返回的是 Any 而不是 ManageChannelBuilder

0 投票
1 回答
382 浏览

protocol-buffers - 使用任何类型提高 Protobuf 性能

使用 protobuf 是否可以提高性能?目前,我有几个服务只是传递数据而不做任何转换。使用 Any 类型可以节省序列化/反序列化数据所花费的 cpu 周期吗?

0 投票
0 回答
1009 浏览

scala - 将 Play Framework 2.6 与 gRPC 和 Netty 集成

在我写这篇文章的时候,Play Framework 是v2.6.0-M4。由于 Netty 冲突,该框架的 v2.5 版本难以与 gRPC 一起使用(请参阅此 stackoverflow 答案)。

我开始研究 gRPC 和 protobufs。已经从 Play Framework 2.5 > 2.6.0-M4 移植了一个项目,以期待实际发布。目前我对 gRPC 的集成有一些疑问。我想知道如何让 gRPC 服务器与 Play Framework 一起工作。我知道v2.6 切换到 Akka HTTP 服务器而不是 Netty,而我grpc-netty在 sbt 中使用依赖项,所以也许我必须再次将项目切换到 Netty(这里是如何)。

出于测试目的,我创建了一个快速而肮脏的GrpcServer.scala类,它启动一个带有 GrpcServer 侦听的线程。我设法用 gRPC 添加ScalaPB并生成/编译我的 protobufs。它可以完美地与一个小型测试 NodeJS 应用程序通信,但我必须独立于主项目启动这个服务器应用程序:

将 gRPC 集成到 Play 框架中的可能解决方案

现在对于 Play Framework v2.6 中的真正集成,我正在寻找建议。这是我可以做的一些事情:

  • 创建一个模块并在 Play Framework 启动时启动 gRPC 服务器,如此 stackoverflow answer中所述。这意味着我们在现有服务器旁边的不同端口上运行 gRPC 服务器(Play Framework 2.6 中的 Akka HTTP 服务器)
  • 创建一个 Scala 命令并使其长时间运行。因此,当我们在服务器上启动应用程序时,我们始终确保启动运行 gRPC 服务器的命令。
  • 在 Play Framework v2.6 中从 Akka HTTP 切换到 Netty,并将 gRPC 与现有的 Netty 服务器紧密集成,因此它可以连接到现有的 Netty 服务器,而不是我们自己创建服务器。我想要这个解决方案,但不知道如何处理它。它肯定会避免运行两个单独的 http 堆栈。

任何关于干净集成的提示/想法都是有帮助的,因为没有太多关于 Play Framework 和 gRPC 的可用信息,除了之前的 2.5 版本中存在问题......