diff --git a/changelog/unreleased/issue-5453 b/changelog/unreleased/issue-5453 new file mode 100644 index 000000000..12c09e0b0 --- /dev/null +++ b/changelog/unreleased/issue-5453 @@ -0,0 +1,10 @@ +Enhancement: `copy` copies snapshots in batches + +The `copy` command used to copy snapshots individually, even if this resulted in creating pack files +smaller than the target pack size. In particular, this resulted in many small files +when copying small incremental snapshots. + +Now, `copy` copies multiple snapshots at once to avoid creating small files. + +https://github.com/restic/restic/issues/5175 +https://github.com/restic/restic/pull/5464 diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index 498d6f75d..bf86de8a7 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "time" "github.com/restic/restic/internal/data" "github.com/restic/restic/internal/debug" @@ -70,6 +71,39 @@ func (opts *CopyOptions) AddFlags(f *pflag.FlagSet) { initMultiSnapshotFilter(f, &opts.SnapshotFilter, true) } +// collectAllSnapshots: select all snapshot trees to be copied +func collectAllSnapshots(ctx context.Context, opts CopyOptions, + srcSnapshotLister restic.Lister, srcRepo restic.Repository, + dstSnapshotByOriginal map[restic.ID][]*data.Snapshot, args []string, printer progress.Printer, +) (selectedSnapshots []*data.Snapshot) { + + selectedSnapshots = make([]*data.Snapshot, 0, 10) + for sn := range FindFilteredSnapshots(ctx, srcSnapshotLister, srcRepo, &opts.SnapshotFilter, args, printer) { + // check whether the destination has a snapshot with the same persistent ID which has similar snapshot fields + srcOriginal := *sn.ID() + if sn.Original != nil { + srcOriginal = *sn.Original + } + if originalSns, ok := dstSnapshotByOriginal[srcOriginal]; ok { + isCopy := false + for _, originalSn := range originalSns { + if similarSnapshots(originalSn, sn) { + printer.V("\n%v", sn) + printer.V("skipping source snapshot %s, was already copied to snapshot %s", sn.ID().Str(), originalSn.ID().Str()) + isCopy = true + break + } + } + if isCopy { + continue + } + } + selectedSnapshots = append(selectedSnapshots, sn) + } + + return selectedSnapshots +} + func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args []string, term ui.Terminal) error { printer := ui.NewProgressPrinter(false, gopts.Verbosity, term) secondaryGopts, isFromRepo, err := opts.SecondaryRepoOptions.FillGlobalOpts(ctx, gopts, "destination") @@ -124,49 +158,12 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args [ return ctx.Err() } - // remember already processed trees across all snapshots - visitedTrees := restic.NewIDSet() + selectedSnapshots := collectAllSnapshots(ctx, opts, srcSnapshotLister, srcRepo, dstSnapshotByOriginal, args, printer) - for sn := range FindFilteredSnapshots(ctx, srcSnapshotLister, srcRepo, &opts.SnapshotFilter, args, printer) { - // check whether the destination has a snapshot with the same persistent ID which has similar snapshot fields - srcOriginal := *sn.ID() - if sn.Original != nil { - srcOriginal = *sn.Original - } - - if originalSns, ok := dstSnapshotByOriginal[srcOriginal]; ok { - isCopy := false - for _, originalSn := range originalSns { - if similarSnapshots(originalSn, sn) { - printer.V("\n%v", sn) - printer.V("skipping source snapshot %s, was already copied to snapshot %s", sn.ID().Str(), originalSn.ID().Str()) - isCopy = true - break - } - } - if isCopy { - continue - } - } - printer.P("\n%v", sn) - printer.P(" copy started, this may take a while...") - if err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree, printer); err != nil { - return err - } - debug.Log("tree copied") - - // save snapshot - sn.Parent = nil // Parent does not have relevance in the new repo. - // Use Original as a persistent snapshot ID - if sn.Original == nil { - sn.Original = sn.ID() - } - newID, err := data.SaveSnapshot(ctx, dstRepo, sn) - if err != nil { - return err - } - printer.P("snapshot %s saved", newID.Str()) + if err := copyTreeBatched(ctx, srcRepo, dstRepo, selectedSnapshots, printer); err != nil { + return err } + return ctx.Err() } @@ -189,8 +186,63 @@ func similarSnapshots(sna *data.Snapshot, snb *data.Snapshot) bool { return true } +// copyTreeBatched copies multiple snapshots in one go. Snapshots are written after +// data equivalent to at least 10 packfiles was written. +func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository, + selectedSnapshots []*data.Snapshot, printer progress.Printer) error { + + // remember already processed trees across all snapshots + visitedTrees := restic.NewIDSet() + + targetSize := uint64(dstRepo.PackSize()) * 100 + minDuration := 1 * time.Minute + + for len(selectedSnapshots) > 0 { + var batch []*data.Snapshot + batchSize := uint64(0) + startTime := time.Now() + + // call WithBlobUploader() once and then loop over all selectedSnapshots + err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + for len(selectedSnapshots) > 0 && (batchSize < targetSize || time.Since(startTime) < minDuration) { + sn := selectedSnapshots[0] + selectedSnapshots = selectedSnapshots[1:] + batch = append(batch, sn) + + printer.P("\n%v", sn) + printer.P(" copy started, this may take a while...") + sizeBlobs, err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree, printer, uploader) + if err != nil { + return err + } + debug.Log("tree copied") + batchSize += sizeBlobs + } + + return nil + }) + if err != nil { + return err + } + + // add a newline to separate saved snapshot messages from the other messages + if len(batch) > 1 { + printer.P("") + } + // save all the snapshots + for _, sn := range batch { + err := copySaveSnapshot(ctx, sn, dstRepo, printer) + if err != nil { + return err + } + } + } + + return nil +} + func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository, - visitedTrees restic.IDSet, rootTreeID restic.ID, printer progress.Printer) error { + visitedTrees restic.IDSet, rootTreeID restic.ID, printer progress.Printer, uploader restic.BlobSaver) (uint64, error) { wg, wgCtx := errgroup.WithContext(ctx) @@ -204,10 +256,12 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep packList := restic.NewIDSet() enqueue := func(h restic.BlobHandle) { - pb := srcRepo.LookupBlob(h.Type, h.ID) - copyBlobs.Insert(h) - for _, p := range pb { - packList.Insert(p.PackID) + if _, ok := dstRepo.LookupBlobSize(h.Type, h.ID); !ok { + pb := srcRepo.LookupBlob(h.Type, h.ID) + copyBlobs.Insert(h) + for _, p := range pb { + packList.Insert(p.PackID) + } } } @@ -217,21 +271,14 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error) } - // Do we already have this tree blob? - treeHandle := restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob} - if _, ok := dstRepo.LookupBlobSize(treeHandle.Type, treeHandle.ID); !ok { - // copy raw tree bytes to avoid problems if the serialization changes - enqueue(treeHandle) - } + // copy raw tree bytes to avoid problems if the serialization changes + enqueue(restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob}) for _, entry := range tree.Nodes { // Recursion into directories is handled by StreamTrees // Copy the blobs for this file. for _, blobID := range entry.Content { - h := restic.BlobHandle{Type: restic.DataBlob, ID: blobID} - if _, ok := dstRepo.LookupBlobSize(h.Type, h.ID); !ok { - enqueue(h) - } + enqueue(restic.BlobHandle{Type: restic.DataBlob, ID: blobID}) } } } @@ -239,21 +286,20 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep }) err := wg.Wait() if err != nil { - return err + return 0, err } - copyStats(srcRepo, copyBlobs, packList, printer) + sizeBlobs := copyStats(srcRepo, copyBlobs, packList, printer) bar := printer.NewCounter("packs copied") - err = repository.Repack(ctx, srcRepo, dstRepo, packList, copyBlobs, bar, printer.P) + err = repository.CopyBlobs(ctx, srcRepo, dstRepo, uploader, packList, copyBlobs, bar, printer.P) if err != nil { - return errors.Fatalf("%s", err) + return 0, errors.Fatalf("%s", err) } - return nil + return sizeBlobs, nil } // copyStats: print statistics for the blobs to be copied -func copyStats(srcRepo restic.Repository, copyBlobs restic.BlobSet, packList restic.IDSet, printer progress.Printer) { - +func copyStats(srcRepo restic.Repository, copyBlobs restic.BlobSet, packList restic.IDSet, printer progress.Printer) uint64 { // count and size countBlobs := 0 sizeBlobs := uint64(0) @@ -267,4 +313,19 @@ func copyStats(srcRepo restic.Repository, copyBlobs restic.BlobSet, packList res printer.V(" copy %d blobs with disk size %s in %d packfiles\n", countBlobs, ui.FormatBytes(uint64(sizeBlobs)), len(packList)) + return sizeBlobs +} + +func copySaveSnapshot(ctx context.Context, sn *data.Snapshot, dstRepo restic.Repository, printer progress.Printer) error { + sn.Parent = nil // Parent does not have relevance in the new repo. + // Use Original as a persistent snapshot ID + if sn.Original == nil { + sn.Original = sn.ID() + } + newID, err := data.SaveSnapshot(ctx, dstRepo, sn) + if err != nil { + return err + } + printer.P("snapshot %s saved, copied from source snapshot %s", newID.Str(), sn.ID().Str()) + return nil } diff --git a/cmd/restic/cmd_copy_integration_test.go b/cmd/restic/cmd_copy_integration_test.go index c35e960ff..6105acfe4 100644 --- a/cmd/restic/cmd_copy_integration_test.go +++ b/cmd/restic/cmd_copy_integration_test.go @@ -7,7 +7,9 @@ import ( "testing" "github.com/restic/restic/internal/global" + "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "github.com/restic/restic/internal/ui" ) func testRunCopy(t testing.TB, srcGopts global.Options, dstGopts global.Options) { @@ -83,6 +85,41 @@ func TestCopy(t *testing.T) { } rtest.Assert(t, len(origRestores) == 0, "found not copied snapshots") + + // check that snapshots were properly batched while copying + _, _, countBlobs := testPackAndBlobCounts(t, env.gopts) + countTreePacksDst, countDataPacksDst, countBlobsDst := testPackAndBlobCounts(t, env2.gopts) + + rtest.Equals(t, countBlobs, countBlobsDst, "expected blob count in boths repos to be equal") + rtest.Equals(t, countTreePacksDst, 1, "expected 1 tree packfile") + rtest.Equals(t, countDataPacksDst, 1, "expected 1 data packfile") +} + +func testPackAndBlobCounts(t testing.TB, gopts global.Options) (countTreePacks int, countDataPacks int, countBlobs int) { + rtest.OK(t, withTermStatus(t, gopts, func(ctx context.Context, gopts global.Options) error { + printer := ui.NewProgressPrinter(gopts.JSON, gopts.Verbosity, gopts.Term) + _, repo, unlock, err := openWithReadLock(ctx, gopts, false, printer) + rtest.OK(t, err) + defer unlock() + + rtest.OK(t, repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { + blobs, _, err := repo.ListPack(context.TODO(), id, size) + rtest.OK(t, err) + rtest.Assert(t, len(blobs) > 0, "a packfile should contain at least one blob") + + switch blobs[0].Type { + case restic.TreeBlob: + countTreePacks++ + case restic.DataBlob: + countDataPacks++ + } + countBlobs += len(blobs) + return nil + })) + return nil + })) + + return countTreePacks, countDataPacks, countBlobs } func TestCopyIncremental(t *testing.T) { diff --git a/doc/045_working_with_repos.rst b/doc/045_working_with_repos.rst index 75a7e79f1..797ea9f9d 100644 --- a/doc/045_working_with_repos.rst +++ b/doc/045_working_with_repos.rst @@ -205,21 +205,28 @@ example from a local to a remote repository, you can use the ``copy`` command: .. code-block:: console - $ restic -r /srv/restic-repo-copy copy --from-repo /srv/restic-repo + $ restic -r /srv/restic-repo-copy copy --from-repo /srv/restic-repo --verbose repository d6504c63 opened successfully repository 3dd0878c opened successfully + [0:00] 100.00% 2 / 2 index files loaded + [0:00] 100.00% 7 / 7 index files loaded snapshot 410b18a2 of [/home/user/work] at 2020-06-09 23:15:57.305305 +0200 CEST by user@kasimir copy started, this may take a while... - snapshot 7a746a07 saved + [0:00] 100.00% 13 / 13 packs copied snapshot 4e5d5487 of [/home/user/work] at 2020-05-01 22:44:07.012113 +0200 CEST by user@kasimir skipping snapshot 4e5d5487, was already copied to snapshot 50eb62b7 + snapshot 7a746a07 saved, copied from source snapshot 410b18a2 + The example command copies all snapshots from the source repository ``/srv/restic-repo`` to the destination repository ``/srv/restic-repo-copy``. Snapshots which have previously been copied between repositories will -be skipped by later copy runs. +be skipped by later copy runs. Information about skipped snapshots is only +printed when ``--verbose`` is passed to the command. For efficiency reasons, +the snapshots are copied in batches, such that the ``snapshot [...] saved`` +messages can appear some time after the snapshot content was copied. .. important:: This process will have to both download (read) and upload (write) the entire snapshot(s) due to the different encryption keys used in the @@ -353,7 +360,7 @@ modifying the repository. Instead restic will only print the actions it would perform. .. note:: The ``rewrite`` command verifies that it does not modify snapshots in - unexpected ways and fails with an ``cannot encode tree at "[...]" without losing information`` + unexpected ways and fails with an ``cannot encode tree at "[...]" without loosing information`` error otherwise. This can occur when rewriting a snapshot created by a newer version of restic or some third-party implementation. diff --git a/internal/backend/local/local_windows.go b/internal/backend/local/local_windows.go index fa21d8240..b3677b0ef 100644 --- a/internal/backend/local/local_windows.go +++ b/internal/backend/local/local_windows.go @@ -24,7 +24,7 @@ func removeFile(f string) error { // as Windows won't let you delete a read-only file err := os.Chmod(f, 0666) if err != nil && !os.IsPermission(err) { - return errors.WithStack(err) + return errors.WithStack(err) } return os.Remove(f) diff --git a/internal/repository/index/master_index.go b/internal/repository/index/master_index.go index 62ccc4f71..f410ebf61 100644 --- a/internal/repository/index/master_index.go +++ b/internal/repository/index/master_index.go @@ -16,13 +16,13 @@ import ( // MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved. type MasterIndex struct { idx []*Index - pendingBlobs restic.BlobSet + pendingBlobs map[restic.BlobHandle]uint idxMutex sync.RWMutex } // NewMasterIndex creates a new master index. func NewMasterIndex() *MasterIndex { - mi := &MasterIndex{pendingBlobs: restic.NewBlobSet()} + mi := &MasterIndex{pendingBlobs: make(map[restic.BlobHandle]uint)} mi.clear() return mi } @@ -46,10 +46,16 @@ func (mi *MasterIndex) Lookup(bh restic.BlobHandle) (pbs []restic.PackedBlob) { } // LookupSize queries all known Indexes for the ID and returns the first match. +// Also returns true if the ID is pending. func (mi *MasterIndex) LookupSize(bh restic.BlobHandle) (uint, bool) { mi.idxMutex.RLock() defer mi.idxMutex.RUnlock() + // also return true if blob is pending + if size, ok := mi.pendingBlobs[bh]; ok { + return size, true + } + for _, idx := range mi.idx { if size, found := idx.LookupSize(bh); found { return size, found @@ -63,13 +69,13 @@ func (mi *MasterIndex) LookupSize(bh restic.BlobHandle) (uint, bool) { // Before doing so it checks if this blob is already known. // Returns true if adding was successful and false if the blob // was already known -func (mi *MasterIndex) AddPending(bh restic.BlobHandle) bool { +func (mi *MasterIndex) AddPending(bh restic.BlobHandle, size uint) bool { mi.idxMutex.Lock() defer mi.idxMutex.Unlock() // Check if blob is pending or in index - if mi.pendingBlobs.Has(bh) { + if _, ok := mi.pendingBlobs[bh]; ok { return false } @@ -80,30 +86,10 @@ func (mi *MasterIndex) AddPending(bh restic.BlobHandle) bool { } // really not known -> insert - mi.pendingBlobs.Insert(bh) + mi.pendingBlobs[bh] = size return true } -// Has queries all known Indexes for the ID and returns the first match. -// Also returns true if the ID is pending. -func (mi *MasterIndex) Has(bh restic.BlobHandle) bool { - mi.idxMutex.RLock() - defer mi.idxMutex.RUnlock() - - // also return true if blob is pending - if mi.pendingBlobs.Has(bh) { - return true - } - - for _, idx := range mi.idx { - if idx.Has(bh) { - return true - } - } - - return false -} - // IDs returns the IDs of all indexes contained in the index. func (mi *MasterIndex) IDs() restic.IDSet { mi.idxMutex.RLock() @@ -165,7 +151,7 @@ func (mi *MasterIndex) storePack(id restic.ID, blobs []restic.Blob) { // delete blobs from pending for _, blob := range blobs { - mi.pendingBlobs.Delete(restic.BlobHandle{Type: blob.Type, ID: blob.ID}) + delete(mi.pendingBlobs, restic.BlobHandle{Type: blob.Type, ID: blob.ID}) } for _, idx := range mi.idx { diff --git a/internal/repository/index/master_index_test.go b/internal/repository/index/master_index_test.go index edf2067b9..98cfe9ac6 100644 --- a/internal/repository/index/master_index_test.go +++ b/internal/repository/index/master_index_test.go @@ -74,9 +74,6 @@ func TestMasterIndex(t *testing.T) { mIdx.Insert(idx2) // test idInIdx1 - found := mIdx.Has(bhInIdx1) - rtest.Equals(t, true, found) - blobs := mIdx.Lookup(bhInIdx1) rtest.Equals(t, []restic.PackedBlob{blob1}, blobs) @@ -85,9 +82,6 @@ func TestMasterIndex(t *testing.T) { rtest.Equals(t, uint(10), size) // test idInIdx2 - found = mIdx.Has(bhInIdx2) - rtest.Equals(t, true, found) - blobs = mIdx.Lookup(bhInIdx2) rtest.Equals(t, []restic.PackedBlob{blob2}, blobs) @@ -96,9 +90,6 @@ func TestMasterIndex(t *testing.T) { rtest.Equals(t, uint(200), size) // test idInIdx12 - found = mIdx.Has(bhInIdx12) - rtest.Equals(t, true, found) - blobs = mIdx.Lookup(bhInIdx12) rtest.Equals(t, 2, len(blobs)) @@ -121,8 +112,6 @@ func TestMasterIndex(t *testing.T) { rtest.Equals(t, uint(80), size) // test not in index - found = mIdx.Has(restic.BlobHandle{ID: restic.NewRandomID(), Type: restic.TreeBlob}) - rtest.Assert(t, !found, "Expected no blobs when fetching with a random id") blobs = mIdx.Lookup(restic.NewRandomBlobHandle()) rtest.Assert(t, blobs == nil, "Expected no blobs when fetching with a random id") _, found = mIdx.LookupSize(restic.NewRandomBlobHandle()) @@ -521,7 +510,7 @@ func TestRewriteOversizedIndex(t *testing.T) { // verify that blobs are still in the index for _, blob := range blobs { - found := mi2.Has(blob.BlobHandle) + _, found := mi2.LookupSize(blob.BlobHandle) rtest.Assert(t, found, "blob %v missing after rewrite", blob.ID) } diff --git a/internal/repository/prune.go b/internal/repository/prune.go index 250ab9846..772765129 100644 --- a/internal/repository/prune.go +++ b/internal/repository/prune.go @@ -105,7 +105,7 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo *Repository, getUsed if repo.Config().Version < 2 && opts.RepackUncompressed { return nil, fmt.Errorf("compression requires at least repository format version 2") } - if opts.SmallPackBytes > uint64(repo.packSize()) { + if opts.SmallPackBytes > uint64(repo.PackSize()) { return nil, fmt.Errorf("repack-smaller-than exceeds repository packsize") } @@ -329,12 +329,12 @@ func decidePackAction(ctx context.Context, opts PruneOptions, repo *Repository, var repackSmallCandidates []packInfoWithID repoVersion := repo.Config().Version // only repack very small files by default - targetPackSize := repo.packSize() / 25 + targetPackSize := repo.PackSize() / 25 if opts.SmallPackBytes > 0 { targetPackSize = uint(opts.SmallPackBytes) } else if opts.RepackSmall { // consider files with at least 80% of the target size as large enough - targetPackSize = repo.packSize() / 5 * 4 + targetPackSize = repo.PackSize() / 5 * 4 } // loop over all packs and decide what to do @@ -563,7 +563,9 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er if len(plan.repackPacks) != 0 { printer.P("repacking packs\n") bar := printer.NewCounter("packs repacked") - err := Repack(ctx, repo, repo, plan.repackPacks, plan.keepBlobs, bar, printer.P) + err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + return CopyBlobs(ctx, repo, repo, uploader, plan.repackPacks, plan.keepBlobs, bar, printer.P) + }) if err != nil { return errors.Fatalf("%s", err) } diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 730325afd..ca0a8a48b 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -21,17 +21,18 @@ type repackBlobSet interface { type LogFunc func(msg string, args ...interface{}) -// Repack takes a list of packs together with a list of blobs contained in +// CopyBlobs takes a list of packs together with a list of blobs contained in // these packs. Each pack is loaded and the blobs listed in keepBlobs is saved // into a new pack. Returned is the list of obsolete packs which can then // be removed. // -// The map keepBlobs is modified by Repack, it is used to keep track of which +// The map keepBlobs is modified by CopyBlobs, it is used to keep track of which // blobs have been processed. -func Repack( +func CopyBlobs( ctx context.Context, repo restic.Repository, dstRepo restic.Repository, + dstUploader restic.BlobSaver, packs restic.IDSet, keepBlobs repackBlobSet, p *progress.Counter, @@ -49,9 +50,7 @@ func Repack( return errors.New("repack step requires a backend connection limit of at least two") } - return dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { - return repack(ctx, repo, dstRepo, uploader, packs, keepBlobs, p, logf) - }) + return repack(ctx, repo, dstRepo, dstUploader, packs, keepBlobs, p, logf) } func repack( diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 4d285681f..bedacaa7e 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -150,7 +150,9 @@ func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSe } func repack(t *testing.T, repo restic.Repository, be backend.Backend, packs restic.IDSet, blobs restic.BlobSet) { - rtest.OK(t, repository.Repack(context.TODO(), repo, repo, packs, blobs, nil, nil)) + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + return repository.CopyBlobs(ctx, repo, repo, uploader, packs, blobs, nil, nil) + })) for id := range packs { rtest.OK(t, be.Remove(context.TODO(), backend.Handle{Type: restic.PackFile, Name: id.String()})) @@ -263,7 +265,9 @@ func testRepackCopy(t *testing.T, version uint) { _, keepBlobs := selectBlobs(t, random, repo, 0.2) copyPacks := findPacksForBlobs(t, repo, keepBlobs) - rtest.OK(t, repository.Repack(context.TODO(), repoWrapped, dstRepoWrapped, copyPacks, keepBlobs, nil, nil)) + rtest.OK(t, repoWrapped.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + return repository.CopyBlobs(ctx, repoWrapped, dstRepoWrapped, uploader, copyPacks, keepBlobs, nil, nil) + })) rebuildAndReloadIndex(t, dstRepo) for h := range keepBlobs { @@ -299,7 +303,9 @@ func testRepackWrongBlob(t *testing.T, version uint) { _, keepBlobs := selectBlobs(t, random, repo, 0) rewritePacks := findPacksForBlobs(t, repo, keepBlobs) - err := repository.Repack(context.TODO(), repo, repo, rewritePacks, keepBlobs, nil, nil) + err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + return repository.CopyBlobs(ctx, repo, repo, uploader, rewritePacks, keepBlobs, nil, nil) + }) if err == nil { t.Fatal("expected repack to fail but got no error") } @@ -346,7 +352,9 @@ func testRepackBlobFallback(t *testing.T, version uint) { })) // repack must fallback to valid copy - rtest.OK(t, repository.Repack(context.TODO(), repo, repo, rewritePacks, keepBlobs, nil, nil)) + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + return repository.CopyBlobs(ctx, repo, repo, uploader, rewritePacks, keepBlobs, nil, nil) + })) keepBlobs = restic.NewBlobSet(restic.BlobHandle{Type: restic.DataBlob, ID: id}) packs := findPacksForBlobs(t, repo, keepBlobs) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index bb9c6c3ba..3b26b1f90 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -154,8 +154,8 @@ func (r *Repository) Config() restic.Config { return r.cfg } -// packSize return the target size of a pack file when uploading -func (r *Repository) packSize() uint { +// PackSize return the target size of a pack file when uploading +func (r *Repository) PackSize() uint { return r.opts.PackSize } @@ -590,8 +590,8 @@ func (r *Repository) startPackUploader(ctx context.Context, wg *errgroup.Group) innerWg, ctx := errgroup.WithContext(ctx) r.packerWg = innerWg r.uploader = newPackerUploader(ctx, innerWg, r, r.Connections()) - r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.packerCount, r.uploader.QueuePacker) - r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.packerCount, r.uploader.QueuePacker) + r.treePM = newPackerManager(r.key, restic.TreeBlob, r.PackSize(), r.packerCount, r.uploader.QueuePacker) + r.dataPM = newPackerManager(r.key, restic.DataBlob, r.PackSize(), r.packerCount, r.uploader.QueuePacker) wg.Go(func() error { return innerWg.Wait() @@ -640,7 +640,7 @@ func (r *Repository) LookupBlob(tpe restic.BlobType, id restic.ID) []restic.Pack return r.idx.Lookup(restic.BlobHandle{Type: tpe, ID: id}) } -// LookupBlobSize returns the size of blob id. +// LookupBlobSize returns the size of blob id. Also returns pending blobs. func (r *Repository) LookupBlobSize(tpe restic.BlobType, id restic.ID) (uint, bool) { return r.idx.LookupSize(restic.BlobHandle{Type: tpe, ID: id}) } @@ -968,7 +968,7 @@ func (r *Repository) saveBlob(ctx context.Context, t restic.BlobType, buf []byte } // first try to add to pending blobs; if not successful, this blob is already known - known = !r.idx.AddPending(restic.BlobHandle{ID: newID, Type: t}) + known = !r.idx.AddPending(restic.BlobHandle{ID: newID, Type: t}, uint(len(buf))) // only save when needed or explicitly told if !known || storeDuplicate { diff --git a/internal/restic/repository.go b/internal/restic/repository.go index cf3ec7834..2f1373641 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -18,6 +18,7 @@ type Repository interface { // Connections returns the maximum number of concurrent backend operations Connections() uint Config() Config + PackSize() uint Key() *crypto.Key LoadIndex(ctx context.Context, p TerminalCounterFactory) error