0

我有一个 Spark 应用程序,它从文件作为 RDD 接收数据并将其发送到另一个服务(MyService)。处理方案如下所示:

object Sender {

def handle(myService: MyService) = {
    val rdd = getRdd()
    rdd.foreachPartition(partition => {
        partition.foreach(it =>
            val myData = new MyData(it)
            myService.send(myData))
        })
    }
}

MyService 看起来像这样:

class MyService() extends Serializable {
    def send(data: MyData) = {
        //do something
    }
}

在我的单元测试中,我尝试做这样的事情:

val myServiceMock = mock[MyService]
val data = new MyData()
Sender.handle(myServiceMock)
verify(myserviceMock).send(eqTo(data))

但是当 Spark 将数据从驱动程序传递给执行程序时,它会被序列化,实际上,它是新的 MyServiceMock 对象。我得到了通缉但没有被调用实际上,与这个模拟的交互为零。

有没有专门的工具来测试这个案例?

4

2 回答 2

0

这里的问题是如果你想使用一个 mocking 框架来查看是否有一些方法你必须考虑到几件事情:

  • 是的,您正在评估在驱动程序中创建的对象实例,因此评估该实例没有意义。您需要评估正在执行程序中创建的实例。

  • 为了在执行程序中进行检查,您需要将验证放在 mapPartitions 函数中。我认为这是不可能的,因为模拟框架不会完全可序列化。

  • 也许有可能将服务模拟实例声明为瞬态。它将在每个执行程序中创建一个模拟实例,以便您可以在mapPartitions函数中使用 verify 方法。

于 2021-03-29T11:55:36.763 回答
0

我解决了这个问题,如下所述。

  1. 我更改了 handle() 方法,该方法现在将分区作为参数。它看起来像这样:
object Sender {

def handle(myService: MyService, partition: Iterator[MyData]) = {
    partition.foreach(it =>
        val myData = new MyData(it)
        myService.send(myData))
}
  1. 在我的测试方法中,我做了这样的事情:
import org.mockito.ArgumentMatchersSugar.eqTo
import org.mockito.Mockito.{mock, verify, withSettings}
import mypackage.MyService

class SenderTest extends org.scalatest.FunSuite {
    test("send") {
        val testRdd = getTestRdd()
        testRdd.foreachPartition(partition => {
            val testData = new MyData()
            val myServiceMock = mock[MyService](classOf[MyService], withSettings.serializable())
            Sender.handle(myServiceMock, partition)
            verify(myServiceMock).send(eqTo(testData))
        }
    }
}

这里的关键是在我创建模拟以使其可序列化时使用 org.mockito.Mockito 中的withSettings.serializable()参数。

于 2021-04-01T13:52:15.283 回答