我在 Spark 上使用 ScalaPB 运行服务器/客户端时遇到问题。
当我使用“sbt run”运行我的代码时,它完全可以正常工作。我想使用 spark 运行这段代码,因为接下来我会导入我的 spark 模型来预测一些标签。但是当我将我的 jar 提交给 spark 时,他们给了我这样的错误。
Exception in thread "main" io.grpc.ManagedChannelProvider$ProviderNotFoundException:
No functional server found. Try adding a dependency on the grpc-netty artifact
这是我的 build.sbt
scalaVersion := "2.11.7"
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
)
val scalapbVersion =
scalapb.compiler.Version.scalapbVersion
val grpcJavaVersion =
scalapb.compiler.Version.grpcJavaVersion
libraryDependencies ++= Seq(
// protobuf
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
//for grpc
"io.grpc" % "grpc-netty" % grpcJavaVersion ,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion
)
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
使用阴影仍然不起作用
assemblyShadeRules in assembly := Seq(ShadeRule.rename("com.google.**" -> "shadegoogle.@1").inAll)
这是我的主要
import java.util.logging.Logger
import io.grpc.{Server, ServerBuilder}
import org.apache.spark.ml.tuning.CrossValidatorModel
import org.apache.spark.sql.SparkSession
import testproto.test.{Email, EmailLabel, RouteGuideGrpc}
import scala.concurrent.{ExecutionContext, Future}
object HelloWorldServer {
private val logger = Logger.getLogger(classOf[HelloWorldServer].getName)
def main(args: Array[String]): Unit = {
val server = new HelloWorldServer(ExecutionContext.global)
server.start()
server.blockUntilShutdown()
}
private val port = 50051
}
class HelloWorldServer(executionContext: ExecutionContext) {
self =>
private[this] var server: Server = null
private def start(): Unit = {
server = ServerBuilder.forPort(HelloWorldServer.port).addService(RouteGuideGrpc.bindService(new RouteGuideImpl, executionContext)).build.start
HelloWorldServer.logger.info("Server started, listening on " + HelloWorldServer.port)
sys.addShutdownHook {
System.err.println("*** shutting down gRPC server since JVM is shutting down")
self.stop()
System.err.println("*** server shut down")
}
}
private def stop(): Unit = {
if (server != null) {
server.shutdown()
}
}
private def blockUntilShutdown(): Unit = {
if (server != null) {
server.awaitTermination()
}
}
private class RouteGuideImpl extends RouteGuideGrpc.RouteGuide {
override def getLabel(request: Email): Future[EmailLabel] = {
val replay = EmailLabel(emailId = request.emailId, label = "aaaaa")
Future.successful(replay)
}
}
}
谢谢