1

我需要从 mysql 数据库和索引文档(大约 1000 个文档)中导入大量数据。在索引过程中,我需要通过向外部 Apache Stanbol 服务器发送增强请求来对字段进行特殊处理。我已经在 solrconfig.xml 中配置了我的 dataimport-handler 以在更新链中使用 StanbolContentProcessor,如下所示;

<updateRequestProcessorChain name="stanbolInterceptor">
    <processor class="com.solr.stanbol.processor.StanbolContentProcessorFactory"/>
    <processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>

<requestHandler name="/dataimport" class="solr.DataImportHandler">   
    <lst name="defaults">  
        <str name="config">data-config.xml</str>
        <str name="update.chain">stanbolInterceptor</str>
    </lst>  
</requestHandler>

我的示例 data-config.xml 如下;

<dataConfig>
    <dataSource type="JdbcDataSource" driver="com.mysql.jdbc.Driver" 
                url="jdbc:mysql://localhost:3306/solrTest" 
                user="test" password="test123" batchSize="1" />
    <document name="stanboldata">
        <entity name="stanbolrequest" query="SELECT * FROM documents">
            <field column="id" name="id" />
            <field column="content" name="content" />
            <field column="title" name="title" />
        </entity>
    </document>
</dataConfig>

当运行包含大约 1000 个文档的大型导入时,我的 stanbol 服务器出现故障,我怀疑是由于上述 Solr Stanbolnterceptor 的负载过重。我想批量限制数据导入,以便 Stanbol 可以同时处理可管理数量的请求。

这是否可以使用 data-config 的 dataSource 元素中的 batchSize 参数来实现?

有人可以提供一些想法来限制 Solr 中的数据导入负载吗?

这是我在 /dataimport 期间处理 Stanbol 请求的自定义 UpdateProcessor 类

