0

我们正在为 Deno 创建一个 Kafka 客户端。我们正在尝试以与 KafkaJS 相同的方式实现 Endocindg/Decoding 我们的缓冲区。我们已经取得了一定的成功,但偶尔我们会收到以下错误:

错误:未捕获(承诺中) RangeError:偏移量超出了 DataView 的范围
return new DataView(this.buffer, this.byteOffset, this.byteLength).getInt32( ^ at DataView.getInt32 () at Buffer.readInt32BE (https://deno.land/std@0.76.0/node/buffer.ts :365:72) 在 Decoder.readInt32 (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/decoder.js:60:31) 在 Decoder.readBytes (file :///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/decoder.js:146:31) 在 V0Decoder (file:///C:/Users/wesge/Desktop/ Coding/Projects/OS-Labs/kafkaSaur/protocol/message/decoder.js:13:18) 在 decodeMessage (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol /message/decoder.js:26:14) 默认 (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/message/decoder.js:49:19)在 EntryDecoder (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/messageSet/decoder.js:91:10) 默认(file:///C:/Users/wesge/Desktop/Coding/Projects/ OS-Labs/kafkaSaur/protocol/messageSet/decoder.js:22:23) 在 decodePartition (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/src/testConsumer.ts :137:21) [E] [daem] 应用程序崩溃 - 在开始之前等待文件更改...

错误来自这段代码的最后一行,与我们编写解码函数的方式有关。我试图弄清楚我们需要改变什么来消除这种情况,但我对编码和解码缓冲区不太熟悉。任何帮助将不胜感激!

    import { Encoder } from '../protocol/encoder.js';
    import request from '../protocol/request.js';
    import { readAll, writeAll } from 'https://deno.land/std@0.105.0/io/util.ts';
    import { Decoder } from '../protocol/decoder.js';
    import MessageSetDecoder from '../protocol/messageSet/decoder.js';
    import { Buffer } from 'https://deno.land/std@0.76.0/node/buffer.ts';

    
    //type imports
    import {
      Broker,
      IHeaders,
      Message,
      PartitionOffset,
      ProducerBatch,
      TopicMessages,
      TopicOffsets,
    } from '../index.d.ts';
    
    //*****here is the actual function
    export default async function func() {
      //step 0 - intiate connection
      const conn = await Deno.connect({
        hostname: 'localhost',
        port: 9093,
        transport: 'tcp',
      });
      console.log('Connected', conn);
    
      //step 00 - type everything
      interface topicDataType {
        topicData: Array<{
          topic: string;
          partitions: Array<{
            partition: number;
            firstSequence?: number;
            messages: Message[];
          }>;
        }>;
      }
    
      //step 1 - big encode method - encodes entire message
      const consumeMessage = ({
        replicaId,
        maxWaitTime,
        minBytes,
        topics,
      }: any) => ({
        apiKey: 1,
        apiVersion: 0,
        apiName: 'Fetch',
        encode: async () => {
          return new Encoder()
            .writeInt32(replicaId)
            .writeInt32(maxWaitTime)
            .writeInt32(minBytes)
            .writeArray(topics.map(encodeTopic));
        },
      });
    
      //step 2a - topic encode method - encodes topic and partitions
      const encodeTopic = ({ topic, partitions }: any) => {
        return new Encoder()
          .writeString(topic)
          .writeArray(partitions.map(encodePartition));
      };
    
      //step 2b - partition encoder - nested in 2a
      const encodePartition = ({ partition, fetchOffset, maxBytes }: any) => {
        return new Encoder()
          .writeInt32(partition)
          .writeInt64(fetchOffset)
          .writeInt32(maxBytes);
      };
    
      //step 3 - create topic data
      const td = [
        {
          topic: 'sams-topic',
          partitions: [
            {
              partition: 1,
              fetchOffset: '0',
              //maxBytes: 2048
              maxBytes: 2048,
            },
          ],
        },
      ];
    
      //step 4 - use big encoder to make a message
      const message = consumeMessage({
        replicaId: -1,
        maxWaitTime: 100000,
        minBytes: 1,
        topics: td,
      });
    
      //step 5 - create request
      const consumeRequest = await request({
        correlationId: 1,
        clientId: 'my-app',
        request: message,
      });
    
    
      const decodePartition = async (decoder: any) => ({
        partition: decoder.readInt32(),
        errorCode: decoder.readInt16(),
        highWatermark: decoder.readInt64().toString(),
        messages: await MessageSetDecoder(decoder),
      });
    
      //takes in above
      const decodeResponse = async (decoder: any) => ({
        topicName: decoder.readString(),
        partitions: await decoder.readArrayAsync(decodePartition),
      });
    
      //takes in above
      const decode = async (rawData: any) => {
        const decoder = new Decoder(rawData);
        decoder.offset = 8;
    
        const responses = await decoder.readArrayAsync(decodeResponse);
    
        return {
          responses,
        };
      };
    
      //step 6 - send it
      //**ENCODING AND SENDING */
    
      const dcd = new TextDecoder();


      const writer = await writeAll(conn, consu

meRequest.buf);
      const response = new Uint8Array(512);
    
      //**GETTING RESPONSE */
      await conn.read(response);
      console.log('response:', response);
    
      /**DECODING RESPONSE */
      const newBuff = await new Buffer(response);
      console.log('new buff', newBuff);
      const decoded = await decode(newBuff);
    
    }
    
    func();
    
    
4

0 回答 0