0

I know in the first place that this question is dummy, yet I could not find a regarding documentation. Which is the way to have Apache camel scan a table on a remote HBase?

Apache camel's documentation doesn't help a lot. It only works when hbase is locally installed.

Thx in advance!

EDIT:

I tried the example of @cexbrayat, yet I am getting the following exception.

8071 [Camel (camel-1) thread #1 - stream://in] ERROR org.apache.camel.processor.DefaultErrorHandler - Failed delivery for (MessageId: ID-1344429030688-0-1 on ExchangeId: ID-1344429030688-0-2). Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException
java.lang.NullPointerException
    at org.apache.camel.component.hbase.HBaseProducer.process(HBaseProducer.java:105)
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:120)
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:292)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:115)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:73)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:330)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:220)
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:117)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
    at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
    at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:73)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
    at org.apache.camel.component.stream.StreamConsumer.processLine(StreamConsumer.java:198)
    at org.apache.camel.component.stream.StreamConsumer.readFromStream(StreamConsumer.java:159)
    at org.apache.camel.component.stream.StreamConsumer.run(StreamConsumer.java:100)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)
4

1 回答 1

5

您必须在 xml 文件中将 hbase 配置为 hbase-site.xml。您将在官方 hbase文档中找到方法。

然后您将能够在 Camel 中使用此配置文件:为此,您将在 camel-hbase 组件源中找到一个示例

使用 java DSL,路由将如下所示:

public class FournisseurRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:scan")
            .to("hbase:mytable?operation=CamelHBaseScan")
            .log("${body}");
    }
}
于 2012-08-08T07:17:20.420 回答