0

我正在使用 Alpakka 解析 csv 文件。版本 "com.lightbend.akka" %% "akka-stream-alpakka-csv" % 0.20 我有带有未封闭报价的 csv 文件。

email
test@emample.com
"test@emample.com
test@emample.com
test@emample.com

我想跳过坏行然后继续,但我的流量正在下降。

我正在使用 supervisorStrategy Supervision.Resume,但它不起作用。

找到未关闭的报价时流失败。

有没有办法解决这个问题?

我的代码:

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()

def hdfsSource(csv: String): Source[ByteString, Future[IOResult]] =
  Source
    .single(csv)
    .map(ByteString.apply)
    .mapMaterializedValue(_ => Future.successful(IOResult(1, Success(Done))))

val csv = """email,country,name
            |"test,test,test
            |test,test,test
            |test,test,test
            |""".stripMargin

val source = hdfsSource(csv)

val decider: Supervision.Decider = {
  case _ ⇒ Supervision.Resume
}

val result = source
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMapAsStrings())
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .runForeach(println)
4

1 回答 1

0

目前CsvParsing.lineScanner()不支持监管策略。您可以选择另一个符号作为行扫描器的引号字符CsvParsing.lineScanner(quoteChar = '\'')。然后,您将获得未闭合的双引号作为解析结果的一部分:

Map(email -> "test, country -> test, name -> test) Map(email -> test, country -> test, name -> test) Map(email -> test, country -> test, name -> test)

于 2018-11-30T06:44:05.247 回答