- 我找不到对应的函数 max 或 max_ by
** 这是我的消息 **
Java
public class TransformTest2_Rolling {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String inputPath = "/Users/yinbaidong/个人学习/Flink_t/src/main/resources/wudu.txt";
DataStream<String> DataSource = env.readTextFile(inputPath);
DataStream<SensorReading> datastream = DataSource.map(line ->{
String[] fields = line.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
});
//分组
KeyedStream<SensorReading, Tuple> keyStream = datastream.keyBy("id");
DataStream<SensorReading> out=keyStream.max("temperature");
out.print();
env.execute();
}
}
Java 结果
5> SensorReading{id=sensor_1',timestamp=1547718199,temperature=35.8}
2> SensorReading{id=sensor_2',timestamp=1547718499,temperature=35.8}
2> SensorReading{id=sensor_2',timestamp=1547718499,temperature=36.8}
5> SensorReading{id=sensor_1',timestamp=1547718199,temperature=39.8}
2> SensorReading{id=sensor_2',timestamp=1547718499,temperature=36.8}
2> SensorReading{id=sensor_2',timestamp=1547718499,temperature=36.8}
5> SensorReading{id=sensor_1',timestamp=1547718199,temperature=41.8}
我想在 Python 中做到这一点
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
def f_map():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)
ds = env.read_text_file("/Users/yinbaidong/PycharmProjects/pythonProject2/wudu.txt")
new_ds = ds.map(lambda i: {"id": i.split(",")[0], "timestamp": i.split(",")[1], "temperature": i.split(",")[2]}).key_by(
lambda i: i.get("id")).max("temperature")
new_ds.print()
env.execute()
if __name__ == '__main__':
f_map()
Python 这是一条错误消息
* Traceback(最近一次调用最后):文件“/Users/yinbaidong/PycharmProjects/pythonProject2/Test/Rolling.py”,第 15 行,在 f_map() 文件“/Users/yinbaidong/PycharmProjects/pythonProject2/Test/Rolling.py ",第 10 行,在 f_map lambda i: i.get("id")).max("temperature") AttributeError: 'KeyedStream' 对象没有属性 'max'
这是我的测试数据 wudu.txt
sensor_1,1547718199,35.8
sensor_2,1547718299,35.8
sensor_2,1547718399,34.8
sensor_2,1547718499,35.8
sensor_2,1547718599,36.8
sensor_1,1547718699,41.8
sensor_1,1547718799,39.8
帮我