0

我想要做什么

一旦按照python 演练,我正在尝试修改module.yaml文件,因此入口和出口不是Protobuf而是String。我并没有真正修改大多数文件,只是module.yaml尝试配置字符串入口并且greeter.py不考虑状态或 protobuf 消息,只打印从入口接收的输入。

项目的架构没有改变:

$ tree statefun-walkthrough
statefun-walkthrough
├── Dockerfile
├── docker-compose.yml
├── generator
│   ├── Dockerfile
│   ├── event-generator.py
│   └── messages_pb2.py
├── greeter
│   ├── Dockerfile
│   ├── greeter.py
│   ├── messages.proto
│   ├── messages_pb2.py
│   └── requirements.txt
└── module.yaml

使用的配置文件和python应用程序:

码头工人-compose.yml


version: "2.1"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka-broker:
    image: wurstmeister/kafka:2.12-2.0.1
    ports:
      - "9092:9092"
    environment:
      HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
      KAFKA_CREATE_TOPICS: "names:1:1,greetings:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    depends_on:
      - zookeeper
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  master: # for flink-statefun
    build:
      context: .
    expose:
      - "6123"
    ports:
      - "8081:8081"
    environment:
      - ROLE=master
      - MASTER_HOST=master
    volumes:
      - ./checkpoint-dir:/checkpoint-dir
  worker: # for flink-statefun
    build:
      context: .
    expose:
      - "6121"
      - "6122"
    depends_on:
      - master
      - kafka-broker
    links:
      - "master:master"
      - "kafka-broker:kafka-broker"
    environment:
      - ROLE=worker
      - MASTER_HOST=master
    volumes:
      - ./checkpoint-dir:/checkpoint-dir
  python-worker: # greeter application
    build:
      context: ./greeter
    expose:
      - "8000"
  event-generator: # reading and writting in kafka topic
    build:
      context: generator
      dockerfile: Dockerfile
    depends_on:
      - kafka-broker

模块.yaml

version: "1.0"
module:
  meta:
    type: remote
  spec:
    functions:
      - function:
          meta:
            kind: http
            type: example/greeter
          spec:
            endpoint: http://python-worker:8000/statefun
            maxNumBatchRequests: 500
            timeout: 2min
    ingresses:
      - ingress:
          meta:
            type: statefun.kafka.io/ingress
            id: example/names
          spec:
            address: kafka-broker:9092
            consumerGroupId: my-group-id
            topics:
              - topic: names
                valueType: io.statefun.types/string
                targets:
                  - example/greeter
    egresses:
      - egress:
          meta:
            type: statefun.kafka.io/egress
            id: example/greets
          spec:
            address: kafka-broker:9092
            deliverySemantic:
              type: exactly-once
              transactionTimeoutMillis: 100000

迎宾员.py

from statefun import StatefulFunctions
from statefun import RequestReplyHandler
from statefun import kafka_egress_record

functions = StatefulFunctions()


@functions.bind("example/greeter")
def greet(context, message):
    print(type(message), message)


handler = RequestReplyHandler(functions)

#
# Serve the endpoint
#

from flask import request
from flask import make_response
from flask import Flask

app = Flask(__name__)


@app.route('/statefun', methods=['POST'])
def handle():
    response_data = handler(request.data)
    response = make_response(response_data)
    response.headers.set('Content-Type', 'application/octet-stream')
    return response


if __name__ == "__main__":
    app.run()

错误

运行docker-compose up -d --build flink master后停止并出现下一个错误:

2022-02-14 18:11:14,795 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StatefulFunctionsClusterEntryPoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent.
        at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:256)
        at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219)
        at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
        at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520)
        at org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint.main(StatefulFunctionsClusterEntryPoint.java:99)
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve the JobGraph.
        at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:57)
        at org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
        at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:194)
        ... 6 more
Caused by: org.apache.flink.util.FlinkException: Could not create the JobGraph from the provided user code jar.
        at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:107)
        at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
        ... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no routers defined.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
        at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
        at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:101)
        ... 9 more
Caused by: java.lang.IllegalStateException: There are no routers defined.
        at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:31)
        at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:76)
        at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        ... 13 more

我不知道这个异常Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no routers defined.是否是主要问题以及为什么会发生。

4

0 回答 0