我正在使用简单的 requests.post() 模块连接到服务器并接收数据
ack = requests.post('<ip>/get_data, data=data, timeout=10.0, verify=False)
下面是服务器中的 get_data() 方法,它使用 zmq 轮询器接收数据,直到获得最终响应。
def get_data():
req = json.loads(self.params.get('data'))
wire = wiring.Wire("indexing_data_pool", zmq_context=g.zmq_context)
try:
close_immediately = True
poll_agent = zmq.Poller()
poll_agent.register(wire.socket, zmq.POLLIN)
wire.send(req)
iteration = 1
while True:
socks = dict(poll_agent.poll())
if wire.socket in socks and socks[wire.socket] == zmq.POLLIN:
res = gevent.with_timeout(10, wire.recv, timeout_value=None)
if res.get('final'):
log.warn('Last Iteration: %s, Length of Rows: %s' %(iteration, len(res['rows'])))
break
else:
log.warn('Iteration: %s, Length of Rows: %s' %(iteration, len(res['rows'])))
next_req = {'search_id': req['search_id'], 'seen_version':res.get('response_version')}
wire.send(next_req)
iteration +=1
finally:
wire.close(immediate=close_immediately)
poll_agent.unregister(wire.socket)
在服务器端,获得的响应记录如下:
2017-08-28_07:17:55.43370 WARNING: Iteration: 1, Length of Rows: 100
2017-08-28_07:17:55.44269 WARNING: Iteration: 2, Length of Rows: 100
2017-08-28_07:17:55.44894 WARNING: Iteration: 3, Length of Rows: 100
2017-08-28_07:17:55.45742 WARNING: Iteration: 4, Length of Rows: 100
2017-08-28_07:17:55.46327 WARNING: Iteration: 5, Length of Rows: 100
2017-08-28_07:17:55.46687 WARNING: Iteration: 6, Length of Rows: 100
2017-08-28_07:17:55.47074 WARNING: Iteration: 7, Length of Rows: 100
2017-08-28_07:17:55.47658 WARNING: Iteration: 8, Length of Rows: 100
2017-08-28_07:17:55.48385 WARNING: Last Iteration: 9, Length of Rows: 75
所以,我假设我在服务器端实现的 zmq poller 工作得很好。但是,我很想知道如何将这 9 次数据迭代发回给请求的客户端?
PS我想在客户端连续接收数据。您可能会建议在某处附加每批响应并将最终响应发送回客户端。当响应太大时这是不可行的(请求客户端会超时)