0

我刚刚开始使用 Apache Flink (Scala API),我的问题如下:我正在尝试根据 Flink 站点的一个示例将数据从 Kafka 流式传输到 Apache Flink:

val stream =
  env.addSource(new FlinkKafkaConsumer09("testing", new SimpleStringSchema() , properties))

一切正常,stream.print() 语句在屏幕上显示以下内容:

2018-05-16 10:22:44 AM|1|11|-71.16|40.27

我想使用案例类来加载数据,我尝试过使用

flatMap(p=>p.split("|")) 

但它一次只拆分一个字符的数据。

基本上预期的结果是能够填充案例类的5个字段,如下所示

  field(0)=2018-05-16 10:22:44 AM
  field(1)=1
  field(2)=11
  field(3)=-71.16 
  field(4)=40.27

但它现在正在做:

   field(0) = 2
   field(1) = 0
   field(3) = 1
   field(4) = 8 

ETC...

任何建议将不胜感激。

先感谢您

坦率

4

1 回答 1

3

问题在于String.split. 如果你用 a 调用它String,那么该方法期望它是一个正则表达式。因此,p.split("\\|")这将是您输入数据的正确正则表达式。或者,您也可以调用split指定分隔符的变体p.split('|')。两种解决方案都应该给你想要的结果。

于 2016-05-17T13:53:42.220 回答