0

I am new to Flume-ng. I have to write a program, which can transfer a text file to other program (agent). I know we must know about agent i.e. host-ip, port number etc. Then a source, sink and a channel should be defined. I just want to transfer a log file to server. My client code is as follows. public class MyRpcClientFacade {

public class MyClient{

  private RpcClient client;
  private String hostname;
  private int port;

  public void init(String hostname, int port) {
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);

      }

      public void sendDataToFlume(String data) {
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
        try {
          client.append(event);
        } catch (EventDeliveryException e) {
          client.close();
          client = null;
          client = RpcClientFactory.getDefaultInstance(hostname, port);
        }
      }

      public void cleanUp() {
        client.close();
      }
}

Above code can send only String data to specified process. But i have to send files. Moreover tell me please that whether Source,Channel and Sink have to be written onto server? And if so, how to configure and write these three. Please help me. Give a small sample of Source,Sink And Channel

4

1 回答 1

0

实际上,您只需要在每个节点上安装 Flume 客户端即可。然后,您提供一个配置文件,提供有关其行为的信息。例如,如果您的节点读取文件(读取每个新行并将它们作为事件发送到通道),并通过 RPC 套接字发送文件内容。您的配置将如下所示:

  # sources/sinks/channels list
  <Agent>.sources = <Name Source1>
  <Agent>.sinks = <Name Sink1>
  <Agent>.channels = <Name Channel1> 
  # Channel attribution to a source
  <Agent>.sources.<Name Source1>.channels = <Name Channel1>
  # Channel attribution to sink
  <Agent>.sinks.<Name Sink1>.channels = <Name Channel1>
  # Configuration (sources,channels and sinks)
  # Source properties : <Name Source1>
  <Agent>.sources.<Name Source1>.type = exec
  <Agent>.sources.<Name Source1>.command = tail -F test
  <Agent>.sources.<Name Source1>.channels = <Name Channel1>
  # Channel properties : <Name Channel1>
  <Agent>.channels.<Name Channel1>.type = memory
  <Agent>.channels.<Name Channel1>.capacity = 1000
  <Agent>.channels.<Name Channel1>.transactionCapacity = 1000
  # Sink properties : <Name Sink1>
  <Agent>.sinks.<Nom Sink1>.type = avro
  <Agent>.sinks.<Nom Sink1>.channel = <Nom Channel1>
  <Agent>.sinks.<Nom Sink1>.hostname = <HOST NAME or IP>
  <Agent>.sinks.<Nom Sink1>.port = <PORT NUMBER>

然后你必须设置一个代理,它将读取同一端口上的 avro 源并以你想要存储它们的方式处理事件。我希望它有帮助;)

于 2013-10-01T09:58:55.943 回答