我有一个独立的 Spark 在我的计算机上的虚拟机上运行。Spark Streaming 从 Kafka 获取数据,将其保存到 HBase 表中,然后对其进行处理并将结果保存到另一个表中。
Spark Batch 在处理结果表中查询最新条目,并使用其中的数据来确定要从未处理数据表中查询哪些数据。批处理作业有一个无限的 while 循环,使批处理在完成后重新启动。它和流式作业都将调度程序设置为公平。
我有一个客户端应用程序,它通过首先将生成的信息流式传输到 Kafka,然后为流式传输层启动一个单独的线程,然后在一定延迟后为批处理启动一个单独的线程,以正确的顺序运行所有这些事情。
我的问题是流运行并且没有抱怨,使用 3 个提供的内核中的 2 个,但是当批处理作业开始时,流说它正在运行,但是 HBase 表清楚地显示,虽然批处理作业正在写入他们的表,流媒体作业不写任何东西。此外,当这一切发生时,流式传输日志会暂停。
我设置要运行的线程的方式是这样的:
Runnable batch = new Runnable() {
@Override
public void run() {
try {
Lambda.startBatch(lowBoundary, highBoundary);
} catch (Exception e) {
e.printStackTrace();
}
}
};
Thread batchThread = new Thread(batch);
batchThread.start();
批处理和流式处理的启动是通过 ProcessBuilder 完成的,如下所示:
public static void startBatch(String low, String high) throws Exception {
// Specify executable path
String sparkSubmit = "/home/lambda/Spark/bin/spark-submit";
// Describe the process to be run
ProcessBuilder batch = new ProcessBuilder(sparkSubmit,
"--class", "batch.Batch", "--master",
"spark://dissertation:7077",
"/home/lambda/Downloads/Lambda/target/lambda-1.0-jar-with-dependencies.jar",
low, high);
// Start the batch layer
batch.start();
}
有谁知道为什么会这样?我怀疑这只是 Spark 没有像我希望的那样管理任务,但不知道该怎么做。