1

I've been trying to see if FluentD is a viable method for my company to send data to Kinesis through. I installed FLuentD using the td-agent rpm pack on an Amazon EC2 instance. There aren't a lot of parameters for how to send the data, just that the TCP or in_forward plugins might be the best fit. I started with the TCP one but I've never really done anything with TCP before so sorry if I make some novel mistakes.

I've been hitting my head against the wall, trying to send TCP requests and see them put to Kinesis. Kinesis is showing that there have been no writes. I thought at first I wasn't sending enough data to trigger a write so I've tried sending tons of data at it and no such luck. Googling the problem has brought me no answers as there was only one result seemed viable and it was a question on some site - netdownload - that had no answers.

FluentD configuration:

<match beacon.test>
       type kinesis
       stream_name test_stream
       region us-east-1
       random_partition_key true
       debug true
 </match>

<source>
  type tcp
  tag beacon.test
  format json
  log_level debug
</source>

CONNECT_TO_ADDR is a constant defined as the Public IP of my ec2-instance. Code to send TCP data:

define('TCP_PORT', 5170);

for($i = 0; $i < 312500000; $i++)
{
    Send(array('id' => $i, 'value' => 'testing123'));
}

function Send($obj)
{
     try
     {
         $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

         if($socket === false)
         {
            throw new Exception(GetSocketError("Unable to create a socket"));
         }

        if(socket_connect($socket, CONNECT_TO_ADDR, TCP_PORT))
        {
            $message = json_encode($obj, JSON_FORCE_OBJECT);

            if($message === false)
            {
                throw new Exception("Unable to create JSON object: " . json_last_error_msg());
            }

            $totalByteCount = strlen($message);

            $wrote = socket_send($socket, $message, $totalByteCount, MSG_EOF);

            if($wrote === false)
            {
                throw new Exception(GetSocketError("Unable to write to socket"));
            }

            echo "Wrote $wrote/$totalByteCount bytes successfully!\n";

        }
        else
        {
            throw new Exception(GetSocketError("Unable to connect to socket"));
        }
    }
    catch(Exception $ex)
    {
        if(isset($socket) && $socket !== false)
        {
            socket_shutdown($socket);
            socket_close($socket);
        }

        throw $ex;
    }
}

Output from the FluentD log:

D, [2015-08-11T21:17:00.244426 #9848] DEBUG -- : [Aws::Kinesis::Client 200 >0.130176 0 retries] describe_stream(stream_name:"test_stream")

2015-08-11 21:17:00 +0000 [info]: listening fluent socket on 0.0.0.0:24224
2015-08-11 21:17:00 +0000 [info]: listening dRuby uri="druby://127.0.0.1:24230" >object="Engine" 2015-08-11 21:17:00 +0000 [debug]: listening tcp socket on 0.0.0.0:5170
2015-08-12 03:28:01 +0000 [info]: force flushing buffered events

4

1 回答 1

1

添加复制+标准输出如何调试?通过这种方式,您可以确认您的 PHP 应用程序实际上是否正在将数据发送到您的 Fluentd。

<match>
  type copy
  <store>
    type stdout
  </store>
  <store>
    type kinesis
    stream_name test_stream
    region us-east-1
    random_partition_key true
    debug true
    flush_interval 10s
  </store>
</match>

此外,添加“flush_interval 10s”选项将使缓冲区刷新更频繁。

于 2015-08-15T06:28:36.617 回答