1

我在我的 flume.config 上运行了一个我自己的自定义源,它负责每小时从 Facebook 页面中提取数据。

我想知道是否有任何方法可以设置我的协调员开始时间的提取时间?

就像,我将我的协调器设置为从 2015 年 1 月 1 日上午 12 点开始,然后我的水槽同时开始提取。

这是我的自定义来源:

public class FacebookPageFansCitySource extends AbstractPollableSource
{
    private String accessToken;
    private String pageId;
    private int refreshInterval;

    private FacebookClient facebookClient;

    private volatile long lastPoll = 0;

    @Override
    protected void doConfigure(Context context) throws FlumeException
    {
        this.accessToken = context.getString(FacebookSourceConstants.ACCESS_TOKEN_KEY);
        this.pageId = context.getString(FacebookSourceConstants.PAGE_ID_KEY);
        this.refreshInterval = context.getInteger(FacebookSourceConstants.REFRESH_INTERVAL_KEY);

        facebookClient = new DefaultFacebookClient(accessToken, Version.VERSION_2_2);
    }

    @Override
    protected void doStart() throws FlumeException
    {
    }

    @Override
    protected void doStop() throws FlumeException
    {
    }

    @Override
    protected Status doProcess() throws EventDeliveryException
    {
        Status status = Status.BACKOFF;

        if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - lastPoll) > refreshInterval)
        {
            lastPoll = System.currentTimeMillis();

            try
            {
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");

                simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

                Date date = new Date(lastPoll);

                String dateFormatted = simpleDateFormat.format(date);

                final Map<String, String> headers = new HashMap<String, String>();

                headers.put("timestamp", String.valueOf(date.getTime()));

                Insight insight = getInsight(pageId, "page_fans_city");

                if (insight != null)
                {
                    final List<Event> events = new ArrayList<Event>();

                    ChannelProcessor channelProcessor = getChannelProcessor();

                    List<JsonObject> values = insight.getValues();

                    for (JsonObject value : values)
                    {
                        String referenceDate = simpleDateFormat.format(DateUtils.toDateFromLongFormat(value.getString("end_time")));

                        JsonObject jsonObjectValue = value.getJsonObject("value");

                        for (Iterator<?> keys = jsonObjectValue.keys(); keys.hasNext(); )
                        {
                            String key = (String) keys.next();
                            Long count = jsonObjectValue.getLong(key);

                            JsonObject jsonObject = new JsonObject();

                            jsonObject.put("reference_date", referenceDate);
                            jsonObject.put("city", key);
                            jsonObject.put("count", count);
                            jsonObject.put("poll_time", dateFormatted);

                            Event event = EventBuilder.withBody(jsonObject.toString().getBytes(), headers);

                            events.add(event);
                        }
                    }

                    channelProcessor.processEventBatch(events);
                }

                status = Status.READY;
            }
            catch (Exception e)
            {
                Logger.getLogger(FacebookPageFansCitySource.class.getName()).log(Level.SEVERE, null, e);
            }
        }

        return status;
    }

    private Insight getInsight(String objectId, String metric)
    {
        TimeZone timeZone = TimeZone.getTimeZone("UTC");

        Calendar calendar = Calendar.getInstance(timeZone);

        calendar.add(Calendar.DAY_OF_MONTH, -4);

        Parameter parameterSince = Parameter.with("since", calendar.getTime());

        calendar.add(Calendar.DAY_OF_MONTH, 1);

        Parameter parameterUntil = Parameter.with("until", calendar.getTime());

        Connection<Insight> responseListInsight = facebookClient.fetchConnection(objectId + "/insights/" + metric, Insight.class, parameterSince, parameterUntil);

        if (responseListInsight != null && !responseListInsight.getData().isEmpty())
            return responseListInsight.getData().get(0);
        else
            return null;
    }
}

谢谢您的帮助。

4

1 回答 1

1

如何创建一个java 操作,并设置一个使用协调器当前时间的 Workflow 属性。

<property>
    <name>myStart</name>
    <value>${coord:current(0)}</value>
</property>

比在您的操作中使用此属性作为参数。

于 2015-05-06T16:57:34.977 回答