1

我正在尝试使用 Apache Camel Mina 作为我的套接字服务器来接收字节流。我正在使用 Apache Camel 2.12.1,这是我的简单路线:

<route id="retriever">
  <from uri="mina2:tcp://127.0.0.1:5555?sync=false" />
  <convertBodyTo type="java.lang.String" />
  <to uri="file:temp/out" />
</route>

我可以完美地启动路由并使用 telnet 发送数据。当我使用简单的 Java 测试客户端发送数据时,我的问题就出现了:

byte[] myData = {0x34, 0x12, 0x25, 0x34};
Socket socket = new Socket("127.0.0.1", 5555);
OutputStream os = socket.getOutputStream();
os.write(myData, 0, myData.length);
os.flush();
socket.close(); 

使用此客户端时,我在任何地方都没有收到任何异常,但数据没有进入骆驼路线。我一直在尝试实现自己的编解码器,并检查 MINA 是否正在接收数据,但我不确定对于这种简单的情况是否需要特殊的编解码器。我只想检索字节数组并保存它。

所以我的问题是:我做错了什么?为什么默认的 mina2 编解码器不适用于我的场景?我是否错过了 mina 端点中的任何特殊选项来允许这样做?

谢谢!

4

1 回答 1

2

如果您的意思是 TextLineCodecFactory 那么您应该检查源代码。此编解码器的解码器使用分隔符(框架正在运行的操作系统的新行)。

TextLineCodecFactory , TextLineDecoder , LineDelimiter

检查解码器。特别检查这部分

226     private void decodeAuto(Context ctx, IoSession session, IoBuffer in, ProtocolDecoderOutput out)
227             throws CharacterCodingException, ProtocolDecoderException {
228         int matchCount = ctx.getMatchCount();
229 
230         // Try to find a match
231         int oldPos = in.position();
232         int oldLimit = in.limit();
233 
234         while (in.hasRemaining()) {
235             byte b = in.get();
236             boolean matched = false;
237 
238             switch (b) {
239             case '\r':
240                 // Might be Mac, but we don't auto-detect Mac EOL
241                 // to avoid confusion.
242                 matchCount++;
243                 break;
244 
245             case '\n':
246                 // UNIX
247                 matchCount++;
248                 matched = true;
249                 break;
250 
251             default:
252                 matchCount = 0;
253             }
254 
255             if (matched) {
256                 // Found a match.
257                 int pos = in.position();
258                 in.limit(pos);
259                 in.position(oldPos);
260 
261                 ctx.append(in);
262 
263                 in.limit(oldLimit);
264                 in.position(pos);
265 
266                 if (ctx.getOverflowPosition() == 0) {
267                     IoBuffer buf = ctx.getBuffer();
268                     buf.flip();
269                     buf.limit(buf.limit() - matchCount);
270 
271                     try {
272                         byte[] data = new byte[buf.limit()];
273                         buf.get(data);
274                         CharsetDecoder decoder = ctx.getDecoder();
275 
276                         CharBuffer buffer = decoder.decode(ByteBuffer.wrap(data));
277                         String str = new String(buffer.array());
278                         writeText(session, str, out);
279                     } finally {
280                         buf.clear();
281                     }
282                 } else {
283                     int overflowPosition = ctx.getOverflowPosition();
284                     ctx.reset();
285                     throw new RecoverableProtocolDecoderException("Line is too long: " + overflowPosition);
286                 }
287 
288                 oldPos = pos;
289                 matchCount = 0;
290             }
291         }
292 
293         // Put remainder to buf.
294         in.position(oldPos);
295         ctx.append(in);
296 
297         ctx.setMatchCount(matchCount);
298     }

而这部分

180     public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
181         Context ctx = getContext(session);
182 
183         if (LineDelimiter.AUTO.equals(delimiter)) {
184             decodeAuto(ctx, session, in, out);
185         } else {
186             decodeNormal(ctx, session, in, out);
187         }
188     }

当它填充 IoBuffer 时,它使用一个新行作为分隔符,所以如果你不添加一个,那么它将继续等待它。尚未对此进行测试,但我相信这是问题所在。只需尝试在最后发送一个带有新行的字符串。将其转换为字节,看看会发生什么。

如果要传输数据,则必须使用某种协议来设置发送器和接收器将使用的规则,以便设置和结束通信。

于 2013-12-05T21:45:53.547 回答