1
  • 我找不到对应的函数 max 或 max_ by
    ** 这是我的消息 **
Java

public class TransformTest2_Rolling {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String inputPath = "/Users/yinbaidong/个人学习/Flink_t/src/main/resources/wudu.txt";
        DataStream<String> DataSource = env.readTextFile(inputPath);
        DataStream<SensorReading> datastream = DataSource.map(line ->{
                String[] fields = line.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
        });

        //分组
        KeyedStream<SensorReading, Tuple> keyStream = datastream.keyBy("id");
        DataStream<SensorReading> out=keyStream.max("temperature");
        out.print();
        env.execute();

    }
}

Java 结果

5> SensorReading{id=sensor_1',timestamp=1547718199,temperature=35.8}
2> SensorReading{id=sensor_2',timestamp=1547718499,temperature=35.8}
2> SensorReading{id=sensor_2',timestamp=1547718499,temperature=36.8}
5> SensorReading{id=sensor_1',timestamp=1547718199,temperature=39.8}
2> SensorReading{id=sensor_2',timestamp=1547718499,temperature=36.8}
2> SensorReading{id=sensor_2',timestamp=1547718499,temperature=36.8}
5> SensorReading{id=sensor_1',timestamp=1547718199,temperature=41.8}

我想在 Python 中做到这一点

from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode


def f_map():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    env.set_parallelism(1)
    ds = env.read_text_file("/Users/yinbaidong/PycharmProjects/pythonProject2/wudu.txt")
    new_ds = ds.map(lambda i: {"id": i.split(",")[0], "timestamp": i.split(",")[1], "temperature": i.split(",")[2]}).key_by(
        lambda i: i.get("id")).max("temperature")
    new_ds.print()
    env.execute()

if __name__ == '__main__':
    f_map()

Python 这是一条错误消息

* Traceback(最近一次调用最后):文件“/Users/yinbaidong/PycharmProjects/pythonProject2/Test/Rolling.py”,第 15 行,在 f_map() 文件“/Users/yinbaidong/PycharmProjects/pythonProject2/Test/Rolling.py ",第 10 行,在 f_map lambda i: i.get("id")).max("temperature") AttributeError: 'KeyedStream' 对象没有属性 'max'

这是我的测试数据 wudu.txt

sensor_1,1547718199,35.8
sensor_2,1547718299,35.8
sensor_2,1547718399,34.8
sensor_2,1547718499,35.8
sensor_2,1547718599,36.8
sensor_1,1547718699,41.8
sensor_1,1547718799,39.8

帮我

4

0 回答 0