1

当我将生产者设置放入我的代码中时,我一次又一次地遇到问题。当我没有它时,一切正常。下面给出了包含所有代码的文件单个文件,我正在尝试将文件写入 kafka 流。并得到这个编译错误。

package somePackage

import java.nio.file.Paths

import akka.Done
import akka.actor.{Actor, ActorLogging, ActorSystem, Cancellable, Props}
import akka.kafka.ProducerSettings
import akka.serialization.ByteArraySerializer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{FileIO, Sink}
import akka.util.ByteString
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

import scala.concurrent.Future


// Lines Producer
class LinesProducer (implicit mat: Materializer) extends Actor with ActorLogging {
  import LinesProducerCompanion._

  override def preStart():Unit = {
    log.info("Not doing anything in PreStart!")
  }

  def receive = {
    case Start =>
      log.info("LinesProducer Started.")
      val future:Future[Done] = FileIO.fromPath(Paths.get("C:\\Users\\tnkteja\\Documents\\GitHub\\scala-immersion-program\\miniproject-1\\1000-genomes.csv"))
        .map( line => {new ProducerRecord[Array[Byte],ByteString]("genomes0", line)})
        .runWith( Sink.ignore )
  }

  override def postStop():Unit = {
    log.info("Doing nothing in postStop!")
  }
}

object LinesProducerCompanion {
  val props = Props[LinesProducer]
  case object Start
  case object Stop
}

object Application extends App {

  implicit val system:ActorSystem = ActorSystem("some")
  implicit val materializer:ActorMaterializer= ActorMaterializer()
  implicit val executor = system.dispatcher
  val LinesProducer = system.actorOf(LinesProducerCompanion.props, "LinesProducer")
  val producerSetting = ProducerSettings(system, new ByteArraySerializer(), new StringSerializer).withBootstrapServers("localhost:9092")
  LinesProducer ! LinesProducerCompanion.Start

  // This example app will ping pong 3 times and thereafter terminate the ActorSystem -
  // see counter logic in PingActor
  //system.awaitTermination()
}

错误是

info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to project (in build file:/C:/Users/tnkteja/Documents/GitHub/scala-immersion-program/miniproject-1/)
[info] Compiling 1 Scala source to C:\Users\tnkteja\Documents\GitHub\scala-immersion-program\miniproject-1\target\scala-2.12\classes...
[error] C:\Users\tnkteja\Documents\GitHub\scala-immersion-program\miniproject-1\src\main\scala\Application.scala:29: overloaded method value apply with alternatives:
[error]   [K, V](config: com.typesafe.config.Config, keySerializer: org.apache.kafka.common.serialization.Serializer[K], valueSerializer: org.apache.kafka.common.serialization.Serializer[V
])akka.kafka.ProducerSettings[K,V] <and>
[error]   [K, V](system: akka.actor.ActorSystem, keySerializer: org.apache.kafka.common.serialization.Serializer[K], valueSerializer: org.apache.kafka.common.serialization.Serializer[V])ak
ka.kafka.ProducerSettings[K,V] <and>
[error]   [K, V](config: com.typesafe.config.Config, keySerializer: Option[org.apache.kafka.common.serialization.Serializer[K]], valueSerializer: Option[org.apache.kafka.common.serializati
on.Serializer[V]])akka.kafka.ProducerSettings[K,V] <and>
[error]   [K, V](system: akka.actor.ActorSystem, keySerializer: Option[org.apache.kafka.common.serialization.Serializer[K]], valueSerializer: Option[org.apache.kafka.common.serialization.S
erializer[V]])akka.kafka.ProducerSettings[K,V]
[error]  cannot be applied to (akka.actor.ActorSystem, akka.serialization.ByteArraySerializer, org.apache.kafka.common.serialization.StringSerializer)
[error]       val producerSetting = ProducerSettings(system, new ByteArraySerializer(), new StringSerializer).withBootstrapServers("localhost:9092")
[error]                             ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 8 s, completed 6 May, 2017 10:42:15 AM
4

2 回答 2

1

您是否尝试过使用 KafkaByteArraySerializer代替?

import org.apache.kafka.common.serialization.ByteArraySerializer

Akka 的ByteArraySerializer()实例构造函数已被弃用。

于 2017-05-06T17:53:14.750 回答
0

这段代码解决了这个问题。

包 com.miniproject1

import akka.Done
import akka.actor.{Actor, ActorLogging, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}


// Futures need execution context to reuse allocated thread pools
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

import scala.io.Source.fromFile


class LinesProducer(implicit mat: Materializer) extends Actor with ActorLogging {
  import LinesProducerCompanion._

  override def preStart(): Unit = {
    super.preStart()
    log.info("Not doing anything in PreStart!")
  }

  override def receive: Receive = {
    case Start => {

      val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
        .withBootstrapServers("192.168.99.100:9092")

      log.info("Initializing writer")

      val kafkaSink = Producer.plainSink(producerSettings)

      //
      val done: Future[Done] = Source.fromIterator(() => fromFile("C:\\Users\\tnkteja\\Documents\\GitHub\\scala-immersion-program\\miniproject-1\\1000-genomes.csv").getLines().drop(1))
        .map(line => {new ProducerRecord[Array[Byte], String]("genomes0", line)})
        .runWith(kafkaSink)

      done.onComplete({
        success =>
          log.info("Writing to kafka Complete!")
          context.stop(self)
      })

      done.onFailure {
        case ex =>
          log.info("*********************Stopping********************")
          context.stop(self)
      }
    }
  }
}

object LinesProducerCompanion{
  val props = Props[LinesProducer]
  case object Start
}
于 2017-05-08T15:24:57.837 回答