1

我使用 Flink(1.9.2) 和 MongoDB。我想自定义一个接收器以将一些消息输出到 mongoDB 中进行测试。但是在我完成并运行它之后,我的工作无法获取保存点

我的水槽:

public class MongoDBSink extends RichSinkFunction<GeneralizedMessage> implements CheckpointedFunction, CheckpointListener {
    private static final Logger logger = LoggerFactory.getLogger(MongoDBSink.class);

    private MongoCollection sinkCollection;
    private Random random = new Random();

    // init mongo connection.
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        MongoConfigurationCenter mongoCenter = MongoConfigurationCenter.getInstance();
        MongoClient mongoClient = mongoCenter.getMongoClient();
        MongoDatabase database = mongoClient.getDatabase("xxxx");
        sinkCollection = database.getCollection(sinkCollectionName);
    }

    /** Writes the given value to the sink. This function is called for every record.*/
    @Override
    public void invoke(GeneralizedMessage value, Context context) throws Exception {
        // some logic
        sinkCollection.insertOne(doc);
        // wait for some times.
        Thread.sleep(200 + random.nextInt(1000));
    }



    /** checkpoint methods */
    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        logger.info("notifyCheckpointComplete");
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        logger.info("snapshotState");
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        logger.info("initializeState");
    }
}

当我单击保存点时:无法确认接收器操作员。 错误信息

日志 我在 JobManager 和 TaskManager 中找不到错误日志,看起来不错,但保存点失败。

4

0 回答 0