我想要做什么
一旦按照python 演练,我正在尝试修改module.yaml
文件,因此入口和出口不是Protobuf而是String。我并没有真正修改大多数文件,只是module.yaml
尝试配置字符串入口并且greeter.py
不考虑状态或 protobuf 消息,只打印从入口接收的输入。
项目的架构没有改变:
$ tree statefun-walkthrough
statefun-walkthrough
├── Dockerfile
├── docker-compose.yml
├── generator
│ ├── Dockerfile
│ ├── event-generator.py
│ └── messages_pb2.py
├── greeter
│ ├── Dockerfile
│ ├── greeter.py
│ ├── messages.proto
│ ├── messages_pb2.py
│ └── requirements.txt
└── module.yaml
使用的配置文件和python应用程序:
码头工人-compose.yml
version: "2.1"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka-broker:
image: wurstmeister/kafka:2.12-2.0.1
ports:
- "9092:9092"
environment:
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_CREATE_TOPICS: "names:1:1,greetings:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
master: # for flink-statefun
build:
context: .
expose:
- "6123"
ports:
- "8081:8081"
environment:
- ROLE=master
- MASTER_HOST=master
volumes:
- ./checkpoint-dir:/checkpoint-dir
worker: # for flink-statefun
build:
context: .
expose:
- "6121"
- "6122"
depends_on:
- master
- kafka-broker
links:
- "master:master"
- "kafka-broker:kafka-broker"
environment:
- ROLE=worker
- MASTER_HOST=master
volumes:
- ./checkpoint-dir:/checkpoint-dir
python-worker: # greeter application
build:
context: ./greeter
expose:
- "8000"
event-generator: # reading and writting in kafka topic
build:
context: generator
dockerfile: Dockerfile
depends_on:
- kafka-broker
模块.yaml
version: "1.0"
module:
meta:
type: remote
spec:
functions:
- function:
meta:
kind: http
type: example/greeter
spec:
endpoint: http://python-worker:8000/statefun
maxNumBatchRequests: 500
timeout: 2min
ingresses:
- ingress:
meta:
type: statefun.kafka.io/ingress
id: example/names
spec:
address: kafka-broker:9092
consumerGroupId: my-group-id
topics:
- topic: names
valueType: io.statefun.types/string
targets:
- example/greeter
egresses:
- egress:
meta:
type: statefun.kafka.io/egress
id: example/greets
spec:
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
transactionTimeoutMillis: 100000
迎宾员.py
from statefun import StatefulFunctions
from statefun import RequestReplyHandler
from statefun import kafka_egress_record
functions = StatefulFunctions()
@functions.bind("example/greeter")
def greet(context, message):
print(type(message), message)
handler = RequestReplyHandler(functions)
#
# Serve the endpoint
#
from flask import request
from flask import make_response
from flask import Flask
app = Flask(__name__)
@app.route('/statefun', methods=['POST'])
def handle():
response_data = handler(request.data)
response = make_response(response_data)
response.headers.set('Content-Type', 'application/octet-stream')
return response
if __name__ == "__main__":
app.run()
错误
运行docker-compose up -d --build
flink master后停止并出现下一个错误:
2022-02-14 18:11:14,795 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StatefulFunctionsClusterEntryPoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent.
at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:256)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520)
at org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint.main(StatefulFunctionsClusterEntryPoint.java:99)
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve the JobGraph.
at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:57)
at org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:194)
... 6 more
Caused by: org.apache.flink.util.FlinkException: Could not create the JobGraph from the provided user code jar.
at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:107)
at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no routers defined.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:101)
... 9 more
Caused by: java.lang.IllegalStateException: There are no routers defined.
at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:31)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:76)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 13 more
我不知道这个异常Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no routers defined.
是否是主要问题以及为什么会发生。