0

我的 nodejs 代码的节点。我有一个用于请求/响应的 api。首先,我发出一个请求http://localhost:3000/number1,在我启动一个消费者之后,它使用来自 kafka 主题和一个分区“接收”的消息,然后我尝试找到 id = 的消息number1。在我想用这个值向用户返回一个响应之后。所以我创建了一个消费者,如下所示:

options = {
    kafkaHost: 'kafka:9092'
}
const client_node = new kafka_node.KafkaClient(options);

var Consumer = kafka_node.Consumer
var consumer_node = new Consumer(
  client_node,
  [
    { topic: 'receive.kafka.entities', partition: 0 , offset: 0}
  ],
  {
    autoCommit: false,
    fetchMaxWaitMs: 100,
    fromOffset: 'earliest' , 
    groupId: 'kafka-node-group',
    asyncPush: false,
   }
);


const read = (callback)=>{
            let ret = "1"
            consumer_node.on('message',async function (message) {
                var parse1 = JSON.parse(message.value)
                var parse2 = JSON.parse(parse1.payload)
                var id = parse2.fullDocument.id
                var lastOffset = message.highWaterOffset - 1
                //check if there is a query
               if(lastOffset <= message.offset || ret !== "1"){
                    return callback(ret)
                }
                else if(id === back2){
                    ret = parse2.fullDocument
                }
            }); 
 }

let error = {
   id: "The entity " + back2 + " not found "
 }

read((data)=>{
 consumer_node.close(true,function(message){
     if(data != "1"){
           res.status(200).send(data)
     }
     else{
           res.status(404).send(error)
     }
  })
})

如果我尝试提出一个连续的请求,在第一个请求之后我会得到一个响应:

{
    "message": "Broker not available (loadMetadataForTopics)"
}

我的 Docker-compose file1 如下:

zookeeper:
      image: confluentinc/cp-zookeeper:5.4.1
      container_name: stellio-zookeeper
      ports:
        - 2181:2181
      environment:
        ZOOKEEPER_SERVER_ID: 1
        ZOOKEEPER_CLIENT_PORT: 2181
        ZOOKEEPER_TICK_TIME: 2000
      networks:
        - default
        - localnet

kafka:
      image: confluentinc/cp-enterprise-kafka:latest
      container_name: kafka
      ports:
        - 9092:9092
        - 9101:9101
      environment:
        KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_BROKER_ID: 1
        KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
        KAFKA_JMX_PORT: 9101
        KAFKA_JMX_HOSTNAME: localhost
        CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:9092
        CONFLUENT_METRICS_ENABLE: 'true'
        CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' 
        KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 
      depends_on:
        - zookeeper
      networks:
        - default
        - localnet
        - my-proxy-net-kafka

networks:
    default: # this network (app2)
        driver: bridge
    my-proxy-net-kafka:
        external:
            name: kafka_network

码头工人撰写文件2

app:
        container_name: docker-node
        hostname: docker-node
        restart: always
        build: .
        command: nodemon /usr/src/app/index.js
        networks:
            - default
            - proxynet-kafka
        ports:
            - '3000:3000'
        volumes:
            - .:/usr/src/app

networks:
    default:
        driver: bridge
    proxynet-kafka:
        name: kafka_network

为什么会这样?你能帮我解决这个问题吗?

[如果您想了解更多信息,请随时问我:)]

4

0 回答 0