我的 nodejs 代码的节点。我有一个用于请求/响应的 api。首先,我发出一个请求http://localhost:3000/number1
,在我启动一个消费者之后,它使用来自 kafka 主题和一个分区“接收”的消息,然后我尝试找到 id = 的消息number1
。在我想用这个值向用户返回一个响应之后。所以我创建了一个消费者,如下所示:
options = {
kafkaHost: 'kafka:9092'
}
const client_node = new kafka_node.KafkaClient(options);
var Consumer = kafka_node.Consumer
var consumer_node = new Consumer(
client_node,
[
{ topic: 'receive.kafka.entities', partition: 0 , offset: 0}
],
{
autoCommit: false,
fetchMaxWaitMs: 100,
fromOffset: 'earliest' ,
groupId: 'kafka-node-group',
asyncPush: false,
}
);
const read = (callback)=>{
let ret = "1"
consumer_node.on('message',async function (message) {
var parse1 = JSON.parse(message.value)
var parse2 = JSON.parse(parse1.payload)
var id = parse2.fullDocument.id
var lastOffset = message.highWaterOffset - 1
//check if there is a query
if(lastOffset <= message.offset || ret !== "1"){
return callback(ret)
}
else if(id === back2){
ret = parse2.fullDocument
}
});
}
let error = {
id: "The entity " + back2 + " not found "
}
read((data)=>{
consumer_node.close(true,function(message){
if(data != "1"){
res.status(200).send(data)
}
else{
res.status(404).send(error)
}
})
})
如果我尝试提出一个连续的请求,在第一个请求之后我会得到一个响应:
{
"message": "Broker not available (loadMetadataForTopics)"
}
我的 Docker-compose file1 如下:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.1
container_name: stellio-zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- default
- localnet
kafka:
image: confluentinc/cp-enterprise-kafka:latest
container_name: kafka
ports:
- 9092:9092
- 9101:9101
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:9092
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
depends_on:
- zookeeper
networks:
- default
- localnet
- my-proxy-net-kafka
networks:
default: # this network (app2)
driver: bridge
my-proxy-net-kafka:
external:
name: kafka_network
码头工人撰写文件2
app:
container_name: docker-node
hostname: docker-node
restart: always
build: .
command: nodemon /usr/src/app/index.js
networks:
- default
- proxynet-kafka
ports:
- '3000:3000'
volumes:
- .:/usr/src/app
networks:
default:
driver: bridge
proxynet-kafka:
name: kafka_network
为什么会这样?你能帮我解决这个问题吗?
[如果您想了解更多信息,请随时问我:)]