Files
restic/internal/repository/repository_test.go
T
Michael Eischer c062a78dcd repository: move Blob, Blobs and PackedBlob to pack package
This removes them from the public interface. The latter now only
provides the PackBlob interface, without being bound to the type used
internally by the pack package.
2026-06-13 18:58:37 +02:00

577 lines
18 KiB
Go

package repository_test
import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"io"
"math/rand"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/cache"
"github.com/restic/restic/internal/backend/local"
"github.com/restic/restic/internal/backend/mem"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/repository/index"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
var testSizes = []int{5, 23, 2<<18 + 23, 1 << 20}
var rnd = rand.New(rand.NewSource(time.Now().UnixNano()))
func TestSave(t *testing.T) {
repository.TestAllVersions(t, testSavePassID)
repository.TestAllVersions(t, testSaveCalculateID)
}
func testSavePassID(t *testing.T, version uint) {
testSave(t, version, false)
}
func testSaveCalculateID(t *testing.T, version uint) {
testSave(t, version, true)
}
func testSave(t *testing.T, version uint, calculateID bool) {
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
for _, size := range testSizes {
data := make([]byte, size)
_, err := io.ReadFull(rnd, data)
rtest.OK(t, err)
id := restic.Hash(data)
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
// save
inputID := restic.ID{}
if !calculateID {
inputID = id
}
sid, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, data, inputID, false)
rtest.OK(t, err)
rtest.Equals(t, id, sid)
return nil
}))
// read back
buf, err := repo.LoadBlob(context.TODO(), restic.DataBlob, id, nil)
rtest.OK(t, err)
rtest.Equals(t, size, len(buf))
rtest.Assert(t, len(buf) == len(data),
"number of bytes read back does not match: expected %d, got %d",
len(data), len(buf))
rtest.Assert(t, bytes.Equal(buf, data),
"data does not match: expected %02x, got %02x",
data, buf)
}
}
func TestSaveLoadZeroSizedBlob(t *testing.T) {
repository.TestAllVersions(t, testSaveLoadZeroSizedBlob)
}
func testSaveLoadZeroSizedBlob(t *testing.T, version uint) {
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
var data []byte
id := restic.Hash(data)
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
sid, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, data, id, false)
rtest.OK(t, err)
rtest.Equals(t, id, sid)
return nil
}))
buf, err := repo.LoadBlob(context.TODO(), restic.DataBlob, id, nil)
rtest.OK(t, err)
rtest.Equals(t, 0, len(buf))
}
func TestSavePackMerging(t *testing.T) {
t.Run("75%", func(t *testing.T) {
testSavePackMerging(t, 75, 1)
})
t.Run("150%", func(t *testing.T) {
testSavePackMerging(t, 175, 2)
})
t.Run("250%", func(t *testing.T) {
testSavePackMerging(t, 275, 3)
})
}
func testSavePackMerging(t *testing.T, targetPercentage int, expectedPacks int) {
repo, _ := repository.TestRepositoryWithBackend(t, nil, 0, repository.Options{
// minimum pack size to speed up test
PackSize: repository.MinPackSize,
})
var ids restic.IDs
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
// add blobs with size targetPercentage / 100 * repo.PackSize to the repository
blobSize := repository.MinPackSize / 100
for range targetPercentage {
data := make([]byte, blobSize)
_, err := io.ReadFull(rnd, data)
rtest.OK(t, err)
sid, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, data, restic.ID{}, false)
rtest.OK(t, err)
ids = append(ids, sid)
}
return nil
}))
// check that all blobs are readable
for _, id := range ids {
_, err := repo.LoadBlob(context.TODO(), restic.DataBlob, id, nil)
rtest.OK(t, err)
}
// check for correct number of pack files
packs := 0
rtest.OK(t, repo.List(context.TODO(), restic.PackFile, func(id restic.ID, _ int64) error {
packs++
return nil
}))
rtest.Equals(t, expectedPacks, packs, "unexpected number of pack files")
repository.TestCheckRepo(t, repo)
}
func BenchmarkSaveAndEncrypt(t *testing.B) {
repository.BenchmarkAllVersions(t, benchmarkSaveAndEncrypt)
}
func benchmarkSaveAndEncrypt(t *testing.B, version uint) {
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
size := 4 << 20 // 4MiB
data := make([]byte, size)
_, err := io.ReadFull(rnd, data)
rtest.OK(t, err)
id := restic.ID(sha256.Sum256(data))
t.ReportAllocs()
t.ResetTimer()
t.SetBytes(int64(size))
_ = repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
for i := 0; i < t.N; i++ {
_, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, data, id, true)
rtest.OK(t, err)
}
return nil
})
}
func TestLoadBlob(t *testing.T) {
repository.TestAllVersions(t, testLoadBlob)
}
func testLoadBlob(t *testing.T, version uint) {
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
length := 1000000
buf := crypto.NewBlobBuffer(length)
_, err := io.ReadFull(rnd, buf)
rtest.OK(t, err)
var id restic.ID
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
id, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, buf, restic.ID{}, false)
return err
}))
base := crypto.CiphertextLength(length)
for _, testlength := range []int{0, base - 20, base - 1, base, base + 7, base + 15, base + 1000} {
buf = make([]byte, 0, testlength)
buf, err := repo.LoadBlob(context.TODO(), restic.DataBlob, id, buf)
if err != nil {
t.Errorf("LoadBlob() returned an error for buffer size %v: %v", testlength, err)
continue
}
if len(buf) != length {
t.Errorf("LoadBlob() returned the wrong number of bytes: want %v, got %v", length, len(buf))
continue
}
}
}
func TestLoadBlobBroken(t *testing.T) {
be := mem.New()
repo, _ := repository.TestRepositoryWithBackend(t, &damageOnceBackend{Backend: be}, restic.StableRepoVersion, repository.Options{})
buf := rtest.Random(42, 1000)
var id restic.ID
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
id, _, _, err = uploader.SaveBlob(ctx, restic.TreeBlob, buf, restic.ID{}, false)
return err
}))
// setup cache after saving the blob to make sure that the damageOnceBackend damages the cached data
c := cache.TestNewCache(t)
repo.UseCache(c, t.Logf)
data, err := repo.LoadBlob(context.TODO(), restic.TreeBlob, id, nil)
rtest.OK(t, err)
rtest.Assert(t, bytes.Equal(buf, data), "data mismatch")
pack := repo.LookupBlob(restic.TreeBlob, id)[0].PackID()
rtest.Assert(t, c.Has(backend.Handle{Type: restic.PackFile, Name: pack.String()}), "expected tree pack to be cached")
}
func BenchmarkLoadBlob(b *testing.B) {
repository.BenchmarkAllVersions(b, benchmarkLoadBlob)
}
func benchmarkLoadBlob(b *testing.B, version uint) {
repo, _, _ := repository.TestRepositoryWithVersion(b, version)
length := 1000000
buf := crypto.NewBlobBuffer(length)
_, err := io.ReadFull(rnd, buf)
rtest.OK(b, err)
var id restic.ID
rtest.OK(b, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
id, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, buf, restic.ID{}, false)
return err
}))
b.ResetTimer()
b.SetBytes(int64(length))
for i := 0; i < b.N; i++ {
var err error
buf, err = repo.LoadBlob(context.TODO(), restic.DataBlob, id, buf)
// Checking the SHA-256 with restic.Hash can make up 38% of the time
// spent in this loop, so pause the timer.
b.StopTimer()
rtest.OK(b, err)
if len(buf) != length {
b.Errorf("wanted %d bytes, got %d", length, len(buf))
}
id2 := restic.Hash(buf)
if !id.Equal(id2) {
b.Errorf("wrong data returned, wanted %v, got %v", id.Str(), id2.Str())
}
b.StartTimer()
}
}
func BenchmarkLoadUnpacked(b *testing.B) {
repository.BenchmarkAllVersions(b, benchmarkLoadUnpacked)
}
func benchmarkLoadUnpacked(b *testing.B, version uint) {
repo, _, _ := repository.TestRepositoryWithVersion(b, version)
length := 1000000
buf := crypto.NewBlobBuffer(length)
_, err := io.ReadFull(rnd, buf)
rtest.OK(b, err)
dataID := restic.Hash(buf)
storageID, err := repo.SaveUnpacked(context.TODO(), restic.WriteableSnapshotFile, buf)
rtest.OK(b, err)
// rtest.OK(b, repo.Flush())
b.ResetTimer()
b.SetBytes(int64(length))
for i := 0; i < b.N; i++ {
data, err := repo.LoadUnpacked(context.TODO(), restic.SnapshotFile, storageID)
rtest.OK(b, err)
// See comment in BenchmarkLoadBlob.
b.StopTimer()
if len(data) != length {
b.Errorf("wanted %d bytes, got %d", length, len(data))
}
id2 := restic.Hash(data)
if !dataID.Equal(id2) {
b.Errorf("wrong data returned, wanted %v, got %v", storageID.Str(), id2.Str())
}
b.StartTimer()
}
}
var repoFixture = filepath.Join("testdata", "test-repo.tar.gz")
func TestRepositoryLoadIndex(t *testing.T) {
repo, _, cleanup := repository.TestFromFixture(t, repoFixture)
defer cleanup()
rtest.OK(t, repo.LoadIndex(context.TODO(), nil))
}
// loadIndex loads the index id from backend and returns it.
func loadIndex(ctx context.Context, repo restic.LoaderUnpacked, id restic.ID) (*index.Index, error) {
buf, err := repo.LoadUnpacked(ctx, restic.IndexFile, id)
if err != nil {
return nil, err
}
return index.DecodeIndex(buf, id)
}
func TestRepositoryLoadUnpackedBroken(t *testing.T) {
repo, _, be := repository.TestRepositoryWithVersion(t, 0)
data := rtest.Random(23, 12345)
id := restic.Hash(data)
h := backend.Handle{Type: restic.IndexFile, Name: id.String()}
// damage buffer
data[0] ^= 0xff
// store broken file
err := be.Save(context.TODO(), h, backend.NewByteReader(data, be.Hasher()))
rtest.OK(t, err)
_, err = repo.LoadUnpacked(context.TODO(), restic.IndexFile, id)
rtest.Assert(t, errors.Is(err, restic.ErrInvalidData), "unexpected error: %v", err)
}
type damageOnceBackend struct {
backend.Backend
m sync.Map
}
func (be *damageOnceBackend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
// don't break the config file as we can't retry it
if h.Type == restic.ConfigFile {
return be.Backend.Load(ctx, h, length, offset, fn)
}
h.IsMetadata = false
_, isRetry := be.m.LoadOrStore(h, true)
if !isRetry {
// return broken data on the first try
offset++
}
return be.Backend.Load(ctx, h, length, offset, fn)
}
func TestRepositoryLoadUnpackedRetryBroken(t *testing.T) {
repodir, cleanup := rtest.Env(t, repoFixture)
defer cleanup()
be, err := local.Open(context.TODO(), local.Config{Path: repodir, Connections: 2}, t.Logf)
rtest.OK(t, err)
repo := repository.TestOpenBackend(t, &damageOnceBackend{Backend: be})
rtest.OK(t, repo.LoadIndex(context.TODO(), nil))
}
// saveRandomDataBlobs generates random data blobs and saves them to the repository.
func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax int) {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
for i := 0; i < num; i++ {
size := rand.Int() % sizeMax
buf := make([]byte, size)
_, err := io.ReadFull(rnd, buf)
rtest.OK(t, err)
_, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, buf, restic.ID{}, false)
rtest.OK(t, err)
}
return nil
}))
}
func TestRepositoryIncrementalIndex(t *testing.T) {
repository.TestAllVersions(t, testRepositoryIncrementalIndex)
}
func testRepositoryIncrementalIndex(t *testing.T, version uint) {
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
index.Full = func(*index.Index) bool { return true }
// add a few rounds of packs
for j := 0; j < 5; j++ {
// add some packs and write index
saveRandomDataBlobs(t, repo, 20, 1<<15)
}
packEntries := make(map[restic.ID]map[restic.ID]struct{})
err := repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error {
idx, err := loadIndex(context.TODO(), repo, id)
rtest.OK(t, err)
for pb := range idx.Values() {
packID := pb.PackID()
if _, ok := packEntries[packID]; !ok {
packEntries[packID] = make(map[restic.ID]struct{})
}
packEntries[packID][id] = struct{}{}
}
return nil
})
if err != nil {
t.Fatal(err)
}
for packID, ids := range packEntries {
if len(ids) > 1 {
t.Errorf("pack %v listed in %d indexes\n", packID, len(ids))
}
}
}
func TestInvalidCompression(t *testing.T) {
var comp repository.CompressionMode
err := comp.Set("nope")
rtest.Assert(t, err != nil, "missing error")
_, err = repository.New(nil, repository.Options{Compression: comp})
rtest.Assert(t, err != nil, "missing error")
}
func TestListPack(t *testing.T) {
be := mem.New()
repo, _ := repository.TestRepositoryWithBackend(t, &damageOnceBackend{Backend: be}, restic.StableRepoVersion, repository.Options{})
buf := rtest.Random(42, 1000)
var id restic.ID
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
id, _, _, err = uploader.SaveBlob(ctx, restic.TreeBlob, buf, restic.ID{}, false)
return err
}))
// setup cache after saving the blob to make sure that the damageOnceBackend damages the cached data
c := cache.TestNewCache(t)
repo.UseCache(c, t.Logf)
// Forcibly cache pack file
packID := repo.LookupBlob(restic.TreeBlob, id)[0].PackID()
rtest.OK(t, be.Load(context.TODO(), backend.Handle{Type: restic.PackFile, IsMetadata: true, Name: packID.String()}, 0, 0, func(rd io.Reader) error { return nil }))
// Get size to list pack
var size int64
rtest.OK(t, repo.List(context.TODO(), restic.PackFile, func(id restic.ID, sz int64) error {
if id == packID {
size = sz
}
return nil
}))
handles, err := repo.ListPackHandles(context.TODO(), packID, size)
rtest.OK(t, err)
rtest.Assert(t, len(handles) == 1 && handles[0].ID == id, "unexpected blobs in pack: %v", handles)
rtest.Assert(t, !c.Has(backend.Handle{Type: restic.PackFile, Name: packID.String()}), "tree pack should no longer be cached as listPack does not set IsMetadata in the backend.Handle")
}
func TestNoDoubleInit(t *testing.T) {
r, _, be := repository.TestRepositoryWithVersion(t, restic.StableRepoVersion)
repo, err := repository.New(be, repository.Options{})
rtest.OK(t, err)
pol := r.Config().ChunkerPolynomial
err = repo.Init(context.TODO(), r.Config().Version, rtest.TestPassword, &pol)
rtest.Assert(t, strings.Contains(err.Error(), "repository master key and config already initialized"), "expected config exist error, got %q", err)
// must also prevent init if only keys exist
rtest.OK(t, be.Remove(context.TODO(), backend.Handle{Type: backend.ConfigFile}))
err = repo.Init(context.TODO(), r.Config().Version, rtest.TestPassword, &pol)
rtest.Assert(t, strings.Contains(err.Error(), "repository already contains keys"), "expected already contains keys error, got %q", err)
// must also prevent init if a snapshot exists and keys were deleted
var data [32]byte
hash := restic.Hash(data[:])
rtest.OK(t, be.Save(context.TODO(), backend.Handle{Type: backend.SnapshotFile, Name: hash.String()}, backend.NewByteReader(data[:], be.Hasher())))
rtest.OK(t, be.List(context.TODO(), restic.KeyFile, func(fi backend.FileInfo) error {
return be.Remove(context.TODO(), backend.Handle{Type: restic.KeyFile, Name: fi.Name})
}))
err = repo.Init(context.TODO(), r.Config().Version, rtest.TestPassword, &pol)
rtest.Assert(t, strings.Contains(err.Error(), "repository already contains snapshots"), "expected already contains snapshots error, got %q", err)
}
func TestSaveBlobAsync(t *testing.T) {
repo, _, _ := repository.TestRepositoryWithVersion(t, 2)
ctx := context.Background()
type result struct {
id restic.ID
known bool
size int
err error
}
numCalls := 10
results := make([]result, numCalls)
var resultsMutex sync.Mutex
err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var wg sync.WaitGroup
wg.Add(numCalls)
for i := 0; i < numCalls; i++ {
// Use unique data for each call
testData := []byte(fmt.Sprintf("test blob data %d", i))
uploader.SaveBlobAsync(ctx, restic.DataBlob, testData, restic.ID{}, false,
func(newID restic.ID, known bool, size int, err error) {
defer wg.Done()
resultsMutex.Lock()
results[i] = result{newID, known, size, err}
resultsMutex.Unlock()
})
}
wg.Wait()
return nil
})
rtest.OK(t, err)
for i, result := range results {
testData := []byte(fmt.Sprintf("test blob data %d", i))
expectedID := restic.Hash(testData)
rtest.Assert(t, result.err == nil, "result %d: unexpected error %v", i, result.err)
rtest.Assert(t, result.id.Equal(expectedID), "result %d: expected ID %v, got %v", i, expectedID, result.id)
rtest.Assert(t, !result.known, "result %d: expected unknown blob", i)
}
}
func TestSaveBlobAsyncErrorHandling(t *testing.T) {
repo, _, _ := repository.TestRepositoryWithVersion(t, 2)
ctx, cancel := context.WithCancel(context.Background())
var callbackCalled atomic.Bool
err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
cancel()
// Callback must be called even if the context is canceled
uploader.SaveBlobAsync(ctx, restic.DataBlob, []byte("test blob data"), restic.ID{}, false,
func(newID restic.ID, known bool, size int, err error) {
callbackCalled.Store(true)
})
return nil
})
rtest.Assert(t, errors.Is(err, context.Canceled), "expected context canceled error, got %v", err)
rtest.Assert(t, callbackCalled.Load(), "callback was not called")
}