Skip to content

Commit a7119db

Browse files
Merge pull request #1498 from appwrite/chore/js-concurrent-chunked-uploads
Add concurrent chunked uploads to SDKs
2 parents 41f3fd8 + 07d37ef commit a7119db

21 files changed

Lines changed: 2062 additions & 524 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
.envrc
1010
.hatch
1111

12+
tmp-upload-tests/
13+
1214
# exclude everything
1315
examples/*
1416
tests/tmp

mock-server/app/http.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,15 @@
389389
'chunksTotal' => (int) ceil($size / ($end + 1 - $start)),
390390
'chunksUploaded' => ceil($start / $chunkSize) + 1
391391
]);
392+
} else {
393+
$chunksTotal = (int) ceil($size / $chunkSize);
394+
395+
$response->json([
396+
'$id' => ID::custom('newfileid'),
397+
'chunksTotal' => $chunksTotal,
398+
'chunksUploaded' => $chunksTotal,
399+
'result' => 'POST:/v1/mock/tests/general/upload:passed',
400+
]);
392401
}
393402
} else {
394403
$file['tmp_name'] = (\is_array($file['tmp_name'])) ? $file['tmp_name'][0] : $file['tmp_name'];

package-lock.json

Lines changed: 0 additions & 6 deletions
This file was deleted.

templates/android/library/src/main/java/io/package/Client.kt.twig

Lines changed: 104 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ import {{ sdk.namespace | caseDot }}.models.UploadProgress
1313
import kotlinx.coroutines.CoroutineScope
1414
import kotlinx.coroutines.Dispatchers
1515
import kotlinx.coroutines.Job
16+
import kotlinx.coroutines.async
17+
import kotlinx.coroutines.awaitAll
18+
import kotlinx.coroutines.coroutineScope
1619
import kotlinx.coroutines.suspendCancellableCoroutine
1720
import okhttp3.*
1821
import okhttp3.Headers.Companion.toHeaders
@@ -30,6 +33,8 @@ import java.net.CookieManager
3033
import java.net.CookiePolicy
3134
import java.security.SecureRandom
3235
import java.security.cert.X509Certificate
36+
import java.util.concurrent.atomic.AtomicInteger
37+
import java.util.concurrent.atomic.AtomicLong
3338
import javax.net.ssl.SSLContext
3439
import javax.net.ssl.SSLSocketFactory
3540
import javax.net.ssl.TrustManager
@@ -49,6 +54,7 @@ class Client @JvmOverloads constructor(
4954
* The size for chunked uploads in bytes.
5055
*/
5156
internal const val CHUNK_SIZE = 5*1024*1024; // 5MB
57+
internal const val MAX_CONCURRENT_UPLOADS = 8
5258
internal const val GLOBAL_PREFS = "{{ sdk.namespace | caseDot }}"
5359
internal const val COOKIE_PREFS = "myCookie"
5460
}
@@ -374,12 +380,10 @@ class Client @JvmOverloads constructor(
374380
idParamName: String? = null,
375381
onProgress: ((UploadProgress) -> Unit)? = null,
376382
): T {
377-
var file: RandomAccessFile? = null
378383
val input = params[paramName] as InputFile
379384
val size: Long = when(input.sourceType) {
380385
"path", "file" -> {
381-
file = RandomAccessFile(input.path, "r")
382-
file.length()
386+
File(input.path).length()
383387
}
384388
"bytes" -> {
385389
(input.data as ByteArray).size.toLong()
@@ -408,9 +412,9 @@ class Client @JvmOverloads constructor(
408412
)
409413
}
410414

411-
val buffer = ByteArray(CHUNK_SIZE)
412415
var offset = 0L
413416
var result: Map<*, *>? = null
417+
var uploadId: String? = null
414418

415419
if (idParamName?.isNotEmpty() == true) {
416420
// Make a request to check if a file already exists
@@ -423,59 +427,128 @@ class Client @JvmOverloads constructor(
423427
)
424428
val chunksUploaded = current["chunksUploaded"] as Long
425429
offset = chunksUploaded * CHUNK_SIZE
430+
uploadId = params[idParamName]?.toString()
431+
result = current
426432
}
427433

428-
while (offset < size) {
429-
when(input.sourceType) {
434+
fun readChunk(start: Long, end: Long): ByteArray {
435+
val length = (end - start).toInt()
436+
return when(input.sourceType) {
430437
"file", "path" -> {
431-
file!!.seek(offset)
432-
file!!.read(buffer)
438+
RandomAccessFile(input.path, "r").use { chunkFile ->
439+
val chunk = ByteArray(length)
440+
chunkFile.seek(start)
441+
chunkFile.readFully(chunk)
442+
chunk
443+
}
433444
}
434445
"bytes" -> {
435-
val end = if (offset + CHUNK_SIZE < size) {
436-
offset + CHUNK_SIZE - 1
437-
} else {
438-
size - 1
439-
}
440-
(input.data as ByteArray).copyInto(
441-
buffer,
442-
startIndex = offset.toInt(),
443-
endIndex = end.toInt()
444-
)
446+
(input.data as ByteArray).copyOfRange(start.toInt(), end.toInt())
445447
}
446448
else -> throw UnsupportedOperationException()
447449
}
450+
}
448451

449-
params[paramName] = MultipartBody.Part.createFormData(
452+
val totalChunks = (size + CHUNK_SIZE - 1) / CHUNK_SIZE
453+
454+
fun isUploadComplete(chunkResult: Map<*, *>): Boolean {
455+
val chunksUploaded = chunkResult["chunksUploaded"]?.toString()?.toLongOrNull() ?: return false
456+
val chunksTotal = chunkResult["chunksTotal"]?.toString()?.toLongOrNull() ?: totalChunks
457+
return chunksUploaded >= chunksTotal
458+
}
459+
460+
suspend fun uploadChunk(index: Int, start: Long, end: Long, includeUploadId: Boolean): Map<*, *> {
461+
val chunkParams = params.toMutableMap()
462+
val chunkHeaders = headers.toMutableMap()
463+
464+
if (includeUploadId && uploadId != null) {
465+
chunkHeaders["x-{{ spec.title | caseLower }}-id"] = uploadId!!
466+
}
467+
468+
chunkHeaders["Content-Range"] = "bytes $start-${end - 1}/$size"
469+
chunkParams[paramName] = MultipartBody.Part.createFormData(
450470
paramName,
451471
input.filename,
452-
buffer.toRequestBody()
472+
readChunk(start, end).toRequestBody()
453473
)
454474

455-
headers["Content-Range"] =
456-
"bytes $offset-${((offset + CHUNK_SIZE) - 1).coerceAtMost(size - 1)}/$size"
457-
458-
result = call(
475+
val chunkResult = call(
459476
method = "POST",
460477
path,
461-
headers,
462-
params,
478+
chunkHeaders,
479+
chunkParams,
463480
responseType = Map::class.java
464481
)
465482

466-
offset += CHUNK_SIZE
467-
headers["x-{{ spec.title | caseLower }}-id"] = result["\$id"].toString()
483+
if (index == 0 || uploadId == null) {
484+
uploadId = chunkResult["\$id"].toString()
485+
}
486+
487+
return chunkResult
488+
}
489+
490+
if (offset == 0L) {
491+
val firstChunkEnd = CHUNK_SIZE.toLong().coerceAtMost(size)
492+
result = uploadChunk(0, 0, firstChunkEnd, false)
493+
offset = firstChunkEnd
468494
onProgress?.invoke(
469495
UploadProgress(
470-
id = result["\$id"].toString(),
496+
id = uploadId ?: result!!["\$id"].toString(),
471497
progress = offset.coerceAtMost(size).toDouble() / size * 100,
472498
sizeUploaded = offset.coerceAtMost(size),
473-
chunksTotal = result["chunksTotal"].toString().toInt(),
474-
chunksUploaded = result["chunksUploaded"].toString().toInt(),
499+
chunksTotal = result!!["chunksTotal"].toString().toInt(),
500+
chunksUploaded = result!!["chunksUploaded"].toString().toInt(),
475501
)
476502
)
477503
}
478504

505+
val chunks = mutableListOf<Triple<Int, Long, Long>>()
506+
var chunkOffset = offset
507+
while (chunkOffset < size) {
508+
val end = (chunkOffset + CHUNK_SIZE).coerceAtMost(size)
509+
chunks.add(Triple((chunkOffset / CHUNK_SIZE).toInt(), chunkOffset, end))
510+
chunkOffset = end
511+
}
512+
513+
if (chunks.isNotEmpty()) {
514+
val nextChunk = AtomicInteger(0)
515+
val completedChunks = AtomicInteger((offset / CHUNK_SIZE).toInt())
516+
val uploadedBytes = AtomicLong(offset.coerceAtMost(size))
517+
518+
coroutineScope {
519+
List(MAX_CONCURRENT_UPLOADS.coerceAtMost(chunks.size)) {
520+
async {
521+
while (true) {
522+
val chunkIndex = nextChunk.getAndIncrement()
523+
if (chunkIndex >= chunks.size) {
524+
break
525+
}
526+
527+
val (index, start, end) = chunks[chunkIndex]
528+
val chunkResult = uploadChunk(index, start, end, true)
529+
530+
val chunksUploaded = completedChunks.incrementAndGet()
531+
val sizeUploaded = uploadedBytes.addAndGet(end - start)
532+
533+
if (isUploadComplete(chunkResult)) {
534+
result = chunkResult
535+
}
536+
537+
onProgress?.invoke(
538+
UploadProgress(
539+
id = uploadId ?: chunkResult["\$id"].toString(),
540+
progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100,
541+
sizeUploaded = sizeUploaded.coerceAtMost(size),
542+
chunksTotal = chunkResult["chunksTotal"].toString().toInt(),
543+
chunksUploaded = chunksUploaded,
544+
)
545+
)
546+
}
547+
}
548+
}.awaitAll()
549+
}
550+
}
551+
479552
return converter(result as Map<String, Any>)
480553
}
481554

templates/apple/Sources/Client.swift.twig

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ open class Client {
441441

442442
var offset = 0
443443
var result = [String:Any]()
444+
var uploadId = idParamName != nil ? params[idParamName!] as? String : nil
444445

445446
if idParamName != nil {
446447
// Make a request to check if a file already exists
@@ -454,37 +455,106 @@ open class Client {
454455
)
455456
let chunksUploaded = map["chunksUploaded"] as! Int
456457
offset = chunksUploaded * Client.chunkSize
458+
result = map
457459
} catch {
458460
// File does not exist yet, swallow exception
459461
}
460462
}
461463

462-
while offset < size {
463-
let slice = (input.data as! ByteBuffer).getSlice(at: offset, length: Client.chunkSize)
464-
?? (input.data as! ByteBuffer).getSlice(at: offset, length: Int(size - offset))
464+
let totalChunks = Int(ceil(Double(size) / Double(Client.chunkSize)))
465+
var nextChunk = offset / Client.chunkSize
466+
var completedChunks = nextChunk
467+
var uploadedBytes = min(offset, size)
468+
let baseParams = params
469+
let baseHeaders = headers
465470

466-
params[paramName] = InputFile.fromBuffer(slice!, filename: input.filename, mimeType: input.mimeType)
467-
headers["content-range"] = "bytes \(offset)-\(min((offset + Client.chunkSize) - 1, size - 1))/\(size)"
471+
func isUploadComplete(_ response: [String: Any]) -> Bool {
472+
guard let chunksUploaded = response["chunksUploaded"] as? Int else {
473+
return false
474+
}
475+
476+
let chunksTotal = response["chunksTotal"] as? Int ?? totalChunks
477+
return chunksUploaded >= chunksTotal
478+
}
479+
480+
func uploadChunk(index: Int, uploadId: String?) async throws -> (Int, Int, [String: Any]) {
481+
let chunkOffset = index * Client.chunkSize
482+
let chunkLength = min(Client.chunkSize, size - chunkOffset)
483+
let slice = (input.data as! ByteBuffer).getSlice(at: chunkOffset, length: chunkLength)!
484+
var chunkParams = baseParams
485+
var chunkHeaders = baseHeaders
486+
chunkParams[paramName] = InputFile.fromBuffer(slice, filename: input.filename, mimeType: input.mimeType)
487+
chunkHeaders["content-range"] = "bytes \(chunkOffset)-\(chunkOffset + chunkLength - 1)/\(size)"
488+
if let uploadId = uploadId {
489+
chunkHeaders["x-{{ spec.title | caseLower }}-id"] = uploadId
490+
}
468491

469-
result = try await call(
492+
let chunkResult = try await call(
470493
method: "POST",
471494
path: path,
472-
headers: headers,
473-
params: params,
495+
headers: chunkHeaders,
496+
params: chunkParams,
474497
converter: { return $0 as! [String: Any] }
475498
)
476499

477-
offset += Client.chunkSize
478-
headers["x-{{ spec.title | caseLower }}-id"] = result["$id"] as? String
500+
return (index, chunkLength, chunkResult)
501+
}
502+
503+
if nextChunk == 0 {
504+
let first = try await uploadChunk(index: 0, uploadId: uploadId)
505+
result = first.2
506+
uploadId = result["$id"] as? String
507+
nextChunk = 1
508+
completedChunks = 1
509+
uploadedBytes = first.1
479510
onProgress?(UploadProgress(
480-
id: result["$id"] as? String ?? "",
481-
progress: Double(min(offset, size))/Double(size) * 100.0,
482-
sizeUploaded: min(offset, size),
483-
chunksTotal: result["chunksTotal"] as? Int ?? -1,
484-
chunksUploaded: result["chunksUploaded"] as? Int ?? -1
511+
id: uploadId ?? "",
512+
progress: Double(uploadedBytes)/Double(size) * 100.0,
513+
sizeUploaded: uploadedBytes,
514+
chunksTotal: result["chunksTotal"] as? Int ?? totalChunks,
515+
chunksUploaded: result["chunksUploaded"] as? Int ?? completedChunks
485516
))
486517
}
487518

519+
let maxConcurrency = 8
520+
521+
try await withThrowingTaskGroup(of: (Int, Int, [String: Any]).self) { group in
522+
var inFlight = 0
523+
524+
while inFlight < maxConcurrency && nextChunk < totalChunks {
525+
let index = nextChunk
526+
let currentUploadId = uploadId
527+
group.addTask { try await uploadChunk(index: index, uploadId: currentUploadId) }
528+
nextChunk += 1
529+
inFlight += 1
530+
}
531+
532+
while let chunk = try await group.next() {
533+
inFlight -= 1
534+
completedChunks += 1
535+
uploadedBytes += chunk.1
536+
if isUploadComplete(chunk.2) {
537+
result = chunk.2
538+
}
539+
540+
onProgress?(UploadProgress(
541+
id: uploadId ?? "",
542+
progress: Double(min(uploadedBytes, size))/Double(size) * 100.0,
543+
sizeUploaded: min(uploadedBytes, size),
544+
chunksTotal: chunk.2["chunksTotal"] as? Int ?? totalChunks,
545+
chunksUploaded: chunk.2["chunksUploaded"] as? Int ?? completedChunks
546+
))
547+
548+
while inFlight < maxConcurrency && nextChunk < totalChunks {
549+
let index = nextChunk
550+
let currentUploadId = uploadId
551+
group.addTask { try await uploadChunk(index: index, uploadId: currentUploadId) }
552+
nextChunk += 1
553+
inFlight += 1
554+
}
555+
}
556+
}
557+
488558
return try converter!(result)
489559
}
490560

0 commit comments

Comments
 (0)