0

我正在尝试与 Flink 和 Cassandra 合作。两者都是大规模并行环境,但我很难让它们一起工作。

现在我需要通过不同的令牌范围从 Cassandra 进行并行读取操作,并有可能在读取 N 个对象后终止查询

批处理模式更适合我,但 DataStreams 也是可能的。我尝试了 LongCounter(见下文),但它不会像我预期的那样工作。我没能和他们一起得到全球总和。仅本地值。

异步模式不是必需的,因为此操作 CassandraRequester 是在并行上下文中执行的,并行化约为 64 或 128。

这是我的尝试

class CassandraRequester<T> (val klass: Class<T>, private val context: FlinkCassandraContext):
        RichFlatMapFunction<CassandraTokenRange, T>() {

    companion object {
        private val session = ApplicationContext.session!!
        private var preparedStatement: PreparedStatement? = null
        private val manager = MappingManager(session)
        private var mapper: Mapper<*>? = null
        private val log = LoggerFactory.getLogger(CassandraRequesterStateless::class.java)

        public const val COUNTER_ROWS_NUMBER = "flink-cassandra-select-count"
    }

    private lateinit var counter: LongCounter

    override fun open(parameters: Configuration?) {
        super.open(parameters)

        if(preparedStatement == null)
            preparedStatement = session.prepare(context.prepareQuery()).setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
        if(mapper == null) {
            mapper = manager.mapper<T>(klass)
        }
        counter = runtimeContext.getLongCounter(COUNTER_ROWS_NUMBER)

    }

    override fun flatMap(tokenRange: CassandraTokenRange, collector: Collector<T>) {

        val bs = preparedStatement!!.bind(tokenRange.start, tokenRange.end)

        val rs = session.execute(bs)
        val resultSelect = mapper!!.map(rs)
        val iter = resultSelect.iterator()
        while (iter.hasNext()) when {
            this.context.maxRowsExtracted == 0L || counter.localValue < context.maxRowsExtracted -> {
                counter.add(1)
                collector.collect(iter.next() as T)
            }
            else -> {
                collector.close()
                return
            }
        }
    }

}

在这种情况下是否可以终止查询?

4

0 回答 0