Skip to content

Add reshardSplitShardCountSummary field to ShardSearchRequest and friends#135804

Merged
bcully merged 14 commits intoelastic:mainfrom
lkts:resharding_checksum_search
Oct 21, 2025
Merged

Add reshardSplitShardCountSummary field to ShardSearchRequest and friends#135804
bcully merged 14 commits intoelastic:mainfrom
lkts:resharding_checksum_search

Conversation

@lkts
Copy link
Contributor

@lkts lkts commented Oct 1, 2025

A reshardSplitShardCountSummary is a field special to resharding functionality. The final goal is for reshardSplitShardCountSummary to be used inside Engine#wrapDirectoryReader via IndexShard#acquireSearcher(Supplier) since it is needed for resharding reader wrapper to work. This PR makes so this value is available to the immediate callers of acquireSearcher(Supplier) like f.e. SearchService#createOrGetReaderContext().

Note that this PR does not change all constructors of ShardSearchRequest in order to keep the size down. Some usages (like ESQL) also need special logic to calculate the value before passing it to ShardSearchRequest. This will be done in the follow up. Currently a default value of 0 is used.

searchShardsGroup.preFiltered(),
searchShardsGroup.skipped()
searchShardsGroup.skipped(),
0 // TODO
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is CCS code path, will remain stubbed with a better comment.

false,
false
false,
0 // TODO
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is CCS code path, will remain stubbed with a better comment.

false,
false
false,
0 // TODO
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for PIT, we need a custom calculation here but it's doable.

@lkts lkts force-pushed the resharding_checksum_search branch from 73d23a4 to 2016bfd Compare October 3, 2025 17:00
@lkts
Copy link
Contributor Author

lkts commented Oct 3, 2025

Test failure is a problem fixed by #135873.

@lkts lkts added >non-issue :Distributed/Distributed A catch all label for anything in the Distributed Area. Please avoid if you can. :Search Foundations/Search Catch all for Search Foundations labels Oct 3, 2025
@lkts lkts marked this pull request as ready for review October 3, 2025 18:19
@elasticsearchmachine elasticsearchmachine added Team:Search Foundations Meta label for the Search Foundations team in Elasticsearch Team:Distributed Indexing (obsolete) Meta label for Distributed Indexing team. Obsolete. Please do not use. labels Oct 3, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-search-foundations (Team:Search Foundations)

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing)

@lkts lkts requested a review from javanna October 3, 2025 18:19
@lkts lkts force-pushed the resharding_checksum_search branch from 29e8862 to 20a7df7 Compare October 7, 2025 16:54
@javanna javanna requested a review from andreidan October 13, 2025 07:46
@javanna
Copy link
Contributor

javanna commented Oct 13, 2025

Hey @lkts ,

Test failure is a problem fixed by #135873.

That problem was specific to how NodeQueryResponse is serialized back, which caused problems with proxy connections and versioning. It took me a bit to connect the dots with your change. I think you had issues although not modifying the response directly, because ShardSearchRequest is serialized back with the SearchPhaseResult in certain scenarios?

I asked @andreidan to take a look. This will require some catch-up. I don't have enough context to review this change so far. I lack context on the proposed flow, why shard search request needs to be changed and how this is going to be used in the future.

@andreidan
Copy link
Contributor

Thanks folks, I think I'm caught up on the general plan (and some existing code) and have started reviewing this. Will post an initial set of thoughts tomorrow (thanks for bearing with me)

Copy link
Contributor

@andreidan andreidan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this Sasha

I think this looks pretty good as being mostly glue code so we have the ability to pass on the resharding summary count to the engine when acquiring a new searcher.

For posterity, this is needed as we'll want a custom LeafReader (similar to DocumentSubsetReader).

I've left some minor questions (the one that I'd like some more information on is around PITs)

* Used for ccs_minimize_roundtrips=false.
*/
static List<SearchShardIterator> getRemoteShardsIterator(
static List<org.elasticsearch.action.search.SearchShardIterator> getRemoteShardsIterator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is the fully qualified name needed here? The SearchShardIterator already exists in org.elasticsearch.action.search and the imports list hasn't changed w.r.t. SearchShardIterator

Copy link
Contributor

@bcully bcully Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this was just an IDE fixup. I'll fix this.

Done in 8a221d4

* Shard search request sent to source shard 1 has the "reshardSplitShardCountSummary" of 8
* since the corresponding target shard 5 is in SPLIT state.
* When a shard search request is executed on the source shard 1, "reshardSplitShardCountSummary" value
* is checked and documents that will be returned by target shard 5 are excluded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also mention that shard 5 will serve search requests with a filter to only include the documents associated with shard 5?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment here. Since shard 5 is newly created, it doesn't need a summary field in the request to tell it to filter - it will always filter out unowned docs (docs that don't route to 5), until the point where the unowned documents are actually deleted and the filter can be removed.

