1

编辑:这基本上是一个“如何在 Java 中正确实现数据流引擎”的问题,我觉得这不能用一个答案来充分回答(就像问“如何正确实现 ORM 层”并让某人写出Hibernate的细节或其他东西),所以认为这个问题“关闭”。

有没有一种优雅的方式来为 Java 中的动态数据流建模?通过数据流,我的意思是有各种类型的任务,这些任务可以任意“连接”,这样当一个任务完成时,后继任务将使用完成的任务输出作为输入并行执行,或者当多个任务完成时,它们的输出在后续任务中聚合(请参阅基于流的编程)。通过动态,我的意思是任务完成时后继任务的类型和数量取决于该已完成任务的输出,因此例如,如果任务 A 具有特定输出,则可能会生成任务 B,但如果具有特定输出,则可能会生成任务 C不同的输出。另一种说法是,每个任务(或一组任务)负责确定下一个任务是什么。

用于渲染网页的示例数据流:我有任务类型:文件下载器、HTML/CSS 渲染器、HTML 解析器/DOM 构建器、图像渲染器、JavaScript 解析器、JavaScript 解释器。

  • HTML 文件的文件下载器任务
    • HTML 解析器/DOM 构建器任务
      • 每个嵌入文件/链接的文件下载器任务
        • 如果是图像,图像渲染器
        • 如果是外部 JavaScript,JavaScript 解析器
          • JavaScript 解释器
        • 否则,只需存储在 HTML 解析器任务中的某个 var/field 中
      • 每个嵌入式脚本的 JavaScript 解析器
        • JavaScript 解释器
      • 等待上述任务完成,然后是 HTML/CSS 渲染器(显然不是最佳或完全正确,但这很简单)

我并不是说解决方案需要一些全面的框架(事实上,越接近 JDK API 越好),我绝对不想要像 Spring Web Flow 或一些声明性标记或其他 DSL 这样重量级的东西.

更具体地说,我试图想出一种在 Java 中使用 Callables、Executors、ExecutorCompletionServices 以及可能还有各种同步器类(如 Semaphore 或 CountDownLatch)来对此进行建模的好方法。有几个用例和要求:

  1. 不要对任务将在哪个执行者上运行做出任何假设。事实上,为了简化,假设只有一个执行者。它可以是一个固定的线程池执行器,所以一个简单的实现可能会导致死锁(例如,想象一个任务提交另一个任务然后阻塞直到该子任务完成,现在想象这些任务中的几个用完所有线程)。
  2. 为简化起见,假设数据不是在任务之间流式传输(任务输出->后续任务输入)——完成任务和后续任务不必同时存在,因此后续任务的输入数据不会被前面的任务(因为它已经完成了)。
  3. 数据流“引擎”应该能够处理的只有几个操作:
    1. 一个任务可以排队更多任务的机制
    2. 在完成所有必需的输入任务之前,后继任务不会排队的机制
    3. 主线程(或其他不受执行程序管理的线程)阻塞直到流程完成的机制
    4. 一种机制,主线程(或其他不受执行程序管理的线程)阻塞直到某些任务完成
  4. 由于数据流是动态的(取决于任务的输入/状态),这些机制的激活应该发生在任务代码中,例如,Callable 中的代码本身负责将更多的 Callables 排队。
  5. 数据流“内部”不应该暴露给任务(Callables)本身——只有上面列出的操作应该对任务可用。
  6. 请注意,所有任务的数据类型不一定相同,例如文件下载任务可能接受文件作为输入,但会输出字符串。
  7. 如果一个任务抛出一个未捕获的异常(表明一些致命错误,需要停止所有数据流处理),它必须尽快传播到启动数据流的线程并取消所有任务(或者像致命错误处理程序这样更奇特的东西)。
  8. 任务应尽快启动。这与之前的要求一起应该排除简单的 Future 轮询 + Thread.sleep()。
  9. 作为奖励,我希望数据流引擎本身在每次任务完成时或自上次任务完成后 X 时间内没有完成时执行一些操作(如日志记录)。就像是:ExecutorCompletionService<T> ecs; while (hasTasks()) { Future<T> future = ecs.poll(1 minute); some_action_like_logging(); if (future != null) { future.get() ... } ... }

是否有直接的方法可以使用 Java 并发 API 完成所有这些操作?或者,如果无论 JDK 中有什么可用的东西都会变得复杂,那么是否有一个轻量级的库可以满足要求?我已经有一个适合我的特定用例的部分解决方案(它在某种程度上作弊,因为我使用了两个执行程序,而且你知道,它与我上面给出的网络浏览器示例完全无关),但我'希望看到一个更通用和优雅的解决方案。

4

2 回答 2

1

如何定义接口,例如:

interface Task extends Callable {
  boolean isReady();
}

然后,您的“数据流引擎”将只需要管理任务对象的集合,即允许将新的任务对象排队等待执行并允许查询给定任务的状态(因此上面的接口可能需要扩展以包含 id 和/或类型)。当任务完成时(当然是引擎启动时),引擎必须只查询任何未启动的任务以查看它们现在是否准备就绪,如果是,则将它们传递给在执行程序上运行。正如您所提到的,任何日志记录等也可以在那时完成。

可能有帮助的另一件事是使用 Guice ( http://code.google.com/p/google-guice/ ) 或类似的轻量级 DI 框架来帮助正确连接所有对象(例如,确保正确的执行程序类型被创建,并确保可以为需要访问数据流引擎的任务(例如,为了他们的 isReady 方法或为了排队其他任务)提供一个实例,而不会引入复杂的循环关系。

HTH,但如果我错过了任何关键方面,请发表评论......保罗。

于 2010-05-19T11:32:14.337 回答
0

查看https://github.com/rfqu/df4j — 一个简单但功能强大的数据流库。如果它缺少一些所需的功能,可以很容易地添加它们。

于 2011-09-22T15:00:15.050 回答