我使用 pyflink 1.11 有以下代码:
import os
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.datastream.time_characteristic import TimeCharacteristic
from pyflink.table import (
StreamTableEnvironment,
DataTypes,
EnvironmentSettings,
CsvTableSink,
)
from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka
from pyflink.table.window import Tumble
env_settings = (
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
execution_environment = StreamExecutionEnvironment.get_execution_environment()
execution_environment.set_parallelism(1)
execution_environment.set_stream_time_characteristic(TimeCharacteristic.EventTime)
table_env = StreamTableEnvironment.create(
execution_environment, environment_settings=env_settings
)
statement_set = table_env.create_statement_set()
KAFKA_BROKER_URL = "<URL>"
KAFKA_TOPIC = "user_events"
def get_data_from_kafka_source():
table_env.connect(
Kafka()
.version("universal")
.topic(KAFKA_TOPIC)
.start_from_earliest()
.property("bootstrap.servers", KAFKA_BROKER_URL)
).with_format(
Json()
.fail_on_missing_field(False)
.schema(
DataTypes.ROW(
[
DataTypes.FIELD("event_timestamp", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("event_uid", DataTypes.STRING()),
DataTypes.FIELD("user_id", DataTypes.STRING()),
DataTypes.FIELD("country", DataTypes.STRING()),
]
)
)
).with_schema(
Schema()
.field("event_timestamp", DataTypes.TIMESTAMP(3))
.field("event_uid", DataTypes.STRING())
.field("country", DataTypes.STRING())
.field("user_id", DataTypes.STRING())
.field("rowtime", DataTypes.TIMESTAMP(3))
.rowtime(
Rowtime()
.timestamps_from_field("event_timestamp")
.watermarks_periodic_bounded(60000)
)
).in_append_mode().create_temporary_table(
"user_events"
)
def sink_into_csv():
result_file = "/opt/examples/data/output/output_file.csv"
if os.path.exists(result_file):
os.remove(result_file)
table_env.register_table_sink(
"sink_into_csv",
CsvTableSink(
["country", "count_sessions", "last_timestamp"],
[
DataTypes.STRING(),
DataTypes.DOUBLE(),
DataTypes.TIMESTAMP(3),
],
result_file,
),
)
def run_job():
get_data_from_kafka_source()
sink_into_csv()
table_env.scan("user_events").window(
Tumble.over("5.minutes").on("rowtime").alias("w")
).group_by("country, w").select(
"country AS country, COUNT(1) AS count_sessions, w.end AS last_timestamp"
).insert_into(
"sink_into_csv"
)
table_env.execute("kafka to csv")
if __name__ == "__main__":
run_job()
但我不断收到此错误:
Traceback (most recent call last):
File "code/kafka_to_csv.py", line 227, in <module>
run_job()
File "code/kafka_to_csv.py", line 210, in run_job
"country AS country, COUNT(1) AS count_sessions, w.end AS last_timestamp"
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table.py", line 907, in select
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o210.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
有人可以帮我理解我做错了什么吗?不确定它是否重要,但它event_timestamp
是"2021-11-03 20:24:46.095000"
格式。
先感谢您!