mirror of
https://github.com/restic/restic.git
synced 2026-06-27 11:04:17 +00:00
feat(backends/s3): add warmup support for check command (#5248)
This commit is contained in:
@@ -292,10 +292,10 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles, er
|
||||
// with an unmodified parameter list
|
||||
// Otherwise it calculates the packfiles needed, gets their sizes from the full
|
||||
// packfile set and submits them to repository.ReadPacks()
|
||||
func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID]int64) map[restic.ID]int64, p restic.Counter, errChan chan<- error) {
|
||||
func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID]int64) map[restic.ID]int64, printer restic.Printer, errChan chan<- error) {
|
||||
// no snapshot filtering, pass through
|
||||
if !c.IsFiltered() {
|
||||
c.Checker.ReadPacks(ctx, filter, p, errChan)
|
||||
c.Checker.ReadPacks(ctx, filter, printer, errChan)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -314,5 +314,5 @@ func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID
|
||||
return filter(filteredPacks)
|
||||
}
|
||||
|
||||
c.Checker.ReadPacks(ctx, packfileFilter, p, errChan)
|
||||
c.Checker.ReadPacks(ctx, packfileFilter, printer, errChan)
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ func checkData(chkr *checker.Checker) []error {
|
||||
func(ctx context.Context, errCh chan<- error) {
|
||||
chkr.ReadPacks(ctx, func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
return packs
|
||||
}, restic.NoopCounter, errCh)
|
||||
}, restic.NewNoopPrinter(), errCh)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ func TestCheckRepo(t testing.TB, repo checkerRepository) {
|
||||
errChan = make(chan error)
|
||||
go chkr.ReadPacks(context.TODO(), func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
return packs
|
||||
}, restic.NoopCounter, errChan)
|
||||
}, restic.NewNoopPrinter(), errChan)
|
||||
|
||||
for err := range errChan {
|
||||
t.Error(err)
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/feature"
|
||||
"github.com/restic/restic/internal/repository/hashing"
|
||||
"github.com/restic/restic/internal/repository/index"
|
||||
"github.com/restic/restic/internal/repository/pack"
|
||||
@@ -243,7 +244,7 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
|
||||
}
|
||||
|
||||
// ReadPacks loads data from specified packs and checks the integrity.
|
||||
func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID]int64) map[restic.ID]int64, p restic.Counter, errChan chan<- error) {
|
||||
func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID]int64) map[restic.ID]int64, printer restic.Printer, errChan chan<- error) {
|
||||
defer close(errChan)
|
||||
|
||||
// compute pack size using index entries
|
||||
@@ -253,7 +254,30 @@ func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID
|
||||
return
|
||||
}
|
||||
packs = filter(packs)
|
||||
|
||||
p := printer.NewCounter("packs")
|
||||
p.SetMax(uint64(len(packs)))
|
||||
defer p.Done()
|
||||
|
||||
packSet := restic.NewIDSet()
|
||||
for pack := range packs {
|
||||
packSet.Insert(pack)
|
||||
}
|
||||
|
||||
if feature.Flag.Enabled(feature.S3Restore) {
|
||||
job, err := c.repo.StartWarmup(ctx, packSet)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
if job.HandleCount() != 0 {
|
||||
printer.P("warming up %d packs from cold storage, this may take a while...", job.HandleCount())
|
||||
if err := job.Wait(ctx); err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
type checkTask struct {
|
||||
@@ -302,11 +326,6 @@ func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID
|
||||
})
|
||||
}
|
||||
|
||||
packSet := restic.NewIDSet()
|
||||
for pack := range packs {
|
||||
packSet.Insert(pack)
|
||||
}
|
||||
|
||||
// push packs to ch
|
||||
for pbs := range c.repo.listPacksFromIndex(ctx, packSet) {
|
||||
size := packs[pbs.PackID]
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/data"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/feature"
|
||||
"github.com/restic/restic/internal/repository/pack"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
@@ -91,7 +92,7 @@ func runReadPacks(chkr *Checker) []error {
|
||||
func(ctx context.Context, errCh chan<- error) {
|
||||
chkr.ReadPacks(ctx, func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
return packs
|
||||
}, restic.NoopCounter, errCh)
|
||||
}, restic.NewNoopPrinter(), errCh)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -224,3 +225,36 @@ func TestCheckPackPartialDownloadError(t *testing.T) {
|
||||
"partial read must produce ErrPackData, got: %T %v", err, err)
|
||||
}
|
||||
}
|
||||
|
||||
// warmupBackend simulates a backend where all handles needs to be warmed up.
|
||||
type warmupBackend struct {
|
||||
backend.Backend
|
||||
handlesToWarmup []backend.Handle
|
||||
handlesAwaited []backend.Handle
|
||||
}
|
||||
|
||||
func (be *warmupBackend) Warmup(_ context.Context, h []backend.Handle) ([]backend.Handle, error) {
|
||||
be.handlesToWarmup = append(be.handlesToWarmup, h...)
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func (be *warmupBackend) WarmupWait(_ context.Context, h []backend.Handle) error {
|
||||
be.handlesAwaited = append(be.handlesAwaited, h...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestCheckerWarmup(t *testing.T) {
|
||||
defer feature.TestSetFlag(t, feature.Flag, feature.S3Restore, true)()
|
||||
|
||||
var wBackend *warmupBackend
|
||||
chkr := setupChecker(t, func(be backend.Backend) backend.Backend {
|
||||
wBackend = &warmupBackend{Backend: be}
|
||||
return wBackend
|
||||
})
|
||||
|
||||
errs := runReadPacks(chkr)
|
||||
rtest.Assert(t, len(errs) == 0, "expected no data error, got %v: %v", len(errs), errs)
|
||||
|
||||
rtest.Assert(t, len(wBackend.handlesToWarmup) > 0, "found no handles to warmup")
|
||||
rtest.Equals(t, wBackend.handlesToWarmup, wBackend.handlesAwaited, "expected to wait for all cold handles")
|
||||
}
|
||||
|
||||
@@ -189,7 +189,7 @@ func TestCheckRepo(t testing.TB, repo *Repository) {
|
||||
errChan = make(chan error)
|
||||
go chkr.ReadPacks(context.TODO(), func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
return packs
|
||||
}, restic.NoopCounter, errChan)
|
||||
}, restic.NewNoopPrinter(), errChan)
|
||||
|
||||
for err := range errChan {
|
||||
t.Error(err)
|
||||
|
||||
Reference in New Issue
Block a user