我正在尝试与 Flink 和 Cassandra 合作。两者都是大规模并行环境,但我很难让它们一起工作。
现在我需要通过不同的令牌范围从 Cassandra 进行并行读取操作,并有可能在读取 N 个对象后终止查询。
批处理模式更适合我,但 DataStreams 也是可能的。我尝试了 LongCounter(见下文),但它不会像我预期的那样工作。我没能和他们一起得到全球总和。仅本地值。
异步模式不是必需的,因为此操作 CassandraRequester 是在并行上下文中执行的,并行化约为 64 或 128。
这是我的尝试
class CassandraRequester<T> (val klass: Class<T>, private val context: FlinkCassandraContext):
RichFlatMapFunction<CassandraTokenRange, T>() {
companion object {
private val session = ApplicationContext.session!!
private var preparedStatement: PreparedStatement? = null
private val manager = MappingManager(session)
private var mapper: Mapper<*>? = null
private val log = LoggerFactory.getLogger(CassandraRequesterStateless::class.java)
public const val COUNTER_ROWS_NUMBER = "flink-cassandra-select-count"
}
private lateinit var counter: LongCounter
override fun open(parameters: Configuration?) {
super.open(parameters)
if(preparedStatement == null)
preparedStatement = session.prepare(context.prepareQuery()).setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
if(mapper == null) {
mapper = manager.mapper<T>(klass)
}
counter = runtimeContext.getLongCounter(COUNTER_ROWS_NUMBER)
}
override fun flatMap(tokenRange: CassandraTokenRange, collector: Collector<T>) {
val bs = preparedStatement!!.bind(tokenRange.start, tokenRange.end)
val rs = session.execute(bs)
val resultSelect = mapper!!.map(rs)
val iter = resultSelect.iterator()
while (iter.hasNext()) when {
this.context.maxRowsExtracted == 0L || counter.localValue < context.maxRowsExtracted -> {
counter.add(1)
collector.collect(iter.next() as T)
}
else -> {
collector.close()
return
}
}
}
}
在这种情况下是否可以终止查询?