我刚刚开始使用 Apache Flink (Scala API),我的问题如下:我正在尝试根据 Flink 站点的一个示例将数据从 Kafka 流式传输到 Apache Flink:
val stream =
env.addSource(new FlinkKafkaConsumer09("testing", new SimpleStringSchema() , properties))
一切正常,stream.print() 语句在屏幕上显示以下内容:
2018-05-16 10:22:44 AM|1|11|-71.16|40.27
我想使用案例类来加载数据,我尝试过使用
flatMap(p=>p.split("|"))
但它一次只拆分一个字符的数据。
基本上预期的结果是能够填充案例类的5个字段,如下所示
field(0)=2018-05-16 10:22:44 AM
field(1)=1
field(2)=11
field(3)=-71.16
field(4)=40.27
但它现在正在做:
field(0) = 2
field(1) = 0
field(3) = 1
field(4) = 8
ETC...
任何建议将不胜感激。
先感谢您
坦率