// This parameter is specific to the resharding feature.
// It is used when creating a searcher to apply filtering needed to have correct search results
// while resharding is in progress.
// In context of PIT the searcher is reused or can be recreated only in read-only scenarios.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that PITs will soon be transferred from one node to another for search resiliency purposes. Is that something that'll impact resharding? Sounds like it might, but it's not clear to me yet how resharding is meant to work with PIT.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know PIT, but my understanding was that PITs would hold onto the readers that they opened when they were created, and subsequent queries would find and reuse the existing readers. If the readers are opened with the correct filtering in place because we've passed down the resharding shard count summary, then hopefully everything just works.

But in addition to not really knowing how PIT works, I also don't really know how PIT migration is going to work. I think the new node would need to have a reference to the same reader as the old one but I don't understand how that works. If another reader is getting opened we probably need to thread the shard count summary to where that's happening.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Brendan.

Upon relocation the PIT will attempt to create a new searcher so I think we'll have to transfer (or recompute on the new node?) the resharding information? https://github.com/elastic/elasticsearch-serverless/pull/4315/files#diff-c0b5acc53dde6eee7ead1f3f35fe05608c01c1ce66d663ae9bfccba20f1f6049R1143
(i.e. the searcher will be recreated in non-read-only scenarios)

But say we transfer the resharding information to the new node when we hand off the pit, it'll be needed to be passed here for the correct shard routing to work?
I'm a bit confused as to how this would work as a PIT can have an extended life (say 24 hours) and the resharding is something transient (so even if we have the resharding summary to pass here, we'll have to update it once resharding completes ? or is it that all open pits will block the resharding operation? - I think it's the latter)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what you're saying, I think that when a PIT is transferred to a new node, it must open a new reader based on a specific potentially older commit, rather than the current latest in the shard (and that we have machinery to preserve a reference to that commit)? Otherwise I don't think I understand how the results would remain consistent. I didn't realize we had a way to open an older commit though.

If that's the case, then we would have to do something to open the new reader with the same filtering configuration as the original reader. I'm not sure the shard summary field we're using now quite works for that, because filtering is determined by comparing the summary field against the current routing table when the reader is opened, and it sounds like the current routing table could change if we're reopening the reader at a later date. We might instead want to directly encode the filtering decision we made into the PIT.

I don't think the lifecycle of the resharding operation interacts with PIT though - neither should block the other. The summary field is calculated when the PIT is created to tell an original shard whether the PIT opener is including a newly created shard in its queries. Walking through the simplest case, splitting from 1 to 2 shards, the cases are:

1: the new shard isn't ready when the PIT is opened. The opener sends a summary of 1 and the search shard also computes 1, so it does no filtering and will return all docs present in the commit at the time the reader is opened. Later the split will complete and there will be 2 shards responsible for half the docs each, but this doesn't matter for the PIT: queries will always continue to go only to the original shard and the original shard should not filter. Since it's using a reader based on a commit from before the split completed, all the documents should be present.

2: the new shard isn't ready at the coordinator when it starts creating the PIT, but by the time the search shard opens the reader, the new shard is ready. In that case, the summary provided in the request to open the reader won't match what the search shard itself computes, so it will again not filter out documents belonging to the new shard. The new shard's documents will still be present because we don't delete them until we can be sure that all coordinators know about the new shards (through an acked cluster state update IIRC). Assuming the PIT holds onto the commit used by this reader throughout its lifetime, the PIT can keep sending requests to the original shard and getting all the docs even though split has completed since then.

3: the new shard is ready, and seen by both the coordinator creating the PIT and the original shard. In that case the summary will show that the coordinator is using 2 shards, and so the original shard will install search filters to filter out documents belonging to the new shard, and the new shard will be filtering out documents belonging to the old shard. I think we do have some missing machinery on the new shard's side to figure out whether it should filter or not - right now it filters until the resharding process deletes the old shard's docs on the new shard, producing a new commit. If we always only open readers against the latest commit this is fine, but it seems like for PIT we must be able to open against an older commit, in which case we'd need to install filtering if the older commit still contained the original shard's documents. But assuming we can plumb that information through (again, probably by recording the actual filtering decision in the PIT rather than the shard summary, so symmetric with the source shard?) then it is still fine for resharding to complete and won't affect results produced by the PIT.

