Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/138465.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138465
summary: "Allocation: add duration and count metrics for write load hotspot"
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.telemetry.metric.DoubleHistogram;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand All @@ -35,25 +42,51 @@
* {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds.
*/
public class WriteLoadConstraintMonitor {
public static final String HOTSPOT_NODES_COUNT_METRIC_NAME = "es.allocator.allocations.node.write_load_hotspot.current";
public static final String HOTSPOT_DURATION_METRIC_NAME = "es.allocator.allocations.node.write_load_hotspot.duration.histogram";

private static final Logger logger = LogManager.getLogger(WriteLoadConstraintMonitor.class);
private static final int MAX_NODE_IDS_IN_MESSAGE = 3;
private final WriteLoadConstraintSettings writeLoadConstraintSettings;
private final Supplier<ClusterState> clusterStateSupplier;
private final LongSupplier currentTimeMillisSupplier;
private final RerouteService rerouteService;
private volatile long lastRerouteTimeMillis = 0;
private volatile Set<String> lastSetOfHotSpottedNodes = Set.of();
private final Map<String, Long> hotspotNodeStartTimes = new HashMap<>();
private long hotspotNodeStartTimesLastTerm = -1L;

public WriteLoadConstraintMonitor(
private final AtomicLong hotspotNodesCount = new AtomicLong(-1L); // metrics source of hotspotting node count
private final DoubleHistogram hotspotDurationHistogram;

protected WriteLoadConstraintMonitor(
ClusterSettings clusterSettings,
LongSupplier currentTimeMillisSupplier,
Supplier<ClusterState> clusterStateSupplier,
RerouteService rerouteService
) {
// default of NOOP for tests
this(clusterSettings, currentTimeMillisSupplier, clusterStateSupplier, rerouteService, MeterRegistry.NOOP);
}

public WriteLoadConstraintMonitor(
ClusterSettings clusterSettings,
LongSupplier currentTimeMillisSupplier,
Supplier<ClusterState> clusterStateSupplier,
RerouteService rerouteService,
MeterRegistry meterRegistry
) {
this.writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings);
this.clusterStateSupplier = clusterStateSupplier;
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
this.rerouteService = rerouteService;

meterRegistry.registerLongsGauge(
HOTSPOT_NODES_COUNT_METRIC_NAME,
"Total number of nodes hotspotting with write loads",
"unit",
this::getHotspotNodesCount
);
hotspotDurationHistogram = meterRegistry.registerDoubleHistogram(HOTSPOT_DURATION_METRIC_NAME, "hotspot duration", "s");
}

/**
Expand Down Expand Up @@ -99,6 +132,9 @@ public void onNewInfo(ClusterInfo clusterInfo) {
}
}

final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
Set<String> lastHotspotNodes = recordHotspotDurations(state, writeNodeIdsExceedingQueueLatencyThreshold, currentTimeMillis);

if (writeNodeIdsExceedingQueueLatencyThreshold.isEmpty()) {
logger.trace("No hot-spotting write nodes detected");
return;
Expand All @@ -110,15 +146,14 @@ public void onNewInfo(ClusterInfo clusterInfo) {
return;
}

final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long timeSinceLastRerouteMillis = currentTimeMillis - lastRerouteTimeMillis;
final boolean haveCalledRerouteRecently = timeSinceLastRerouteMillis < writeLoadConstraintSettings.getMinimumRerouteInterval()
.millis();

// We know that there is at least one hot-spotting node if we've reached this code. Now check whether there are any new hot-spots
// or hot-spots that are persisting and need further balancing work.
if (haveCalledRerouteRecently == false
|| Sets.difference(writeNodeIdsExceedingQueueLatencyThreshold, lastSetOfHotSpottedNodes).isEmpty() == false) {
Set<String> newHotspotNodes = Sets.difference(writeNodeIdsExceedingQueueLatencyThreshold, lastHotspotNodes);
if (haveCalledRerouteRecently == false || newHotspotNodes.isEmpty() == false) {
if (logger.isDebugEnabled()) {
logger.debug(
"""
Expand All @@ -130,7 +165,7 @@ public void onNewInfo(ClusterInfo clusterInfo) {
lastRerouteTimeMillis == 0
? "has never previously been called"
: "was last called [" + TimeValue.timeValueMillis(timeSinceLastRerouteMillis) + "] ago",
nodeSummary(lastSetOfHotSpottedNodes),
nodeSummary(lastHotspotNodes),
writeLoadConstraintSettings.getQueueLatencyThreshold()
);
}
Expand All @@ -144,7 +179,8 @@ public void onNewInfo(ClusterInfo clusterInfo) {
)
);
lastRerouteTimeMillis = currentTimeMillisSupplier.getAsLong();
lastSetOfHotSpottedNodes = writeNodeIdsExceedingQueueLatencyThreshold;

recordHotspotStartTimes(newHotspotNodes, currentTimeMillis);
} else {
logger.debug(
"Not calling reroute because we called reroute [{}] ago and there are no new hot spots",
Expand All @@ -153,6 +189,44 @@ public void onNewInfo(ClusterInfo clusterInfo) {
}
}

private void recordHotspotStartTimes(Set<String> nodeIds, long startTimestamp) {
for (String nodeId : nodeIds) {
hotspotNodeStartTimes.put(nodeId, startTimestamp);
}
hotspotNodesCount.set(hotspotNodeStartTimes.size());
}

private Set<String> recordHotspotDurations(ClusterState state, Set<String> currentHotspotNodes, long hotspotEndTime) {
// reset hotspotNodeStartTimes if the term has changed
if (state.term() != hotspotNodeStartTimesLastTerm || state.nodes().isLocalNodeElectedMaster() == false) {
hotspotNodeStartTimesLastTerm = state.term();
hotspotNodeStartTimes.clear();
}

Set<String> lastHotspotNodes = hotspotNodeStartTimes.keySet();
Set<String> staleHotspotNodes = Sets.difference(lastHotspotNodes, currentHotspotNodes);

for (String nodeId : staleHotspotNodes) {
assert hotspotNodeStartTimes.containsKey(nodeId) : "Map should contain key from its own subset";
long hotspotStartTime = hotspotNodeStartTimes.remove(nodeId);
long hotspotDuration = hotspotEndTime - hotspotStartTime;
assert hotspotDuration >= 0 : "hotspot duration should always be non-negative";
hotspotDurationHistogram.record(hotspotDuration / 1000.0);
}
hotspotNodesCount.set(hotspotNodeStartTimes.size());

return lastHotspotNodes;
}

private List<LongWithAttributes> getHotspotNodesCount() {
long hotspotCount = hotspotNodesCount.getAndSet(-1L);
if (hotspotCount >= 0) {
return List.of(new LongWithAttributes(hotspotCount));
} else {
return List.of();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to return an empty list here if we're not the master, otherwise we'll get a bunch of bogus values each metric interval from the non-master nodes.

Perhaps getHotSpotNodesCount could switch it to -1 and then we could only report if it's been switched back to an actual value (the InternalClusterInfoService stops refreshing the cluster info when the node is no longer master so this approach would naturally only report metrics from the master).

Or any other approach you think is best.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to make this a cluster state listener, we will stop receiving new ClusterInfo when we are no longer master.

If we make this listen to the cluster state, it adds more dependencies and will add a tiny additional cost to the cluster state applier thread (negligible but we should try and do that as little as possible).

I think I prefer my suggestion to only return a hot-spot node count if we've calculated one since last time we were polled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would also be presumably simpler to test (just confirm we only return the count if onNewInfo has been called since last time we asked)


private static String nodeSummary(Set<String> nodeIds) {
if (nodeIds.isEmpty() == false && nodeIds.size() <= MAX_NODE_IDS_IN_MESSAGE) {
return "[" + String.join(", ", nodeIds) + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,8 @@ private void construct(
clusterService.getClusterSettings(),
threadPool.relativeTimeInMillisSupplier(),
clusterService::state,
rerouteService
rerouteService,
telemetryProvider.getMeterRegistry()
)::onNewInfo
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
Expand Down Expand Up @@ -113,7 +114,8 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
new RerouteService() {
@Override
public void reroute(String reason, Priority priority, ActionListener<Void> listener) {}
}
},
MeterRegistry.NOOP
)
);
clusterInfoService.addListener(usageMonitor::onNewInfo);
Expand Down
Loading