Files
restic/internal/restic/parallel.go
Michael Eischer 99e105eeb6 repository: restrict SaveUnpacked and RemoveUnpacked
Those methods now only allow modifying snapshots. Internal data types
used by the repository are now read-only. The repository-internal code
can bypass the restrictions by wrapping the repository in an
`internalRepository` type.

The restriction itself is implemented by using a new datatype
WriteableFileType in the SaveUnpacked and RemoveUnpacked methods. This
statically ensures that code cannot bypass the access restrictions.

The test changes are somewhat noisy as some of them modify repository
internals and therefore require some way to bypass the access
restrictions. This works by capturing an `internalRepository` or
`Backend` when creating the Repository using a test helper function.
2025-01-13 22:39:57 +01:00

93 lines
2.1 KiB
Go

package restic
import (
"context"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
)
func ParallelList(ctx context.Context, r Lister, t FileType, parallelism uint, fn func(context.Context, ID, int64) error) error {
type FileIDInfo struct {
ID
Size int64
}
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan FileIDInfo)
// send list of index files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return r.List(ctx, t, func(id ID, size int64) error {
select {
case <-ctx.Done():
return nil
case ch <- FileIDInfo{id, size}:
}
return nil
})
})
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
worker := func() error {
for fi := range ch {
debug.Log("worker got file %v/%v", t, fi.ID.Str())
err := fn(ctx, fi.ID, fi.Size)
if err != nil {
return err
}
}
return nil
}
// run workers on ch
for i := uint(0); i < parallelism; i++ {
wg.Go(worker)
}
return wg.Wait()
}
// ParallelRemove deletes the given fileList of fileType in parallel
// if callback returns an error, then it will abort.
func ParallelRemove[FT FileTypes](ctx context.Context, repo RemoverUnpacked[FT], fileList IDSet, fileType FT, report func(id ID, err error) error, bar *progress.Counter) error {
fileChan := make(chan ID)
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
defer close(fileChan)
for id := range fileList {
select {
case fileChan <- id:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
bar.SetMax(uint64(len(fileList)))
// deleting files is IO-bound
workerCount := repo.Connections()
for i := 0; i < int(workerCount); i++ {
wg.Go(func() error {
for id := range fileChan {
err := repo.RemoveUnpacked(ctx, fileType, id)
if report != nil {
err = report(id, err)
}
if err != nil {
return err
}
bar.Add(1)
}
return nil
})
}
return wg.Wait()
}