6

我有一个 JSON-RPC 服务,其中一个请求返回一个连续的 JSON 对象流。

IE :

{id:'1'}
{id:'2'}
//30 minutes of no data
{id:'3'}
//...

当然,没有 Content-Length,因为流是无穷无尽的。

我正在使用自定义 TStream 后代来接收和解析数据。但是在内部缓冲数据并且在收到字节TIdHttp之前不会将其传递给我。RecvBufferSize

这导致:

{id:'1'} //received
{id:'2'} //buffered by Indy but not received
//30 minutes of no data
{id:'3'} //this is where Indy commits {id:'2'} to me

显然这不会发生,因为 30 分钟前重要的消息应该在 30 分钟前传递。

我希望 Indy 做套接字所做的事情:如果有可用数据,则读取到 RecvBufferSize 或更少并立即返回。

我发现这个讨论从 2005 年开始,一些可怜的人试图向 Indy 开发人员解释这个问题,但他们不理解他。(读它;这是一个悲伤的景象)

无论如何,他通过编写自定义 IOHandler 后代来解决这个问题,但那是在 2005 年,也许今天有一些现成的解决方案?

4

4 回答 4

4

对我来说听起来像是一个WebSocket任务,因为您的连接不再是面向简单的 HTTP 问题/答案,而是一个内容流。

有关某些代码,请参阅Delphi 的 WebSocket 服务器实现。

至少有一个基于 Indy的,来自 AsmProfiler 的作者。

AFAIK websockets中有两种流:二进制和文本。从 websocket 的角度来看,我怀疑您的 JSON 流是一些文本内容。

另一种选择是使用长池或一些更旧的协议,它们对 rooter 更友好 - 当连接切换到 websockets 模式时,它不再是标准 HTTP,所以一些“明智的”数据包检查工具(在公司网络上) 可能会将其识别为安全攻击(例如 DoS),因此可能会停止连接。

于 2013-03-25T14:54:56.420 回答
2

虽然使用 TCP 流是一种选择,但最后我还是采用了编写自定义TIdIOHandlerStack后代的原始解决方案。

动机是使用 TIdHTTP 我知道什么不起作用,只需要修复它,而切换到较低级别的 TCP 意味着可能会出现新问题。

这是我正在使用的代码,我将在这里讨论关键点。

TIdStreamIoHandler必须继承自TIdIOHandlerStack.

需要重写两个函数:ReadBytesReadStream

function TryReadBytes(var VBuffer: TIdBytes; AByteCount: Integer;
  AAppend: Boolean = True): integer; virtual;
procedure ReadStream(AStream: TStream; AByteCount: TIdStreamSize = -1;
  AReadUntilDisconnect: Boolean = False); override;

两者都是修改后的 Indy 函数,可以在IdIOHandler.TIdIOHandler. 在子句ReadByteswhile必须替换为单个ReadFromSource()请求,以便TryReadBytes在一次读取最多 AByteCount 个字节后返回。

基于此,ReadStream必须处理 AByteCount (>0, <0) 和 ReadUntilDisconnect (true, false) 的所有组合以循环读取然后写入从套接字到达的流数据块。

请注意,ReadStream如果套接字中只有部分请求的数据可用,即使在此流版本中也不需要提前终止。它只需要立即将该部分写入流而不是将其缓存在 中FInputBuffer,然后阻塞并等待下一部分数据。

于 2013-04-24T07:22:35.860 回答
2

您不需要编写 IOHandler 后代,TIdTCPClient类已经可以。它公开了一个TIdIOHandler对象,该对象具有从套接字读取的方法。这些 ReadXXX 方法会一直阻塞,直到请求的数据被读取或发生超时。只要连接存在,ReadXXX 就可以循环执行,每当它接收到新的 JSON 对象时,将其传递给应用程序逻辑。

您的示例看起来像所有 JSON 对象只有一行。然而,JSON 对象可以是多行的,在这种情况下,客户端代码需要知道它们是如何分开的。


更新:在“流式”HTTP JSON Web 服务的类似 Stackoverflow 问题(针对 .Net)中,最受好评的解决方案使用较低级别的 TCP 客户端而不是 HTTP 客户端:Reading data from an open HTTP stream

于 2013-03-25T14:25:16.620 回答
0

在分块编码传输模式下传输的数据包内容之前实际上有一个长度数据。idhttp 的 IOhandler 使用这个长度数据,一包一包地读取流。最小有意义的单位是一个数据包,所以不需要从一个数据包中一个一个地读取字符,也不需要改变IOHandler的功能。唯一的问题是 idhttp 不会停止将流数据转到下一步,因为流数据无穷无尽:没有结束数据包。所以解决方案是使用 idhttp onwork 事件触发从流中读取并将流位置设置为零以避免溢出。像这样:

    //add a event handler to idhttp    
    IdHTTP.OnWork :=  IdHTTPWork;


    procedure  TRatesStreamWorker.IdHTTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
    begin
         .....
         ResponseStringStream.Position :=0; 
         s:=ResponseStringStream.ReadString(ResponseStringStream.Size) ;//this is the packet conten
         ResponseStringStream.Clear;
         ... 
    end;

procedure TForm1.ButtonGetStreamPricesClick(Sender: TObject);
var
 begin
    .....    
    source := RatesWorker.RatesURL+'EUR_USD';  
    RatesWorker.IdHTTP.Get(source,RatesWorker.ResponseStringStream);  
 end;

然而,使用 Tstream 的自定义 write() 函数可能是这种需求的更好解决方案。

于 2016-03-01T03:15:26.523 回答