我需要从 mysql 数据库和索引文档(大约 1000 个文档)中导入大量数据。在索引过程中,我需要通过向外部 Apache Stanbol 服务器发送增强请求来对字段进行特殊处理。我已经在 solrconfig.xml 中配置了我的 dataimport-handler 以在更新链中使用 StanbolContentProcessor,如下所示;
<updateRequestProcessorChain name="stanbolInterceptor">
<processor class="com.solr.stanbol.processor.StanbolContentProcessorFactory"/>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<requestHandler name="/dataimport" class="solr.DataImportHandler">
<lst name="defaults">
<str name="config">data-config.xml</str>
<str name="update.chain">stanbolInterceptor</str>
</lst>
</requestHandler>
我的示例 data-config.xml 如下;
<dataConfig>
<dataSource type="JdbcDataSource" driver="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/solrTest"
user="test" password="test123" batchSize="1" />
<document name="stanboldata">
<entity name="stanbolrequest" query="SELECT * FROM documents">
<field column="id" name="id" />
<field column="content" name="content" />
<field column="title" name="title" />
</entity>
</document>
</dataConfig>
当运行包含大约 1000 个文档的大型导入时,我的 stanbol 服务器出现故障,我怀疑是由于上述 Solr Stanbolnterceptor 的负载过重。我想批量限制数据导入,以便 Stanbol 可以同时处理可管理数量的请求。
这是否可以使用 data-config 的 dataSource 元素中的 batchSize 参数来实现?
有人可以提供一些想法来限制 Solr 中的数据导入负载吗?
这是我在 /dataimport 期间处理 Stanbol 请求的自定义 UpdateProcessor 类
public class StanbolContentProcessorFactory extends
UpdateRequestProcessorFactory {
public static final String NLP_ORGANIZATION = "nlp_organization";
public static final String NLP_PERSON = "nlp_person";
public static final String[] STANBOL_REQUEST_FIELDS = { "title", "content" };
public static final String STANBOL_ENDPOINT = "http://localhost:8080/enhancer";
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse res, UpdateRequestProcessor next) {
return new StanbolContentProcessor(next);
}
class StanbolContentProcessor extends UpdateRequestProcessor {
public StanbolContentProcessor(UpdateRequestProcessor next) {
super(next);
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
SolrInputDocument doc = cmd.getSolrInputDocument();
String request = "";
for (String field : STANBOL_REQUEST_FIELDS) {
if (null != doc.getFieldValue(field)) {
request += (String) doc.getFieldValue(field) + ". ";
}
}
try {
EnhancementResult result = stanbolPost(request, getBaseURI());
Collection<TextAnnotation> textAnnotations = result
.getTextAnnotations();
// extracting text annotations
Set<String> personSet = new HashSet<String>();
Set<String> orgSet = new HashSet<String>();
for (TextAnnotation text : textAnnotations) {
String type = text.getType();
String selectedText = text.getSelectedText();
if (null != type && null != selectedText) {
if (type.equalsIgnoreCase(StanbolConstants.DBPEDIA_PERSON)
|| type.equalsIgnoreCase(StanbolConstants.FOAF_PERSON)) {
personSet.add(selectedText);
} else if (type
.equalsIgnoreCase(StanbolConstants.DBPEDIA_ORGANIZATION)
|| type.equalsIgnoreCase(StanbolConstants.FOAF_ORGANIZATION)) {
orgSet.add(selectedText);
}
}
}
for (String person : personSet) {
doc.addField(NLP_PERSON, person);
}
for (String org : orgSet) {
doc.addField(NLP_ORGANIZATION, org);
}
cmd.solrDoc = doc;
super.processAdd(cmd);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
private EnhancementResult stanbolPost(String request, URI uri) {
Client client = Client.create();
WebResource webResource = client.resource(uri);
ClientResponse response = webResource.type(MediaType.TEXT_PLAIN)
.accept(new MediaType("application", "rdf+xml"))
.entity(request, MediaType.TEXT_PLAIN)
.post(ClientResponse.class);
int status = response.getStatus();
if (status != 200 && status != 201 && status != 202) {
throw new RuntimeException("Failed : HTTP error code : "
+ response.getStatus());
}
String output = response.getEntity(String.class);
// Parse the RDF model
Model model = ModelFactory.createDefaultModel();
StringReader reader = new StringReader(output);
model.read(reader, null);
return new EnhancementResult(model);
}
private static URI getBaseURI() {
return UriBuilder.fromUri(STANBOL_ENDPOINT).build();
}
}