@@ -18,7 +18,6 @@ import (
18
18
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
19
19
"github.com/buildbuddy-io/buildbuddy/server/metrics"
20
20
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
21
- "github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool"
22
21
"github.com/buildbuddy-io/buildbuddy/server/util/compression"
23
22
"github.com/buildbuddy-io/buildbuddy/server/util/ioutil"
24
23
"github.com/buildbuddy-io/buildbuddy/server/util/log"
50
49
acRPCTimeout = flag .Duration ("cache.client.ac_rpc_timeout" , 15 * time .Second , "Maximum time a single Action Cache RPC can take." )
51
50
filecacheTreeSalt = flag .String ("cache.filecache_tree_salt" , "20250304" , "A salt to invalidate filecache tree hashes, if/when needed." )
52
51
requestCachedSubtreeDigests = flag .Bool ("cache.request_cached_subtree_digests" , true , "If true, GetTree requests will set send_cached_subtree_digests." )
53
-
54
- uploadBufPool = bytebufferpool .FixedSize (uploadBufSizeBytes )
55
52
)
56
53
57
54
func retryOptions (name string ) * retry.Options {
@@ -258,29 +255,19 @@ func uploadFromReader(ctx context.Context, bsClient bspb.ByteStreamClient, r *di
258
255
}
259
256
260
257
if r .GetCompressor () == repb .Compressor_IDENTITY {
261
- w , err := NewUploadWriter (ctx , bsClient , r )
258
+ w , err := newUploadWriteCloser (ctx , bsClient , r )
262
259
if err != nil {
263
260
return nil , 0 , err
264
261
}
265
- defer w .Close ()
266
- bytesRead , err := w .ReadFrom (in )
262
+ bytesUploaded , err := w .ReadFrom (in )
267
263
if err != nil {
264
+ w .Close ()
268
265
return nil , 0 , err
269
266
}
270
- if err := w .Commit (); err != nil {
271
- return nil , 0 , err
272
- }
273
- // Either the write succeeded or was short-circuited, but in
274
- // either case, the remoteSize for uncompressed uploads should
275
- // match the file size.
276
- committedSize , err := w .GetCommittedSize ()
277
- if err != nil {
267
+ if err := w .Close (); err != nil {
278
268
return nil , 0 , err
279
269
}
280
- if committedSize != r .GetDigest ().GetSizeBytes () {
281
- return nil , 0 , status .DataLossErrorf ("Remote size (%d) != uploaded size: (%d)" , committedSize , r .GetDigest ().GetSizeBytes ())
282
- }
283
- return r .GetDigest (), bytesRead , nil
270
+ return r .GetDigest (), bytesUploaded , nil
284
271
}
285
272
286
273
stream , err := bsClient .Write (ctx )
@@ -336,15 +323,24 @@ func uploadFromReader(ctx context.Context, bsClient bspb.ByteStreamClient, r *di
336
323
return nil , 0 , err
337
324
}
338
325
339
- committedSize := rsp .GetCommittedSize ()
340
- // -1 is returned if the blob already exists, otherwise the
341
- // remoteSize should agree with what we uploaded.
342
- if committedSize != bytesUploaded && committedSize != - 1 {
343
- return nil , 0 , status .DataLossErrorf ("Remote size (%d) != uploaded size: (%d)" , committedSize , r .GetDigest ().GetSizeBytes ())
326
+ remoteSize := rsp .GetCommittedSize ()
327
+ if r .GetCompressor () == repb .Compressor_IDENTITY {
328
+ // Either the write succeeded or was short-circuited, but in
329
+ // either case, the remoteSize for uncompressed uploads should
330
+ // match the file size.
331
+ if remoteSize != r .GetDigest ().GetSizeBytes () {
332
+ return nil , 0 , status .DataLossErrorf ("Remote size (%d) != uploaded size: (%d)" , remoteSize , r .GetDigest ().GetSizeBytes ())
333
+ }
334
+ } else {
335
+ // -1 is returned if the blob already exists, otherwise the
336
+ // remoteSize should agree with what we uploaded.
337
+ if remoteSize != bytesUploaded && remoteSize != - 1 {
338
+ return nil , 0 , status .DataLossErrorf ("Remote size (%d) != uploaded size: (%d)" , remoteSize , r .GetDigest ().GetSizeBytes ())
339
+ }
344
340
}
345
341
346
342
// At this point, a remote size of -1 means nothing new was uploaded.
347
- if committedSize == - 1 {
343
+ if remoteSize == - 1 {
348
344
bytesUploaded = 0
349
345
}
350
346
@@ -1117,130 +1113,91 @@ func maybeSetCompressor(rn *digest.CASResourceName) {
1117
1113
}
1118
1114
}
1119
1115
1120
- type UploadWriter struct {
1116
+ type uploadWriteCloser struct {
1121
1117
ctx context.Context
1122
1118
stream bspb.ByteStream_WriteClient
1123
1119
sender rpcutil.Sender [* bspb.WriteRequest ]
1120
+ resource * digest.CASResourceName
1124
1121
uploadString string
1125
1122
1126
1123
bytesUploaded int64
1127
- finished bool
1128
- committedSize int64
1129
- closed bool
1130
-
1131
- buf []byte
1132
- bytesBuffered int
1133
1124
}
1134
1125
1135
- // Assert that UploadWriter implements CommittedWriteCloser
1136
- var _ interfaces.CommittedWriteCloser = (* UploadWriter )(nil )
1137
-
1138
- func (uw * UploadWriter ) Write (p []byte ) (int , error ) {
1139
- if uw .finished || uw .closed {
1140
- return 0 , status .FailedPreconditionError ("Cannot write to UploadWriter after it is finished or closed" )
1141
- }
1126
+ func (cwc * uploadWriteCloser ) Write (p []byte ) (int , error ) {
1142
1127
written := 0
1143
1128
for len (p ) > 0 {
1144
- n := copy (uw .buf [uw .bytesBuffered :], p )
1145
- uw .bytesBuffered += n
1146
- if uw .bytesBuffered == uploadBufSizeBytes {
1147
- if err := uw .flush (false /* finish */ ); err != nil {
1148
- return written , err
1129
+ n := min (len (p ), uploadBufSizeBytes )
1130
+ req := & bspb.WriteRequest {
1131
+ Data : p [:n ],
1132
+ ResourceName : cwc .uploadString ,
1133
+ WriteOffset : cwc .bytesUploaded + int64 (written ),
1134
+ FinishWrite : false ,
1135
+ }
1136
+
1137
+ err := cwc .sender .SendWithTimeoutCause (req , * casRPCTimeout , status .DeadlineExceededError ("Timed out sending Write request" ))
1138
+ if err != nil {
1139
+ if err == io .EOF {
1140
+ break
1149
1141
}
1142
+ cwc .bytesUploaded += int64 (written )
1143
+ return written , err
1150
1144
}
1151
1145
written += n
1152
1146
p = p [n :]
1153
1147
}
1148
+ cwc .bytesUploaded += int64 (written )
1154
1149
return written , nil
1155
1150
}
1156
1151
1157
- func (uw * UploadWriter ) flush (finish bool ) error {
1158
- if uw .finished || uw .closed {
1159
- return status .FailedPreconditionError ("UploadWriteCloser already finished or closed, cannot flush" )
1160
- }
1161
- req := & bspb.WriteRequest {
1162
- Data : uw .buf [:uw .bytesBuffered ],
1163
- ResourceName : uw .uploadString ,
1164
- WriteOffset : uw .bytesUploaded ,
1165
- FinishWrite : finish ,
1166
- }
1167
- err := uw .sender .SendWithTimeoutCause (req , * casRPCTimeout , status .DeadlineExceededError ("Timed out sending Write request" ))
1168
- if err != nil {
1169
- return err
1170
- }
1171
- uw .bytesUploaded += int64 (uw .bytesBuffered )
1172
- uw .bytesBuffered = 0
1173
- uw .finished = finish
1174
- return nil
1175
- }
1176
-
1177
- func (uw * UploadWriter ) ReadFrom (r io.Reader ) (int64 , error ) {
1178
- if uw .finished || uw .closed {
1179
- return 0 , status .FailedPreconditionError ("UploadWriter cannot ReadFrom after it is finished or closed" )
1180
- }
1181
- bytesRead := int64 (0 )
1152
+ func (cwc * uploadWriteCloser ) ReadFrom (r io.Reader ) (int64 , error ) {
1153
+ buf := make ([]byte , uploadBufSizeBytes )
1154
+ var bytesUploaded int64
1182
1155
for {
1183
- n , err := ioutil .ReadTryFillBuffer (r , uw .buf [uw .bytesBuffered :])
1184
- uw .bytesBuffered += n
1185
- bytesRead += int64 (n )
1156
+ n , err := ioutil .ReadTryFillBuffer (r , buf )
1186
1157
done := err == io .EOF
1187
1158
if err != nil && ! done {
1188
- return bytesRead , err
1159
+ return bytesUploaded , err
1189
1160
}
1190
- if uw .bytesBuffered == uploadBufSizeBytes {
1191
- if err := uw .flush (false /* finish */ ); err != nil {
1192
- return bytesRead , err
1193
- }
1161
+ written , err := cwc .Write (buf [:n ])
1162
+ if err != nil {
1163
+ return bytesUploaded + int64 (written ), err
1194
1164
}
1165
+ bytesUploaded += int64 (written )
1195
1166
if done {
1196
1167
break
1197
1168
}
1198
1169
}
1199
- return bytesRead , nil
1170
+ return bytesUploaded , nil
1200
1171
}
1201
1172
1202
- func (uw * UploadWriter ) Commit () error {
1203
- if uw .closed {
1204
- return status .FailedPreconditionError ("UploadWriteCloser already closed, cannot commit" )
1205
- }
1206
- if uw .finished {
1207
- return status .FailedPreconditionError ("UploadWriteCloser already committed, cannot commit again" )
1208
- }
1209
- if err := uw .flush (true /* finish */ ); err != nil {
1210
- return err
1173
+ func (cwc * uploadWriteCloser ) Close () error {
1174
+ req := & bspb.WriteRequest {
1175
+ ResourceName : cwc .uploadString ,
1176
+ WriteOffset : cwc .bytesUploaded ,
1177
+ FinishWrite : true ,
1211
1178
}
1212
- rsp , err := uw .stream .CloseAndRecv ()
1179
+
1180
+ err := cwc .sender .SendWithTimeoutCause (req , * casRPCTimeout , status .DeadlineExceededError ("Timed out sending Write request" ))
1213
1181
if err != nil {
1214
1182
return err
1215
1183
}
1216
- uw .committedSize = rsp .GetCommittedSize ()
1217
- return nil
1218
- }
1219
1184
1220
- func ( uw * UploadWriter ) Close () error {
1221
- if uw . closed {
1222
- return status . FailedPreconditionError ( "UploadWriteCloser already closed, cannot close again" )
1185
+ rsp , err := cwc . stream . CloseAndRecv ()
1186
+ if err != nil {
1187
+ return err
1223
1188
}
1224
- uw .closed = true
1225
- uploadBufPool .Put (uw .buf )
1226
- return uw .stream .CloseSend ()
1227
- }
1228
1189
1229
- func (uw * UploadWriter ) GetCommittedSize () (int64 , error ) {
1230
- if ! uw .finished {
1231
- return 0 , status .FailedPreconditionError ("UploadWriter not committed, cannot get committed size" )
1190
+ remoteSize := rsp .GetCommittedSize ()
1191
+ // Either the write succeeded or was short-circuited, but in
1192
+ // either case, the remoteSize for uncompressed uploads should
1193
+ // match the file size.
1194
+ if remoteSize != cwc .resource .GetDigest ().GetSizeBytes () {
1195
+ return status .DataLossErrorf ("Remote size (%d) != uploaded size: (%d)" , remoteSize , cwc .resource .GetDigest ().GetSizeBytes ())
1232
1196
}
1233
- return uw .committedSize , nil
1234
- }
1235
-
1236
- func (uw * UploadWriter ) GetBytesUploaded () int64 {
1237
- return uw .bytesUploaded
1197
+ return nil
1238
1198
}
1239
1199
1240
- // NewUploadWriter returns an UploadWriter that writes to the CAS for the specific resource name.
1241
- // The blob is guaranteed to be written to the CAS only if all Write(...) calls and the Close() call succeed.
1242
- // The caller is responsible for checking data integrity using GetCommittedSize() and GetBytesUploaded().
1243
- func NewUploadWriter (ctx context.Context , bsClient bspb.ByteStreamClient , r * digest.CASResourceName ) (* UploadWriter , error ) {
1200
+ func newUploadWriteCloser (ctx context.Context , bsClient bspb.ByteStreamClient , r * digest.CASResourceName ) (* uploadWriteCloser , error ) {
1244
1201
if bsClient == nil {
1245
1202
return nil , status .FailedPreconditionError ("ByteStreamClient not configured" )
1246
1203
}
@@ -1252,11 +1209,11 @@ func NewUploadWriter(ctx context.Context, bsClient bspb.ByteStreamClient, r *dig
1252
1209
return nil , err
1253
1210
}
1254
1211
sender := rpcutil .NewSender [* bspb.WriteRequest ](ctx , stream )
1255
- return & UploadWriter {
1212
+ return & uploadWriteCloser {
1256
1213
ctx : ctx ,
1257
1214
stream : stream ,
1258
1215
sender : sender ,
1216
+ resource : r ,
1259
1217
uploadString : r .NewUploadString (),
1260
- buf : uploadBufPool .Get (),
1261
1218
}, nil
1262
1219
}
0 commit comments