我正在尝试将数据写入我的本地 Elasticsearch Docker 容器(7.4.2),为简单起见,我使用了 Spring 提供的 AbstractReactiveElasticsearchConfiguration 也覆盖了 entityMapper 函数。我构建了我的存储库,扩展了 ReactiveElasticsearchRepository 然后最后我使用我的自动装配存储库来 saveAll() 我包含数据的元素集合。但是 Elasticsearch 不会写入任何数据。我还有一个 REST 控制器,它开始我的整个过程,基本上什么都不返回,DeferredResult>
来自我的 ApiDelegateImpl 的 REST 方法
@Override
public DeferredResult<ResponseEntity<Void>> openUsageExporterStartPost() {
final DeferredResult<ResponseEntity<Void>> deferredResult = new DeferredResult<>();
ForkJoinPool.commonPool().execute(() -> {
try {
openUsageExporterAdapter.startExport();
deferredResult.setResult(ResponseEntity.accepted().build());
} catch (Exception e) {
deferredResult.setErrorResult(e);
}
}
);
return deferredResult;
}
我的 Elasticsearch 配置
@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String elasticSearchEndpoint;
@Bean
@Override
public EntityMapper entityMapper() {
final ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(), new DefaultConversionService());
entityMapper.setConversions(elasticsearchCustomConversions());
return entityMapper;
}
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchEndpoint)
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}
我的仓库
public interface OpenUsageRepository extends ReactiveElasticsearchRepository<OpenUsage, Long> {
}
我的 DTO
@Data
@Document(indexName = "open_usages", type = "open_usages")
@TypeAlias("OpenUsage")
public class OpenUsage {
@Field(name = "id")
@Id
private Long id;
......
}
我的适配器实现
@Autowired
private final OpenUsageRepository openUsageRepository;
...transform entity into OpenUsage...
public void doSomething(final List<OpenUsage> openUsages){
openUsageRepository.saveAll(openUsages)
}
最后是我的 IT 测试
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestPropertySource(locations = {"classpath:application-it.properties"})
@ContextConfiguration(initializers = OpenUsageExporterApplicationIT.Initializer.class)
class OpenUsageExporterApplicationIT {
@LocalServerPort
private int port;
private final static String STARTCALL = "http://localhost:%s/open-usage-exporter/start/";
@Container
private static ElasticsearchContainer container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.8.4").withExposedPorts(9200);
static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(final ConfigurableApplicationContext configurableApplicationContext) {
final List<String> pairs = new ArrayList<>();
pairs.add("spring.data.elasticsearch.client.reactive.endpoints=" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
pairs.add("spring.elasticsearch.rest.uris=http://" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
TestPropertyValues.of(pairs).applyTo(configurableApplicationContext);
}
}
@Test
void testExportToES() throws IOException, InterruptedException {
final List<OpenUsageEntity> openUsageEntities = dbPreparator.insertTestData();
assertTrue(openUsageEntities.size() > 0);
final String result = executeRestCall(STARTCALL);
// Awaitility here tells me nothing is in ElasticSearch :(
}
private String executeRestCall(final String urlTemplate) throws IOException {
final String url = String.format(urlTemplate, port);
final HttpUriRequest request = new HttpPost(url);
final HttpResponse response = HttpClientBuilder.create().build().execute(request);
// Get the result.
return EntityUtils.toString(response.getEntity());
}
}