我使用 Flink(1.9.2) 和 MongoDB。我想自定义一个接收器以将一些消息输出到 mongoDB 中进行测试。但是在我完成并运行它之后,我的工作无法获取保存点
我的水槽:
public class MongoDBSink extends RichSinkFunction<GeneralizedMessage> implements CheckpointedFunction, CheckpointListener {
private static final Logger logger = LoggerFactory.getLogger(MongoDBSink.class);
private MongoCollection sinkCollection;
private Random random = new Random();
// init mongo connection.
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MongoConfigurationCenter mongoCenter = MongoConfigurationCenter.getInstance();
MongoClient mongoClient = mongoCenter.getMongoClient();
MongoDatabase database = mongoClient.getDatabase("xxxx");
sinkCollection = database.getCollection(sinkCollectionName);
}
/** Writes the given value to the sink. This function is called for every record.*/
@Override
public void invoke(GeneralizedMessage value, Context context) throws Exception {
// some logic
sinkCollection.insertOne(doc);
// wait for some times.
Thread.sleep(200 + random.nextInt(1000));
}
/** checkpoint methods */
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
logger.info("notifyCheckpointComplete");
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
logger.info("snapshotState");
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
logger.info("initializeState");
}
}
日志 我在 JobManager 和 TaskManager 中找不到错误日志,看起来不错,但保存点失败。