我正在尝试读取一个包含 34 个字段的文件,以使用 Netbeans 在控制台上打印。但是,我能打印的只是模式。因为在与 csvreader 一起使用的这个特定版本的 Flink 中缺少打印选项。
请查看代码并帮助我了解我应该在哪里更正。我会使用CSVReader
内置的 API,但事实证明它不支持超过 22 个字段,因此求助于使用 Table API。还尝试使用CsvTableSource
1.5.1 Flink 版本,但语法不走运。由于.field("%CPU", Types.FLOAT())
类型浮动无法识别符号的不断给出错误。我的主要目标是能够读取 CSV 文件,然后发送到 Kafka 主题,但在此之前我想检查文件是否被读取,还没有运气。
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.Types;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.api.java.Slide;
public class CsvReader {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = new CsvTableSource("/home/merlin/Experiments/input_container/container_data1.csv",
new String[] { "%CPU", "MEM", "VSZ", "RSS", "timestamp",
"OOM_Score", "io_read_count", "io_write_count", "io_read_bytes", "io_write_bytes",
"io_read_chars", "io_write_chars", "num_fds", "num_ctx_switches_voluntary", "num_ctx_switches_involuntary",
"mem_rss", "mem_vms", "mem_shared", "mem_text", "mem_lib", "mem_data", "mem_dirty", "mem_uss", "mem_pss",
"mem_swap", "num_threads", "cpu_time_user", "cpu_time_system", "cpu_time_children_user",
"cpu_time_children_system", "container_nr_sleeping", "container_nr_running",
"container_nr_stopped", "container_nr_uninterruptible","container_nr_iowait" },
new TypeInformation<?>[] {
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT()
});// lenient
tEnv.registerTableSource("container", csvTableSource);
Table result = tEnv
.scan("container");
System.out.println(result);
result.printSchema();
}
}
/*tEnv.toAppendStream(result, Row.class).print();
result.writeToSink(null);print();
env.execute();*/
这是输出
root
|-- %CPU: Float
|-- MEM: Float
|-- VSZ: Float
|-- RSS: Float
|-- timestamp: Float
|-- OOM_Score: Float
|-- io_read_count: Float
|-- io_write_count: Float
|-- io_read_bytes: Float
|-- io_write_bytes: Float
|-- io_read_chars: Float
|-- io_write_chars: Float
|-- num_fds: Float
|-- num_ctx_switches_voluntary: Float
|-- num_ctx_switches_involuntary: Float
|-- mem_rss: Float
|-- mem_vms: Float
|-- mem_shared: Float
|-- mem_text: Float
|-- mem_lib: Float
|-- mem_data: Float
|-- mem_dirty: Float
|-- mem_uss: Float
|-- mem_pss: Float
|-- mem_swap: Float
|-- num_threads: Float
|-- cpu_time_user: Float
|-- cpu_time_system: Float
|-- cpu_time_children_user: Float
|-- cpu_time_children_system: Float
|-- container_nr_sleeping: Float
|-- container_nr_running: Float
|-- container_nr_stopped: Float
|-- container_nr_uninterruptible: Float
|-- container_nr_iowait: Float
这是另一个版本的代码,它也不起作用
package wikiedits;
import static com.sun.xml.internal.fastinfoset.alphabet.BuiltInRestrictedAlphabets.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableEnvironment;
public class Csv {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
//TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = CsvTableSource
.builder()
.path("home/merlin/Experiments/input_container/container_data1.csv")
.field("%CPU", Types.FLOAT)
.field("MEM", Types.FLOAT)
.field("VSZ", Types.FLOAT)
.field("RSS", Types.FLOAT)
.field("timestamp", Types.FLOAT)
.field("OOM_Score", Types.FLOAT)
.field("io_read_count", Types.FLOAT)
.field("io_write_count", Types.FLOAT)
.field("io_read_bytes", Types.FLOAT)
.field("io_write_bytes", Types.FLOAT)
.field("io_read_chars", Types.FLOAT)
.field("io_write_chars", Types.FLOAT)
.field("num_fds", Types.FLOAT)
.field("num_ctx_switches_voluntary", Types.FLOAT)
.field("num_ctx_switches_involuntary", Types.FLOAT)
.field("mem_rss", Types.FLOAT)
.field("mem_vms", Types.FLOAT)
.field("mem_shared", Types.FLOAT)
.field("mem_text", Types.FLOAT)
.field("mem_lib", Types.FLOAT)
.field("mem_data", Types.FLOAT)
.field("mem_dirty", Types.FLOAT)
.field("mem_uss", Types.FLOAT)
.field("mem_pss", Types.FLOAT)
.field("mem_swap", Types.FLOAT)
.field("num_threads", Types.FLOAT)
.field("cpu_time_user", Types.FLOAT)
.field("cpu_time_system", Types.FLOAT)
.field("cpu_time_children_user", Types.FLOAT)
.field("cpu_time_children_system", Types.FLOAT)
.field("container_nr_sleeping", Types.FLOAT)
.field("container_nr_running", Types.FLOAT)
.field("container_nr_stopped", Types.FLOAT)
.field("container_nr_uninterruptible", Types.FLOAT)
.field("container_nr_iowait", Types.FLOAT)
.fieldDelimiter(",")
.lineDelimiter("\n")
.ignoreFirstLine()
.ignoreParseErrors()
.commentPrefix("%")
.build();
// name your table source
tEnv.registerTableSource("container", csvTableSource);
Table table = tEnv.scan("container");
DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);
// define the sink as common print on console here
stream.print();
// env.execute();
}
}
新编辑如果我必须将它传递给 Kafka 主题,然后传递给函数调用?这是我尝试过的:
DataStreamSink<Row> stream = addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties));
//DataStreamSink<Row> stream = tEnv.toAppendStream(table, Row.class).print();
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
public String map(String value) throws Exception {
SendToRestAPI sendrest= new SendToRestAPI(value);
String String1= sendrest.converttoJson();
return "Stream Value: " + String1;
}
})
.addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties)); /*.print();*/
env.execute();
}
}
stream.map 行抛出错误:
Cannot find symbol:method Map