我想写一个 DataSource,它是来自 Tarantool-java https://github.com/tarantool/tarantool-java的 DataStream 。
谁能给我一个关于如何通过用户定义编写数据源的指南。
这是我的代码:
package tarantooljava.streaming.flink_connecter;
import org.apache.flink.configuration.Configuration;
import org.tarantool.TarantoolConnection16;
import org.tarantool.TarantoolConnection16Impl;
import splunk.test.TestSchema;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static java.util.Objects.requireNonNull;
/**
* Created by jaryzhen on 16/4/19.
*/
public class FlinkTarantoolJavaSpace<T> extends FlinkTarantoolJavaSpaceBase<T>{
private ConsumerThread<T> consumerThread;
public FlinkTarantoolJavaSpace(String ip, int port, String user, String pwd) throws IOException {
FlinkTarantoolJavaSpace(ip,port,user,pwd,11);
}
public List<T> FlinkTarantoolJavaSpace(String ip, int port, String user, String pwd, int a) throws IOException {
requireNonNull(ip, "topics");
TarantoolConnection16 con = new TarantoolConnection16Impl(ip, port);
con.auth(user, pwd);
final TestSchema schema = con.schema(new TestSchema());
List select0 = null;
for (int i=0 ; i <100 ; i=i+2) {
select0 = con.select(schema.tester.id, schema.tester.primary, Arrays.asList(i), 0, 30, 0);
//System.out.println("select0:" +i+ select0);
}
// System.out.println(a.size());
// System.out.println(a.get(0));
con.close();
return select0;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
consumerThread = new ConsumerThread<>(this, sourceContext);
}
@Override
public void cancel() {
// set ourselves as not running
boolean running = false;
if(true) {
} else {
// the consumer thread is not running, so we have to interrupt our own thread
}
}
@Override
public void close() throws Exception {
cancel();
super.close();
}
// ------------------------------------------------------------------------
// Checkpoint and restore
// ----------------------------------------------------------------
private static class ConsumerThread<T> extends Thread {
private FlinkTarantoolJavaSpace<T> flinConsumer;
private SourceContext<T> sourceContext;
private boolean running = true;
public ConsumerThread(FlinkTarantoolJavaSpace<T> flinkConsumer, SourceContext<T> sourceContext) {
this.sourceContext = sourceContext;
this.flinkConsumer=flinkConsumer;
}
@Override
public void run() {
}
/**
* Try to shutdown the thread
*/
public void shutdown() {
this.running = false;
}
}
}