I've been trying to see if FluentD is a viable method for my company to send data to Kinesis through. I installed FLuentD using the td-agent rpm pack on an Amazon EC2 instance. There aren't a lot of parameters for how to send the data, just that the TCP or in_forward plugins might be the best fit. I started with the TCP one but I've never really done anything with TCP before so sorry if I make some novel mistakes.
I've been hitting my head against the wall, trying to send TCP requests and see them put to Kinesis. Kinesis is showing that there have been no writes. I thought at first I wasn't sending enough data to trigger a write so I've tried sending tons of data at it and no such luck. Googling the problem has brought me no answers as there was only one result seemed viable and it was a question on some site - netdownload - that had no answers.
FluentD configuration:
<match beacon.test>
type kinesis
stream_name test_stream
region us-east-1
random_partition_key true
debug true
</match>
<source>
type tcp
tag beacon.test
format json
log_level debug
</source>
CONNECT_TO_ADDR is a constant defined as the Public IP of my ec2-instance. Code to send TCP data:
define('TCP_PORT', 5170);
for($i = 0; $i < 312500000; $i++)
{
Send(array('id' => $i, 'value' => 'testing123'));
}
function Send($obj)
{
try
{
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if($socket === false)
{
throw new Exception(GetSocketError("Unable to create a socket"));
}
if(socket_connect($socket, CONNECT_TO_ADDR, TCP_PORT))
{
$message = json_encode($obj, JSON_FORCE_OBJECT);
if($message === false)
{
throw new Exception("Unable to create JSON object: " . json_last_error_msg());
}
$totalByteCount = strlen($message);
$wrote = socket_send($socket, $message, $totalByteCount, MSG_EOF);
if($wrote === false)
{
throw new Exception(GetSocketError("Unable to write to socket"));
}
echo "Wrote $wrote/$totalByteCount bytes successfully!\n";
}
else
{
throw new Exception(GetSocketError("Unable to connect to socket"));
}
}
catch(Exception $ex)
{
if(isset($socket) && $socket !== false)
{
socket_shutdown($socket);
socket_close($socket);
}
throw $ex;
}
}
Output from the FluentD log:
D, [2015-08-11T21:17:00.244426 #9848] DEBUG -- : [Aws::Kinesis::Client 200 >0.130176 0 retries] describe_stream(stream_name:"test_stream")
2015-08-11 21:17:00 +0000 [info]: listening fluent socket on 0.0.0.0:24224
2015-08-11 21:17:00 +0000 [info]: listening dRuby uri="druby://127.0.0.1:24230" >object="Engine" 2015-08-11 21:17:00 +0000 [debug]: listening tcp socket on 0.0.0.0:5170
2015-08-12 03:28:01 +0000 [info]: force flushing buffered events