Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/140237.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 140237
summary: Overall Decision for Deciders prioritizes THROTTLE
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.apache.logging.log4j.Level;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.After;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class NonDesiredBalanceIT extends ESIntegTestCase {

private static final Set<String> NOT_PREFERRED_AND_THROTTLED_NODES = Collections.newSetFromMap(new ConcurrentHashMap<>());

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(NotPreferredAndThrottledPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), ClusterModule.BALANCED_ALLOCATOR)
.build();
}
Comment on lines +51 to +56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since we are starting node manually for eadch test. I think we can merge the two test classes and start node in each test method with its own node settings. That seems less clutter to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah definitely some de-duplication due here. I'll add it to the follow-up


@After
public void clearThrottleAndNotPreferredNodes() {
NOT_PREFERRED_AND_THROTTLED_NODES.clear();
}

/**
* Tests that the non-desired balance allocator obeys the THROTTLE decision of a node.
*/
@TestLogging(
reason = "watch for can't move message",
value = "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE"
)
public void testThrottleVsNotPreferredPriorityInDecisions() {
final Settings settings = Settings.builder().build();
final var sourceNode = internalCluster().startNode(settings);
final var indexName = randomIdentifier();
createIndex(indexName, 1, 0);
ensureGreen(indexName);
final var targetNode = internalCluster().startNode(settings);
final var sourceNodeID = getNodeId(sourceNode);
final var targetNodeID = getNodeId(targetNode);

NOT_PREFERRED_AND_THROTTLED_NODES.add(targetNodeID);

var mapNodeIdsToNames = nodeIdsToNames();
final var sourceNodeName = mapNodeIdsToNames.get(sourceNodeID);
final var targetNodeName = mapNodeIdsToNames.get(targetNodeID);
assertNotNull(sourceNodeName);
assertNotNull(targetNodeName);

logger.info("--> Verifying the shard did not move because allocation to the target node (ID: " + targetNodeID + ") is throttled.");
MockLog.awaitLogger(() -> {
logger.info(
"--> Excluding shard assignment to node "
+ sourceNodeName
+ "(ID: "
+ sourceNodeID
+ ") to force its shard to move to node "
+ targetNodeName
+ "(ID: "
+ targetNodeID
+ ")"
);
updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", sourceNodeName));
},
BalancedShardsAllocator.class,
new MockLog.SeenEventExpectation(
"unable to relocate shard due to throttling",
BalancedShardsAllocator.class.getCanonicalName(),
Level.TRACE,
"[[*]][0] can't move: [MoveDecision{canMoveDecision=throttled, canRemainDecision=NO(), *}]"
)
);

logger.info("--> Clearing THROTTLE decider response and prodding the Reconciler (with a reroute request) to try again");
clearThrottleAndNotPreferredNodes();

safeGet(
client().execute(TransportClusterRerouteAction.TYPE, new ClusterRerouteRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
);

safeAwait(
ClusterServiceUtils.addMasterTemporaryStateListener(
state -> state.routingTable(ProjectId.DEFAULT)
.index(indexName)
.allShards()
.flatMap(IndexShardRoutingTable::allShards)
.allMatch(shardRouting -> shardRouting.currentNodeId().equals(targetNodeID) && shardRouting.started())
)
);
}

public static class NotPreferredAndThrottledPlugin extends Plugin implements ClusterPlugin {
@Override
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {

return List.of(new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return NOT_PREFERRED_AND_THROTTLED_NODES.contains(node.nodeId()) ? Decision.NOT_PREFERRED : Decision.YES;
}
}, new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// THROTTLING is not returned in simulation, so only THROTTLE for real moves in the Reconciler.
return (NOT_PREFERRED_AND_THROTTLED_NODES.contains(node.nodeId()) && allocation.isSimulating() == false)
? Decision.THROTTLE
: Decision.YES;
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.apache.logging.log4j.Level;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.After;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ReconcilerIT extends ESIntegTestCase {

private static final Set<String> NOT_PREFERRED_AND_THROTTLED_NODES = Collections.newSetFromMap(new ConcurrentHashMap<>());

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(NotPreferredAndThrottledPlugin.class);
}

@After
public void clearThrottleAndNotPreferredNodes() {
NOT_PREFERRED_AND_THROTTLED_NODES.clear();
}

/**
* Tests that the Reconciler obeys the THROTTLE decision of a node.
*/
@TestLogging(
reason = "track when Reconciler decisions",
value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:TRACE"
)
public void testThrottleVsNotPreferredPriorityInDecisions() {
final Settings settings = Settings.builder().build();
final var sourceNode = internalCluster().startNode(settings);
final var indexName = randomIdentifier();
createIndex(indexName, 1, 0);
ensureGreen(indexName);
final var targetNode = internalCluster().startNode(settings);
final var sourceNodeID = getNodeId(sourceNode);
final var targetNodeID = getNodeId(targetNode);

NOT_PREFERRED_AND_THROTTLED_NODES.add(targetNodeID);

var mapNodeIdsToNames = nodeIdsToNames();
final var sourceNodeName = mapNodeIdsToNames.get(sourceNodeID);
final var targetNodeName = mapNodeIdsToNames.get(targetNodeID);
assertNotNull(sourceNodeName);
assertNotNull(targetNodeName);

logger.info("--> Verifying the shard did not move because allocation to the target node (ID: " + targetNodeID + ") is throttled.");
MockLog.awaitLogger(() -> {
logger.info(
"--> Excluding shard assignment to node "
+ sourceNodeName
+ "(ID: "
+ sourceNodeID
+ ") to force its shard to move to node "
+ targetNodeName
+ "(ID: "
+ targetNodeID
+ ")"
);
updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", sourceNodeName));
},
DesiredBalanceReconciler.class,
new MockLog.SeenEventExpectation(
"unable to relocate shard that can no longer remain",
DesiredBalanceReconciler.class.getCanonicalName(),
Level.TRACE,
"Cannot move shard * and cannot remain because of [NO()]"
)
);

logger.info("--> Clearing THROTTLE decider response and prodding the Reconciler (with a reroute request) to try again");
clearThrottleAndNotPreferredNodes();

safeGet(
client().execute(TransportClusterRerouteAction.TYPE, new ClusterRerouteRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT))
);

safeAwait(
ClusterServiceUtils.addMasterTemporaryStateListener(
state -> state.routingTable(ProjectId.DEFAULT)
.index(indexName)
.allShards()
.flatMap(IndexShardRoutingTable::allShards)
.allMatch(shardRouting -> shardRouting.currentNodeId().equals(targetNodeID) && shardRouting.started())
)
);
}

public static class NotPreferredAndThrottledPlugin extends Plugin implements ClusterPlugin {
@Override
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {

return List.of(new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return NOT_PREFERRED_AND_THROTTLED_NODES.contains(node.nodeId()) ? Decision.NOT_PREFERRED : Decision.YES;
}
}, new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// THROTTLING is not returned in simulation, so only THROTTLE for real moves in the Reconciler.
return (NOT_PREFERRED_AND_THROTTLED_NODES.contains(node.nodeId()) && allocation.isSimulating() == false)
? Decision.THROTTLE
: Decision.YES;
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,12 @@ private MoveDecision explainRebalanceDecision(final ProjectIndex index, final Sh
// if the simulated weight delta with the shard moved away is better than the weight delta
// with the shard remaining on the current node, and we are allowed to allocate to the
// node in question, then allow the rebalance
if (rebalanceConditionsMet && canAllocate.type().higherThan(bestRebalanceCanAllocateDecisionType)) {
if (rebalanceConditionsMet && canAllocate.type().compareToBetweenNodes(bestRebalanceCanAllocateDecisionType) > 0) {
// Overwrite the best decision since it is better than the last. This means that YES/THROTTLE decisions will replace
// NOT_PREFERRED/NO decisions, and a YES decision will replace a THROTTLE decision. NOT_PREFERRED will also replace
// NO, even if neither are acted upon for rebalancing, for allocation explain purposes.
bestRebalanceCanAllocateDecisionType = canAllocate.type();
if (canAllocate.type().higherThan(Type.NOT_PREFERRED)) {
if (canAllocate.type().compareToBetweenNodes(Type.NOT_PREFERRED) > 0) {
// Movement is only allowed to THROTTLE/YES nodes. NOT_PREFERRED is the same as no for rebalancing, since
// rebalancing aims to distribute resource usage and NOT_PREFERRED means the move could cause hot-spots.
targetNode = node;
Expand Down Expand Up @@ -1053,7 +1053,7 @@ private MoveDecision decideMove(
// Relocating a shard from one NOT_PREFERRED node to another NOT_PREFERRED node would not improve the situation.
continue;
}
if (allocationDecision.type().higherThan(bestDecision)) {
if (allocationDecision.type().compareToBetweenNodes(bestDecision) > 0) {
bestDecision = allocationDecision.type();
if (bestDecision == Type.YES) {
targetNode = target;
Expand Down Expand Up @@ -1570,12 +1570,16 @@ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, ProjectIn
continue;
}

final Decision.Type canAllocateOrRebalance = Decision.Type.min(allocationDecision.type(), rebalanceDecision.type());
assert rebalanceDecision.type() == Type.YES || rebalanceDecision.type() == Type.THROTTLE
: "We should only see YES/THROTTLE decisions here";
assert allocationDecision.type() == Type.YES || allocationDecision.type() == Type.THROTTLE
: "We should only see YES/THROTTLE decisions here";
final Decision.Type canAllocateOrRebalance = allocationDecision.type() == Type.THROTTLE
|| rebalanceDecision.type() == Type.THROTTLE ? Type.THROTTLE : Type.YES;
Comment on lines +1577 to +1578
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we capture this as a method on Type? Or should this use compareToBetweenDecisions since it's technically still decision aggreation for a single node?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to use compareToBetweenDecisions because it sort-of implies that the order used by that method is significant, here we are dealing only with THROTTLE and YES so I think it's good to make it clear that it only cares about that.

I think we could add it to Type but perhaps as a subsequent PR to prevent holding things up any further.


maxNode.removeShard(projectIndex(shard), shard);
long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);

assert canAllocateOrRebalance == Type.YES || canAllocateOrRebalance == Type.THROTTLE : canAllocateOrRebalance;
logger.debug(
"decision [{}]: relocate [{}] from [{}] to [{}]",
canAllocateOrRebalance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,14 @@ private void moveShards() {
iterator.dePrioritizeNode(shardRouting.currentNodeId());
moveOrdering.recordAllocation(shardRouting.currentNodeId());
movedUndesiredShard = true;
} else {
logger.trace(
"Cannot move shard [{}][{}] away from {}, and cannot remain because of [{}]",
shardRouting.index(),
shardRouting.shardId(),
shardRouting.currentNodeId(),
canRemainDecision
);
}
} finally {
if (movedUndesiredShard) {
Expand Down
Loading