Skip to content

Commit 8934bc1

Browse files
authored
Merge pull request #300 from databendlabs/feat/enhance-stream-upload
feat: enhance stream upload
2 parents 2128bd3 + 458224a commit 8934bc1

File tree

1 file changed

+142
-44
lines changed

1 file changed

+142
-44
lines changed

databend-jdbc/src/main/java/com/databend/jdbc/cloud/DatabendPresignClientV1.java

Lines changed: 142 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import okhttp3.MediaType;
66
import okhttp3.MultipartBody;
77
import okhttp3.OkHttpClient;
8+
import okhttp3.Protocol;
89
import okhttp3.Request;
910
import okhttp3.RequestBody;
1011
import okhttp3.Response;
@@ -15,12 +16,14 @@
1516
import org.checkerframework.checker.nullness.qual.NonNull;
1617

1718
import javax.annotation.Nullable;
19+
1820
import java.io.File;
1921
import java.io.IOException;
2022
import java.io.InputStream;
2123
import java.net.SocketTimeoutException;
2224
import java.nio.file.Files;
2325
import java.time.Duration;
26+
import java.util.Arrays;
2427
import java.util.concurrent.TimeUnit;
2528
import java.util.logging.Level;
2629
import java.util.logging.Logger;
@@ -31,7 +34,9 @@
3134
import static java.util.Objects.requireNonNull;
3235
import static java.util.concurrent.TimeUnit.MILLISECONDS;
3336

