mirror of
https://github.com/restic/restic.git
synced 2026-04-23 21:29:25 +00:00
repository: fix race condition for blobSaver shutdown
wg.Go() may not be called after wg.Wait(). This prevents connecting two errgroups such that the errors are propagated between them if the child errgroup dynamically starts goroutines. Instead use just a single errgroup, and sequence the shutdown using a sync.WaitGroup. This is far simpler and does not require any "clever" tricks.
This commit is contained in:
@@ -42,7 +42,8 @@ type Repository struct {
|
||||
opts Options
|
||||
|
||||
packerWg *errgroup.Group
|
||||
blobWg *errgroup.Group
|
||||
mainWg *errgroup.Group
|
||||
blobSaver *sync.WaitGroup
|
||||
uploader *packerUploader
|
||||
treePM *packerManager
|
||||
dataPM *packerManager
|
||||
@@ -562,12 +563,14 @@ func (r *Repository) removeUnpacked(ctx context.Context, t restic.FileType, id r
|
||||
|
||||
func (r *Repository) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaverWithAsync) error) error {
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
// pack uploader + wg.Go below + blob saver (CPU bound)
|
||||
wg.SetLimit(2 + runtime.GOMAXPROCS(0))
|
||||
r.mainWg = wg
|
||||
r.startPackUploader(ctx, wg)
|
||||
saverCtx := r.startBlobSaver(ctx, wg)
|
||||
// blob saver are spawned on demand, use wait group to keep track of them
|
||||
r.blobSaver = &sync.WaitGroup{}
|
||||
wg.Go(func() error {
|
||||
// must use saverCtx to ensure that the ctx used for saveBlob calls is bound to it
|
||||
// otherwise the blob saver could deadlock in case of an error.
|
||||
if err := fn(saverCtx, &blobSaverRepo{repo: r}); err != nil {
|
||||
if err := fn(ctx, &blobSaverRepo{repo: r}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.flush(ctx); err != nil {
|
||||
@@ -594,22 +597,6 @@ func (r *Repository) startPackUploader(ctx context.Context, wg *errgroup.Group)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Repository) startBlobSaver(ctx context.Context, wg *errgroup.Group) context.Context {
|
||||
// blob upload computations are CPU bound
|
||||
blobWg, blobCtx := errgroup.WithContext(ctx)
|
||||
blobWg.SetLimit(runtime.GOMAXPROCS(0))
|
||||
r.blobWg = blobWg
|
||||
|
||||
wg.Go(func() error {
|
||||
// As the goroutines are only spawned on demand, wait until the context is canceled.
|
||||
// This will either happen on an error while saving a blob or when blobWg.Wait() is called
|
||||
// by flushBlobUploader().
|
||||
<-blobCtx.Done()
|
||||
return blobWg.Wait()
|
||||
})
|
||||
return blobCtx
|
||||
}
|
||||
|
||||
type blobSaverRepo struct {
|
||||
repo *Repository
|
||||
}
|
||||
@@ -624,28 +611,26 @@ func (r *blobSaverRepo) SaveBlobAsync(ctx context.Context, t restic.BlobType, bu
|
||||
|
||||
// Flush saves all remaining packs and the index
|
||||
func (r *Repository) flush(ctx context.Context) error {
|
||||
if err := r.flushBlobUploader(); err != nil {
|
||||
return err
|
||||
}
|
||||
r.flushBlobSaver()
|
||||
r.mainWg = nil
|
||||
|
||||
if err := r.flushPacks(ctx); err != nil {
|
||||
if err := r.flushPackUploader(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return r.idx.Flush(ctx, &internalRepository{r})
|
||||
}
|
||||
|
||||
func (r *Repository) flushBlobUploader() error {
|
||||
if r.blobWg == nil {
|
||||
return nil
|
||||
func (r *Repository) flushBlobSaver() {
|
||||
if r.blobSaver == nil {
|
||||
return
|
||||
}
|
||||
err := r.blobWg.Wait()
|
||||
r.blobWg = nil
|
||||
return err
|
||||
r.blobSaver.Wait()
|
||||
r.blobSaver = nil
|
||||
}
|
||||
|
||||
// FlushPacks saves all remaining packs.
|
||||
func (r *Repository) flushPacks(ctx context.Context) error {
|
||||
func (r *Repository) flushPackUploader(ctx context.Context) error {
|
||||
if r.packerWg == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -1032,11 +1017,11 @@ func (r *Repository) saveBlob(ctx context.Context, t restic.BlobType, buf []byte
|
||||
}
|
||||
|
||||
func (r *Repository) saveBlobAsync(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, size int, err error)) {
|
||||
r.blobWg.Go(func() error {
|
||||
r.mainWg.Go(func() error {
|
||||
if ctx.Err() != nil {
|
||||
// fail fast if the context is cancelled
|
||||
cb(restic.ID{}, false, 0, context.Cause(ctx))
|
||||
return context.Cause(ctx)
|
||||
cb(restic.ID{}, false, 0, ctx.Err())
|
||||
return ctx.Err()
|
||||
}
|
||||
newID, known, size, err := r.saveBlob(ctx, t, buf, id, storeDuplicate)
|
||||
cb(newID, known, size, err)
|
||||
|
||||
Reference in New Issue
Block a user