I think I'd like to better understand how during PIT relocation we manage opening a new reader that matches the commit from the original shard though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at that draft PR for relocation and I do see that there's a new acquireSearcherForCommit method. I think we will probably have to plumb the filtering decision across to that function similarly to how we're plumbing the commit name. Right now the design is that the coordinator supplies a shard count summary and then the reader compares that to the routing table to make a filtering decision, then installs (or doesn't) a filter on the reader accordingly. Seems like we'd have to add some machinery to read the filtering decision from the current reader and send it across.

Copy link
Contributor

@andreidan andreidan Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the details here, really useful to progress this.

I think we will probably have to plumb the filtering decision across to that function similarly to how we're plumbing the commit name

When you say "filtering decision" is this the same as routing?
I notice we're extending theShardIterator which holds the routing information in this PR with SearchShardRouting to only add the resharding count summary (so my assumption was that the resharding count summary is all we need to figure out how to route the requests to the correct shards - perhaps, later, to be used here).
What new information do we need? Can you please expand that and perhaps capture it in code in a comment?

It's a bit unclear to me at the moment how the routing can be transferred (and why can't we compute it on the new node - every node would know about the shard allocations and can build the local routing table).

it sounds like the current routing table could change if we're reopening the reader at a later date

Related to the above, routing information can potentially change even if we pass a "snapshot" of it at handoff time I think? Maybe we can first think of what this information is before zooming in more here but I do wonder if it becomes "easier" (oh boy :) ) if any new shard, upon relocation/recover, is able to figure out the state of the routing table given the resharding summary?

I don't think the lifecycle of the resharding operation interacts with PIT though - neither should block the other.

You're right ! Sorry for the confusion here. I thought for a moment an open reader (pit) would block the delete-by-query step but I don't think that's the case.
If a PIT holds a reader open on commit N:

  • Delete-by-query on the source creates commit N+1 with deletion markers
  • Commit N's segments (without deletions) remain accessible to the PIT
  • Commit N+1's segments (same files, but with .liv files showing deletions) are used for new queries
  • The same underlying segment files are shared between both commits
    The only downside for long lived PITs is that we'll have duplicated data kept in the source shard for longer (i.e. more used disk)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should be more careful about terminology here, sorry. When I say routing, I really should just be saying shard count. The actual node that's hosting a shard shouldn't affect resharding.

Ok, so the fundamental reason for passing around the shard count summary is because there's a time window between when the coordinator chooses the set of shards to query (or include in a PIT) and when the query actually executes on the relevant shards. If resharding is happening, then it's possible that within that time window a new search shard can become active that wasn't included in the coordinator's request. The search shard summary allows the search shard, when it is actually opening a reader, to figure out whether the coordinator is also querying the new shard or not. If everything were completely synchronized, and the coordinator were guaranteed to include new search shards in its request the instant they became ready, then the original search shards would always filter out documents that belonged to those new shards as soon as the new shards became ready. But that's not possible (at least, not without some very expensive cluster-wide synchronization) so the original search shards can expect that for some time after new shards become ready, they will receive some requests that do not yet include the new shards, and for those they should not filter.

So far I'm just restating things. But you asked if the filtering decision is the same as routing. It's not. The shard routing summary calculated by the coordinator when it selects search shards is input to the filtering decision, but the decision is made based on comparing the shard's view of active shards with what the coordinator provided. And right now that comparison is based on the shard's view at the moment the reader is opened.

For PIT relocation this comparison changes if the shard's view of the routing table at the moment the reader is re-opened after relocation does not match the view it had when it opened the first reader. It may may make a different decision about whether or not to filter out documents that used to belong to it but now belong to a new shard.

For example, if a coordinator opens a PIT before new search shards become active, and the original search shards also open readers before the new search shards become active, then the readers will not install search filters, because the original search shards still "own" all the documents. Now let's say that after those readers are opened, new search shards become active. Now if a PIT is relocated from one of the original shards, the relocated shard will reopen a reader and when it does it will see that the new shards are active. This time, it will see a mismatch between the shard count summary provided when the PIT was created and the actual number of active search shards.

