我有一个作为箭头记录批次流的结果集,我使用 reader.read_chunk() 来获取批次,我已将批次推送到批次数组并将其转换为字节数组作为响应。因为这里是代码
def getBatchStreambytes(_):
reader = client.do_get(flight_info.endpoints[0].ticket, options)
print('[INFO] Reading query results from Dremio Server ')
batches = []
while True:
try:
batch, metadata = reader.read_chunk()
print(batch.num_rows)
batches.append(batch)
except Exception as exception:
break
data = pa.Table.from_batches(batches)
sink = pa.BufferOutputStream()
writer = pa.RecordBatchStreamWriter(sink, data.schema)
writer.write_table(data)
writer.close()
#print(reader.read_pandas())
return sink.getvalue().to_pybytes()
由于每个批次的 API 响应需要更多时间,如何将批次作为响应发送,以便 api 以迭代批次进行响应,我如何发送批次块。这是我从飞行服务器收到的批次列表的屏幕截图