我遇到了一种情况,我们需要使用普通的 gRPC 客户端(通过grpc.aio
API)与Arrow Flight gRPC 服务器通信。
DoGet
调用确实到达了服务器,我们收到了 响应FlightData
。如果我们对FlightgRPC
定义的理解是正确的,那么响应会包含一条flatbuffers
消息,该消息可以以某种方式解码为RecordBatch
.
以下是客户端代码,
import asyncio
import pathlib
import grpc
import pyarrow as pa
import pyarrow.flight as pf
import flight_pb2, flight_pb2_grpc
async def main():
ticket = pf.Ticket("tick")
sock_file = pathlib.Path.cwd().joinpath("arena.sock").resolve()
async with grpc.aio.insecure_channel(f"unix://{sock_file}") as channel:
stub = flight_pb2_grpc.FlightServiceStub(channel)
async for data in stub.DoGet(flight_pb2.Ticket(ticket=ticket.ticket)):
assert type(data) is flight_pb2.FlightData
print(data)
# How to convert data into a RecordBatch?
asyncio.run(main())
目前,我们坚持解码FlightData
响应的最后一步。
问题有两个,
- 是否有一些现有的设施形式
pyarrow.flight
可以用来解码该类型的 pythongrpc
对象?FlightData
- 如果 #1 是不可能的,还有哪些其他选项可以解码内容并从头开始
FlightData
重建 a ?RecordBatch
这里的主要兴趣是使用普通gRPC客户端的 AsyncIO。据说,这对于当前版本的 Arrow Flight gRPC 客户端是不可行的。