我们正在为 Deno 创建一个 Kafka 客户端。我们正在尝试以与 KafkaJS 相同的方式实现 Endocindg/Decoding 我们的缓冲区。我们已经取得了一定的成功,但偶尔我们会收到以下错误:
错误:未捕获(承诺中) RangeError:偏移量超出了 DataView 的范围
return new DataView(this.buffer, this.byteOffset, this.byteLength).getInt32( ^ at DataView.getInt32 () at Buffer.readInt32BE (https://deno.land/std@0.76.0/node/buffer.ts :365:72) 在 Decoder.readInt32 (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/decoder.js:60:31) 在 Decoder.readBytes (file :///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/decoder.js:146:31) 在 V0Decoder (file:///C:/Users/wesge/Desktop/ Coding/Projects/OS-Labs/kafkaSaur/protocol/message/decoder.js:13:18) 在 decodeMessage (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol /message/decoder.js:26:14) 默认 (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/message/decoder.js:49:19)在 EntryDecoder (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/messageSet/decoder.js:91:10) 默认(file:///C:/Users/wesge/Desktop/Coding/Projects/ OS-Labs/kafkaSaur/protocol/messageSet/decoder.js:22:23) 在 decodePartition (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/src/testConsumer.ts :137:21) [E] [daem] 应用程序崩溃 - 在开始之前等待文件更改...
错误来自这段代码的最后一行,与我们编写解码函数的方式有关。我试图弄清楚我们需要改变什么来消除这种情况,但我对编码和解码缓冲区不太熟悉。任何帮助将不胜感激!
import { Encoder } from '../protocol/encoder.js';
import request from '../protocol/request.js';
import { readAll, writeAll } from 'https://deno.land/std@0.105.0/io/util.ts';
import { Decoder } from '../protocol/decoder.js';
import MessageSetDecoder from '../protocol/messageSet/decoder.js';
import { Buffer } from 'https://deno.land/std@0.76.0/node/buffer.ts';
//type imports
import {
Broker,
IHeaders,
Message,
PartitionOffset,
ProducerBatch,
TopicMessages,
TopicOffsets,
} from '../index.d.ts';
//*****here is the actual function
export default async function func() {
//step 0 - intiate connection
const conn = await Deno.connect({
hostname: 'localhost',
port: 9093,
transport: 'tcp',
});
console.log('Connected', conn);
//step 00 - type everything
interface topicDataType {
topicData: Array<{
topic: string;
partitions: Array<{
partition: number;
firstSequence?: number;
messages: Message[];
}>;
}>;
}
//step 1 - big encode method - encodes entire message
const consumeMessage = ({
replicaId,
maxWaitTime,
minBytes,
topics,
}: any) => ({
apiKey: 1,
apiVersion: 0,
apiName: 'Fetch',
encode: async () => {
return new Encoder()
.writeInt32(replicaId)
.writeInt32(maxWaitTime)
.writeInt32(minBytes)
.writeArray(topics.map(encodeTopic));
},
});
//step 2a - topic encode method - encodes topic and partitions
const encodeTopic = ({ topic, partitions }: any) => {
return new Encoder()
.writeString(topic)
.writeArray(partitions.map(encodePartition));
};
//step 2b - partition encoder - nested in 2a
const encodePartition = ({ partition, fetchOffset, maxBytes }: any) => {
return new Encoder()
.writeInt32(partition)
.writeInt64(fetchOffset)
.writeInt32(maxBytes);
};
//step 3 - create topic data
const td = [
{
topic: 'sams-topic',
partitions: [
{
partition: 1,
fetchOffset: '0',
//maxBytes: 2048
maxBytes: 2048,
},
],
},
];
//step 4 - use big encoder to make a message
const message = consumeMessage({
replicaId: -1,
maxWaitTime: 100000,
minBytes: 1,
topics: td,
});
//step 5 - create request
const consumeRequest = await request({
correlationId: 1,
clientId: 'my-app',
request: message,
});
const decodePartition = async (decoder: any) => ({
partition: decoder.readInt32(),
errorCode: decoder.readInt16(),
highWatermark: decoder.readInt64().toString(),
messages: await MessageSetDecoder(decoder),
});
//takes in above
const decodeResponse = async (decoder: any) => ({
topicName: decoder.readString(),
partitions: await decoder.readArrayAsync(decodePartition),
});
//takes in above
const decode = async (rawData: any) => {
const decoder = new Decoder(rawData);
decoder.offset = 8;
const responses = await decoder.readArrayAsync(decodeResponse);
return {
responses,
};
};
//step 6 - send it
//**ENCODING AND SENDING */
const dcd = new TextDecoder();
const writer = await writeAll(conn, consu
meRequest.buf);
const response = new Uint8Array(512);
//**GETTING RESPONSE */
await conn.read(response);
console.log('response:', response);
/**DECODING RESPONSE */
const newBuff = await new Buffer(response);
console.log('new buff', newBuff);
const decoded = await decode(newBuff);
}
func();