我已经设置了一个 Flume 服务,它可以监视 Netcat 或使用 Exec 作为源来跟踪日志,诸如此类。我使用 Memory 作为通道,并将 Avro 作为接收器(文档中指定了 Thrift,但似乎在 Flume 1.3 或 1.4 中不起作用)
我已经设置了一个 C# 套接字服务器来接收消息,并且我得到了一个字节流。如果我使用 Encoding.UTF8.GetString(buffer) 读取它们,那么我可以看到如下内容:
"\0\0\0\0\0\0\0\0\00�����Tt������5\ne\0�����Tt������5\ne\0\0appendBatch\0\0�\0�127.0.0.1 - - [12/Nov/2013:22:42:50 +0000] \"GET /docs/appdev/index.html HTTP/1.1\" 200 7645\0�127.0.0.1 - - [12/Nov/2013:22:44:07 +0000] \"GET /docs/appdev/introduction.html HTTP/1.1\" 200 8619\0�127.0.0.1 - - [12/Nov/2013:22:44:09 +0000] \"GET /docs/appdev/installation.html HTTP/1.1\" 200 9045\0�127.0.0.1 - - [12/Nov/2013:22:44:12 +0000] \"GET /docs/appdev/deployment.html HTTP/1.1\" 200 18800\0�127.0.0.1 - - [12/Nov/2013:22:49:07 +0000] \"GET /docs/appdev/source.html HTTP/1.1\" 200 24554\0�127.0.0.1 - - [12/Nov/2013:22:50:38 +0000] \"GET /docs/appdev/processes.html HTTP/1.1\" 200 30743\0�127.0.0.1 - - [12/Nov/2013:22:51:39 +0000] \"GET /docs/appdev/sample/ HTTP/1.1\" 200 1852\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:22:51:48 +0000] \"GET /sample HTTP/1.1\" 404 963\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:22:51:48 +0000] \"GET /favicon.ico HTTP/1.1\" 200 21630\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:23:02:13 +0000] \"GET /sample HTTP/1.1\" 404 963\0"
所以很明显我正在获取数据,但我想正确地反序列化它而不是进行某种正则表达式提取。我可以看到有一个官方的 Avro C# 库,还有一个带有反序列化库的 Microsoft Hadoop 库。我创建了一个本地对象来反序列化:
[DataContract]
public class AvroEvent
{
[DataMember]
public byte[] Body { get; set; }
}
并尝试反序列化:
client = serverSocket.EndAccept(result);
var myNetworkStream = new NetworkStream(client);
myNetworkStream.Read(buffer, 0, size);
var avro = new AvroSerializer(typeof(AvroEvent));
var deser = avro.Deserialize(myNetworkStream);
然后我得到这个错误:
System.InvalidOperationException was unhandled
HResult=-2146233079
Message=Unexpected number of bytes.
Source=Microsoft.Hadoop.Avro
我几乎肯定会以错误的方式处理这一切,而且我敢肯定人们会告诉我不要使用 C#,但我在 Google 上几乎没有资源,所以如果其他人真的有这样做并指出我正确的方向,我将不胜感激
托比