restic copy --batch: a fresh start from commit 382616747

Instead of rebasing my code, I decided to start fresh, since WithBlobUploader()
has been introduced.

changelog/unreleased/issue-5453:
doc/045_working_with_repos.rst:
the usual

cmd/restic/cmd_copy.go:
gather all snaps to be collected - collectAllSnapshots()
run overall copy step - func copyTreeBatched()
helper copySaveSnapshot() to save the corresponding snapshot

internal/repository/repack.go:
introduce wrapper CopyBlobs(), which passes parameter `uploader restic.BlobSaver` from
WithBlobUploader() via copyTreeBatched() to repack().

internal/backend/local/local_windows.go:
I did not touch it, but gofmt did: whitespace
This commit is contained in:
Winfried Plappert
2025-11-19 07:09:24 +00:00
parent 3826167474
commit b87f7586e4
6 changed files with 370 additions and 24 deletions
+134 -22
View File
@@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"slices"
"github.com/restic/restic/internal/data"
"github.com/restic/restic/internal/debug"
@@ -63,13 +64,52 @@ Exit status is 12 if the password is incorrect.
type CopyOptions struct {
global.SecondaryRepoOptions
data.SnapshotFilter
batch bool
}
func (opts *CopyOptions) AddFlags(f *pflag.FlagSet) {
f.BoolVar(&opts.batch, "batch", false, "batch all snapshots to be copied into one step to optimize use of packfiles")
opts.SecondaryRepoOptions.AddFlags(f, "destination", "to copy snapshots from")
initMultiSnapshotFilter(f, &opts.SnapshotFilter, true)
}
// collectAllSnapshots: select all snapshot trees to be copied
func collectAllSnapshots(ctx context.Context, opts CopyOptions,
srcSnapshotLister restic.Lister, srcRepo restic.Repository,
dstSnapshotByOriginal map[restic.ID][]*data.Snapshot, args []string, printer progress.Printer) (selectedSnapshots []*data.Snapshot) {
selectedSnapshots = make([]*data.Snapshot, 0, 10)
for sn := range FindFilteredSnapshots(ctx, srcSnapshotLister, srcRepo, &opts.SnapshotFilter, args, printer) {
// check whether the destination has a snapshot with the same persistent ID which has similar snapshot fields
srcOriginal := *sn.ID()
if sn.Original != nil {
srcOriginal = *sn.Original
}
if originalSns, ok := dstSnapshotByOriginal[srcOriginal]; ok {
isCopy := false
for _, originalSn := range originalSns {
if similarSnapshots(originalSn, sn) {
printer.V("\n%v\n", sn)
printer.V("skipping source snapshot %s, was already copied to snapshot %s\n", sn.ID().Str(), originalSn.ID().Str())
isCopy = true
break
}
}
if isCopy {
continue
}
}
selectedSnapshots = append(selectedSnapshots, sn)
}
slices.SortStableFunc(selectedSnapshots, func(a, b *data.Snapshot) int {
return a.Time.Compare(b.Time)
})
return selectedSnapshots
}
func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args []string, term ui.Terminal) error {
printer := ui.NewProgressPrinter(false, gopts.Verbosity, term)
secondaryGopts, isFromRepo, err := opts.SecondaryRepoOptions.FillGlobalOpts(ctx, gopts, "destination")
@@ -124,10 +164,11 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args [
return ctx.Err()
}
selectedSnapshots := collectAllSnapshots(ctx, opts, srcSnapshotLister, srcRepo, dstSnapshotByOriginal, args, printer)
// remember already processed trees across all snapshots
visitedTrees := restic.NewIDSet()
for sn := range FindFilteredSnapshots(ctx, srcSnapshotLister, srcRepo, &opts.SnapshotFilter, args, printer) {
for _, sn := range selectedSnapshots {
// check whether the destination has a snapshot with the same persistent ID which has similar snapshot fields
srcOriginal := *sn.ID()
if sn.Original != nil {
@@ -148,25 +189,12 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args [
continue
}
}
printer.P("\n%v", sn)
printer.P(" copy started, this may take a while...")
if err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree, printer); err != nil {
return err
}
debug.Log("tree copied")
// save snapshot
sn.Parent = nil // Parent does not have relevance in the new repo.
// Use Original as a persistent snapshot ID
if sn.Original == nil {
sn.Original = sn.ID()
}
newID, err := data.SaveSnapshot(ctx, dstRepo, sn)
if err != nil {
return err
}
printer.P("snapshot %s saved", newID.Str())
}
if err := copyTreeBatched(ctx, srcRepo, dstRepo, visitedTrees, selectedSnapshots, opts, printer); err != nil {
return err
}
return ctx.Err()
}
@@ -190,7 +218,7 @@ func similarSnapshots(sna *data.Snapshot, snb *data.Snapshot) bool {
}
func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository,
visitedTrees restic.IDSet, rootTreeID restic.ID, printer progress.Printer) error {
visitedTrees restic.IDSet, rootTreeID restic.ID, printer progress.Printer, uploader restic.BlobSaver, seenBlobs restic.IDSet) error {
wg, wgCtx := errgroup.WithContext(ctx)
@@ -204,11 +232,15 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep
packList := restic.NewIDSet()
enqueue := func(h restic.BlobHandle) {
if seenBlobs.Has(h.ID) {
return
}
pb := srcRepo.LookupBlob(h.Type, h.ID)
copyBlobs.Insert(h)
for _, p := range pb {
packList.Insert(p.PackID)
}
seenBlobs.Insert(h.ID)
}
wg.Go(func() error {
@@ -244,7 +276,9 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep
copyStats(srcRepo, copyBlobs, packList, printer)
bar := printer.NewCounter("packs copied")
err = repository.Repack(ctx, srcRepo, dstRepo, packList, copyBlobs, bar, printer.P)
bar.SetMax(uint64(len(packList)))
err = repository.CopyBlobs(ctx, srcRepo, dstRepo, uploader, packList, copyBlobs, bar, printer.P)
bar.Done()
if err != nil {
return errors.Fatalf("%s", err)
}
@@ -268,3 +302,81 @@ func copyStats(srcRepo restic.Repository, copyBlobs restic.BlobSet, packList res
printer.V(" copy %d blobs with disk size %s in %d packfiles\n",
countBlobs, ui.FormatBytes(uint64(sizeBlobs)), len(packList))
}
func copySaveSnapshot(ctx context.Context, sn *data.Snapshot, dstRepo restic.Repository, printer progress.Printer) error {
sn.Parent = nil // Parent does not have relevance in the new repo.
// Use Original as a persistent snapshot ID
if sn.Original == nil {
sn.Original = sn.ID()
}
newID, err := data.SaveSnapshot(ctx, dstRepo, sn)
if err != nil {
return err
}
printer.P("snapshot %s saved", newID.Str())
return nil
}
// copyTreeBatched: copy multiple snapshot trees in one go, using calls to
// repository.RepackInner() for all selected snapshot trees and thereby packing the packfiles optimally.
// Usually each snapshot creates at least one tree packfile and one data packfile.
func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository,
visitedTrees restic.IDSet, selectedSnapshots []*data.Snapshot, opts CopyOptions,
printer progress.Printer) error {
// seenBlobs is necessary in about 1 of 10000 blobs, in the other 99.99% the check
// dstRepo.LookupBlobSize() is working
seenBlobs := restic.NewIDSet()
// dependent on opts.batch the package Uploader is started either for
// each snapshot to be copied or once for all snapshots
if opts.batch {
// call WithBlobUploader() once and then loop over all selectedSnapshots
err := dstRepo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
for _, sn := range selectedSnapshots {
printer.P("\n%v", sn)
printer.P(" copy started, this may take a while...")
err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree, printer, uploader, seenBlobs)
if err != nil {
return err
}
debug.Log("tree copied")
}
// save all the snapshots
for _, sn := range selectedSnapshots {
err := copySaveSnapshot(ctx, sn, dstRepo, printer)
if err != nil {
return err
}
}
return nil
})
return err
}
// no batch option, loop over selectedSnapshots and call WithBlobUploader()
// inside the loop
for _, sn := range selectedSnapshots {
printer.P("\n%v", sn)
printer.P(" copy started, this may take a while...")
err := dstRepo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
if err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree, printer, uploader, seenBlobs); err != nil {
return err
}
debug.Log("tree copied")
return nil
})
if err != nil {
return err
}
err = copySaveSnapshot(ctx, sn, dstRepo, printer)
if err != nil {
return err
}
}
return nil
}
+194
View File
@@ -6,8 +6,11 @@ import (
"path/filepath"
"testing"
"github.com/restic/restic/internal/data"
"github.com/restic/restic/internal/global"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"github.com/restic/restic/internal/ui"
)
func testRunCopy(t testing.TB, srcGopts global.Options, dstGopts global.Options) {
@@ -28,6 +31,25 @@ func testRunCopy(t testing.TB, srcGopts global.Options, dstGopts global.Options)
}))
}
func testRunCopyBatched(t testing.TB, srcGopts global.Options, dstGopts global.Options) {
gopts := srcGopts
gopts.Repo = dstGopts.Repo
gopts.Password = dstGopts.Password
gopts.InsecureNoPassword = dstGopts.InsecureNoPassword
copyOpts := CopyOptions{
SecondaryRepoOptions: global.SecondaryRepoOptions{
Repo: srcGopts.Repo,
Password: srcGopts.Password,
InsecureNoPassword: srcGopts.InsecureNoPassword,
},
batch: true,
}
rtest.OK(t, withTermStatus(t, gopts, func(ctx context.Context, gopts global.Options) error {
return runCopy(context.TODO(), copyOpts, gopts, nil, gopts.Term)
}))
}
func TestCopy(t *testing.T) {
env, cleanup := withTestEnvironment(t)
defer cleanup()
@@ -85,6 +107,178 @@ func TestCopy(t *testing.T) {
rtest.Assert(t, len(origRestores) == 0, "found not copied snapshots")
}
// packfile with size and type
type packInfo struct {
Type string
size int64
numberBlobs int
}
// testGetUsedBlobs: call data.FindUsedBlobs for all snapshots in repositpry
func testGetUsedBlobs(t *testing.T, repo restic.Repository) (usedBlobs restic.BlobSet) {
selectedTrees := make([]restic.ID, 0, 3)
usedBlobs = restic.NewBlobSet()
snapshotLister, err := restic.MemorizeList(context.TODO(), repo, restic.SnapshotFile)
rtest.OK(t, err)
rtest.OK(t, repo.LoadIndex(context.TODO(), nil))
// gather all snapshots
nullFilter := &data.SnapshotFilter{}
err = nullFilter.FindAll(context.TODO(), snapshotLister, repo, nil, func(_ string, sn *data.Snapshot, err error) error {
rtest.OK(t, err)
selectedTrees = append(selectedTrees, *sn.Tree)
return nil
})
rtest.OK(t, err)
rtest.OK(t, data.FindUsedBlobs(context.TODO(), repo, selectedTrees, usedBlobs, nil))
return usedBlobs
}
// getPackfileInfo: get packfiles, their length, type and number of blobs in packfile
func getPackfileInfo(t *testing.T, repo restic.Repository) (packfiles map[restic.ID]packInfo) {
packfiles = make(map[restic.ID]packInfo)
rtest.OK(t, repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error {
blobs, _, err := repo.ListPack(context.TODO(), id, size)
rtest.OK(t, err)
rtest.Assert(t, len(blobs) > 0, "a packfile should contain at least one blob")
Type := ""
if len(blobs) > 0 {
Type = blobs[0].Type.String()
}
packfiles[id] = packInfo{
Type: Type,
size: size,
numberBlobs: len(blobs),
}
return nil
}))
return packfiles
}
// get various counts from the packfiles in the repository
func getCounts(t *testing.T, repo restic.Repository) (int, int, int) {
countTreePacks := 0
countDataPacks := 0
countBlobs := 0
for _, item := range getPackfileInfo(t, repo) {
switch item.Type {
case "tree":
countTreePacks++
case "data":
countDataPacks++
}
countBlobs += item.numberBlobs
}
return countTreePacks, countDataPacks, countBlobs
}
func TestCopyBatched(t *testing.T) {
env, cleanup := withTestEnvironment(t)
defer cleanup()
env3, cleanup3 := withTestEnvironment(t)
defer cleanup3()
testSetupBackupData(t, env)
opts := BackupOptions{}
testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9")}, opts, env.gopts)
testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9", "2")}, opts, env.gopts)
testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9", "3")}, opts, env.gopts)
testRunCheck(t, env.gopts)
// batch copy
testRunInit(t, env3.gopts)
testRunCopyBatched(t, env.gopts, env3.gopts)
// check integrity of the copy
testRunCheck(t, env3.gopts)
snapshotIDs := testListSnapshots(t, env.gopts, 3)
copiedSnapshotIDs := testListSnapshots(t, env3.gopts, 3)
// check that the copied snapshots have the same tree contents as the old ones (= identical tree hash)
origRestores := make(map[string]struct{})
for i, snapshotID := range snapshotIDs {
restoredir := filepath.Join(env.base, fmt.Sprintf("restore%d", i))
origRestores[restoredir] = struct{}{}
testRunRestore(t, env.gopts, restoredir, snapshotID.String())
}
for i, snapshotID := range copiedSnapshotIDs {
restoredir := filepath.Join(env3.base, fmt.Sprintf("restore%d", i))
testRunRestore(t, env3.gopts, restoredir, snapshotID.String())
foundMatch := false
for cmpdir := range origRestores {
diff := directoriesContentsDiff(t, restoredir, cmpdir)
if diff == "" {
delete(origRestores, cmpdir)
foundMatch = true
}
}
rtest.Assert(t, foundMatch, "found no counterpart for snapshot %v", snapshotID)
}
rtest.Assert(t, len(origRestores) == 0, "found not copied snapshots")
// get access to the repositories
var repo1 restic.Repository
var unlock1 func()
var err error
rtest.OK(t, withTermStatus(t, env.gopts, func(ctx context.Context, gopts global.Options) error {
printer := ui.NewProgressPrinter(gopts.JSON, gopts.Verbosity, gopts.Term)
_, repo1, unlock1, err = openWithReadLock(ctx, gopts, false, printer)
rtest.OK(t, err)
defer unlock1()
return err
}))
var repo3 restic.Repository
var unlock3 func()
rtest.OK(t, withTermStatus(t, env3.gopts, func(ctx context.Context, gopts global.Options) error {
printer := ui.NewProgressPrinter(gopts.JSON, gopts.Verbosity, gopts.Term)
_, repo3, unlock3, err = openWithReadLock(ctx, gopts, false, printer)
rtest.OK(t, err)
defer unlock3()
return err
}))
usedBlobs1 := testGetUsedBlobs(t, repo1)
usedBlobs3 := testGetUsedBlobs(t, repo3)
rtest.Assert(t, len(usedBlobs1) == len(usedBlobs3),
"used blob length must be identical in both repositories, but is not: (normal) %d <=> (batched) %d",
len(usedBlobs1), len(usedBlobs3))
// compare usedBlobs1 <=> usedBlobs3
good := true
for bh := range usedBlobs1 {
if !usedBlobs3.Has(bh) {
good = false
break
}
}
rtest.Assert(t, good, "all blobs in both repositories should be equal but they are not")
_, _, countBlobs1 := getCounts(t, repo1)
countTreePacks3, countDataPacks3, countBlobs3 := getCounts(t, repo3)
rtest.Assert(t, countBlobs1 == countBlobs3,
"expected 1 blob count in boths repos to be equal, but got %d and %d blobs",
countBlobs1, countBlobs3)
rtest.Assert(t, countTreePacks3 == 1 && countDataPacks3 == 1,
"expected 1 data packfile and 1 tree packfile, but got %d trees and %d data packfiles",
countTreePacks3, countDataPacks3)
}
func TestCopyIncremental(t *testing.T) {
env, cleanup := withTestEnvironment(t)
defer cleanup()