1

我有一个订阅 JMS 主题的应用程序(Scala 2.10.3、Akka 2.3.1、Camel 2.13.0),并在特定文件可供下载时通过 JMS 消息得到通知。每个 JMS 消息都包含文件的名称和路径,可通过 SFTP 收集。

然后,我希望能够通过 SFTP获取文件,但仅获取我们已收到 JMS 消息的文件(以避免我们可能获取正在写入的文件的问题)。

我想要一个适合 Akka Camel 和消费者模型的解决方案。我已经阅读了用于 SFTP 端点的file2ftp2的 Camel 选项,但我需要以下帮助:

  • 如何定义可以通过 &filter=... 参数在 endpointUri 字符串中引用的类/对象?我希望能够更新过滤器对象,以便每次消费者轮询文件列表时,都会应用更新的过滤器列表。

  • 如何定义自定义 IdempotentRepository,以允许缓存大小大于默认值 1000?

我的 SFTP Consumer Actor 目前看起来像这样(一些值被编辑了......):

class SftpConsumer extends Actor with ActorLogging with Consumer {
  val host = ...
  val username = ...
  val keyFile = ...
  def endpointUri: String = s"sftp://${host}?username=${username}&privateKeyFile=${keyFile}&idempotent=true"
4

1 回答 1

3

filteridempotentRepository参数需要引用注册表中的对象(按名称)。

对于过滤器,您需要创建一个扩展org.apache.camel.component.file.GenericFileFilter的类的对象。

对于过滤器和/或idempotentRepository,您需要创建一个注册表,将注册表分配给 Camel 上下文,并将这些对象注册到注册表,例如

// define a class which extends GenericFileFilter[T], and which
// filters for files which are explicitly allowed via include()
class MyFileFilter extends GenericFileFilter[File] {
  var include = Set[String]()
  def accept(file: GenericFile[File]) = include.contains(file.getFileName)
  def include(filename: String) = include = include + filename
  def exclude(filename: String) = include = include - filename
}

// Create a filter and a registry, add a mapping for the file filter to
// the registry, and assign the registry to the camel context
val myFileFilter = new MyFileFilter()
val simpleRegistry = new SimpleRegistry()
simpleRegistry.put("myFilter", myFileFilter )
camel.context.setRegistry(simpleRegistry);

// create a memory-based idempotent repository with a custom cache size
val cacheSize = 2500
val myRepository = MemoryIdempotentRepository.memoryIdempotentRepository(cacheSize)
simpleRegistry.put("myRepository", myRepository)

// adjust the endpointUri to include the &filter= and &idempotentRepository= parameters
def endpointUri: String = s"sftp://${host}?username=${username}...&idempotent=true&idempotentRepository=#myRepository&filter=#myFilter"
于 2014-08-01T04:06:58.190 回答