4

我的 spark 应用程序中有一个从 MySQL 数据库加载数据的方法。该方法看起来像这样。

trait DataManager {

val session: SparkSession

def loadFromDatabase(input: Input): DataFrame = {
            session.read.jdbc(input.jdbcUrl, s"(${input.selectQuery}) T0",
              input.columnName, 0L, input.maxId, input.parallelism, input.connectionProperties)
    }
}

jdbc该方法除了执行方法并从数据库加载数据之外什么都不做。我该如何测试这种方法?标准方法是创建一个对象的模拟,session它是SparkSession. 但由于SparkSession有一个私有构造函数,我无法使用 ScalaMock 模拟它。

这里的主要问题是我的函数是一个纯粹的副作用函数(副作用是从关系数据库中提取数据),鉴于我有模拟问题,我如何对这个函数进行单元测试SparkSession

那么有什么方法可以模拟SparkSession或比模拟测试这种方法更好的方法吗?

4

2 回答 2

1

在您的情况下,我建议不要模拟 SparkSession。这或多或少会模拟整个功能(无论如何您都可以这样做)。如果您想测试此功能,我的建议是运行嵌入式数据库(如H2)并使用真正的 SparkSession。为此,您需要将 SparkSession 提供给您的DataManager.

未经测试的草图:

你的代码:

class DataManager (session: SparkSession) {
         def loadFromDatabase(input: Input): DataFrame = {
            session.read.jdbc(input.jdbcUrl, s"(${input.selectQuery}) T0",
            input.columnName, 0L, input.maxId, input.parallelism, input.connectionProperties)
         }
    }

你的测试用例:

class DataManagerTest extends FunSuite with BeforeAndAfter {
  override def beforeAll() {
    Connection conn = DriverManager.getConnection("jdbc:h2:~/test", "sa", "");
    // your insert statements goes here
    conn.close()
  }

  test ("should load data from database") {
    val dm = DataManager(SparkSession.builder().getOrCreate())
    val input = Input(jdbcUrl = "jdbc:h2:~/test", selectQuery="SELECT whateveryounedd FROM whereeveryouputit ")
    val expectedData = dm.loadFromDatabase(input)
    assert(//expectedData)
  }
}
于 2018-03-26T07:32:19.547 回答
1

您可以使用 mockito scala 来模拟 SparkSession,如本文所示。

于 2019-11-01T01:42:16.947 回答