1

我知道我做错了什么,mutable.ListBuffer但我不知道如何解决它(以及对问题的正确解释)。

我简化了下面的代码以重现该行为。

我基本上是在尝试并行运行函数以在处理我的第一个列表时将元素添加到列表中。我最终“失去”了元素。

import java.util.Properties

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}


import scala.concurrent.{ExecutionContext}
import ExecutionContext.Implicits.global


object MyTestObject {

  var listBufferOfInts = new ListBuffer[Int]() // files that are processed

  def runFunction(): Int = {
    listBufferOfInts = new ListBuffer[Int]()
    val inputListOfInts = 1 to 1000
    val fut = Future.traverse(inputListOfInts) { i =>
      Future {
        appendElem(i)
      }
    }
    Await.ready(fut, Duration.Inf)
    listBufferOfInts.length
  }

  def appendElem(elem: Int): Unit = {
    listBufferOfInts ++= List(elem)
  }
}

MyTestObject.runFunction()
MyTestObject.runFunction()
MyTestObject.runFunction()

返回:

res0: Int = 937
res1: Int = 992
res2: Int = 997

显然,我希望1000一直返回。如何修复我的代码以保持“架构”但让我的 ListBuffer“同步”?

4

3 回答 3

4

我不知道您所说的简化了它的确切问题是什么,但是您仍然有明显的竞争条件,多个线程修改了一个可变集合,这非常糟糕。正如其他答案指出的那样,您需要一些锁定,以便只有一个线程可以同时修改集合。如果您的计算量很大,则以同步方式将结果附加到缓冲区不应显着影响性能,但如有疑问,请始终进行测量。

但是不需要同步,您可以做其他事情,而不需要 vars 和可变状态。让每个Future返回您的部分结果,然后将它们合并到一个列表中,实际上Future.traverse就是这样做的。

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global

def runFunction: Int = {
  val inputListOfInts = 1 to 1000

  val fut: Future[List[Int]] = Future.traverse(inputListOfInts.toList) { i =>
    Future { 
      // some heavy calculations on i
      i * 4
    }
  }

  val listOfInts = Await.result(fut, Duration.Inf)
  listOfInts.size
}

Future.traverse已经为您提供了一个包含所有结果的不可变列表,无需将它们附加到可变缓冲区。不用说,你总会1000回来的。

@ List.fill(10000)(runFunction).exists(_ != 1000) 
res18: Boolean = false
于 2016-05-19T08:36:48.837 回答
1

我不确定上面是否显示了您正在尝试正确执行的操作。也许问题在于您实际上正在共享一个您在 runFunction 中重新初始化的 var ListBuffer。

当我把它拿出来时,我收集了所有我期望正确的事件:

import java.util.Properties

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }

import scala.concurrent.{ ExecutionContext }
import ExecutionContext.Implicits.global

object BrokenTestObject extends App {

  var listBufferOfInts = ( new ListBuffer[Int]() )

  def runFunction(): Int = {
    val inputListOfInts = 1 to 1000
    val fut = Future.traverse(inputListOfInts) { i =>
      Future {
        appendElem(i)
      }
    }
    Await.ready(fut, Duration.Inf)
    listBufferOfInts.length
  }

  def appendElem(elem: Int): Unit = {
    listBufferOfInts.append( elem )
  }

  BrokenTestObject.runFunction()
  BrokenTestObject.runFunction()
  BrokenTestObject.runFunction()

  println(s"collected ${listBufferOfInts.length} elements")
}

如果您确实有同步问题,您可以使用以下内容:

import java.util.Properties

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }

import scala.concurrent.{ ExecutionContext }
import ExecutionContext.Implicits.global

class WrappedListBuffer(val lb: ListBuffer[Int]) {
  def append(i: Int) {
    this.synchronized {
      lb.append(i)
    }
  }
}

object MyTestObject extends App {

  var listBufferOfInts = new WrappedListBuffer( new ListBuffer[Int]() )

  def runFunction(): Int = {
    val inputListOfInts = 1 to 1000
    val fut = Future.traverse(inputListOfInts) { i =>
      Future {
        appendElem(i)
      }
    }
    Await.ready(fut, Duration.Inf)
    listBufferOfInts.lb.length
  }

  def appendElem(elem: Int): Unit = {
    listBufferOfInts.append( elem )
  }

  MyTestObject.runFunction()
  MyTestObject.runFunction()
  MyTestObject.runFunction()

  println(s"collected ${listBufferOfInts.lb.size} elements")
}
于 2016-05-19T00:51:04.497 回答
0

改变

listBufferOfInts ++= List(elem)

synchronized {
    listBufferOfInts ++= List(elem)
}

让它起作用。可能会成为性能问题?我仍然对解释感兴趣,也许是更好的做事方式!

于 2016-05-19T00:46:44.123 回答