1

我们有一个Play 2.2.1应用程序,它使用插件jongo来管理 MongoDB 访问和对象映射。这个插件使用一个名为bson4jackson的库来为 Jackson JSON 处理器添加对 BSON 的支持。

我们可以处理数百 MB 的 UTF8 数据,但随机地,我们有一个永不结束的线程,并且占用了 CPU 的一个线程的 100%。使用相同的数据集,错误可能发生也可能不发生。

这是线程的调用堆栈:

application-akka.actor.default-dispatcher-12 [RUNNABLE]
de.undercouch.bson4jackson.io.DynamicOutputBuffer.putUTF8(int, String)
de.undercouch.bson4jackson.io.DynamicOutputBuffer.putUTF8(String)
de.undercouch.bson4jackson.BsonGenerator._writeCString(String)
de.undercouch.bson4jackson.BsonGenerator._writeString(String)
de.undercouch.bson4jackson.BsonGenerator.writeString(String)
com.fasterxml.jackson.databind.ObjectWriter.writeValue(OutputStream, Object)
org.jongo.marshall.jackson.JacksonEngine.marshall(Object)
org.jongo.Insert.marshallDocument(Object)
org.jongo.Insert.createDBObjectToInsert(Object)
org.jongo.Insert.save(Object)
org.jongo.MongoCollection.save(Object)
models.audits.PageEntityCollection.save(PageEntity)
services.audit.PageBean.finish()
services.audit.PageBean.auditChangeState(AuditState)
services.audit.dispatcher.SchedulerAPI.schedule(PageBean)
services.audit.dispatcher.PageAuditManager.startAudit(String, String, String, String,    String)
controllers.HARReceiver.launchAudit(JsonNode, String)
controllers.HARReceiver$1.run()
akka.dispatch.TaskInvocation.run()
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
scala.concurrent.forkjoin.ForkJoinTask.doExec()
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinTask)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool$WorkQueue)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run()

该方法的代码如下: 链接到类

    public int putUTF8(int pos, String s) {
            ByteBuffer minibb = null;

            CharsetEncoder enc = getUTF8Encoder();
            CharBuffer in = CharBuffer.wrap(s);

            int pos2 = pos;
            ByteBuffer bb = getBuffer(pos2);
            int index = pos2 % _bufferSize;
            bb.position(index);

            while (in.remaining() > 0) {
                    CoderResult res = enc.encode(in, bb, true);

                    //flush minibb first
                    if (bb == minibb) {
                            bb.flip();
                            while (bb.remaining() > 0) {
                                    putByte(pos2, bb.get());
                                    ++pos2;
                            }
                    } else {
                            pos2 += bb.position() - index;
                    }

                    if (res.isOverflow()) {
                            if (bb.remaining() > 0) {
                                    //exceeded buffer boundaries; write to a small temporary buffer
                                    if (minibb == null) {
                                            minibb = ByteBuffer.allocate(4);
                                    }
                                    minibb.rewind();
                                    bb = minibb;
                                    index = 0;
                            } else {
                                    bb = getBuffer(pos2);
                                    index = pos2 % _bufferSize;
                                    bb.position(index);
                            }
                    } else if (res.isError()) {
                            try {
                                    res.throwException();
                            } catch (CharacterCodingException e) {
                                    throw new RuntimeException("Could not encode string", e);
                            }
                    }
            }

            adaptSize(pos2);
            return pos2 - pos;
    }

我们发现线程一直在运行并停留在线程上,while (in.remaining() > 0)因为我们看到了对方法的重复调用encode()

我们真的不明白为什么会这样。我们的团队在 I/O 方面没有主要技能,我们很乐意得到问题出在哪里的提示,或者调试方法。

4

0 回答 0