Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136773.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136773
summary: Return `ConstNullBlock` in `FromAggMetricDouble`
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -216,55 +216,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
};
bulkIndex(dataStreamName, sourceSupplier, 100);

// Rollover to ensure the index we will downsample is not the write index
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
String sourceIndex = backingIndices.get(0);
String secondIndex = backingIndices.get(1);
String interval = "5m";
String targetIndex = "downsample-" + interval + "-" + sourceIndex;
// Set the source index to read-only state
assertAcked(
indicesAdmin().prepareUpdateSettings(sourceIndex)
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
);

DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval));
assertAcked(
client().execute(
DownsampleAction.INSTANCE,
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig)
)
);

// Wait for downsampling to complete
SubscribableListener<Void> listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
final var indexMetadata = clusterState.metadata().getProject().index(targetIndex);
if (indexMetadata == null) {
return false;
}
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS;
});
safeAwait(listener);

assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);

// remove old backing index and replace with downsampled index and delete old so old is not queried
assertAcked(
client().execute(
ModifyDataStreamsAction.INSTANCE,
new ModifyDataStreamsAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
List.of(
DataStreamAction.removeBackingIndex(dataStreamName, sourceIndex),
DataStreamAction.addBackingIndex(dataStreamName, targetIndex)
)
)
).actionGet()
);
assertAcked(client().execute(TransportDeleteIndexAction.TYPE, new DeleteIndexRequest(sourceIndex)).actionGet());
String secondBackingIndex = rolloverAndDownsample(dataStreamName, 0, "5m");

// index to the next backing index; random time between 31 and 59m in the future to because default look_ahead_time is 30m and we
// don't want to conflict with the previous backing index
Expand Down Expand Up @@ -311,7 +263,99 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
)
);
}
testEsqlMetrics(dataStreamName, secondBackingIndex);
}

public void testPartialNullMetricsAfterDownsampling() throws Exception {
String dataStreamName = "metrics-foo";
Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build();
putTSDBIndexTemplate("my-template", List.of("metrics-foo"), settings, """
{
"properties": {
"host": {
"type": "keyword",
"time_series_dimension": true
},
"cluster" : {
"type": "keyword",
"time_series_dimension": true
},
"cpu": {
"type": "double",
"time_series_metric": "gauge"
},
"request": {
"type": "double",
"time_series_metric": "counter"
}
}
}
""", null, null);

// Create data stream by indexing documents with no values in numerics
final Instant now = Instant.now();
Supplier<XContentBuilder> sourceSupplier = () -> {
String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.minusSeconds(60 * 15).toEpochMilli());
try {
return XContentFactory.jsonBuilder()
.startObject()
.field("@timestamp", ts)
.field("host", randomFrom("host1", "host2", "host3"))
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
bulkIndex(dataStreamName, sourceSupplier, 100);
// And index documents with values
sourceSupplier = () -> {
String ts = randomDateForRange(now.minusSeconds(60 * 14).toEpochMilli(), now.plusSeconds(60 * 30).toEpochMilli());
try {
return XContentFactory.jsonBuilder()
.startObject()
.field("@timestamp", ts)
.field("host", randomFrom("host1", "host2", "host3"))
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
.field("cpu", randomDouble())
.field("request", randomDoubleBetween(0, 100, true))
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
bulkIndex(dataStreamName, sourceSupplier, 100);
String secondBackingIndex = rolloverAndDownsample(dataStreamName, 0, "5m");

Supplier<XContentBuilder> nextSourceSupplier = () -> {
String ts = randomDateForRange(now.plusSeconds(60 * 31).toEpochMilli(), now.plusSeconds(60 * 59).toEpochMilli());
try {
return XContentFactory.jsonBuilder()
.startObject()
.field("@timestamp", ts)
.field("host", randomFrom("host1", "host2", "host3"))
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
.field("cpu", randomDouble())
.field("request", randomDoubleBetween(0, 100, true))
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
bulkIndex(dataStreamName, nextSourceSupplier, 100);

// check that aggregate metric double is available
var response = clusterAdmin().nodesCapabilities(
new NodesCapabilitiesRequest().method(RestRequest.Method.POST)
.path("/_query")
.capabilities(AGGREGATE_METRIC_DOUBLE_V0.capabilityName())
).actionGet();
assumeTrue("Require aggregate_metric_double casting", response.isSupported().orElse(Boolean.FALSE));

testEsqlMetrics(dataStreamName, secondBackingIndex);
}

private void testEsqlMetrics(String dataStreamName, String nonDownsampledIndex) throws Exception {
// test _over_time commands with implicit casting of aggregate_metric_double
for (String outerCommand : List.of("min", "max", "sum", "count")) {
String expectedType = outerCommand.equals("count") ? "long" : "double";
Expand All @@ -338,7 +382,9 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
// TODO: add to counter tests below when support for counters is added
for (String innerCommand : List.of("first_over_time", "last_over_time")) {
String command = outerCommand + " (" + innerCommand + "(cpu))";
try (var resp = esqlCommand("TS " + secondIndex + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")) {
try (
var resp = esqlCommand("TS " + nonDownsampledIndex + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")
) {
var columns = resp.columns();
assertThat(columns, hasSize(3));
assertThat(
Expand Down Expand Up @@ -381,6 +427,60 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
}
}

private String rolloverAndDownsample(String dataStreamName, int timesDownsampledAlready, String interval) throws Exception {
// returns the name of the new backing index
// Rollover to ensure the index we will downsample is not the write index
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, timesDownsampledAlready + 2);
String sourceIndex = backingIndices.get(timesDownsampledAlready);
String secondIndex = backingIndices.get(timesDownsampledAlready + 1);
String targetIndex = "downsample-" + interval + "-" + sourceIndex;
// Set the source index to read-only state
assertAcked(
indicesAdmin().prepareUpdateSettings(sourceIndex)
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
);

DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval));
assertAcked(
client().execute(
DownsampleAction.INSTANCE,
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig)
)
);

// Wait for downsampling to complete
SubscribableListener<Void> listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
final var indexMetadata = clusterState.metadata().getProject().index(targetIndex);
if (indexMetadata == null) {
return false;
}
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS;
});
safeAwait(listener);

assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);

// remove old backing index and replace with downsampled index and delete old so old is not queried
assertAcked(
client().execute(
ModifyDataStreamsAction.INSTANCE,
new ModifyDataStreamsAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
List.of(
DataStreamAction.removeBackingIndex(dataStreamName, sourceIndex),
DataStreamAction.addBackingIndex(dataStreamName, targetIndex)
)
)
).actionGet()
);
assertAcked(client().execute(TransportDeleteIndexAction.TYPE, new DeleteIndexRequest(sourceIndex)).actionGet());

return secondIndex;
}

private EsqlQueryResponse esqlCommand(String command) throws IOException {
return client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(command)).actionGet(30, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ private record Evaluator(BlockFactory blockFactory, EvalOperator.ExpressionEvalu
@Override
public Block eval(Page page) {
Block block = eval.eval(page);
if (block.areAllValuesNull()) {
return block;
}
try {
if (block.areAllValuesNull()) {
return blockFactory.newConstantNullBlock(block.getPositionCount());
}
Block resultBlock = ((AggregateMetricDoubleBlock) block).getMetricBlock(subFieldIndex);
resultBlock.incRef();
return resultBlock;
Expand Down