public class StanbolContentProcessorFactory extends
        UpdateRequestProcessorFactory {

    public static final String NLP_ORGANIZATION = "nlp_organization";
    public static final String NLP_PERSON = "nlp_person";
    public static final String[] STANBOL_REQUEST_FIELDS = { "title", "content" };
    public static final String STANBOL_ENDPOINT = "http://localhost:8080/enhancer";

    @Override
    public UpdateRequestProcessor getInstance(SolrQueryRequest req,
            SolrQueryResponse res, UpdateRequestProcessor next) {

        return new StanbolContentProcessor(next);
    }

    class StanbolContentProcessor extends UpdateRequestProcessor {

        public StanbolContentProcessor(UpdateRequestProcessor next) {
            super(next);
        }

        @Override
        public void processAdd(AddUpdateCommand cmd) throws IOException {
            SolrInputDocument doc = cmd.getSolrInputDocument();
            String request = "";
            for (String field : STANBOL_REQUEST_FIELDS) {
                if (null != doc.getFieldValue(field)) {
                    request += (String) doc.getFieldValue(field) + ". ";
                }

            }
            try {
                EnhancementResult result = stanbolPost(request, getBaseURI());
                Collection<TextAnnotation> textAnnotations = result
                        .getTextAnnotations();
                // extracting text annotations
                Set<String> personSet = new HashSet<String>();
                Set<String> orgSet = new HashSet<String>();
                for (TextAnnotation text : textAnnotations) {
                    String type = text.getType();
                    String selectedText = text.getSelectedText();

                    if (null != type && null != selectedText) {
                        if (type.equalsIgnoreCase(StanbolConstants.DBPEDIA_PERSON)
                                || type.equalsIgnoreCase(StanbolConstants.FOAF_PERSON)) {
                            personSet.add(selectedText);

                        } else if (type
                                .equalsIgnoreCase(StanbolConstants.DBPEDIA_ORGANIZATION)
                                || type.equalsIgnoreCase(StanbolConstants.FOAF_ORGANIZATION)) {
                            orgSet.add(selectedText);

                        }
                    }
                }
                for (String person : personSet) {
                    doc.addField(NLP_PERSON, person);
                }
                for (String org : orgSet) {
                    doc.addField(NLP_ORGANIZATION, org);
                }
                cmd.solrDoc = doc;
                super.processAdd(cmd);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }

    }

    private EnhancementResult stanbolPost(String request, URI uri) {
        Client client = Client.create();
        WebResource webResource = client.resource(uri);
        ClientResponse response = webResource.type(MediaType.TEXT_PLAIN)
                .accept(new MediaType("application", "rdf+xml"))
                .entity(request, MediaType.TEXT_PLAIN)
                .post(ClientResponse.class);

        int status = response.getStatus();
        if (status != 200 && status != 201 && status != 202) {
            throw new RuntimeException("Failed : HTTP error code : "
                    + response.getStatus());
        }
        String output = response.getEntity(String.class);
        // Parse the RDF model

        Model model = ModelFactory.createDefaultModel();
        StringReader reader = new StringReader(output);
        model.read(reader, null);
        return new EnhancementResult(model);

    }


    private static URI getBaseURI() {
        return UriBuilder.fromUri(STANBOL_ENDPOINT).build();
    }

}
4

2 回答 2

4

batchSize选项用于批量检索数据库表的行以减少内存使用(通常用于防止运行数据导入处理程序时内存不足)。虽然较小的批量大小可能会更慢,但该选项并不打算影响导入过程的速度。

我的建议是以其他方式限制请求,例如使用防火墙规则。如果您使用的是 Linux 并且可以访问 Netfilter,则可以运行类似以下命令:

iptables -A INPUT -p tcp --dport 12345 -m limit --limit 10/s -j ACCEPT

其中“12345”是 Stanbol 端口,“10/s”是每秒接受的数据包数。

于 2013-11-27T08:50:08.387 回答
3

Mowgli 是对的,这batchsize不会帮助你。由于大多数人都以相反的方式解决了问题(例如My dataimport is too slow, please help),因此在 Solr 中没有这样的事情。至少我什么都不知道。


就我个人而言,我不会选择配置您的 Linux 系统来为您处理节流。如果您从一个阶段移动到另一个阶段,或者您在某些时候迁移到不同的服务器,那么您需要记住这一点。如果人们在您的系统的生命周期内发生变化,他们将不会知道这一点。

所以,我不知道你的代码,但正如你在其他问题中StanbolContentProcessorFactory已经提到的那样,它似乎是自定义代码。由于它是您的自定义代码,您可以在其中添加一个节流机制。为了详细说明这一点,我需要一些代码来查看。


更新

Solr 确实有 Google 的番石榴,所以我会使用这里建议的RateLimiter 。如果您使用 Maven 构建,这意味着您可以使用 scope 。如果您不使用 Maven,则无需制作 fatjar 或将 guava 放入 Solr 的 lib 文件夹。provided

import com.google.common.util.concurrent.RateLimiter;

public class StanbolContentProcessorFactory extends
    UpdateRequestProcessorFactory {

    // ...

    // add a rate limiter to throttle your requests
    // this setting would allow 10 requests per second
    private RateLimiter throttle = RateLimiter.create(0.1);

    // ...

    private EnhancementResult stanbolPost(String request, URI uri) {
        Client client = Client.create();

        // this will throttle your requests
        throttle.acquire();

        WebResource webResource = client.resource(uri);
        ClientResponse response = webResource.type(MediaType.TEXT_PLAIN)
            .accept(new MediaType("application", "rdf+xml"))
            .entity(request, MediaType.TEXT_PLAIN)
            .post(ClientResponse.class);

        int status = response.getStatus();
        if (status != 200 && status != 201 && status != 202) {
            throw new RuntimeException("Failed : HTTP error code : "
                + response.getStatus());
        }
        String output = response.getEntity(String.class);
        // Parse the RDF model
        Model model = ModelFactory.createDefaultModel();
        StringReader reader = new StringReader(output);
        model.read(reader, null);
        return new EnhancementResult(model);
}
于 2013-11-27T11:54:26.803 回答