Skip to content

Commit 509cda9

Browse files
committed
Attempt to upload accumulated files at exit
1 parent d520b12 commit 509cda9

File tree

1 file changed

+23
-12
lines changed

1 file changed

+23
-12
lines changed

main.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,13 @@ func run(ctx context.Context, args runArgs) error {
136136
defer ln.Close()
137137

138138
srv := &server{dir: args.Dir, ch: make(chan json.RawMessage, 1000), wake: make(chan struct{})}
139+
defer func() {
140+
c, cancel := context.WithTimeout(context.Background(), 10*time.Second)
141+
defer cancel()
142+
if err := srv.uploadOnce(c, args.Bucket, args.Prefix, upl); err != nil {
143+
log.Printf("final upload attempt: %v", err)
144+
}
145+
}()
139146
group, ctx := errgroup.WithContext(ctx)
140147
group.Go(func() error { <-ctx.Done(); return ln.Close() })
141148
group.Go(func() error { return srv.ingest(ctx, args.Mb<<20, args.D) })
@@ -172,6 +179,21 @@ type server struct {
172179
}
173180

174181
func (srv *server) upload(ctx context.Context, d time.Duration, bucket, prefix string, upl *manager.Uploader) error {
182+
ticker := time.NewTicker(d)
183+
defer ticker.Stop()
184+
for {
185+
select {
186+
case <-ctx.Done():
187+
return nil
188+
case <-ticker.C:
189+
_ = srv.uploadOnce(ctx, bucket, prefix, upl)
190+
case <-srv.wake:
191+
_ = srv.uploadOnce(ctx, bucket, prefix, upl)
192+
}
193+
}
194+
}
195+
196+
func (srv *server) uploadOnce(ctx context.Context, bucket, prefix string, upl *manager.Uploader) error {
175197
walkFn := func(path string, d fs.DirEntry, err error) error {
176198
if err != nil || d.IsDir() || !strings.HasSuffix(path, ".json.gz") {
177199
return nil
@@ -191,18 +213,7 @@ func (srv *server) upload(ctx context.Context, d time.Duration, bucket, prefix s
191213
}
192214
return nil
193215
}
194-
ticker := time.NewTicker(d)
195-
defer ticker.Stop()
196-
for {
197-
select {
198-
case <-ctx.Done():
199-
return nil
200-
case <-ticker.C:
201-
_ = filepath.WalkDir(srv.dir, walkFn)
202-
case <-srv.wake:
203-
_ = filepath.WalkDir(srv.dir, walkFn)
204-
}
205-
}
216+
return filepath.WalkDir(srv.dir, walkFn)
206217
}
207218

208219
func uploadFile(ctx context.Context, upl *manager.Uploader, bucket, prefix, name string) error {

0 commit comments

Comments
 (0)