0

我遇到了一种情况,我们需要使用普通的 gRPC 客户端(通过grpc.aioAPI)与Arrow Flight gRPC 服务器通信。

DoGet调用确实到达了服务器,我们收到了 响应FlightData。如果我们对FlightgRPC定义的理解是正确的,那么响应会包含一条flatbuffers消息,该消息可以以某种方式解码为RecordBatch.

以下是客户端代码,

import asyncio
import pathlib

import grpc
import pyarrow as pa
import pyarrow.flight as pf

import flight_pb2, flight_pb2_grpc

async def main():
    ticket = pf.Ticket("tick")
    sock_file = pathlib.Path.cwd().joinpath("arena.sock").resolve()
    async with grpc.aio.insecure_channel(f"unix://{sock_file}") as channel:
        stub = flight_pb2_grpc.FlightServiceStub(channel)
        async for data in stub.DoGet(flight_pb2.Ticket(ticket=ticket.ticket)):
            assert type(data) is flight_pb2.FlightData
            print(data)
            # How to convert data into a RecordBatch?

asyncio.run(main())

目前,我们坚持解码FlightData响应的最后一步。

问题有两个,

  1. 是否有一些现有的设施形式pyarrow.flight可以用来解码该类型的 pythongrpc对象?FlightData
  2. 如果 #1 是不可能的,还有哪些其他选项可以解码内容并从头开始FlightData重建 a ?RecordBatch

这里的主要兴趣是使用普通gRPC客户端的 AsyncIO。据说,这对于当前版本的 Arrow Flight gRPC 客户端是不可行的。

4

1 回答 1

2

pyarrow.flight 中确实没有为此公开任何实用程序。

除了其他内容之外,ArrowData 还包含 Arrow IPC 标头和正文。因此,您可以改为使用pyarrow.ipc. 这是一个例子:

import asyncio
import pathlib
import struct

import grpc
import pyarrow as pa
import pyarrow.flight as pf

import Flight_pb2, Flight_pb2_grpc

async def main():
    ticket = pf.Ticket("tick")
    async with grpc.aio.insecure_channel("localhost:1234") as channel:
        stub = Flight_pb2_grpc.FlightServiceStub(channel)
        schema = None
        async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
            # 4 bytes: Need IPC continuation token
            token = b'\xff\xff\xff\xff'
            # 4 bytes: message length (little-endian)
            length = struct.pack('<I', len(data.data_header))
            buf = pa.py_buffer(token + length + data.data_header + data.data_body)
            message = pa.ipc.read_message(buf)
            print(message)
            if schema is None:
                # This should work but is unimplemented
                # print(pa.ipc.read_schema(message))
                schema = pa.ipc.read_schema(buf)
                print(schema)
            else:
                batch = pa.ipc.read_record_batch(message, schema)
                print(batch)
                print(batch.to_pydict())

asyncio.run(main())

服务器:

import pyarrow.flight as flight
import pyarrow as pa

class TestServer(flight.FlightServerBase):
    def do_get(self, context, ticket):
        table = pa.table([[1,2,3,4]], names=["a"])
        return flight.RecordBatchStream(table)

TestServer("grpc://localhost:1234").serve()

有一些关于异步 Flight API 的讨论,如果您想加入,请加入dev@ 邮件列表。

于 2022-01-08T02:49:14.410 回答