0

我有一个作为箭头记录批次流的结果集,我使用 reader.read_chunk() 来获取批次,我已将批次推送到批次数组并将其转换为字节数组作为响应。因为这里是代码

def getBatchStreambytes(_):
   reader = client.do_get(flight_info.endpoints[0].ticket, options)
            print('[INFO] Reading query results from Dremio Server ')
            batches = [] 
            while True:
                     try:
                            batch, metadata = reader.read_chunk()
                            print(batch.num_rows)
                            batches.append(batch)

                except Exception as exception:
                        break

        data = pa.Table.from_batches(batches)
        sink = pa.BufferOutputStream()
        writer = pa.RecordBatchStreamWriter(sink, data.schema)
        writer.write_table(data)
        writer.close()
        #print(reader.read_pandas())

        return  sink.getvalue().to_pybytes()

由于每个批次的 API 响应需要更多时间,如何将批次作为响应发送,以便 api 以迭代批次进行响应,我如何发送批次块。这是我从飞行服务器收到的批次列表的屏幕截图 在此处输入图像描述

4

0 回答 0