So I was thinking that to avoid making a different filtering decision when reopening the reader, we'd have to carry along additional information, either the actual decision made or the view of search active shards seen by the search shard when the reader was first opened. But I'm realizing now that although the comparison can be different when the reader is reopened, I don't think it results in a different filtering decision. If there's a mismatch between the shard's view when it opens the reader and the PIT's view when the set of shards are chosen, that causes the shard to not filter. In the example above that's the correct action. I think that's the only way that the calculation can change, so we would get away with just providing the PIT's shard count summary when reopening the reader. That's good news!

This might not hold up if we introduce shrink in the future, but that's still far away and we expect that the shard count summary would have to be changed into something richer everywhere to handle that anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Brendan, I think we're on the same page here. Also, kudos for the documentation on SplitShardCountSummary - revisiting that really helped. Thanks for bearing with me.

I've opened ES-13264 to keep track of this. Is there any code comment we need to add to make sure this is not left behind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Andrei, it looks like you've attached that ticket to the right milestone, so we should be able to track it and make sure that we don't close this out without addressing it. I think it remains parked until relocatable PITs land, but we won't lose it.

I've also taken another pass at the javadoc for SplitShardCountSummary to incorporate what's come up in the discussion here. Hopefully it's a little clearer now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the javadoc, it looks great !

@bcully
Copy link
Contributor

bcully commented Oct 16, 2025

Sasha is out for the rest of the month so I'm going to take over this PR while he's away.


package org.elasticsearch.cluster.routing;

public record SearchShardRouting(ShardIterator iterator, SplitShardCountSummary reshardSplitShardCountSummary)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment describing the purpose of this class?

}

private static void collectTargetShardsWithRouting(
private static Set<SearchTargetShard> collectTargetShardsWithRouting(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

// This parameter is specific to the resharding feature.
// It is used when creating a searcher to apply filtering needed to have correct search results
// while resharding is in progress.
// In context of PIT the searcher is reused or can be recreated only in read-only scenarios.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know PIT, but my understanding was that PITs would hold onto the readers that they opened when they were created, and subsequent queries would find and reuse the existing readers. If the readers are opened with the correct filtering in place because we've passed down the resharding shard count summary, then hopefully everything just works.

But in addition to not really knowing how PIT works, I also don't really know how PIT migration is going to work. I think the new node would need to have a reference to the same reader as the old one but I don't understand how that works. If another reader is getting opened we probably need to thread the shard count summary to where that's happening.

* Used for ccs_minimize_roundtrips=false.
*/
static List<SearchShardIterator> getRemoteShardsIterator(
static List<org.elasticsearch.action.search.SearchShardIterator> getRemoteShardsIterator(
Copy link
Contributor

@bcully bcully Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this was just an IDE fixup. I'll fix this.

Done in 8a221d4

* Shard search request sent to source shard 1 has the "reshardSplitShardCountSummary" of 8
* since the corresponding target shard 5 is in SPLIT state.
* When a shard search request is executed on the source shard 1, "reshardSplitShardCountSummary" value
* is checked and documents that will be returned by target shard 5 are excluded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment here. Since shard 5 is newly created, it doesn't need a summary field in the request to tell it to filter - it will always filter out unowned docs (docs that don't route to 5), until the point where the unowned documents are actually deleted and the filter can be removed.

@bcully bcully self-assigned this Oct 16, 2025
Copy link
Contributor

@andreidan andreidan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for iterating on this Brendan

// This parameter is specific to the resharding feature.
// It is used when creating a searcher to apply filtering needed to have correct search results
// while resharding is in progress.
// In context of PIT the searcher is reused or can be recreated only in read-only scenarios.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the javadoc, it looks great !

@bcully
Copy link
Contributor

bcully commented Oct 21, 2025

LGTM, thanks for iterating on this Brendan

Thanks for the review!

@bcully bcully merged commit 70673eb into elastic:main Oct 21, 2025
34 checks passed
fzowl pushed a commit to voyage-ai/elasticsearch that referenced this pull request Nov 3, 2025
…ends (elastic#135804)

Co-authored-by: Brendan Cully <brendan.cully@elastic.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Distributed A catch all label for anything in the Distributed Area. Please avoid if you can. >non-issue :Search Foundations/Search Catch all for Search Foundations Team:Distributed Indexing (obsolete) Meta label for Distributed Indexing team. Obsolete. Please do not use. Team:Search Foundations Meta label for the Search Foundations team in Elasticsearch v9.3.0

5 participants