Use common retry logic for GCS#138553
Conversation
| .setRetryDelayMultiplier(options.getRetrySettings().getRetryDelayMultiplier()) | ||
| .setMaxRetryDelay(Duration.ofSeconds(1L)) | ||
| .setMaxAttempts(0) | ||
| .setJittered(false) |
There was a problem hiding this comment.
This test originally configured retries to be time-based (i.e. no limit on the attempts, just keep retrying for some amount of time). I changed it to just make the retry intervals small and depend on the configured retry limits because we don't support time-based retries anymore.
| container.writeBlob(randomPurpose(), blobKey, new BytesArray(initialValue), true); | ||
|
|
||
| try (InputStream inputStream = container.readBlob(randomPurpose(), blobKey)) { | ||
| try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), blobKey)) { |
There was a problem hiding this comment.
We have to be careful where we use randomPurpose() now because some purposes no longer retry (e.g. REPOSITORY_ANALYSIS)
| @Override | ||
| public long getMeaningfulProgressSize() { | ||
| return Math.max(1L, GoogleCloudStorageBlobStore.SDK_DEFAULT_CHUNK_SIZE / 100L); | ||
| } |
There was a problem hiding this comment.
The choice of this value is somewhat arbitrary, open to suggestions of whether we should make this consistent across CSPs or use some other value here. SDK default chunk size is 16MB, so this is 160KB
server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java
Show resolved
Hide resolved
| PREFIX, | ||
| "max_retries", | ||
| (key) -> Setting.intSetting(key, 5, 0, Setting.Property.NodeScope) | ||
| ); |
There was a problem hiding this comment.
We never configured number of retries for GCS previously. The default settings were for 6 attempts (aka 5 retries)
| private long currentOffset; | ||
| private boolean closed; | ||
| private Long lastGeneration; | ||
| private static final StorageRetryStrategy STORAGE_RETRY_STRATEGY = GoogleCloudStorageService.createStorageRetryStrategy(); |
There was a problem hiding this comment.
The one we use is stateless, we might need to re-think this lifecycle if we switch to one that is not. You can't get it out of the client or StorageOptions as far as I could see.
There was a problem hiding this comment.
I don't know important this is. If necessary, I think we can store the original strategy object to the MeteredStorage object so that we can get it in this class?
There was a problem hiding this comment.
I went back to using BaseService.EXCEPTION_HANDLER for the outer retries during open. This is consistent with the existing behaviour (more permissive retries on the inner loop, slightly less permissive on the outer)
| } | ||
| return n; | ||
| } catch (IOException e) { | ||
| throw StorageException.translate(e); |
There was a problem hiding this comment.
We translate these for consistency with the existing implementation. We retry anything when reading, consistent with the existing implementation, but when something goes wrong the translation might add some more context in the stack-trace.
…c_gcs # Conflicts: # modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java # modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java # server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java
… use_common_retry_logic_gcs
ywangd
left a comment
There was a problem hiding this comment.
I have only minor comments. I don't think I'll find anything. But would like sometime to read it through again. Thanks!
| @Override | ||
| public boolean equals(Object o) { |
There was a problem hiding this comment.
I think we should update both equals and hashCode?
| } catch (NoSuchFileException | RequestedRangeNotSatisfiedException e) { | ||
| throw e; |
There was a problem hiding this comment.
We probably missed it last time? Both s3 and gcs add failures as suppressed for these two exceptions as well. If so, I think we should preserve the behaviour.
| private <T extends Exception> long maybeLogAndComputeRetryDelay(String action, T e) throws T { | ||
| if (shouldRetry(attempt) == false) { | ||
| private <T extends Exception> long maybeLogAndComputeRetryDelay(StreamAction action, T e) throws T { | ||
| if (blobStoreServices.isRetryableException(action, e) == false || shouldRetry(attempt) == false) { |
There was a problem hiding this comment.
Nit: can we move the blobStoreServices.isRetryableException check into shouldRetry as well?
| public static boolean willRetry(OperationPurpose purpose) { | ||
| return purpose != OperationPurpose.REPOSITORY_ANALYSIS; | ||
| } | ||
|
|
||
| public static boolean willRetryForever(OperationPurpose purpose) { | ||
| return purpose == OperationPurpose.INDICES; |
There was a problem hiding this comment.
These methods are used in tests only. So I think we should either move them to test code or somehow use them in shouldRetry to justify them being here?
There was a problem hiding this comment.
Actually they're only used in one place and it's a natural home for this logic, moved them there in d444f89
| if (position < 0L) { | ||
| throw new IllegalArgumentException("position must be non-negative"); | ||
| } | ||
| if (length < 0) { | ||
| throw new IllegalArgumentException("length must be non-negative"); | ||
| } |
There was a problem hiding this comment.
I think these are already handled production code?
There was a problem hiding this comment.
Yeah I just duplicated them here because I didn't want to lose that validation (we don't call super here, I just overrode so I could override getRetryDelayInMillis). I changed them to assertions in 2f7cd06
If I've understood correctly?
There was a problem hiding this comment.
I meant that the same checks are done in RetryingInputStream's constructor which is invoked in the else branch. That said, it's not called when length == 0. Assertions are fine.
| protected OperationPurpose randomRetryingPurpose() { | ||
| return randomValueOtherThan(OperationPurpose.REPOSITORY_ANALYSIS, BlobStoreTestUtil::randomPurpose); | ||
| return BlobStoreTestUtil.randomRetryingPurpose(); | ||
| } | ||
|
|
||
| @Override | ||
| protected OperationPurpose randomFiniteRetryingPurpose() { | ||
| return randomValueOtherThanMany( | ||
| purpose -> purpose == OperationPurpose.REPOSITORY_ANALYSIS || purpose == OperationPurpose.INDICES, | ||
| BlobStoreTestUtil::randomPurpose | ||
| ); | ||
| return BlobStoreTestUtil.randomFiniteRetryingPurpose(); | ||
| } |
There was a problem hiding this comment.
Can't we have these methods defined in AbstractBlobContainerRetriesTestCase? Same for GoogleCloudStorageBlobContainerRetriesTests?
There was a problem hiding this comment.
They're now also used in classes that aren't descendants of AbstractBlobContainerRetriesTestCase, so that reuse is easier if they're available as static utilities.
The old pattern was to have them defined in AbstractBlobContainerRetriesTestCase and overridden in the subclasses because the operation-purpose specific logic was unique to S3. Once we get this and the Azure implementation implemented the behaviour will be consistent and we can not bother with the inheritance and overrides and just use the static methods everywhere.
| if (getAttempts() > 1) { | ||
| assertEquals(eTag, version); |
There was a problem hiding this comment.
Is this branch not tested since maxRetries is 0?
There was a problem hiding this comment.
It was, because I had used OperationPurpose.INDICES which retries infinitely regardless of the configured max retries. I changed it to use a random retrying purpose and configured max retries accordingly. It confused even me when I looked at it again :)
see 52f8659
| if (offset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { | ||
| assert start + offset <= end : "requesting beyond end, start = " + start + " offset=" + offset + " end=" + end; | ||
| } | ||
| // noinspection TryWithIdenticalCatches |
| } catch (RuntimeException e) { | ||
| if (attempt == 1) { | ||
| blobStoreServices.onRetryStarted("open"); | ||
| } | ||
| final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e); | ||
| delayBeforeRetry(delayInMillis); | ||
| retryOrAbortOnOpen(e); | ||
| } catch (IOException e) { | ||
| retryOrAbortOnOpen(e); | ||
| } |
There was a problem hiding this comment.
It seems impossible for GoogleCloudStorageRetryingInputStream to throw IOException on openning other than NoSuchFileException and RequestedRangeNotSatisfiedException which are handled separately. S3RetryingInputStream currently does not retry on IOException. If so, do we need this separate handling of IOException? Or is it for future Azure changes?
There was a problem hiding this comment.
Yeah the Azure implementation does throw IOException if we migrate it as-is, but looking at it it's really just used as a catch-all. I'd be keen to minimise the differences until the dust has settled on these changes.
Also as long as BlobStoreServices#getInputStream throws IOException static analysis will require us to keep it.
Perhaps after Azure, we can change those exceptions to be more specific?
.../src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java
Outdated
Show resolved
Hide resolved
…ories/gcs/GoogleCloudStorageRetryingInputStream.java Co-authored-by: Yang Wang <ywangd@gmail.com>
This is the second step for breaking up and merging #136663
I chose to do GCS next because it introduces safe-resume (where we remember a version of the blob we were downloading so we can request specifically that one when we resume). This will mean less refactoring than if we'd done Azure first.
I didn't implement that logic for S3 although its trivial. I will do that in a subsequent change.