Add reshardSplitShardCountSummary field to ShardSearchRequest and friends#135804
Add reshardSplitShardCountSummary field to ShardSearchRequest and friends#135804bcully merged 14 commits intoelastic:mainfrom
Conversation
| searchShardsGroup.preFiltered(), | ||
| searchShardsGroup.skipped() | ||
| searchShardsGroup.skipped(), | ||
| 0 // TODO |
There was a problem hiding this comment.
This is CCS code path, will remain stubbed with a better comment.
| false, | ||
| false | ||
| false, | ||
| 0 // TODO |
There was a problem hiding this comment.
This is CCS code path, will remain stubbed with a better comment.
| false, | ||
| false | ||
| false, | ||
| 0 // TODO |
There was a problem hiding this comment.
This is for PIT, we need a custom calculation here but it's doable.
73d23a4 to
2016bfd
Compare
|
Test failure is a problem fixed by #135873. |
|
Pinging @elastic/es-search-foundations (Team:Search Foundations) |
|
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
29e8862 to
20a7df7
Compare
|
Hey @lkts ,
That problem was specific to how I asked @andreidan to take a look. This will require some catch-up. I don't have enough context to review this change so far. I lack context on the proposed flow, why shard search request needs to be changed and how this is going to be used in the future. |
|
Thanks folks, I think I'm caught up on the general plan (and some existing code) and have started reviewing this. Will post an initial set of thoughts tomorrow (thanks for bearing with me) |
andreidan
left a comment
There was a problem hiding this comment.
Thanks for working on this Sasha
I think this looks pretty good as being mostly glue code so we have the ability to pass on the resharding summary count to the engine when acquiring a new searcher.
For posterity, this is needed as we'll want a custom LeafReader (similar to DocumentSubsetReader).
I've left some minor questions (the one that I'd like some more information on is around PITs)
| * Used for ccs_minimize_roundtrips=false. | ||
| */ | ||
| static List<SearchShardIterator> getRemoteShardsIterator( | ||
| static List<org.elasticsearch.action.search.SearchShardIterator> getRemoteShardsIterator( |
There was a problem hiding this comment.
nit: is the fully qualified name needed here? The SearchShardIterator already exists in org.elasticsearch.action.search and the imports list hasn't changed w.r.t. SearchShardIterator
There was a problem hiding this comment.
I assume this was just an IDE fixup. I'll fix this.
Done in 8a221d4
| * Shard search request sent to source shard 1 has the "reshardSplitShardCountSummary" of 8 | ||
| * since the corresponding target shard 5 is in SPLIT state. | ||
| * When a shard search request is executed on the source shard 1, "reshardSplitShardCountSummary" value | ||
| * is checked and documents that will be returned by target shard 5 are excluded |
There was a problem hiding this comment.
Should we also mention that shard 5 will serve search requests with a filter to only include the documents associated with shard 5?
There was a problem hiding this comment.
I'll add a comment here. Since shard 5 is newly created, it doesn't need a summary field in the request to tell it to filter - it will always filter out unowned docs (docs that don't route to 5), until the point where the unowned documents are actually deleted and the filter can be removed.
| // This parameter is specific to the resharding feature. | ||
| // It is used when creating a searcher to apply filtering needed to have correct search results | ||
| // while resharding is in progress. | ||
| // In context of PIT the searcher is reused or can be recreated only in read-only scenarios. |
There was a problem hiding this comment.
note that PITs will soon be transferred from one node to another for search resiliency purposes. Is that something that'll impact resharding? Sounds like it might, but it's not clear to me yet how resharding is meant to work with PIT.
There was a problem hiding this comment.
I don't know PIT, but my understanding was that PITs would hold onto the readers that they opened when they were created, and subsequent queries would find and reuse the existing readers. If the readers are opened with the correct filtering in place because we've passed down the resharding shard count summary, then hopefully everything just works.
But in addition to not really knowing how PIT works, I also don't really know how PIT migration is going to work. I think the new node would need to have a reference to the same reader as the old one but I don't understand how that works. If another reader is getting opened we probably need to thread the shard count summary to where that's happening.
There was a problem hiding this comment.
Thanks Brendan.
Upon relocation the PIT will attempt to create a new searcher so I think we'll have to transfer (or recompute on the new node?) the resharding information? https://github.com/elastic/elasticsearch-serverless/pull/4315/files#diff-c0b5acc53dde6eee7ead1f3f35fe05608c01c1ce66d663ae9bfccba20f1f6049R1143
(i.e. the searcher will be recreated in non-read-only scenarios)
But say we transfer the resharding information to the new node when we hand off the pit, it'll be needed to be passed here for the correct shard routing to work?
I'm a bit confused as to how this would work as a PIT can have an extended life (say 24 hours) and the resharding is something transient (so even if we have the resharding summary to pass here, we'll have to update it once resharding completes ? or is it that all open pits will block the resharding operation? - I think it's the latter)
There was a problem hiding this comment.
From what you're saying, I think that when a PIT is transferred to a new node, it must open a new reader based on a specific potentially older commit, rather than the current latest in the shard (and that we have machinery to preserve a reference to that commit)? Otherwise I don't think I understand how the results would remain consistent. I didn't realize we had a way to open an older commit though.
If that's the case, then we would have to do something to open the new reader with the same filtering configuration as the original reader. I'm not sure the shard summary field we're using now quite works for that, because filtering is determined by comparing the summary field against the current routing table when the reader is opened, and it sounds like the current routing table could change if we're reopening the reader at a later date. We might instead want to directly encode the filtering decision we made into the PIT.
I don't think the lifecycle of the resharding operation interacts with PIT though - neither should block the other. The summary field is calculated when the PIT is created to tell an original shard whether the PIT opener is including a newly created shard in its queries. Walking through the simplest case, splitting from 1 to 2 shards, the cases are:
1: the new shard isn't ready when the PIT is opened. The opener sends a summary of 1 and the search shard also computes 1, so it does no filtering and will return all docs present in the commit at the time the reader is opened. Later the split will complete and there will be 2 shards responsible for half the docs each, but this doesn't matter for the PIT: queries will always continue to go only to the original shard and the original shard should not filter. Since it's using a reader based on a commit from before the split completed, all the documents should be present.
2: the new shard isn't ready at the coordinator when it starts creating the PIT, but by the time the search shard opens the reader, the new shard is ready. In that case, the summary provided in the request to open the reader won't match what the search shard itself computes, so it will again not filter out documents belonging to the new shard. The new shard's documents will still be present because we don't delete them until we can be sure that all coordinators know about the new shards (through an acked cluster state update IIRC). Assuming the PIT holds onto the commit used by this reader throughout its lifetime, the PIT can keep sending requests to the original shard and getting all the docs even though split has completed since then.
3: the new shard is ready, and seen by both the coordinator creating the PIT and the original shard. In that case the summary will show that the coordinator is using 2 shards, and so the original shard will install search filters to filter out documents belonging to the new shard, and the new shard will be filtering out documents belonging to the old shard. I think we do have some missing machinery on the new shard's side to figure out whether it should filter or not - right now it filters until the resharding process deletes the old shard's docs on the new shard, producing a new commit. If we always only open readers against the latest commit this is fine, but it seems like for PIT we must be able to open against an older commit, in which case we'd need to install filtering if the older commit still contained the original shard's documents. But assuming we can plumb that information through (again, probably by recording the actual filtering decision in the PIT rather than the shard summary, so symmetric with the source shard?) then it is still fine for resharding to complete and won't affect results produced by the PIT.
I think I'd like to better understand how during PIT relocation we manage opening a new reader that matches the commit from the original shard though.
There was a problem hiding this comment.
I looked at that draft PR for relocation and I do see that there's a new acquireSearcherForCommit method. I think we will probably have to plumb the filtering decision across to that function similarly to how we're plumbing the commit name. Right now the design is that the coordinator supplies a shard count summary and then the reader compares that to the routing table to make a filtering decision, then installs (or doesn't) a filter on the reader accordingly. Seems like we'd have to add some machinery to read the filtering decision from the current reader and send it across.
There was a problem hiding this comment.
Thanks for the details here, really useful to progress this.
I think we will probably have to plumb the filtering decision across to that function similarly to how we're plumbing the commit name
When you say "filtering decision" is this the same as routing?
I notice we're extending theShardIterator which holds the routing information in this PR with SearchShardRouting to only add the resharding count summary (so my assumption was that the resharding count summary is all we need to figure out how to route the requests to the correct shards - perhaps, later, to be used here).
What new information do we need? Can you please expand that and perhaps capture it in code in a comment?
It's a bit unclear to me at the moment how the routing can be transferred (and why can't we compute it on the new node - every node would know about the shard allocations and can build the local routing table).
it sounds like the current routing table could change if we're reopening the reader at a later date
Related to the above, routing information can potentially change even if we pass a "snapshot" of it at handoff time I think? Maybe we can first think of what this information is before zooming in more here but I do wonder if it becomes "easier" (oh boy :) ) if any new shard, upon relocation/recover, is able to figure out the state of the routing table given the resharding summary?
I don't think the lifecycle of the resharding operation interacts with PIT though - neither should block the other.
You're right ! Sorry for the confusion here. I thought for a moment an open reader (pit) would block the delete-by-query step but I don't think that's the case.
If a PIT holds a reader open on commit N:
- Delete-by-query on the source creates commit N+1 with deletion markers
- Commit N's segments (without deletions) remain accessible to the PIT
- Commit N+1's segments (same files, but with .liv files showing deletions) are used for new queries
- The same underlying segment files are shared between both commits
The only downside for long lived PITs is that we'll have duplicated data kept in the source shard for longer (i.e. more used disk)
There was a problem hiding this comment.
I should be more careful about terminology here, sorry. When I say routing, I really should just be saying shard count. The actual node that's hosting a shard shouldn't affect resharding.
Ok, so the fundamental reason for passing around the shard count summary is because there's a time window between when the coordinator chooses the set of shards to query (or include in a PIT) and when the query actually executes on the relevant shards. If resharding is happening, then it's possible that within that time window a new search shard can become active that wasn't included in the coordinator's request. The search shard summary allows the search shard, when it is actually opening a reader, to figure out whether the coordinator is also querying the new shard or not. If everything were completely synchronized, and the coordinator were guaranteed to include new search shards in its request the instant they became ready, then the original search shards would always filter out documents that belonged to those new shards as soon as the new shards became ready. But that's not possible (at least, not without some very expensive cluster-wide synchronization) so the original search shards can expect that for some time after new shards become ready, they will receive some requests that do not yet include the new shards, and for those they should not filter.
So far I'm just restating things. But you asked if the filtering decision is the same as routing. It's not. The shard routing summary calculated by the coordinator when it selects search shards is input to the filtering decision, but the decision is made based on comparing the shard's view of active shards with what the coordinator provided. And right now that comparison is based on the shard's view at the moment the reader is opened.
For PIT relocation this comparison changes if the shard's view of the routing table at the moment the reader is re-opened after relocation does not match the view it had when it opened the first reader. It may may make a different decision about whether or not to filter out documents that used to belong to it but now belong to a new shard.
For example, if a coordinator opens a PIT before new search shards become active, and the original search shards also open readers before the new search shards become active, then the readers will not install search filters, because the original search shards still "own" all the documents. Now let's say that after those readers are opened, new search shards become active. Now if a PIT is relocated from one of the original shards, the relocated shard will reopen a reader and when it does it will see that the new shards are active. This time, it will see a mismatch between the shard count summary provided when the PIT was created and the actual number of active search shards.
So I was thinking that to avoid making a different filtering decision when reopening the reader, we'd have to carry along additional information, either the actual decision made or the view of search active shards seen by the search shard when the reader was first opened. But I'm realizing now that although the comparison can be different when the reader is reopened, I don't think it results in a different filtering decision. If there's a mismatch between the shard's view when it opens the reader and the PIT's view when the set of shards are chosen, that causes the shard to not filter. In the example above that's the correct action. I think that's the only way that the calculation can change, so we would get away with just providing the PIT's shard count summary when reopening the reader. That's good news!
This might not hold up if we introduce shrink in the future, but that's still far away and we expect that the shard count summary would have to be changed into something richer everywhere to handle that anyway.
There was a problem hiding this comment.
Thanks Brendan, I think we're on the same page here. Also, kudos for the documentation on SplitShardCountSummary - revisiting that really helped. Thanks for bearing with me.
I've opened ES-13264 to keep track of this. Is there any code comment we need to add to make sure this is not left behind?
There was a problem hiding this comment.
Thanks Andrei, it looks like you've attached that ticket to the right milestone, so we should be able to track it and make sure that we don't close this out without addressing it. I think it remains parked until relocatable PITs land, but we won't lose it.
I've also taken another pass at the javadoc for SplitShardCountSummary to incorporate what's come up in the discussion here. Hopefully it's a little clearer now.
There was a problem hiding this comment.
Thanks for updating the javadoc, it looks great !
|
Sasha is out for the rest of the month so I'm going to take over this PR while he's away. |
|
|
||
| package org.elasticsearch.cluster.routing; | ||
|
|
||
| public record SearchShardRouting(ShardIterator iterator, SplitShardCountSummary reshardSplitShardCountSummary) |
There was a problem hiding this comment.
Can you add a comment describing the purpose of this class?
| } | ||
|
|
||
| private static void collectTargetShardsWithRouting( | ||
| private static Set<SearchTargetShard> collectTargetShardsWithRouting( |
| // This parameter is specific to the resharding feature. | ||
| // It is used when creating a searcher to apply filtering needed to have correct search results | ||
| // while resharding is in progress. | ||
| // In context of PIT the searcher is reused or can be recreated only in read-only scenarios. |
There was a problem hiding this comment.
I don't know PIT, but my understanding was that PITs would hold onto the readers that they opened when they were created, and subsequent queries would find and reuse the existing readers. If the readers are opened with the correct filtering in place because we've passed down the resharding shard count summary, then hopefully everything just works.
But in addition to not really knowing how PIT works, I also don't really know how PIT migration is going to work. I think the new node would need to have a reference to the same reader as the old one but I don't understand how that works. If another reader is getting opened we probably need to thread the shard count summary to where that's happening.
| * Used for ccs_minimize_roundtrips=false. | ||
| */ | ||
| static List<SearchShardIterator> getRemoteShardsIterator( | ||
| static List<org.elasticsearch.action.search.SearchShardIterator> getRemoteShardsIterator( |
There was a problem hiding this comment.
I assume this was just an IDE fixup. I'll fix this.
Done in 8a221d4
| * Shard search request sent to source shard 1 has the "reshardSplitShardCountSummary" of 8 | ||
| * since the corresponding target shard 5 is in SPLIT state. | ||
| * When a shard search request is executed on the source shard 1, "reshardSplitShardCountSummary" value | ||
| * is checked and documents that will be returned by target shard 5 are excluded |
There was a problem hiding this comment.
I'll add a comment here. Since shard 5 is newly created, it doesn't need a summary field in the request to tell it to filter - it will always filter out unowned docs (docs that don't route to 5), until the point where the unowned documents are actually deleted and the filter can be removed.
andreidan
left a comment
There was a problem hiding this comment.
LGTM, thanks for iterating on this Brendan
| // This parameter is specific to the resharding feature. | ||
| // It is used when creating a searcher to apply filtering needed to have correct search results | ||
| // while resharding is in progress. | ||
| // In context of PIT the searcher is reused or can be recreated only in read-only scenarios. |
There was a problem hiding this comment.
Thanks for updating the javadoc, it looks great !
Thanks for the review! |
…ends (elastic#135804) Co-authored-by: Brendan Cully <brendan.cully@elastic.co>
A
reshardSplitShardCountSummaryis a field special to resharding functionality. The final goal is forreshardSplitShardCountSummaryto be used insideEngine#wrapDirectoryReaderviaIndexShard#acquireSearcher(Supplier)since it is needed for resharding reader wrapper to work. This PR makes so this value is available to the immediate callers ofacquireSearcher(Supplier)like f.e.SearchService#createOrGetReaderContext().Note that this PR does not change all constructors of
ShardSearchRequestin order to keep the size down. Some usages (like ESQL) also need special logic to calculate the value before passing it toShardSearchRequest. This will be done in the follow up. Currently a default value of 0 is used.