0

我是 pyflink 框架的初学者,我想知道我的用例是否可以使用它......

我需要制作一个翻滚窗口并在其上应用 python udf(scikit 学习聚类模型)。用例是:每 30 秒我想在前 30 秒的数据上应用我的 udf。

目前,我成功地在流中使用了来自 kafka 的数据,但是我无法使用 python API 在非键控流上创建 30 秒窗口。

你知道我的用例的一些例子吗?你知道 pyflink API 是否允许这样做吗?

这是我的第一枪:

from pyflink.common import Row
from pyflink.common.serialization import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
from pyflink.common import Duration

import time

from utils.selector import Selector
from utils.timestampAssigner import KafkaRowTimestampAssigner

# 1. create a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file:///flink-sql-connector-kafka_2.11-1.14.0.jar")

deserialization_schema = JsonRowDeserializationSchema.builder() \
    .type_info(type_info=Types.ROW_NAMED(["labelId","freq","timestamp"],[Types.STRING(),Types.DOUBLE(),Types.STRING()])).build()


kafka_consumer = FlinkKafkaConsumer(
    topics='events',
    deserialization_schema=deserialization_schema,
    properties={'bootstrap.servers': 'localhost:9092'})



# watermark_strategy = WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5))\
#     .with_timestamp_assigner(KafkaRowTimestampAssigner())

ds = env.add_source(kafka_consumer)
ds.print()
ds = ds.windowAll()
# ds.print()

env.execute()


WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/home/dorian/dataScience/pyflink/pyflink_env/lib/python3.6/site-packages/pyflink/lib/flink-dist_2.11-1.14.0.jar) to field java.util.Properties.serialVersionUID
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Traceback (most recent call last):
  File "/home/dorian/dataScience/pyflink/project/__main__.py", line 35, in <module>
    ds = ds.windowAll()
AttributeError: 'DataStream' object has no attribute 'windowAll'

谢谢

4

0 回答 0