34-
public class DatabendPresignClientV1 implements DatabendPresignClient {
37+
public class DatabendPresignClientV1
38+
implements DatabendPresignClient
39+
{
3540

3641
private static final int MaxRetryAttempts = 20;
3742

@@ -40,15 +45,52 @@ public class DatabendPresignClientV1 implements DatabendPresignClient {
4045
private final String uri;
4146
private static final Logger logger = Logger.getLogger(DatabendPresignClientV1.class.getPackage().getName());
4247

43-
public DatabendPresignClientV1(OkHttpClient client, String uri) {
48+
public DatabendPresignClientV1(OkHttpClient client, String uri)
49+
{
4450
Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINEST);
4551
OkHttpClient.Builder builder = client.newBuilder();
4652
this.client = builder.
47-
connectTimeout(120, TimeUnit.SECONDS).build();
53+
connectTimeout(600, TimeUnit.SECONDS)
54+
.writeTimeout(900, TimeUnit.SECONDS)
55+
.readTimeout(600, TimeUnit.SECONDS)
56+
.retryOnConnectionFailure(true)
57+
.protocols(Arrays.asList(Protocol.HTTP_1_1))
58+
.addInterceptor(chain -> {
59+
Request request = chain.request();
60+
int retryCount = 0;
61+
Response response = null;
62+
while (retryCount < 3) {
63+
try {
64+
response = chain.proceed(request);
65+
if (response.isSuccessful()) {
66+
return response;
67+
}
68+
response.close();
69+
}
70+
catch (IOException e) {
71+
if (retryCount == 2) {
72+
throw e;
73+
}
74+
}
75+
retryCount++;
76+
77+
long waitTimeMs = (long) (Math.pow(2, retryCount) * 1000);
78+
try {
79+
TimeUnit.MILLISECONDS.sleep(waitTimeMs);
80+
}
81+
catch (InterruptedException e) {
82+
Thread.currentThread().interrupt();
83+
throw new IOException("Upload interrupted", e);
84+
}
85+
}
86+
return response;
87+
}).build();
4888
this.uri = uri;
4989
}
5090

51-
private void uploadFromStream(InputStream inputStream, String stageName, String relativePath, String name, long fileSize) throws IOException {
91+
private void uploadFromStream(InputStream inputStream, String stageName, String relativePath, String name, long fileSize)
92+
throws IOException
93+
{
5294
// multipart upload input stream into /v1/upload_to_stage
5395
RequestBody requestBody = new MultipartBody.Builder()
5496
.setType(MultipartBody.FORM)
@@ -73,24 +115,31 @@ private void uploadFromStream(InputStream inputStream, String stageName, String
73115
.build();
74116
try {
75117
executeInternal(request, true);
76-
77-
} catch (IOException e) {
118+
}
119+
catch (IOException e) {
78120
throw new IOException("uploadFromStreamAPI failed", e);
79121
}
80-
81122
}
82123

83-
private void uploadFromStream(InputStream inputStream, Headers headers, String presignedUrl, long fileSize) throws IOException {
84-
requireNonNull(inputStream, "inputStream is null");
85-
Request r = putRequest(headers, presignedUrl, inputStream, fileSize);
124+
private void uploadFromStream(InputStream inputStream, Headers headers, String presignedUrl, long fileSize)
125+
throws IOException
126+
{
127+
logger.info("Starting upload: size=" + fileSize + " bytes, url=" + presignedUrl);
128+
long startTime = System.currentTimeMillis();
86129
try {
130+
Request r = putRequest(headers, presignedUrl, inputStream, fileSize);
87131
executeInternal(r, true);
88-
} catch (IOException e) {
89-
throw new IOException(format(" uploadFromStream failed, file size is %s kb, error is %s", fileSize / 1024.0, presignedUrl, e.toString()));
132+
logger.info("Upload completed in " + (System.currentTimeMillis() - startTime) + "ms");
133+
}
134+
catch (IOException e) {
135+
logger.severe("Upload failed after " + (System.currentTimeMillis() - startTime) + "ms: " + e.getMessage());
136+
throw e;
90137
}
91138
}
92139

93-
private ResponseBody executeInternal(Request request, boolean shouldClose) throws IOException {
140+
private ResponseBody executeInternal(Request request, boolean shouldClose)
141+
throws IOException
142+
{
94143
requireNonNull(request, "request is null");
95144
long start = System.nanoTime();
96145
long attempts = 0;
@@ -110,9 +159,11 @@ private ResponseBody executeInternal(Request request, boolean shouldClose) throw
110159

111160
try {
112161
MILLISECONDS.sleep(attempts * 100);
113-
} catch (InterruptedException e) {
162+
}
163+
catch (InterruptedException e) {
114164
try {
115-
} finally {
165+
}
166+
finally {
116167
Thread.currentThread().interrupt();
117168
}
118169
throw new RuntimeException("StatementClient thread was interrupted");
@@ -124,122 +175,169 @@ private ResponseBody executeInternal(Request request, boolean shouldClose) throw
124175
response = client.newCall(request).execute();
125176
if (response.isSuccessful()) {
126177
return response.body();
127-
} else if (response.code() == 401) {
178+
}
179+
else if (response.code() == 401) {
128180
throw new RuntimeException("Error exeucte presign, Unauthorized user: " + response.code() + " " + response.message());
129-
} else if (response.code() >= 503) {
181+
}
182+
else if (response.code() >= 503) {
130183
cause = new RuntimeException("Error execute presign, service unavailable: " + response.code() + " " + response.message());
131-
} else if (response.code() >= 400) {
184+
}
185+
else if (response.code() >= 400) {
132186
cause = new RuntimeException("Error execute presign, configuration error: " + response.code() + " " + response.message());
133187
}
134-
} catch (SocketTimeoutException e) {
188+
}
189+
catch (SocketTimeoutException e) {
135190
logger.warning("Error execute presign, socket timeout: " + e.getMessage());
136191
cause = new RuntimeException("Error execute presign, request is " + request.toString() + "socket timeout: " + e.getMessage());
137-
} catch (RuntimeException e) {
192+
}
193+
catch (RuntimeException e) {
138194
cause = e;
139-
} finally {
195+
}
196+
finally {
140197
if (shouldClose) {
141198
try {
142199
if (response != null) {
143200
response.close();
144201
}
145-
} catch (Exception e) {
202+
}
203+
catch (Exception e) {
146204
// ignore
147205
}
148206
}
149207
}
150-
151208
}
152209
}
153210

154211
@Override
155212
public void presignUpload(File srcFile, InputStream inputStream, Headers headers,
156-
String presignedUrl, long fileSize, boolean uploadFromStream) throws IOException {
213+
String presignedUrl, long fileSize, boolean uploadFromStream)
214+
throws IOException
215+
{
157216

158217
InputStream it = null;
159218
if (!uploadFromStream) {
160219
it = Files.newInputStream(srcFile.toPath());
161-
} else {
220+
}
221+
else {
162222
it = inputStream;
163223
}
164224
uploadFromStream(it, headers, presignedUrl, fileSize);
165225
}
166226

167227
@Override
168-
public void presignUpload(File srcFile, InputStream inputStream, String stageName, String relativePath, String name, long fileSize, boolean uploadFromStream) throws IOException {
228+
public void presignUpload(File srcFile, InputStream inputStream, String stageName, String relativePath, String name, long fileSize, boolean uploadFromStream)
229+
throws IOException
230+
{
169231
if (!uploadFromStream) {
170232
try (InputStream it = Files.newInputStream(srcFile.toPath())) {
171233
uploadFromStream(it, stageName, relativePath, name, fileSize);
172234
}
173-
} else {
235+
}
236+
else {
174237
uploadFromStream(inputStream, stageName, relativePath, name, fileSize);
175238
}
176239
}
177240

178241
@Override
179-
public void presignDownload(String destFileName, Headers headers, String presignedUrl) {
242+
public void presignDownload(String destFileName, Headers headers, String presignedUrl)
243+
{
180244
Request r = getRequest(headers, presignedUrl);
181245
try (ResponseBody body = executeInternal(r, false)) {
182246
BufferedSink sink = Okio.buffer(Okio.sink(new File(destFileName)));
183247
sink.writeAll(body.source());
184248
sink.close();
185-
} catch (IOException e) {
249+
}
250+
catch (IOException e) {
186251
throw new RuntimeException("presignDownload failed", e);
187252
}
188253
}
189254

190255
@Override
191-
public InputStream presignDownloadStream(Headers headers, String presignedUrl) {
256+
public InputStream presignDownloadStream(Headers headers, String presignedUrl)
257+
{
192258
Request r = getRequest(headers, presignedUrl);
193259
try {
194260
ResponseBody responseBody = executeInternal(r, false);
195261
return responseBody.byteStream();
196-
} catch (IOException e) {
262+
}
263+
catch (IOException e) {
197264
throw new RuntimeException("presignDownloadStream failed", e);
198265
}
199266
}
200267

201-
private Request getRequest(Headers headers, String url) {
268+
private Request getRequest(Headers headers, String url)
269+
{
202270
return new Request.Builder().headers(headers).url(url).get().build();
203271
}
204272

205-
private Request putRequest(Headers headers, String url, InputStream inputStream, long fileSize)
206-
throws IOException {
207-
RequestBody input = new InputStreamRequestBody(null, inputStream, fileSize);
208-
return new Request.Builder().headers(headers).url(url).put(input).build();
209-
}
273+
private Request putRequest(Headers headers, String presignedUrl, InputStream inputStream, long fileSize) {
274+
RequestBody requestBody = new RequestBody() {
275+
@Override
276+
public MediaType contentType() {
277+
return MediaType.parse("application/octet-stream");
278+
}
279+
280+
@Override
281+
public long contentLength() {
282+
return fileSize;
283+
}
210284

285+
@Override
286+
public void writeTo(BufferedSink sink) throws IOException {
287+
try (Source source = Okio.source(inputStream)) {
288+
sink.writeAll(source);
289+
}
290+
}
291+
};
292+
293+
return new Request.Builder()
294+
.url(presignedUrl)
295+
.put(requestBody)
296+
.headers(headers)
297+
.build();
298+
}
211299
}
212300

213-
class InputStreamRequestBody extends RequestBody {
301+
class InputStreamRequestBody
302+
extends RequestBody
303+
{
214304
private final InputStream inputStream;
215305
private final MediaType contentType;
216306
private final long fileSize;
217307
private static final Logger logger = Logger.getLogger(InputStreamRequestBody.class.getPackage().getName());
218308

219-
public InputStreamRequestBody(MediaType contentType, InputStream inputStream, long fileSize) {
220-
if (inputStream == null) throw new NullPointerException("inputStream == null");
309+
public InputStreamRequestBody(MediaType contentType, InputStream inputStream, long fileSize)
310+
{
311+
if (inputStream == null) {
312+
throw new NullPointerException("inputStream == null");
313+
}
221314
this.contentType = contentType;
222315
this.inputStream = inputStream;
223316
this.fileSize = fileSize;
224317
}
225318

226319
@Nullable
227320
@Override
228-
public MediaType contentType() {
321+
public MediaType contentType()
322+
{
229323
return contentType;
230324
}
231325

232326
@Override
233-
public long contentLength() {
327+
public long contentLength()
328+
{
234329
return fileSize; // return the actual file size
235330
// return inputStream.available() == 0 ? -1 : inputStream.available();
236331
}
237332

238333
@Override
239-
public void writeTo(@NonNull BufferedSink sink) throws IOException {
334+
public void writeTo(@NonNull BufferedSink sink)
335+
throws IOException
336+
{
240337
try (Source source = Okio.source(inputStream)) {
241338
sink.writeAll(source);
242-
} catch (IOException e) {
339+
}
340+
catch (IOException e) {
243341
logger.warning(format("writeTo failed, error is %s, cause is %s", e.getMessage(), e.getCause()));
244342
}
245343
}

0 commit comments

Comments
 (0)