4

我正在编写一段代码,当缓冲区(列表)增长到一定大小时,它将填充 mongoDB 集合。

import scala.actors.Actor
import com.mongodb.casbah.Imports._
import scala.collection.mutable.ListBuffer

class PopulateDB extends Actor {
  val buffer = new ListBuffer[DBObject]
  val mongoConn = MongoConnection()
  val mongoCol = mongoConn("casbah_test")("logs")

  def add(info: DBObject = null) {
    if (info != null) buffer += info

    if (buffer.size > 0 && (info == null || buffer.length >= 1000)) {
      mongoCol.insert(buffer.toList)
      buffer.clear
      println("adding a batch")
    }
  }

  def act() {
    loop {
      react {
        case info: DBObject => add(info)

        case msg if msg == "closeConnection" =>
          println("Close connection")
          add()
          mongoConn.close
      }
    }
  }
}

但是,当我运行以下代码时,scala 偶尔会在“mongoCol.insert(buffer.toList)”行上抛出“ConcurrentModificationException”。我很确定它与“mongoCol.insert”有关。我想知道代码是否有任何根本性的错误。或者我应该使用 Akka 的“atomic {...}”之类的东西来避免这个问题。

这是完整的堆栈跟踪:

PopulateDB@7e859a68: caught java.util.ConcurrentModificationException
java.util.ConcurrentModificationException
    at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:373)
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:392)
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:391)
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:113)
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:67)
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:215)
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:180)
    at com.mongodb.DBCollection.insert(DBCollection.java:85)
    at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:561)
    at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:864)
    at PopulateDB.add(PopulateDB.scala:14)
    at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:26)
    at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:25)
    at scala.actors.ReactorTask.run(ReactorTask.scala:34)
    at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129)
    at PopulateDB.scala$actors$ReplyReactor$$super$resumeReceiver(PopulateDB.scala:5)
    at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:69)
    at PopulateDB.resumeReceiver(PopulateDB.scala:5)
    at scala.actors.Actor$class.searchMailbox(Actor.scala:478)
    at PopulateDB.searchMailbox(PopulateDB.scala:5)
    at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
    at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
    at scala.actors.ReactorTask.run(ReactorTask.scala:36)
    at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611)
    at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)

谢谢,德里克

4

2 回答 2

4

DBObject不是线程安全的;您正在发送一个 DBObject 与您的演员消息。稍后可能会再次对其进行修改,这将导致并发修改问题。

我建议首先尝试clone()在 DBObject 进入演员时使用它,然后将其放入缓冲区中。它只是一个浅拷贝,但至少应该足以在支持 DBObject 上的键的 LinkedHashMap 上引起并发修改问题(借助 LHM,它保持有序)。

我会尝试:

  def act() {
    loop {
      react {
        case info: DBObject => add(info.clone())

        case msg if msg == "closeConnection" =>
          println("Close connection")
          add()
          mongoConn.close
      }
    }
  }

如果这不起作用,请在将 DBObject 发送到 Actor 后查看您正在修改的其他任何地方。

于 2011-04-24T21:00:09.333 回答
1

为什么class在下面?

class PopulateDB extends Actor

你有多个PupulateDB演员吗?我希望object PopulateDB extends Actor,这样一个演员就可以集中精力完成这项任务。

除此之外,问题似乎出在 casbah 或 mongodb 本身。

于 2011-04-24T00:12:58.090 回答