据我所知,Kairosdb 中没有线性插值聚合器。我已经能够在 GAPS 聚合器的帮助下编写一个。以下代码说明了基于您的值的采样和线性插值。Kairosdb 版本 1.1.3 和 Java 客户端版本 2.2.0。
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Range;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.kairosdb.client.HttpClient;
import org.kairosdb.client.builder.*;
import org.kairosdb.client.builder.aggregator.SamplingAggregator;
import org.kairosdb.client.response.QueryResponse;
import org.kairosdb.client.response.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.number.IsCloseTo.closeTo;
public class KairosBulkLinearInterpolationTest {
// constants
private static final ZoneId UTC = ZoneId.of("UTC");
private static final String dbUrl = "localhost";
private static final String port = "8083";
private static final Logger logger = LoggerFactory.getLogger(KairosBulkLinearInterpolationTest.class);
private HttpClient client = null;
private String metricName;
@Before
public void setUp() throws Exception {
client = new HttpClient("http://" + dbUrl + ":" + port);
metricName = "bulkInterpolationTest" + new Random().nextInt(10000);
MetricBuilder metricBuilder = MetricBuilder.getInstance();
Metric metric = metricBuilder.addMetric(metricName);
metric.addTag("test", "test");
metric.addDataPoint(1000000, 100);
metric.addDataPoint(2000000, 200);
metric.addDataPoint(3000000, 300);
metric.addDataPoint(4000000, 500);
client.pushMetrics(metricBuilder);
}
@Test
public void bulkInterpolation() throws IOException, URISyntaxException {
ZonedDateTime start = Instant.ofEpochMilli(1000000).atZone(UTC);
Map<ZonedDateTime, Double> valuesMap = interpolationBulk(metricName, new HashMap<>(), Range.closed(start, start.plusSeconds(3500)),
Duration.ofSeconds(500));
Set<Map.Entry<ZonedDateTime, Double>> entries = valuesMap.entrySet();
ArrayList<Map.Entry<ZonedDateTime, Double>> list = new ArrayList<>(entries);
assertValueOf(list.get(0), 1000L, 100d);
assertValueOf(list.get(1), 1500L, 150d);
assertValueOf(list.get(2), 2000L, 200d);
assertValueOf(list.get(3), 2500L, 250d);
assertValueOf(list.get(4), 3000L, 300d);
assertValueOf(list.get(5), 3500L, 400d);
assertValueOf(list.get(6), 4000L, 500d);
}
protected void assertValueOf(Map.Entry<ZonedDateTime, Double> entry, long seconds, double value) {
assertThat(entry.getKey().toEpochSecond(), is(seconds));
assertThat(entry.getValue(), closeTo(value, 0d));
}
@After
public void tearDown() throws Exception {
client.deleteMetric(metricName);
client.shutdown();
}
@VisibleForTesting
protected Map<ZonedDateTime, Double> interpolationBulk(@Nonnull String metricName,
@Nonnull Map<String, String> tags,
@Nonnull Range<ZonedDateTime> inRange,
@Nonnull Duration samplingPeriod) throws MalformedURLException {
Preconditions.checkNotNull(metricName, "Missing required parameter 'metricName'!");
Preconditions.checkNotNull(tags, "Missing required parameter 'tags'!");
Preconditions.checkNotNull(inRange, "Missing required parameter 'inRange'!");
Preconditions.checkNotNull(samplingPeriod, "Missing required parameter 'samplingPeriod'!");
// load the values in the given range
QueryBuilder queryBuilder = QueryBuilder.getInstance();
queryBuilder.addMetric(metricName).addTags(tags);
QueryMetric queryMetric = queryBuilder.getMetrics().get(0);
// first average all the given values - we are sampling, remember
queryMetric.addAggregator(new SamplingAggregator("avg", (int) samplingPeriod.getSeconds(), TimeUnit.SECONDS));
// and then mark the remaining with null
queryMetric.addAggregator(new SamplingAggregator("gaps", (int) samplingPeriod.getSeconds(), TimeUnit.SECONDS));
if (inRange.hasLowerBound()) {
queryBuilder.setStart(new Date(inRange.lowerEndpoint().toInstant().toEpochMilli()));
}
if (inRange.hasUpperBound()) {
queryBuilder.setEnd(new Date(inRange.upperEndpoint().toInstant().toEpochMilli()));
}
try {
QueryResponse queryResponse = client.query(queryBuilder);
List<String> errors = queryResponse.getErrors();
if (!errors.isEmpty()) {
logger.error("Error while querying Kairosb! " + StringUtils.join(errors));
return new HashMap<>();
}
List<Result> results = queryResponse.getQueryResponse(metricName).getResults();
TreeMap<ZonedDateTime, Double> valuesMap = new TreeMap<>();
results.forEach(result -> {
if (CollectionUtils.isNotEmpty(result.getDataPoints())) {
result.getDataPoints().forEach(dataPoint -> {
try {
valuesMap.put(Instant.ofEpochMilli(dataPoint.getTimestamp()).atZone(UTC), dataPoint.getValue() == null ? null : dataPoint.doubleValue());
} catch (DataFormatException dfe) {
logger.error("Data format exception while reading the Kairos database response!", dfe);
}
});
}
});
TreeMap<ZonedDateTime, Double> overrideValues = new TreeMap<>();
// fill the gaps
valuesMap.entrySet().stream().filter(entry -> entry.getValue() == null).map(Map.Entry::getKey).forEach(gapTime -> {
// we can trust the entries to be not-null here because we trust Kairosdb to mark the gaps with null. And a gap is exactly a point with a missing left and right side.
Map.Entry<ZonedDateTime, Double> previous = valuesMap.floorEntry(gapTime.minus(1L, ChronoUnit.MILLIS));
Map.Entry<ZonedDateTime, Double> next = valuesMap.ceilingEntry(gapTime.plus(1L, ChronoUnit.MILLIS));
overrideValues.put(gapTime,
Math.abs(next.getValue() + previous.getValue()) / 2);
});
overrideValues.forEach(valuesMap::put);
return valuesMap;
} catch (IOException ioex) {
logger.error("I/O-Exception while querying the Kairos database!", ioex);
return new TreeMap<>();
} catch (URISyntaxException e) {
logger.error("URISyntaxException while querying the Kairos database!", e);
return new TreeMap<>();
}
}
}