1

与默认示例不同,我们有用于从 csv、tsv 等导入行的 firehose,我们是否有一个这样我们可以从数据库导入记录并插入到 druid 中?有什么想法吗?

这就是我的想法 -

"firehose": {
    "type" : "database",
        "datasource" : {
                 "connectURI" : "jdbc:mysql://localhost:3306/test",
                 "user" : "druid",
                 "password" : "xyz123"
        },
        "query" : "select * from table"
        "frequency" : "P1M"
}

我们可以扩展它以通过 jndi 数据源和其他一些数据源获得连接。这种实现有什么问题吗?

4

1 回答 1

-1
这个想法怎么样?它是用于 jdbc 摄取的自定义 firehose。
在这种情况下,只支持一次查询摄取。
https://github.com/sirpkt/druid/tree/jdbc_firehose/extensions-contrib/jdbc-firehose
这是代码片段。使用 DBI 库尝试从现有数据库服务器获取结果集。
  public Firehose connect(final MapInputRowParser parser) throws IOException, ParseException, IllegalArgumentException
  {
    if (columns != null) {
      verifyParserSpec(parser.getParseSpec(), columns);
    }

    final Handle handle = new DBI(
        connectorConfig.getConnectURI(),
        connectorConfig.getUser(),
        connectorConfig.getPassword()
    ).open();

    final String query = makeQuery(columns);

    final ResultIterator<InputRow> rowIterator = handle
        .createQuery(query)
        .map(
            new ResultSetMapper<InputRow>()
            {
              List<String> queryColumns = (columns == null) ? Lists.<String>newArrayList(): columns;

              @Override
              public InputRow map(
                  final int index,
                  final ResultSet r,
                  final StatementContext ctx
              ) throws SQLException
              {
                try {
                  if (queryColumns.size() == 0)
                  {
                    ResultSetMetaData metadata = r.getMetaData();
                    for (int idx = 1; idx <= metadata.getColumnCount(); idx++)
                    {
                      queryColumns.add(metadata.getColumnName(idx));
                    }
                    Preconditions.checkArgument(queryColumns.size() > 0,
                        String.format("No column in table [%s]", table));
                    verifyParserSpec(parser.getParseSpec(), queryColumns);
                  }
                  ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder();
                  for (String column: queryColumns) {
                    builder.put(column, r.getObject(column));
                  }
                  return parser.parse(builder.build());

                } catch(IllegalArgumentException e) {
                  throw new SQLException(e);
                }
              }
            }
        ).iterator();
于 2016-06-10T13:54:13.483 回答