7

我正在努力使用 Python 和请求访问流式 API。

API 内容:“我们启用了流式端点,以使用持久的 HTTP 套接字连接来请求报价和交易数据。来自 API 的流式数据包括发出经过身份验证的 HTTP 请求并保持 HTTP 套接字打开以持续接收数据。”

我是如何尝试访问数据的:

s = requests.Session()
def streaming(symbols):
    url = 'https://stream.tradeking.com/v1/market/quotes.json'
    payload = {'symbols': ','.join(symbols)}
    return s.get(url, params=payload, stream=True)  
r = streaming(['AAPL', 'GOOG'])

此处的请求文档显示了两件有趣的事情: 使用生成器/迭代器来处理在数据字段中传递的分块数据。对于流数据,它建议使用如下代码:

for line in r.iter_lines():
    print(line)

两者似乎都不起作用,虽然我不知道在生成器函数中放什么,因为这个例子不清楚。使用 r.iter_lines(),我得到输出:"b'{"status":"connected"}{"status":disconnected"}'"

我可以访问标头,响应是 HTTP 200,但无法获取有效数据,或者找到有关如何在 python 中访问流式 HTTP 数据的明确示例。任何帮助,将不胜感激。API 建议使用 Jetty for Java 来保持流打开,但我不确定如何在 Python 中执行此操作。

标头:{'connection': 'keep-alive', 'content-type': 'application/json', 'x-powered-by': 'Express', 'transfer-encoding': 'chunked'}

4

3 回答 3

10

正如verbsintransit 所说,您需要解决身份验证问题,但是可以使用以下示例解决流式传输问题:

s = requests.Session()

def streaming(symbols):
    payload = {'symbols': ','.join(symbols)}
    headers = {'connection': 'keep-alive', 'content-type': 'application/json', 'x-powered-by': 'Express', 'transfer-encoding': 'chunked'}
    req = requests.Request("GET",'https://stream.tradeking.com/v1/market/quotes.json',
                           headers=headers,
                           params=payload).prepare()

    resp = s.send(req, stream=True)

    for line in resp.iter_lines():
        if line:
            yield line


def read_stream():

    for line in streaming(['AAPL', 'GOOG']):
        print line


read_stream()

if line:条件是检查它是line实际消息还是只是连接保持活动状态。

于 2013-07-24T00:17:30.140 回答
3

不确定您是否发现了这一点,但 TradeKing 不会在他们的 JSON blob 之间添加换行符。因此,您必须使用 iter_content 逐字节获取它,将该字节附加到缓冲区,尝试解码缓冲区,成功清除缓冲区并生成结果对象。:(

于 2014-02-20T18:48:02.070 回答
0
import requests
from requests_oauthlib import OAuth1


def streaming(symbols):
    consumer_key     = '***'
    consumer_secret  = '***'
    access_token     = '***'
    access_secret    = '***'

    auth = OAuth1(consumer_key,
        client_secret = consumer_secret,
        resource_owner_key = access_token,
        resource_owner_secret = access_secret)
        
    payload = {'symbols': ','.join(symbols)}
    resp = requests.Session().request("GET",'https://stream.tradeking.com/v1/market/quotes.json',stream=True,auth=auth,params=payload)
    # resp.raise_for_status()
    
    for chunk in resp.iter_content(chunk_size=1):
        if chunk:
            yield chunk.decode('utf8')

#try this
for line in streaming(['AAPL', 'GOOG']):
    print(line)
于 2020-10-13T17:50:31.340 回答