0

我正在尝试设置一个新流以将 Tika 螺栓连接到 Warc 螺栓。

import com.digitalpebble.stormcrawler.tika.ParserBolt;
import com.digitalpebble.stormcrawler.warc.WARCHdfsBolt;

builder.setBolt("tika", new ParserBolt(), numWorkers)
  .localOrShuffleGrouping("shunt","tika");

WARCHdfsBolt warcbolt = getWarcBolt("XX");

builder.setBolt("warc", warcbolt, numWorkers)
  .localOrShuffleGrouping("tika",  "warc");

在 Tika 定义中,我修改了 outputDeclarerFields 函数,如下所示定义我的新“warc”流:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("url", "content", "metadata", "text"));
  declarer.declareStream(StatusStreamName, new Fields("url", "metadata", "status"));
  declarer.declareStream("warc",   new Fields("url", "content", "metadata", "text"));
}

但是,当我以本地模式启动拓扑时,我得到:

14308 [main] WARN oasdsSlot - SLOT debian8:1027 以 EMPTY 状态开始 - 赋值 null 14308 [main] WARN oasdsSlot - SLOT debian8:1028 以 EMPTY 状态开始 - 赋值 null 14308 [main] WARN oasdsSlot - SLOT debian8:1029 以状态开始EMPTY - assignment null 14309 [main] INFO oaslAsyncLocalizer - 清理 /tmp/a1e3b7f5-e251-40ae-a032-b0839ca103c8/supervisor/stormdist 中未使用的拓扑 14318 [main] INFO oasdsSupervisor - 使用 id f42c64cd-7c36-40ab-9f85 启动 supervisor -4b7751ed2d6a 在主机 debian8 上。15030 [main] WARN oasdnimbus - 拓扑提交异常。(拓扑名称='xxCrawler') #error { :cause nil :via [{:type org.apache.storm.generated.InvalidTopologyException :message nil
:at [org.apache.storm.daemon.common$validate_structure_BANG_invoke common.clj 185]}] :trace [[org.apache.storm.daemon.common$validate_structure_BANG_invoke common.clj 185]
[org.apache.storm. daemon.common$system_topology_BANG_ 调用 common.clj 378]
[org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__10782 submitTopologyWithOpts nimbus.clj 1694]
[org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__10782 submitTopology nimbus.clj 1726 ]
[sun.reflect.NativeMethodAccessorImpl invoke0 NativeMethodAccessorImpl.java -2]
[sun.reflect.NativeMethodAccessorImpl 调用 NativeMethodAccessorImpl.java 62]
[sun.reflect.DelegatingMethodAccessorImpl 调用 DelegatingMethodAccessorImpl.java 43] [java.lang.reflect.Method 调用 Method.java 498] [clojure.lang.Reflector invokeMatchingMethod Reflector.java 93] [clojure.lang.Reflector invokeInstanceMethod Reflector.java 28] [org.apache.storm.testing$submit_local_topology 调用 testing.clj 310]
[org.apache.storm.LocalCluster$_submitTopology 调用 LocalCluster.clj 49] [org.apache.storm.LocalCluster submitTopology nil -1]
[com.digitalpebble. stormcrawler.ConfigurableTopology 提交 ConfigurableTopology.java 76]
[com.digitalpebble.stormcrawler.ConfigurableTopology 提交 ConfigurableTopology.java 65] [xx.xx.xx.xx.xxTopology 运行 xxTopology.java 111]
[com.digitalpebble.stormcrawler.ConfigurableTopology start ConfigurableTopology.java 50] [xx.xx.xx.xx.xxTopology main xxTopology.java 53]]} 15035 [main] 错误 oassoazsNIOServerCnxnFactory - Thread Thread[main,5,main] dead org .apache.storm.generated.InvalidTopologyException: null at org.apache.storm.daemon.common$validate_structure_BANG_.invoke(common.clj:185) ~[storm-core-1.1.0.jar:1.1.0] at org. apache.storm.daemon.common$system_topology_BANG_.invoke(common.clj:378) ~[storm-core-1.1.0.jar:1.1.0] at org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__10782.submitTopologyWithOpts (nimbus.clj:1694) ~[storm-core-1.1.0.jar:1.1.0] at org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__10782.submitTopology(nimbus.clj:1726) ~[storm- core-1.1.0.jar:1.1.0] 在 sun.reflect。NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_131] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java :43) ~[?:1.8.0_131] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131] at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93 ) ~[clojure-1.7.0.jar:?] at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.7.0.jar:?] at org.apache.storm.testing$submit_local_topology .invoke(testing.clj:310) ~[storm-core-1.1.0.jar:1.1.0] at org.apache.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:49) ~[storm-core- 1.1.0.jar:1.1.0] 在 org.apache.storm.LocalCluster。submitTopology(未知来源)~[storm-core-1.1.0.jar:1.1.0] at com.digitalpebble.stormcrawler.ConfigurableTopology.submit(ConfigurableTopology.java:76)~[xx-crawler-1.1.jar:?]在 com.digitalpebble.stormcrawler.ConfigurableTopology.submit(ConfigurableTopology.java:65) ~[xx-1.1.jar:?] 在 xx.xx.xx.xx.xxTopology.run(xxTopology.java:111) ~[xx- crawler-1.1.jar:?] at com.digitalpebble.stormcrawler.ConfigurableTopology.start(ConfigurableTopology.java:50) ~[xx-crawler-1.1.jar:?] at xx.xx.xx.xx.xxTopology.main( xxTopology.java:53) ~[xx-crawler-1.1.jar:?]提交(ConfigurableTopology.java:65)~[xx-1.1.jar:?] at xx.xx.xx.xx.xxTopology.run(xxTopology.java:111)~[xx-crawler-1.1.jar:?] at com.digitalpebble.stormcrawler.ConfigurableTopology.start(ConfigurableTopology.java:50) ~[xx-crawler-1.1.jar:?] at xx.xx.xx.xx.xxTopology.main(xxTopology.java:53) ~[xx -crawler-1.1.jar:?]提交(ConfigurableTopology.java:65)~[xx-1.1.jar:?] at xx.xx.xx.xx.xxTopology.run(xxTopology.java:111)~[xx-crawler-1.1.jar:?] at com.digitalpebble.stormcrawler.ConfigurableTopology.start(ConfigurableTopology.java:50) ~[xx-crawler-1.1.jar:?] at xx.xx.xx.xx.xxTopology.main(xxTopology.java:53) ~[xx -crawler-1.1.jar:?]

任何帮助将不胜感激!

请注意,如果我使用 StatusStreamName ("status") 流来连接 tika 和 warc 螺栓,它工作正常。

谢谢,

艾蒂安

4

1 回答 1

0

WARC 是从原始的、未解析的内容生成的。您应该将 WARC 连接到 Fetcher 的输出而不是 Parser bolt。

你不需要为warc声明一个新的流,你可以简单地将warc bolt连接到来自Tika bolt的默认流。

我在你的代码中看到

导入 com.digitalpebble.stormcrawler.tika.ParserBolt;

这表明您依赖于默认实现(不会生成“warc”流)。您是否忘记用修改后的实现替换它?

于 2017-06-16T07:53:46.160 回答