我们有一个Play 2.2.1应用程序,它使用插件jongo来管理 MongoDB 访问和对象映射。这个插件使用一个名为bson4jackson的库来为 Jackson JSON 处理器添加对 BSON 的支持。
我们可以处理数百 MB 的 UTF8 数据,但随机地,我们有一个永不结束的线程,并且占用了 CPU 的一个线程的 100%。使用相同的数据集,错误可能发生也可能不发生。
这是线程的调用堆栈:
application-akka.actor.default-dispatcher-12 [RUNNABLE]
de.undercouch.bson4jackson.io.DynamicOutputBuffer.putUTF8(int, String)
de.undercouch.bson4jackson.io.DynamicOutputBuffer.putUTF8(String)
de.undercouch.bson4jackson.BsonGenerator._writeCString(String)
de.undercouch.bson4jackson.BsonGenerator._writeString(String)
de.undercouch.bson4jackson.BsonGenerator.writeString(String)
com.fasterxml.jackson.databind.ObjectWriter.writeValue(OutputStream, Object)
org.jongo.marshall.jackson.JacksonEngine.marshall(Object)
org.jongo.Insert.marshallDocument(Object)
org.jongo.Insert.createDBObjectToInsert(Object)
org.jongo.Insert.save(Object)
org.jongo.MongoCollection.save(Object)
models.audits.PageEntityCollection.save(PageEntity)
services.audit.PageBean.finish()
services.audit.PageBean.auditChangeState(AuditState)
services.audit.dispatcher.SchedulerAPI.schedule(PageBean)
services.audit.dispatcher.PageAuditManager.startAudit(String, String, String, String, String)
controllers.HARReceiver.launchAudit(JsonNode, String)
controllers.HARReceiver$1.run()
akka.dispatch.TaskInvocation.run()
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
scala.concurrent.forkjoin.ForkJoinTask.doExec()
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinTask)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool$WorkQueue)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
该方法的代码如下: 链接到类
public int putUTF8(int pos, String s) {
ByteBuffer minibb = null;
CharsetEncoder enc = getUTF8Encoder();
CharBuffer in = CharBuffer.wrap(s);
int pos2 = pos;
ByteBuffer bb = getBuffer(pos2);
int index = pos2 % _bufferSize;
bb.position(index);
while (in.remaining() > 0) {
CoderResult res = enc.encode(in, bb, true);
//flush minibb first
if (bb == minibb) {
bb.flip();
while (bb.remaining() > 0) {
putByte(pos2, bb.get());
++pos2;
}
} else {
pos2 += bb.position() - index;
}
if (res.isOverflow()) {
if (bb.remaining() > 0) {
//exceeded buffer boundaries; write to a small temporary buffer
if (minibb == null) {
minibb = ByteBuffer.allocate(4);
}
minibb.rewind();
bb = minibb;
index = 0;
} else {
bb = getBuffer(pos2);
index = pos2 % _bufferSize;
bb.position(index);
}
} else if (res.isError()) {
try {
res.throwException();
} catch (CharacterCodingException e) {
throw new RuntimeException("Could not encode string", e);
}
}
}
adaptSize(pos2);
return pos2 - pos;
}
我们发现线程一直在运行并停留在线程上,while (in.remaining() > 0)
因为我们看到了对方法的重复调用encode()
。
我们真的不明白为什么会这样。我们的团队在 I/O 方面没有主要技能,我们很乐意得到问题出在哪里的提示,或者调试方法。