我有一个 Spark 应用程序,它从文件作为 RDD 接收数据并将其发送到另一个服务(MyService)。处理方案如下所示:
object Sender {
def handle(myService: MyService) = {
val rdd = getRdd()
rdd.foreachPartition(partition => {
partition.foreach(it =>
val myData = new MyData(it)
myService.send(myData))
})
}
}
MyService 看起来像这样:
class MyService() extends Serializable {
def send(data: MyData) = {
//do something
}
}
在我的单元测试中,我尝试做这样的事情:
val myServiceMock = mock[MyService]
val data = new MyData()
Sender.handle(myServiceMock)
verify(myserviceMock).send(eqTo(data))
但是当 Spark 将数据从驱动程序传递给执行程序时,它会被序列化,实际上,它是新的 MyServiceMock 对象。我得到了通缉但没有被调用,实际上,与这个模拟的交互为零。
有没有专门的工具来测试这个案例?