Files
restic/internal/repository/repack.go
Michael Eischer e247118f49 repository: move Blob, Blobs and PackedBlob to pack package
This removes them from the public interface. The latter now only
provides the PackBlob interface, without being bound to the type used
internally by the pack package.
2026-06-05 12:25:03 +02:00

159 lines
3.9 KiB
Go

package repository
import (
"context"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/feature"
"github.com/restic/restic/internal/repository/index"
"github.com/restic/restic/internal/repository/pack"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
)
type repackBlobSet interface {
Has(bh restic.BlobHandle) bool
Delete(bh restic.BlobHandle)
Len() int
}
type LogFunc func(msg string, args ...interface{})
// CopyBlobs takes a list of packs together with a list of blobs contained in
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
// into a new pack. Returned is the list of obsolete packs which can then
// be removed.
//
// The map keepBlobs is modified by CopyBlobs, it is used to keep track of which
// blobs have been processed.
func CopyBlobs(
ctx context.Context,
repo *Repository,
dstRepo restic.Repository,
dstUploader restic.BlobSaverWithAsync,
packs restic.IDSet,
keepBlobs repackBlobSet,
p *progress.Counter,
logf LogFunc,
) error {
debug.Log("repacking %d packs while keeping %d blobs", len(packs), keepBlobs.Len())
if logf == nil {
logf = func(_ string, _ ...interface{}) {}
}
p.SetMax(uint64(len(packs)))
defer p.Done()
if repo == dstRepo && dstRepo.Connections() < 2 {
return errors.New("repack step requires a backend connection limit of at least two")
}
return repack(ctx, repo, dstRepo, dstUploader, packs, keepBlobs, p, logf)
}
func repack(
ctx context.Context,
repo *Repository,
dstRepo restic.Repository,
uploader restic.BlobSaverWithAsync,
packs restic.IDSet,
keepBlobs repackBlobSet,
p *progress.Counter,
logf LogFunc,
) error {
wg, wgCtx := errgroup.WithContext(ctx)
if feature.Flag.Enabled(feature.S3Restore) {
job, err := repo.StartWarmup(ctx, packs)
if err != nil {
return err
}
if job.HandleCount() != 0 {
logf("warming up %d packs from cold storage, this may take a while...", job.HandleCount())
if err := job.Wait(ctx); err != nil {
return err
}
}
}
var keepMutex sync.Mutex
downloadQueue := make(chan index.PackBlobs)
wg.Go(func() error {
defer close(downloadQueue)
for pbs := range repo.listPacksFromIndex(wgCtx, packs) {
var packBlobs pack.Blobs
keepMutex.Lock()
// filter out unnecessary blobs
for _, entry := range pbs.Blobs {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
if keepBlobs.Has(h) {
packBlobs = append(packBlobs, entry)
}
}
keepMutex.Unlock()
select {
case downloadQueue <- index.PackBlobs{PackID: pbs.PackID, Blobs: packBlobs}:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return wgCtx.Err()
})
worker := func() error {
for t := range downloadQueue {
err := repo.loadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
// a required blob couldn't be retrieved
return err
}
keepMutex.Lock()
// recheck whether some other worker was faster
shouldKeep := keepBlobs.Has(blob)
if shouldKeep {
keepBlobs.Delete(blob)
}
keepMutex.Unlock()
if !shouldKeep {
return nil
}
// We do want to save already saved blobs!
_, _, _, err = uploader.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true)
if err != nil {
return err
}
debug.Log(" saved blob %v", blob.ID)
return nil
})
if err != nil {
return err
}
p.Add(1)
}
return nil
}
// as packs are streamed the concurrency is limited by IO
// reduce by one to ensure that uploading is always possible
repackWorkerCount := int(repo.Connections() - 1)
if repo != dstRepo {
// no need to share the upload and download connections for different repositories
repackWorkerCount = int(repo.Connections())
}
for i := 0; i < repackWorkerCount; i++ {
wg.Go(worker)
}
return wg.Wait()
}