0

所以我有两个班级负责播种(注入网址)和爬行。

ESSeedInjector 类:

public class ESSeedInjector extends ConfigurableTopology {

public static void main(String[] args)  {
    ConfigurableTopology.start(new ESSeedInjector(), new String[]{".","seeds.txt","-local","-conf", "es-conf.yaml","--sleep","5000"});
}

@Override
public int run(String[] args) {

    if (args.length == 0) {
        System.err.println("ESSeedInjector seed_dir file_filter");
        return -1;
    }

    conf.setDebug(true);

    TopologyBuilder builder = new TopologyBuilder();

    Scheme scheme = new StringTabScheme(Status.DISCOVERED);

    builder.setSpout("spout", new FileSpout(args[0], args[1], scheme));

    Fields key = new Fields("url");

    builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("spout",
            key);

    builder.setBolt("enqueue", new StatusUpdaterBolt(), 10)
            .customGrouping("filter", new URLStreamGrouping());

    return submit("ESSeedInjector", conf, builder);
}

爬虫类:

public class ESCrawlTopology extends ConfigurableTopology {
public static void main(String[] args) {
    ConfigurableTopology.start(new ESCrawlTopology(), new String[]{"-conf", "es-conf.yaml", "-local"});
}

@Override
protected int run(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();

    int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1);

    int numShards = 1;

    builder.setSpout("spout", new CollapsingSpout(), numShards);

    builder.setBolt("status_metrics", new StatusMetricsBolt())
            .shuffleGrouping("spout");

    builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers)
            .shuffleGrouping("spout");

    builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping(
            "partitioner", new Fields("key"));

    builder.setBolt("sitemap", new SiteMapParserBolt(), numWorkers)
            .localOrShuffleGrouping("fetch");

    builder.setBolt("parse", new JSoupParserBolt(), numWorkers)
            .localOrShuffleGrouping("sitemap");

    builder.setBolt("indexer", new IndexerBolt(), numWorkers)
            .localOrShuffleGrouping("parse");

    Fields furl = new Fields("url");

    builder.setBolt("status", new StatusUpdaterBolt(), numWorkers)
            .fieldsGrouping("fetch", Constants.StatusStreamName, furl)
            .fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
            .fieldsGrouping("parse", Constants.StatusStreamName, furl)
            .fieldsGrouping("indexer", Constants.StatusStreamName, furl);

    builder.setBolt("deleter", new DeletionBolt(), numWorkers)
            .localOrShuffleGrouping("status",
                    Constants.DELETION_STREAM_NAME);

    conf.registerMetricsConsumer(MetricsConsumer.class);
    conf.registerMetricsConsumer(LoggingMetricsConsumer.class);

    return submit("crawl", conf, builder);
}
}

流向——

运行 ESSeedInjector 类(这会成功注入 url)。

运行爬虫类。

现在这开始爬行,但在任意时间它都会产生错误。

18892 [elasticsearch[_client_][listener][T#2]] ERROR c.d.s.e.p.CollapsingSpout -  Exception with ES query
org.elasticsearch.transport.RemoteTransportException: [2rbuRko][127.0.0.1:9300][indices:data/read/search]
Caused by: org.elasticsearch.transport.RemoteTransportException: [2rbuRko][127.0.0.1:9300][indices:data/read/msearch]
Caused by: java.lang.IllegalArgumentException: Validation Failed: 1: no requests added;
    at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:29) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.search.MultiSearchRequest.validate(MultiSearchRequest.java:90) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:131) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:64) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:54) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:69) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.sendLocalRequest(TransportService.java:621) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.access$000(TransportService.java:73) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService$3.sendRequest(TransportService.java:133) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:569) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:502) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.sendChildRequest(TransportService.java:529) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.sendChildRequest(TransportService.java:520) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.search.SearchTransportService.sendExecuteMultiSearch(SearchTransportService.java:182) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.search.ExpandSearchPhase.run(ExpandSearchPhase.java:93) ~[?:?]
    at org.elasticsearch.action.search.AbstractSearchAsyncAction.executePhase(AbstractSearchAsyncAction.java:144) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:138) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.search.FetchSearchPhase.moveToNextPhase(FetchSearchPhase.java:207) ~[?:?]
    at org.elasticsearch.action.search.FetchSearchPhase.lambda$innerRun$2(FetchSearchPhase.java:105) ~[?:?]
    at org.elasticsearch.action.search.FetchSearchPhase.innerRun(FetchSearchPhase.java:117) ~[?:?]
    at org.elasticsearch.action.search.FetchSearchPhase.access$000(FetchSearchPhase.java:45) ~[?:?]
    at org.elasticsearch.action.search.FetchSearchPhase$1.doRun(FetchSearchPhase.java:87) ~[?:?]
    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:638) [elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-5.3.0.jar:5.3.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_151]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_151]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]

不知道是什么导致了错误,但我看到的模式是,如果通过运行 ESIndex.Init 擦除 ElasticSearch 中的数据,然后执行 ESSeedInjector 然后执行 ESCrawlTopology 类,它将在抓取过程的早期产生异常(解析后种子网址)。

但是,如果我再次运行 ESCrawlTopology(不做任何其他事情),它会产生异常,但要晚得多。

编辑:当我从 CollapsingSpout() 更改为 AggregationSpout() 我现在得到这个日志。

15409 [elasticsearch[_client_][listener][T#1]] INFO  c.d.s.e.p.AggregationSpout -  ES query returned 0 hits from 0 buckets in 2 msec with 0 already being processed

ES 中不再处理或索引任何内容。

4

0 回答 0