@@ -18,7 +18,6 @@ import (
18
18
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
19
19
"github.com/buildbuddy-io/buildbuddy/server/metrics"
20
20
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
21
- "github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool"
22
21
"github.com/buildbuddy-io/buildbuddy/server/util/compression"
23
22
"github.com/buildbuddy-io/buildbuddy/server/util/ioutil"
24
23
"github.com/buildbuddy-io/buildbuddy/server/util/log"
50
49
acRPCTimeout = flag .Duration ("cache.client.ac_rpc_timeout" , 15 * time .Second , "Maximum time a single Action Cache RPC can take." )
51
50
filecacheTreeSalt = flag .String ("cache.filecache_tree_salt" , "20250304" , "A salt to invalidate filecache tree hashes, if/when needed." )
52
51
requestCachedSubtreeDigests = flag .Bool ("cache.request_cached_subtree_digests" , true , "If true, GetTree requests will set send_cached_subtree_digests." )
53
-
54
- uploadBufPool = bytebufferpool .FixedSize (uploadBufSizeBytes )
55
52
)
56
53
57
54
func retryOptions (name string ) * retry.Options {
@@ -249,58 +246,85 @@ func ComputeFileDigest(fullFilePath, instanceName string, digestFunction repb.Di
249
246
return computeDigest (f , instanceName , digestFunction )
250
247
}
251
248
252
- func uploadFromReader (ctx context.Context , bsClient bspb.ByteStreamClient , rn * digest.CASResourceName , in io.Reader ) (* repb.Digest , int64 , error ) {
249
+ func uploadFromReader (ctx context.Context , bsClient bspb.ByteStreamClient , r * digest.CASResourceName , in io.Reader ) (* repb.Digest , int64 , error ) {
253
250
if bsClient == nil {
254
251
return nil , 0 , status .FailedPreconditionError ("ByteStreamClient not configured" )
255
252
}
256
- if rn .IsEmpty () {
257
- return rn .GetDigest (), 0 , nil
253
+ if r .IsEmpty () {
254
+ return r .GetDigest (), 0 , nil
258
255
}
259
-
260
- w , err := NewUploadWriter (ctx , bsClient , rn )
256
+ stream , err := bsClient .Write (ctx )
261
257
if err != nil {
262
258
return nil , 0 , err
263
259
}
264
- defer w .Close ()
265
260
266
- rc : = io .NopCloser (in )
267
- if rn .GetCompressor () == repb .Compressor_ZSTD {
261
+ var rc io. ReadCloser = io .NopCloser (in )
262
+ if r .GetCompressor () == repb .Compressor_ZSTD {
268
263
rbuf := make ([]byte , 0 , uploadBufSizeBytes )
269
264
cbuf := make ([]byte , 0 , uploadBufSizeBytes )
270
- reader , err := compression .NewZstdCompressingReader (rc , rbuf [:uploadBufSizeBytes ], cbuf [:uploadBufSizeBytes ])
265
+ reader , err := compression .NewZstdCompressingReader (io . NopCloser ( in ) , rbuf [:uploadBufSizeBytes ], cbuf [:uploadBufSizeBytes ])
271
266
if err != nil {
272
267
return nil , 0 , status .InternalErrorf ("Failed to compress blob: %s" , err )
273
268
}
274
269
rc = reader
275
270
}
276
271
defer rc .Close ()
277
272
278
- if _ , err := w .ReadFrom (rc ); err != nil {
279
- return nil , 0 , err
273
+ buf := make ([]byte , uploadBufSizeBytes )
274
+ bytesUploaded := int64 (0 )
275
+ sender := rpcutil .NewSender [* bspb.WriteRequest ](ctx , stream )
276
+ resourceName := r .NewUploadString ()
277
+ for {
278
+ n , err := ioutil .ReadTryFillBuffer (rc , buf )
279
+ if err != nil && err != io .EOF {
280
+ return nil , bytesUploaded , err
281
+ }
282
+ readDone := err == io .EOF
283
+
284
+ req := & bspb.WriteRequest {
285
+ Data : buf [:n ],
286
+ ResourceName : resourceName ,
287
+ WriteOffset : bytesUploaded ,
288
+ FinishWrite : readDone ,
289
+ }
290
+
291
+ err = sender .SendWithTimeoutCause (req , * casRPCTimeout , status .DeadlineExceededError ("Timed out sending Write request" ))
292
+ if err != nil {
293
+ // If the blob already exists in the CAS, the server will respond EOF.
294
+ // It is safe to stop sending writes.
295
+ if err == io .EOF {
296
+ break
297
+ }
298
+ return nil , bytesUploaded , err
299
+ }
300
+ bytesUploaded += int64 (len (req .Data ))
301
+ if readDone {
302
+ break
303
+ }
304
+
280
305
}
281
- if err := w .Commit (); err != nil {
282
- return nil , 0 , err
306
+ rsp , err := stream .CloseAndRecv ()
307
+ if err != nil {
308
+ return nil , bytesUploaded , err
283
309
}
284
310
285
- bytesUploaded := w .GetBytesUploaded ()
286
- committedSize := w .GetCommittedSize ()
287
-
288
- if rn .GetCompressor () == repb .Compressor_IDENTITY {
311
+ remoteSize := rsp .GetCommittedSize ()
312
+ if r .GetCompressor () == repb .Compressor_IDENTITY {
289
313
// Either the write succeeded or was short-circuited, but in
290
314
// either case, the remoteSize for uncompressed uploads should
291
315
// match the file size.
292
- if committedSize != rn .GetDigest ().GetSizeBytes () {
293
- return nil , bytesUploaded , status .DataLossErrorf ("Remote size (%d) != uploaded size: (%d)" , committedSize , rn .GetDigest ().GetSizeBytes ())
316
+ if remoteSize != r .GetDigest ().GetSizeBytes () {
317
+ return nil , bytesUploaded , status .DataLossErrorf ("Remote size (%d) != uploaded size: (%d)" , remoteSize , r .GetDigest ().GetSizeBytes ())
294
318
}
295
319
} else {
296
320
// -1 is returned if the blob already exists, otherwise the
297
- // remote (committed) size should agree with what we uploaded.
298
- if committedSize != bytesUploaded && committedSize != - 1 {
299
- return nil , bytesUploaded , status .DataLossErrorf ("Remote size (%d) != uploaded size: (%d)" , committedSize , rn .GetDigest ().GetSizeBytes ())
321
+ // remoteSize should agree with what we uploaded.
322
+ if remoteSize != bytesUploaded && remoteSize != - 1 {
323
+ return nil , bytesUploaded , status .DataLossErrorf ("Remote size (%d) != uploaded size: (%d)" , remoteSize , r .GetDigest ().GetSizeBytes ())
300
324
}
301
325
}
302
326
303
- return rn .GetDigest (), bytesUploaded , nil
327
+ return r .GetDigest (), bytesUploaded , nil
304
328
}
305
329
306
330
type uploadRetryResult = struct {
@@ -1077,167 +1101,3 @@ func maybeSetCompressor(rn *digest.CASResourceName) {
1077
1101
rn .SetCompressor (repb .Compressor_ZSTD )
1078
1102
}
1079
1103
}
1080
-
1081
- type UploadWriter struct {
1082
- ctx context.Context
1083
- stream bspb.ByteStream_WriteClient
1084
- sender rpcutil.Sender [* bspb.WriteRequest ]
1085
- uploadString string
1086
-
1087
- bytesUploaded int64
1088
- finished bool
1089
- committedSize int64
1090
- closed bool
1091
-
1092
- buf []byte
1093
- bytesBuffered int
1094
- }
1095
-
1096
- // Write copies the input bytes to an internal buffer and may send some or all of the bytes to the CAS.
1097
- // Bytes are not guaranteed to be uploaded to the CAS until a call to Commit() succeeds.
1098
- // Returning EOF indicates that the blob already exists in the CAS and no further writes are necessary.
1099
- func (uw * UploadWriter ) Write (p []byte ) (int , error ) {
1100
- if uw .finished || uw .closed {
1101
- return 0 , status .FailedPreconditionError ("Cannot write to UploadWriter after it is finished or closed" )
1102
- }
1103
- written := 0
1104
- for len (p ) > 0 {
1105
- n := copy (uw .buf [uw .bytesBuffered :], p )
1106
- uw .bytesBuffered += n
1107
- if uw .bytesBuffered == uploadBufSizeBytes {
1108
- if err := uw .flush (false /* finish */ ); err != nil {
1109
- return written , err
1110
- }
1111
- }
1112
- written += n
1113
- p = p [n :]
1114
- }
1115
- return written , nil
1116
- }
1117
-
1118
- func (uw * UploadWriter ) flush (finish bool ) error {
1119
- if uw .finished {
1120
- return nil
1121
- }
1122
- if uw .closed {
1123
- return status .FailedPreconditionError ("UploadWriteCloser already finished or closed, cannot flush" )
1124
- }
1125
- req := & bspb.WriteRequest {
1126
- Data : uw .buf [:uw .bytesBuffered ],
1127
- ResourceName : uw .uploadString ,
1128
- WriteOffset : uw .bytesUploaded ,
1129
- FinishWrite : finish ,
1130
- }
1131
- err := uw .sender .SendWithTimeoutCause (req , * casRPCTimeout , status .DeadlineExceededError ("Timed out sending Write request" ))
1132
- if err != nil {
1133
- // If the blob already exists in the CAS, the server will respond EOF
1134
- // to indicate no further writes are needed.
1135
- if err == io .EOF {
1136
- uw .finished = true
1137
- }
1138
- return err
1139
- }
1140
- uw .bytesUploaded += int64 (uw .bytesBuffered )
1141
- uw .bytesBuffered = 0
1142
- uw .finished = finish
1143
- return nil
1144
- }
1145
-
1146
- // ReadFrom reads all the bytes from the input Reader until encountering EOF,
1147
- // copies them to an internal buffer, and may send some or all of the bytes to the CAS.
1148
- // The bytes are not guaranteed uploaded to the CAS until Commit is called and returns successfully.
1149
- // ReadFrom returns the number of bytes read from the input reader.
1150
- func (uw * UploadWriter ) ReadFrom (r io.Reader ) (int64 , error ) {
1151
- if uw .finished || uw .closed {
1152
- return 0 , status .FailedPreconditionError ("UploadWriter cannot ReadFrom after it is finished or closed" )
1153
- }
1154
- bytesRead := int64 (0 )
1155
- for {
1156
- n , err := ioutil .ReadTryFillBuffer (r , uw .buf [uw .bytesBuffered :])
1157
- uw .bytesBuffered += n
1158
- bytesRead += int64 (n )
1159
- if err != nil && err != io .EOF {
1160
- return bytesRead , err
1161
- }
1162
- readDone := err == io .EOF
1163
-
1164
- if uw .bytesBuffered == uploadBufSizeBytes {
1165
- err := uw .flush (false /* finish */ )
1166
- if err != nil && err != io .EOF {
1167
- return bytesRead , err
1168
- }
1169
- // If the blob already exists in the CAS, the server can respond with an EOF.
1170
- // Stop sending bytes to the server.
1171
- if err == io .EOF {
1172
- break
1173
- }
1174
- }
1175
- if readDone {
1176
- break
1177
- }
1178
- }
1179
- return bytesRead , nil
1180
- }
1181
-
1182
- // Commit sends any bytes remaining in the internal buffer to the CAS
1183
- // and tells the server that the stream is done sending writes.
1184
- func (uw * UploadWriter ) Commit () error {
1185
- if uw .closed {
1186
- return status .FailedPreconditionError ("UploadWriteCloser already closed, cannot commit" )
1187
- }
1188
- err := uw .flush (true /* finish */ )
1189
- // If the blob already exists in the CAS, the server can respond with an EOF.
1190
- // The blob exists, it is safe to finish committing.
1191
- if err != nil && err != io .EOF {
1192
- return err
1193
- }
1194
- rsp , err := uw .stream .CloseAndRecv ()
1195
- if err != nil {
1196
- return err
1197
- }
1198
- uw .committedSize = rsp .GetCommittedSize ()
1199
- return nil
1200
- }
1201
-
1202
- // Close closes the underlying stream and returns an internal buffer to the pool.
1203
- // It is expected (and safe) to call Close even if Commit fails.
1204
- func (uw * UploadWriter ) Close () error {
1205
- if uw .closed {
1206
- return status .FailedPreconditionError ("UploadWriteCloser already closed, cannot close again" )
1207
- }
1208
- uw .closed = true
1209
- uploadBufPool .Put (uw .buf )
1210
- return uw .stream .CloseSend ()
1211
- }
1212
-
1213
- func (uw * UploadWriter ) GetCommittedSize () int64 {
1214
- return uw .committedSize
1215
- }
1216
-
1217
- func (uw * UploadWriter ) GetBytesUploaded () int64 {
1218
- return uw .bytesUploaded
1219
- }
1220
-
1221
- // Assert that UploadWriter implements CommittedWriteCloser
1222
- var _ interfaces.CommittedWriteCloser = (* UploadWriter )(nil )
1223
-
1224
- // NewUploadWriter returns an UploadWriter that writes to the CAS for the specific resource name.
1225
- // The blob is guaranteed to be written to the CAS only if all Write(...) calls and the Close() call succeed.
1226
- // The caller is responsible for checking data integrity using GetCommittedSize() and GetBytesUploaded().
1227
- func NewUploadWriter (ctx context.Context , bsClient bspb.ByteStreamClient , r * digest.CASResourceName ) (* UploadWriter , error ) {
1228
- if bsClient == nil {
1229
- return nil , status .FailedPreconditionError ("ByteStreamClient not configured" )
1230
- }
1231
- stream , err := bsClient .Write (ctx )
1232
- if err != nil {
1233
- return nil , err
1234
- }
1235
- sender := rpcutil .NewSender [* bspb.WriteRequest ](ctx , stream )
1236
- return & UploadWriter {
1237
- ctx : ctx ,
1238
- stream : stream ,
1239
- sender : sender ,
1240
- uploadString : r .NewUploadString (),
1241
- buf : uploadBufPool .Get (),
1242
- }, nil
1243
- }
0 commit comments