Skip to content

Commit c648d38

Browse files
committed
chore: start plumbing the zero copy~ness closer to the public api
1 parent b8dda33 commit c648d38

8 files changed

Lines changed: 153 additions & 89 deletions

File tree

‎google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptor.java‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,23 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.core.ApiFuture;
20+
import com.google.protobuf.ByteString;
21+
import java.io.Closeable;
22+
import java.io.IOException;
2023

2124
/** Blob Descriptor is to blob, what File Descriptor is to a file */
2225
public interface BlobDescriptor {
2326

2427
BlobInfo getBlobInfo();
2528

2629
ApiFuture<byte[]> readRangeAsBytes(ByteRangeSpec range);
30+
31+
interface ZeroCopySupport {
32+
interface DisposableByteString extends AutoCloseable, Closeable {
33+
ByteString byteString();
34+
35+
@Override
36+
void close() throws IOException;
37+
}
38+
}
2739
}

‎google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java‎

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
import com.google.storage.v2.Object;
3535
import com.google.storage.v2.ObjectRangeData;
3636
import com.google.storage.v2.ReadRange;
37-
import java.io.ByteArrayOutputStream;
3837
import java.io.IOException;
38+
import java.util.ArrayList;
39+
import java.util.Collections;
3940
import java.util.List;
4041
import java.util.Map;
4142
import java.util.concurrent.ConcurrentHashMap;
@@ -244,7 +245,7 @@ protected void onStartImpl(StreamController controller) {
244245
@Override
245246
protected void onResponseImpl(BidiReadObjectResponse response) {
246247
controller.request(1);
247-
try (ResponseContentLifecycleHandle handle =
248+
try (ResponseContentLifecycleHandle<BidiReadObjectResponse> handle =
248249
bidiResponseContentLifecycleManager.get(response)) {
249250
if (response.hasMetadata()) {
250251
state.metadata.set(response.getMetadata());
@@ -256,21 +257,32 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
256257
if (rangeData.isEmpty()) {
257258
return;
258259
}
259-
for (ObjectRangeData d : rangeData) {
260+
for (int i = 0; i < rangeData.size(); i++) {
261+
ObjectRangeData d = rangeData.get(i);
260262
long id = d.getReadRange().getReadId();
261263
OutstandingReadToArray read = state.outstandingReads.get(id);
262264
if (read == null) {
263265
continue;
264266
}
265-
ByteString content = d.getChecksummedData().getContent();
266-
ChildRef childRef = handle.borrow();
267-
read.accept(childRef, content);
267+
final int idx = i;
268+
//noinspection rawtypes
269+
ChildRef childRef =
270+
handle.borrow(r -> r.getObjectDataRanges(idx).getChecksummedData().getContent());
271+
read.accept(childRef);
268272
if (d.getRangeEnd()) {
269273
// invoke eof on exec, the resolving future could have a downstream callback
270274
// that we don't want to block this grpc thread
271-
exec.execute(read::eof);
272-
// don't remove the outstanding read until the future has been resolved
273-
state.outstandingReads.remove(id);
275+
exec.execute(
276+
() -> {
277+
try {
278+
read.eof();
279+
// don't remove the outstanding read until the future has been resolved
280+
state.outstandingReads.remove(id);
281+
} catch (IOException e) {
282+
// TODO: sync this up with stream restarts when the time comes
283+
throw StorageException.coalesce(e);
284+
}
285+
});
274286
}
275287
}
276288
} catch (IOException e) {
@@ -290,28 +302,34 @@ protected void onCompleteImpl() {}
290302
static final class OutstandingReadToArray {
291303
private final long readId;
292304
private final ReadCursor readCursor;
293-
private final ByteArrayOutputStream bytes;
305+
private final List<ChildRef> childRefs;
294306
private final SettableApiFuture<byte[]> complete;
295307

296308
@VisibleForTesting
297309
OutstandingReadToArray(
298310
long readId, long readOffset, long readLimit, SettableApiFuture<byte[]> complete) {
299311
this.readId = readId;
300312
this.readCursor = new ReadCursor(readOffset, readOffset + readLimit);
301-
this.bytes = new ByteArrayOutputStream();
313+
this.childRefs = Collections.synchronizedList(new ArrayList<>());
302314
this.complete = complete;
303315
}
304316

305-
public void accept(ChildRef childRef, ByteString bytes) throws IOException {
306-
try (ChildRef autoclose = childRef) {
307-
int size = bytes.size();
308-
bytes.writeTo(this.bytes);
309-
readCursor.advance(size);
310-
}
317+
public void accept(ChildRef childRef) throws IOException {
318+
int size = childRef.byteString().size();
319+
childRefs.add(childRef);
320+
readCursor.advance(size);
311321
}
312322

313-
public void eof() {
314-
complete.set(bytes.toByteArray());
323+
public void eof() throws IOException {
324+
try {
325+
ByteString base = ByteString.empty();
326+
for (ChildRef ref : childRefs) {
327+
base = base.concat(ref.byteString());
328+
}
329+
complete.set(base.toByteArray());
330+
} finally {
331+
GrpcUtils.closeAll(childRefs);
332+
}
315333
}
316334

317335
public ReadRange makeReadRange() {

‎google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java‎

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@
100100
import java.util.Arrays;
101101
import java.util.Collections;
102102
import java.util.IdentityHashMap;
103-
import java.util.Iterator;
104103
import java.util.List;
105104
import java.util.Locale;
106105
import java.util.Map;
@@ -1133,40 +1132,14 @@ private Response parseFrom(CodedInputStream stream) throws InvalidProtocolBuffer
11331132
}
11341133

11351134
@Override
1136-
public ResponseContentLifecycleHandle get(Response response) {
1135+
public ResponseContentLifecycleHandle<Response> get(Response response) {
11371136
InputStream stream = unclosedStreams.remove(response);
11381137
return ResponseContentLifecycleHandle.create(response, toByteBuffersFunction, stream);
11391138
}
11401139

11411140
@Override
11421141
public void close() throws IOException {
1143-
closeAllStreams(unclosedStreams.values());
1144-
}
1145-
1146-
/**
1147-
* In the event closing the streams results in multiple streams throwing IOExceptions, collect
1148-
* them all as suppressed exceptions on the first occurrence.
1149-
*/
1150-
@VisibleForTesting
1151-
static void closeAllStreams(Iterable<InputStream> inputStreams) throws IOException {
1152-
Iterator<InputStream> iterator = inputStreams.iterator();
1153-
IOException ioException = null;
1154-
while (iterator.hasNext()) {
1155-
InputStream next = iterator.next();
1156-
try {
1157-
next.close();
1158-
} catch (IOException e) {
1159-
if (ioException == null) {
1160-
ioException = e;
1161-
} else if (ioException != e) {
1162-
ioException.addSuppressed(e);
1163-
}
1164-
}
1165-
}
1166-
1167-
if (ioException != null) {
1168-
throw ioException;
1169-
}
1142+
GrpcUtils.closeAll(unclosedStreams.values());
11701143
}
11711144
}
11721145

‎google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java‎

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@
1919
import com.google.api.gax.grpc.GrpcCallContext;
2020
import com.google.common.collect.ImmutableList;
2121
import com.google.common.collect.ImmutableMap;
22+
import java.io.Closeable;
23+
import java.io.IOException;
24+
import java.util.Collection;
2225
import java.util.Locale;
26+
import java.util.Objects;
2327

2428
final class GrpcUtils {
2529

@@ -34,4 +38,38 @@ static GrpcCallContext contextWithBucketName(String bucketName, GrpcCallContext
3438
}
3539
return baseContext;
3640
}
41+
42+
/**
43+
* In the event closing the streams results in multiple streams throwing IOExceptions, collect
44+
* them all as suppressed exceptions on the first occurrence.
45+
*/
46+
static <C extends Closeable> void closeAll(Collection<C> closeables) throws IOException {
47+
IOException ioException =
48+
closeables.stream()
49+
.map(
50+
stream -> {
51+
try {
52+
stream.close();
53+
return null;
54+
} catch (IOException e) {
55+
return e;
56+
}
57+
})
58+
.filter(Objects::nonNull)
59+
.reduce(
60+
null,
61+
(l, r) -> {
62+
if (l != null) {
63+
l.addSuppressed(r);
64+
return l;
65+
} else {
66+
return r;
67+
}
68+
},
69+
(l, r) -> l);
70+
71+
if (ioException != null) {
72+
throw ioException;
73+
}
74+
}
3775
}

‎google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java‎

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
*/
1616
package com.google.cloud.storage;
1717

18+
import com.google.cloud.storage.BlobDescriptor.ZeroCopySupport.DisposableByteString;
1819
import com.google.common.base.Preconditions;
1920
import com.google.common.base.Suppliers;
21+
import com.google.protobuf.ByteString;
2022
import java.io.Closeable;
2123
import java.io.IOException;
2224
import java.nio.ByteBuffer;
@@ -27,33 +29,37 @@
2729
import java.util.function.Supplier;
2830
import org.checkerframework.checker.nullness.qual.Nullable;
2931

30-
final class ResponseContentLifecycleHandle implements Closeable {
32+
final class ResponseContentLifecycleHandle<Response> implements Closeable {
33+
34+
private final Response response;
3135
@Nullable private final Closeable dispose;
3236

3337
private final Supplier<List<ByteBuffer>> lazyBuffers;
3438
private final AtomicBoolean open;
3539
private final AtomicInteger refs;
3640

3741
private ResponseContentLifecycleHandle(
38-
Supplier<List<ByteBuffer>> lazyBuffers, @Nullable Closeable dispose) {
42+
Response response,
43+
Function<Response, List<ByteBuffer>> toBuffersFunction,
44+
@Nullable Closeable dispose) {
45+
this.response = response;
3946
this.dispose = dispose;
40-
this.lazyBuffers = lazyBuffers;
47+
this.lazyBuffers = Suppliers.memoize(() -> toBuffersFunction.apply(response));
4148
this.open = new AtomicBoolean(true);
4249
this.refs = new AtomicInteger(1);
4350
}
4451

45-
static <Response> ResponseContentLifecycleHandle create(
52+
static <Response> ResponseContentLifecycleHandle<Response> create(
4653
Response response,
4754
Function<Response, List<ByteBuffer>> toBuffersFunction,
4855
@Nullable Closeable dispose) {
49-
Supplier<List<ByteBuffer>> lazyBuffers =
50-
Suppliers.memoize(() -> toBuffersFunction.apply(response));
51-
return new ResponseContentLifecycleHandle(lazyBuffers, dispose);
56+
return new ResponseContentLifecycleHandle<>(response, toBuffersFunction, dispose);
5257
}
5358

54-
ChildRef borrow() {
59+
ChildRef borrow(Function<Response, ByteString> toByteStringFunction) {
5560
Preconditions.checkState(open.get(), "only able to borrow when open");
56-
ChildRef childRef = new ChildRef();
61+
Preconditions.checkNotNull(toByteStringFunction);
62+
ChildRef childRef = new ChildRef(toByteStringFunction);
5763
refs.incrementAndGet();
5864
return childRef;
5965
}
@@ -91,7 +97,18 @@ private void dispose() throws IOException {
9197
}
9298
}
9399

94-
final class ChildRef implements Closeable {
100+
final class ChildRef implements Closeable, DisposableByteString {
101+
102+
private final Function<Response, ByteString> toByteStringFunction;
103+
104+
private ChildRef(Function<Response, ByteString> toByteStringFunction) {
105+
this.toByteStringFunction = toByteStringFunction;
106+
}
107+
108+
@Override
109+
public ByteString byteString() {
110+
return toByteStringFunction.apply(response);
111+
}
95112

96113
@Override
97114
public void close() throws IOException {

‎google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.io.IOException;
2222

2323
interface ResponseContentLifecycleManager<Response> extends Closeable {
24-
ResponseContentLifecycleHandle get(Response response);
24+
ResponseContentLifecycleHandle<Response> get(Response response);
2525

2626
@Override
2727
default void close() throws IOException {}

0 commit comments

Comments
 (0)