mirror of
https://github.com/restic/restic.git
synced 2026-06-01 14:49:44 +00:00
Merge pull request #21827 from MichaelEischer/fix-pack-repair
repair packs: correctly handle packs with missing/incomplete index entry
This commit is contained in:
@@ -0,0 +1,9 @@
|
||||
Bugfix: `repair packs` correctly handles pack files missing from the index
|
||||
|
||||
The `repair packs` command was unable to salvage blobs from a pack file if
|
||||
the pack file was not contained in the index or the index entry was incomplete.
|
||||
The command now uses both the information contained in the index and the
|
||||
pack file header.
|
||||
|
||||
https://github.com/restic/restic/issues/21820
|
||||
https://github.com/restic/restic/pull/21827
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -340,7 +339,7 @@ func decryptUnsigned(k *crypto.Key, buf []byte) []byte {
|
||||
return out
|
||||
}
|
||||
|
||||
func loadBlobs(ctx context.Context, opts DebugExamineOptions, repo restic.Repository, packID restic.ID, list []restic.Blob, printer progress.Printer) error {
|
||||
func loadBlobs(ctx context.Context, opts DebugExamineOptions, repo restic.Repository, packID restic.ID, list restic.Blobs, printer progress.Printer) error {
|
||||
dec, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
@@ -543,13 +542,11 @@ func examinePack(ctx context.Context, opts DebugExamineOptions, repo restic.Repo
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkPackSize(blobs []restic.Blob, fileSize int, printer progress.Printer) {
|
||||
func checkPackSize(blobs restic.Blobs, fileSize int, printer progress.Printer) {
|
||||
// track current size and offset
|
||||
var size, offset uint64
|
||||
|
||||
sort.Slice(blobs, func(i, j int) bool {
|
||||
return blobs[i].Offset < blobs[j].Offset
|
||||
})
|
||||
blobs.Sort()
|
||||
|
||||
for _, pb := range blobs {
|
||||
printer.S(" %v blob %v, offset %-6d, raw length %-6d", pb.Type, pb.ID, pb.Offset, pb.Length)
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/restic/restic/internal/backend"
|
||||
@@ -37,7 +36,7 @@ func (e *partialReadError) Error() string {
|
||||
}
|
||||
|
||||
// CheckPack reads a pack and checks the integrity of all blobs.
|
||||
func CheckPack(ctx context.Context, r *Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
|
||||
func CheckPack(ctx context.Context, r *Repository, id restic.ID, blobs restic.Blobs, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
|
||||
err := checkPackInner(ctx, r, id, blobs, size, bufRd, dec)
|
||||
if err != nil {
|
||||
if r.cache != nil {
|
||||
@@ -56,7 +55,7 @@ func CheckPack(ctx context.Context, r *Repository, id restic.ID, blobs []restic.
|
||||
return err
|
||||
}
|
||||
|
||||
func checkPackInner(ctx context.Context, r *Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
|
||||
func checkPackInner(ctx context.Context, r *Repository, id restic.ID, blobs restic.Blobs, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
|
||||
|
||||
debug.Log("checking pack %v", id.String())
|
||||
|
||||
@@ -65,9 +64,7 @@ func checkPackInner(ctx context.Context, r *Repository, id restic.ID, blobs []re
|
||||
}
|
||||
|
||||
// sanity check blobs in index
|
||||
sort.Slice(blobs, func(i, j int) bool {
|
||||
return blobs[i].Offset < blobs[j].Offset
|
||||
})
|
||||
blobs.Sort()
|
||||
idxHdrSize := pack.CalculateHeaderSize(blobs)
|
||||
lastBlobEnd := 0
|
||||
nonContinuousPack := false
|
||||
|
||||
@@ -215,7 +215,7 @@ func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID
|
||||
type checkTask struct {
|
||||
id restic.ID
|
||||
size int64
|
||||
blobs []restic.Blob
|
||||
blobs restic.Blobs
|
||||
}
|
||||
ch := make(chan checkTask)
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ func TestAssociatedSet(t *testing.T) {
|
||||
bh, blob := makeFakePackedBlob()
|
||||
|
||||
mi := NewMasterIndex()
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob.PackID, []restic.Blob{blob.Blob}, &noopSaver{}))
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob.PackID, restic.Blobs{blob.Blob}, &noopSaver{}))
|
||||
test.OK(t, mi.Flush(context.TODO(), &noopSaver{}))
|
||||
|
||||
bs := NewAssociatedSet[uint8](mi)
|
||||
@@ -123,14 +123,14 @@ func TestAssociatedSetWithExtendedIndex(t *testing.T) {
|
||||
_, blob := makeFakePackedBlob()
|
||||
|
||||
mi := NewMasterIndex()
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob.PackID, []restic.Blob{blob.Blob}, &noopSaver{}))
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob.PackID, restic.Blobs{blob.Blob}, &noopSaver{}))
|
||||
test.OK(t, mi.Flush(context.TODO(), &noopSaver{}))
|
||||
|
||||
bs := NewAssociatedSet[uint8](mi)
|
||||
|
||||
// add new blobs to index after building the set
|
||||
of, blob2 := makeFakePackedBlob()
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob2.PackID, []restic.Blob{blob2.Blob}, &noopSaver{}))
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob2.PackID, restic.Blobs{blob2.Blob}, &noopSaver{}))
|
||||
test.OK(t, mi.Flush(context.TODO(), &noopSaver{}))
|
||||
|
||||
// non-existent
|
||||
@@ -167,10 +167,10 @@ func TestAssociatedSetIntersectAndSub(t *testing.T) {
|
||||
bh3, blob3 := makeFakePackedBlob()
|
||||
bh4, blob4 := makeFakePackedBlob()
|
||||
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob1.PackID, []restic.Blob{blob1.Blob}, saver))
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob2.PackID, []restic.Blob{blob2.Blob}, saver))
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob3.PackID, []restic.Blob{blob3.Blob}, saver))
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob4.PackID, []restic.Blob{blob4.Blob}, saver))
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob1.PackID, restic.Blobs{blob1.Blob}, saver))
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob2.PackID, restic.Blobs{blob2.Blob}, saver))
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob3.PackID, restic.Blobs{blob3.Blob}, saver))
|
||||
test.OK(t, mi.StorePack(context.TODO(), blob4.PackID, restic.Blobs{blob4.Blob}, saver))
|
||||
test.OK(t, mi.Flush(context.TODO(), saver))
|
||||
|
||||
t.Run("Intersect", func(t *testing.T) {
|
||||
|
||||
@@ -143,7 +143,7 @@ func (idx *Index) Preallocate(t restic.BlobType, numEntries int) {
|
||||
|
||||
// StorePack remembers the ids of all blobs of a given pack
|
||||
// in the index
|
||||
func (idx *Index) StorePack(id restic.ID, blobs []restic.Blob) {
|
||||
func (idx *Index) StorePack(id restic.ID, blobs restic.Blobs) {
|
||||
idx.m.Lock()
|
||||
defer idx.m.Unlock()
|
||||
|
||||
@@ -228,11 +228,6 @@ func (idx *Index) Values() iter.Seq[restic.PackedBlob] {
|
||||
}
|
||||
}
|
||||
|
||||
type EachByPackResult struct {
|
||||
PackID restic.ID
|
||||
Blobs []restic.Blob
|
||||
}
|
||||
|
||||
// EachByPack returns a channel that yields all blobs known to the index
|
||||
// grouped by packID but ignoring blobs with a packID in packPlacklist for
|
||||
// finalized indexes.
|
||||
@@ -240,10 +235,10 @@ type EachByPackResult struct {
|
||||
// from the finalized index which have been re-read into a non-finalized index.
|
||||
// When the context is cancelled, the background goroutine
|
||||
// terminates. This blocks any modification of the index.
|
||||
func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan EachByPackResult {
|
||||
func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan restic.PackBlobs {
|
||||
idx.m.RLock()
|
||||
|
||||
ch := make(chan EachByPackResult)
|
||||
ch := make(chan restic.PackBlobs)
|
||||
|
||||
go func() {
|
||||
defer idx.m.RUnlock()
|
||||
@@ -264,7 +259,7 @@ func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-
|
||||
}
|
||||
|
||||
for packID, packByType := range byPack {
|
||||
var result EachByPackResult
|
||||
var result restic.PackBlobs
|
||||
result.PackID = packID
|
||||
for typ, p := range packByType {
|
||||
for _, e := range p {
|
||||
|
||||
@@ -21,7 +21,7 @@ func TestIndexSerialize(t *testing.T) {
|
||||
// create 50 packs with 20 blobs each
|
||||
for i := 0; i < 50; i++ {
|
||||
packID := restic.NewRandomID()
|
||||
var blobs []restic.Blob
|
||||
var blobs restic.Blobs
|
||||
|
||||
pos := uint(0)
|
||||
for j := 0; j < 20; j++ {
|
||||
@@ -85,7 +85,7 @@ func TestIndexSerialize(t *testing.T) {
|
||||
newtests := []restic.PackedBlob{}
|
||||
for i := 0; i < 10; i++ {
|
||||
packID := restic.NewRandomID()
|
||||
var blobs []restic.Blob
|
||||
var blobs restic.Blobs
|
||||
|
||||
pos := uint(0)
|
||||
for j := 0; j < 10; j++ {
|
||||
@@ -145,7 +145,7 @@ func TestIndexSize(t *testing.T) {
|
||||
blobCount := 100
|
||||
for i := 0; i < packs; i++ {
|
||||
packID := restic.NewRandomID()
|
||||
var blobs []restic.Blob
|
||||
var blobs restic.Blobs
|
||||
|
||||
pos := uint(0)
|
||||
for j := 0; j < blobCount; j++ {
|
||||
@@ -401,7 +401,7 @@ func TestIndexPacks(t *testing.T) {
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
packID := restic.NewRandomID()
|
||||
idx.StorePack(packID, []restic.Blob{
|
||||
idx.StorePack(packID, restic.Blobs{
|
||||
{
|
||||
BlobHandle: restic.NewRandomBlobHandle(),
|
||||
Offset: 0,
|
||||
@@ -433,7 +433,7 @@ func createRandomIndex(rng *rand.Rand, packfiles int) (idx *index.Index, lookupB
|
||||
// create index with given number of pack files
|
||||
for i := 0; i < packfiles; i++ {
|
||||
packID := NewRandomTestID(rng)
|
||||
var blobs []restic.Blob
|
||||
var blobs restic.Blobs
|
||||
offset := 0
|
||||
for offset < maxPackSize {
|
||||
size := 2000 + rng.Intn(4*1024*1024)
|
||||
@@ -524,7 +524,7 @@ func TestIndexHas(t *testing.T) {
|
||||
// create 50 packs with 20 blobs each
|
||||
for i := 0; i < 50; i++ {
|
||||
packID := restic.NewRandomID()
|
||||
var blobs []restic.Blob
|
||||
var blobs restic.Blobs
|
||||
|
||||
pos := uint(0)
|
||||
for j := 0; j < 20; j++ {
|
||||
@@ -566,7 +566,7 @@ func TestMixedEachByPack(t *testing.T) {
|
||||
for i := 0; i < 50; i++ {
|
||||
packID := restic.NewRandomID()
|
||||
expected[packID] = 1
|
||||
blobs := []restic.Blob{
|
||||
blobs := restic.Blobs{
|
||||
{
|
||||
BlobHandle: restic.BlobHandle{Type: restic.DataBlob, ID: restic.NewRandomID()},
|
||||
Offset: 0,
|
||||
@@ -586,9 +586,7 @@ func TestMixedEachByPack(t *testing.T) {
|
||||
reported[bp.PackID]++
|
||||
|
||||
rtest.Equals(t, 2, len(bp.Blobs)) // correct blob count
|
||||
if bp.Blobs[0].Offset > bp.Blobs[1].Offset {
|
||||
bp.Blobs[1], bp.Blobs[0] = bp.Blobs[0], bp.Blobs[1]
|
||||
}
|
||||
bp.Blobs.Sort()
|
||||
b0 := bp.Blobs[0]
|
||||
rtest.Assert(t, b0.Type == restic.DataBlob && b0.Offset == 0 && b0.Length == 42, "wrong blob", b0)
|
||||
b1 := bp.Blobs[1]
|
||||
@@ -610,7 +608,7 @@ func TestEachByPackIgnoes(t *testing.T) {
|
||||
} else {
|
||||
expected[packID] = 1
|
||||
}
|
||||
blobs := []restic.Blob{
|
||||
blobs := restic.Blobs{
|
||||
{
|
||||
BlobHandle: restic.BlobHandle{Type: restic.DataBlob, ID: restic.NewRandomID()},
|
||||
Offset: 0,
|
||||
|
||||
@@ -145,12 +145,12 @@ func (mi *MasterIndex) Insert(idx *Index) {
|
||||
}
|
||||
|
||||
// StorePack remembers the id and pack in the index.
|
||||
func (mi *MasterIndex) StorePack(ctx context.Context, id restic.ID, blobs []restic.Blob, r restic.SaverUnpacked[restic.FileType]) error {
|
||||
func (mi *MasterIndex) StorePack(ctx context.Context, id restic.ID, blobs restic.Blobs, r restic.SaverUnpacked[restic.FileType]) error {
|
||||
mi.storePack(id, blobs)
|
||||
return mi.saveFullIndex(ctx, r)
|
||||
}
|
||||
|
||||
func (mi *MasterIndex) storePack(id restic.ID, blobs []restic.Blob) {
|
||||
func (mi *MasterIndex) storePack(id restic.ID, blobs restic.Blobs) {
|
||||
mi.idxMutex.Lock()
|
||||
defer mi.idxMutex.Unlock()
|
||||
|
||||
@@ -652,7 +652,7 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan
|
||||
defer close(out)
|
||||
// only resort a part of the index to keep the memory overhead bounded
|
||||
for i := byte(0); i < 16; i++ {
|
||||
packBlob := make(map[restic.ID][]restic.Blob)
|
||||
packBlob := make(map[restic.ID]restic.Blobs)
|
||||
for pack := range packs {
|
||||
if pack[0]&0xf == i {
|
||||
packBlob[pack] = nil
|
||||
|
||||
@@ -64,12 +64,12 @@ func TestMasterIndex(t *testing.T) {
|
||||
}
|
||||
|
||||
idx1 := index.NewIndex()
|
||||
idx1.StorePack(blob1.PackID, []restic.Blob{blob1.Blob})
|
||||
idx1.StorePack(blob12a.PackID, []restic.Blob{blob12a.Blob})
|
||||
idx1.StorePack(blob1.PackID, restic.Blobs{blob1.Blob})
|
||||
idx1.StorePack(blob12a.PackID, restic.Blobs{blob12a.Blob})
|
||||
|
||||
idx2 := index.NewIndex()
|
||||
idx2.StorePack(blob2.PackID, []restic.Blob{blob2.Blob})
|
||||
idx2.StorePack(blob12b.PackID, []restic.Blob{blob12b.Blob})
|
||||
idx2.StorePack(blob2.PackID, restic.Blobs{blob2.Blob})
|
||||
idx2.StorePack(blob12b.PackID, restic.Blobs{blob12b.Blob})
|
||||
|
||||
mIdx := index.NewMasterIndex()
|
||||
mIdx.Insert(idx1)
|
||||
@@ -135,7 +135,7 @@ func TestMasterIndexAddPending(t *testing.T) {
|
||||
// Test AddPending: try to add a blob that's already in an index (should return false)
|
||||
bhInIndex := restic.NewRandomBlobHandle()
|
||||
idx := index.NewIndex()
|
||||
idx.StorePack(restic.NewRandomID(), []restic.Blob{{
|
||||
idx.StorePack(restic.NewRandomID(), restic.Blobs{{
|
||||
BlobHandle: bhInIndex,
|
||||
Length: uint(crypto.CiphertextLength(50)),
|
||||
Offset: 0,
|
||||
@@ -180,7 +180,7 @@ func TestMasterIndexStorePackRemovesPending(t *testing.T) {
|
||||
UncompressedLength: 75,
|
||||
}
|
||||
saver := &noopSaver{}
|
||||
err := mIdx.StorePack(context.Background(), packID, []restic.Blob{blob}, saver)
|
||||
err := mIdx.StorePack(context.Background(), packID, restic.Blobs{blob}, saver)
|
||||
rtest.OK(t, err)
|
||||
|
||||
// Verify it is still found
|
||||
@@ -223,10 +223,10 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
|
||||
}
|
||||
|
||||
idx1 := index.NewIndex()
|
||||
idx1.StorePack(blob1.PackID, []restic.Blob{blob1.Blob})
|
||||
idx1.StorePack(blob1.PackID, restic.Blobs{blob1.Blob})
|
||||
|
||||
idx2 := index.NewIndex()
|
||||
idx2.StorePack(blob2.PackID, []restic.Blob{blob2.Blob})
|
||||
idx2.StorePack(blob2.PackID, restic.Blobs{blob2.Blob})
|
||||
|
||||
mIdx := index.NewMasterIndex()
|
||||
mIdx.Insert(idx1)
|
||||
@@ -256,8 +256,8 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
|
||||
|
||||
// merge another index containing identical blobs
|
||||
idx3 := index.NewIndex()
|
||||
idx3.StorePack(blob1.PackID, []restic.Blob{blob1.Blob})
|
||||
idx3.StorePack(blob2.PackID, []restic.Blob{blob2.Blob})
|
||||
idx3.StorePack(blob1.PackID, restic.Blobs{blob1.Blob})
|
||||
idx3.StorePack(blob2.PackID, restic.Blobs{blob2.Blob})
|
||||
|
||||
mIdx.Insert(idx3)
|
||||
finalIndexes, idxCount, newIDs := index.TestMergeIndex(t, mIdx)
|
||||
@@ -588,14 +588,14 @@ func TestRewriteOversizedIndex(t *testing.T) {
|
||||
return idx.Len(restic.DataBlob) > 2*fullIndexCount
|
||||
}
|
||||
|
||||
var blobs []restic.Blob
|
||||
var blobs restic.Blobs
|
||||
|
||||
// build oversized index
|
||||
idx := index.NewIndex()
|
||||
numPacks := 5
|
||||
for p := 0; p < numPacks; p++ {
|
||||
packID := restic.NewRandomID()
|
||||
packBlobs := make([]restic.Blob, 0, fullIndexCount)
|
||||
packBlobs := make(restic.Blobs, 0, fullIndexCount)
|
||||
|
||||
for i := 0; i < fullIndexCount; i++ {
|
||||
blob := restic.Blob{
|
||||
|
||||
@@ -21,7 +21,7 @@ var ErrBroken = errors.New("packer cannot be used after a write error")
|
||||
|
||||
// Packer is used to create a new Pack.
|
||||
type Packer struct {
|
||||
blobs []restic.Blob
|
||||
blobs restic.Blobs
|
||||
|
||||
bytes uint
|
||||
k *crypto.Key
|
||||
@@ -129,7 +129,7 @@ func (p *Packer) Finalize() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func verifyHeader(k *crypto.Key, header []byte, expected []restic.Blob) error {
|
||||
func verifyHeader(k *crypto.Key, header []byte, expected restic.Blobs) error {
|
||||
// do not offer a way to skip the pack header verification, as pack headers are usually small enough
|
||||
// to not result in a significant performance impact
|
||||
|
||||
@@ -157,7 +157,7 @@ func (p *Packer) HeaderOverhead() int {
|
||||
}
|
||||
|
||||
// makeHeader constructs the header for p.
|
||||
func makeHeader(blobs []restic.Blob) ([]byte, error) {
|
||||
func makeHeader(blobs restic.Blobs) ([]byte, error) {
|
||||
buf := make([]byte, 0, len(blobs)*int(entrySize))
|
||||
|
||||
for _, b := range blobs {
|
||||
@@ -232,7 +232,7 @@ func (p *Packer) HeaderFull() bool {
|
||||
}
|
||||
|
||||
// Blobs returns the slice of blobs that have been written.
|
||||
func (p *Packer) Blobs() []restic.Blob {
|
||||
func (p *Packer) Blobs() restic.Blobs {
|
||||
p.m.Lock()
|
||||
defer p.m.Unlock()
|
||||
|
||||
@@ -348,7 +348,7 @@ func (e InvalidFileError) Error() string {
|
||||
|
||||
// List returns the list of entries found in a pack file and the length of the
|
||||
// header (including header size and crypto overhead)
|
||||
func List(k *crypto.Key, rd io.ReaderAt, size int64) (entries []restic.Blob, hdrSize uint32, err error) {
|
||||
func List(k *crypto.Key, rd io.ReaderAt, size int64) (entries restic.Blobs, hdrSize uint32, err error) {
|
||||
buf, err := readHeader(rd, size)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
@@ -367,7 +367,7 @@ func List(k *crypto.Key, rd io.ReaderAt, size int64) (entries []restic.Blob, hdr
|
||||
}
|
||||
|
||||
// might over allocate a bit if all blobs have EntrySize but only by a few percent
|
||||
entries = make([]restic.Blob, 0, uint(len(buf))/plainEntrySize)
|
||||
entries = make(restic.Blobs, 0, uint(len(buf))/plainEntrySize)
|
||||
|
||||
pos := uint(0)
|
||||
for len(buf) > 0 {
|
||||
@@ -427,7 +427,7 @@ func CalculateEntrySize(blob restic.Blob) int {
|
||||
return int(plainEntrySize)
|
||||
}
|
||||
|
||||
func CalculateHeaderSize(blobs []restic.Blob) int {
|
||||
func CalculateHeaderSize(blobs restic.Blobs) int {
|
||||
size := headerSize
|
||||
for _, blob := range blobs {
|
||||
size += CalculateEntrySize(blob)
|
||||
|
||||
@@ -182,7 +182,7 @@ func TestReadRecords(t *testing.T) {
|
||||
func TestUnpackedVerification(t *testing.T) {
|
||||
// create random keys
|
||||
k := crypto.NewRandomKey()
|
||||
blobs := []restic.Blob{
|
||||
blobs := restic.Blobs{
|
||||
{
|
||||
BlobHandle: restic.NewRandomBlobHandle(),
|
||||
Length: 42,
|
||||
|
||||
@@ -84,7 +84,7 @@ func repack(
|
||||
wg.Go(func() error {
|
||||
defer close(downloadQueue)
|
||||
for pbs := range repo.ListPacksFromIndex(wgCtx, packs) {
|
||||
var packBlobs []restic.Blob
|
||||
var packBlobs restic.Blobs
|
||||
keepMutex.Lock()
|
||||
// filter out unnecessary blobs
|
||||
for _, entry := range pbs.Blobs {
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"slices"
|
||||
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
@@ -15,31 +16,36 @@ func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printe
|
||||
bar.SetMax(uint64(len(ids)))
|
||||
defer bar.Done()
|
||||
|
||||
err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
|
||||
packToBlobs, err := resolveBlobsForPacks(ctx, repo, ids)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
|
||||
// examine all data the indexes have for the pack file
|
||||
for b := range repo.ListPacksFromIndex(ctx, ids) {
|
||||
blobs := b.Blobs
|
||||
if len(blobs) == 0 {
|
||||
printer.E("no blobs found for pack %v", b.PackID)
|
||||
bar.Add(1)
|
||||
continue
|
||||
indexBlobs := b.Blobs
|
||||
err := reuploadBlobsFromPack(ctx, repo, b.PackID, indexBlobs, printer, uploader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := repo.LoadBlobsFromPack(ctx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
indexBlobs.Sort()
|
||||
packBlobs := packToBlobs[b.PackID]
|
||||
packBlobs.Sort()
|
||||
if packBlobs != nil && !slices.Equal(indexBlobs, packBlobs) {
|
||||
// handle case where the index entry is broken or incomplete.
|
||||
// this can result in duplicate blobs, which can be cleaned up by running prune.
|
||||
printer.E("repairing incomplete index entry for pack %v", b.PackID)
|
||||
err := reuploadBlobsFromPack(ctx, repo, b.PackID, packBlobs, printer, uploader)
|
||||
if err != nil {
|
||||
printer.E("failed to load blob %v: %v", blob.ID, err)
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
id, _, _, err := uploader.SaveBlob(ctx, blob.Type, buf, restic.ID{}, true)
|
||||
if !id.Equal(blob.ID) {
|
||||
panic("pack id mismatch during upload")
|
||||
}
|
||||
return err
|
||||
})
|
||||
// ignore truncated file parts
|
||||
if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
return err
|
||||
}
|
||||
if len(indexBlobs) == 0 && len(packBlobs) == 0 {
|
||||
printer.E("no blobs found for pack %v", b.PackID)
|
||||
}
|
||||
|
||||
bar.Add(1)
|
||||
}
|
||||
return nil
|
||||
@@ -64,3 +70,41 @@ func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printe
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func resolveBlobsForPacks(ctx context.Context, repo *Repository, ids restic.IDSet) (map[restic.ID]restic.Blobs, error) {
|
||||
packToBlobs := make(map[restic.ID]restic.Blobs)
|
||||
|
||||
err := repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
|
||||
if ids.Has(id) {
|
||||
blobs, _, err := repo.ListPack(ctx, id, size)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
packToBlobs[id] = blobs
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return packToBlobs, nil
|
||||
}
|
||||
|
||||
func reuploadBlobsFromPack(ctx context.Context, repo *Repository, packID restic.ID, blobs restic.Blobs, printer progress.Printer, uploader restic.BlobSaverWithAsync) error {
|
||||
err := repo.LoadBlobsFromPack(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
if err != nil {
|
||||
printer.E("failed to load blob %v: %v", blob.ID, err)
|
||||
return nil
|
||||
}
|
||||
id, _, _, err := uploader.SaveBlob(ctx, blob.Type, buf, restic.ID{}, true)
|
||||
if err == nil && !id.Equal(blob.ID) {
|
||||
panic("pack id mismatch during upload")
|
||||
}
|
||||
return err
|
||||
})
|
||||
// ignore truncated file parts
|
||||
if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -96,6 +96,22 @@ func testRepairBrokenPack(t *testing.T, version uint) {
|
||||
}
|
||||
return restic.NewIDSet(damagedID), damagedBlobs
|
||||
},
|
||||
}, {
|
||||
"unindexed pack",
|
||||
func(t *testing.T, random *rand.Rand, repo *repository.Repository, be backend.Backend, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet) {
|
||||
// remove one pack file from the index
|
||||
unindexID := packsBefore.List()[0]
|
||||
h := backend.Handle{Type: backend.PackFile, Name: unindexID.String()}
|
||||
|
||||
buf, err := backendtest.LoadAll(context.TODO(), be, h)
|
||||
rtest.OK(t, err)
|
||||
rtest.OK(t, be.Remove(context.TODO(), h))
|
||||
rtest.OK(t, repository.RepairIndex(context.TODO(), repo, repository.RepairIndexOptions{}, &progress.NoopPrinter{}))
|
||||
|
||||
rtest.OK(t, be.Save(context.TODO(), h, backend.NewByteReader(buf, be.Hasher())))
|
||||
|
||||
return restic.NewIDSet(unindexID), restic.NewBlobSet()
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"io"
|
||||
"math"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
@@ -289,7 +288,7 @@ func (r *Repository) loadBlob(ctx context.Context, blobs []restic.PackedBlob, bu
|
||||
continue
|
||||
}
|
||||
|
||||
it := newPackBlobIterator(blob.PackID, newByteReader(buf), blob.Offset, []restic.Blob{blob.Blob}, r.key, r.getZstdDecoder())
|
||||
it := newPackBlobIterator(blob.PackID, newByteReader(buf), blob.Offset, restic.Blobs{blob.Blob}, r.key, r.getZstdDecoder())
|
||||
pbv, err := it.Next()
|
||||
|
||||
if err == nil {
|
||||
@@ -960,7 +959,7 @@ func (r *Repository) List(ctx context.Context, t restic.FileType, fn func(restic
|
||||
|
||||
// ListPack returns the list of blobs saved in the pack id and the length of
|
||||
// the pack header.
|
||||
func (r *Repository) ListPack(ctx context.Context, id restic.ID, size int64) ([]restic.Blob, uint32, error) {
|
||||
func (r *Repository) ListPack(ctx context.Context, id restic.ID, size int64) (restic.Blobs, uint32, error) {
|
||||
h := backend.Handle{Type: restic.PackFile, Name: id.String()}
|
||||
|
||||
entries, hdrSize, err := pack.List(r.Key(), backend.ReaderAt(ctx, r.be, h), size)
|
||||
@@ -1049,19 +1048,16 @@ const maxUnusedRange = 1 * 1024 * 1024
|
||||
// handleBlobFn is called at most once for each blob. If the callback returns an error,
|
||||
// then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within
|
||||
// this specific call. The callback must not keep a reference to buf.
|
||||
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
return streamPack(ctx, r.be.Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn)
|
||||
}
|
||||
|
||||
func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
if len(blobs) == 0 {
|
||||
// nothing to do
|
||||
return nil
|
||||
}
|
||||
|
||||
sort.Slice(blobs, func(i, j int) bool {
|
||||
return blobs[i].Offset < blobs[j].Offset
|
||||
})
|
||||
blobs.Sort()
|
||||
|
||||
lowerIdx := 0
|
||||
lastPos := blobs[0].Offset
|
||||
@@ -1099,7 +1095,7 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn
|
||||
return streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:], handleBlobFn)
|
||||
}
|
||||
|
||||
func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: blobs[0].Type.IsMetadata()}
|
||||
|
||||
dataStart := blobs[0].Offset
|
||||
@@ -1209,7 +1205,7 @@ type packBlobIterator struct {
|
||||
rd discardReader
|
||||
currentOffset uint
|
||||
|
||||
blobs []restic.Blob
|
||||
blobs restic.Blobs
|
||||
key *crypto.Key
|
||||
dec *zstd.Decoder
|
||||
|
||||
@@ -1225,7 +1221,7 @@ type packBlobValue struct {
|
||||
var errPackEOF = errors.New("reached EOF of pack file")
|
||||
|
||||
func newPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint,
|
||||
blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *packBlobIterator {
|
||||
blobs restic.Blobs, key *crypto.Key, dec *zstd.Decoder) *packBlobIterator {
|
||||
return &packBlobIterator{
|
||||
packID: packID,
|
||||
rd: rd,
|
||||
|
||||
@@ -96,7 +96,7 @@ func benchmarkLoadIndex(b *testing.B, version uint) {
|
||||
idx := index.NewIndex()
|
||||
|
||||
for i := 0; i < 5000; i++ {
|
||||
idx.StorePack(restic.NewRandomID(), []restic.Blob{
|
||||
idx.StorePack(restic.NewRandomID(), restic.Blobs{
|
||||
{
|
||||
BlobHandle: restic.NewRandomBlobHandle(),
|
||||
Length: 1234,
|
||||
@@ -133,7 +133,7 @@ func loadIndex(ctx context.Context, repo restic.LoaderUnpacked, id restic.ID) (*
|
||||
}
|
||||
|
||||
// buildPackfileWithoutHeader returns a manually built pack file without a header.
|
||||
func buildPackfileWithoutHeader(blobSizes []int, key *crypto.Key, compress bool) (blobs []restic.Blob, packfile []byte) {
|
||||
func buildPackfileWithoutHeader(blobSizes []int, key *crypto.Key, compress bool) (blobs restic.Blobs, packfile []byte) {
|
||||
opts := []zstd.EOption{
|
||||
// Set the compression level configured.
|
||||
zstd.WithEncoderLevel(zstd.SpeedDefault),
|
||||
@@ -280,19 +280,19 @@ func testStreamPack(t *testing.T, version uint) {
|
||||
// first, test regular usage
|
||||
t.Run("regular", func(t *testing.T) {
|
||||
tests := []struct {
|
||||
blobs []restic.Blob
|
||||
blobs restic.Blobs
|
||||
calls int
|
||||
shortFirstLoad bool
|
||||
}{
|
||||
{packfileBlobs[1:2], 1, false},
|
||||
{packfileBlobs[2:5], 1, false},
|
||||
{packfileBlobs[2:8], 1, false},
|
||||
{[]restic.Blob{
|
||||
{restic.Blobs{
|
||||
packfileBlobs[0],
|
||||
packfileBlobs[4],
|
||||
packfileBlobs[2],
|
||||
}, 1, false},
|
||||
{[]restic.Blob{
|
||||
{restic.Blobs{
|
||||
packfileBlobs[0],
|
||||
packfileBlobs[len(packfileBlobs)-1],
|
||||
}, 2, false},
|
||||
@@ -341,12 +341,12 @@ func testStreamPack(t *testing.T, version uint) {
|
||||
// next, test invalid uses, which should return an error
|
||||
t.Run("invalid", func(t *testing.T) {
|
||||
tests := []struct {
|
||||
blobs []restic.Blob
|
||||
blobs restic.Blobs
|
||||
err string
|
||||
}{
|
||||
{
|
||||
// pass one blob several times
|
||||
blobs: []restic.Blob{
|
||||
blobs: restic.Blobs{
|
||||
packfileBlobs[3],
|
||||
packfileBlobs[8],
|
||||
packfileBlobs[3],
|
||||
@@ -357,7 +357,7 @@ func testStreamPack(t *testing.T, version uint) {
|
||||
|
||||
{
|
||||
// pass something that's not a valid blob in the current pack file
|
||||
blobs: []restic.Blob{
|
||||
blobs: restic.Blobs{
|
||||
{
|
||||
Offset: 123,
|
||||
Length: 20000,
|
||||
@@ -368,7 +368,7 @@ func testStreamPack(t *testing.T, version uint) {
|
||||
|
||||
{
|
||||
// pass a blob that's too small
|
||||
blobs: []restic.Blob{
|
||||
blobs: restic.Blobs{
|
||||
{
|
||||
Offset: 123,
|
||||
Length: 10,
|
||||
@@ -523,7 +523,7 @@ func TestStreamPackFallback(t *testing.T) {
|
||||
|
||||
plaintext := rtest.Random(800, 42)
|
||||
blobID := restic.Hash(plaintext)
|
||||
blobs := []restic.Blob{
|
||||
blobs := restic.Blobs{
|
||||
{
|
||||
Length: uint(crypto.CiphertextLength(len(plaintext))),
|
||||
Offset: 0,
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package restic
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
"github.com/restic/restic/internal/crypto"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
@@ -31,6 +33,14 @@ func (b Blob) IsCompressed() bool {
|
||||
return b.UncompressedLength != 0
|
||||
}
|
||||
|
||||
type Blobs []Blob
|
||||
|
||||
func (b Blobs) Sort() {
|
||||
slices.SortFunc(b, func(a, b Blob) int {
|
||||
return cmp.Compare(a.Offset, b.Offset)
|
||||
})
|
||||
}
|
||||
|
||||
// PackedBlob is a blob stored within a file.
|
||||
type PackedBlob struct {
|
||||
Blob
|
||||
|
||||
@@ -3,6 +3,8 @@ package restic
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
var blobTypeJSON = []struct {
|
||||
@@ -39,3 +41,15 @@ func TestBlobTypeJSON(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlobsSort(t *testing.T) {
|
||||
blobs := Blobs{
|
||||
{Offset: 100},
|
||||
{Offset: 0},
|
||||
{Offset: 50},
|
||||
}
|
||||
blobs.Sort()
|
||||
rtest.Equals(t, uint(0), blobs[0].Offset)
|
||||
rtest.Equals(t, uint(50), blobs[1].Offset)
|
||||
rtest.Equals(t, uint(100), blobs[2].Offset)
|
||||
}
|
||||
|
||||
@@ -34,10 +34,10 @@ type Repository interface {
|
||||
ListPacksFromIndex(ctx context.Context, packs IDSet) <-chan PackBlobs
|
||||
// ListPack returns the list of blobs saved in the pack id and the length of
|
||||
// the pack header.
|
||||
ListPack(ctx context.Context, id ID, packSize int64) (entries []Blob, hdrSize uint32, err error)
|
||||
ListPack(ctx context.Context, id ID, packSize int64) (entries Blobs, hdrSize uint32, err error)
|
||||
|
||||
LoadBlob(ctx context.Context, t BlobType, id ID, buf []byte) ([]byte, error)
|
||||
LoadBlobsFromPack(ctx context.Context, packID ID, blobs []Blob, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error
|
||||
LoadBlobsFromPack(ctx context.Context, packID ID, blobs Blobs, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error
|
||||
|
||||
// WithUploader starts the necessary workers to upload new blobs. Once the callback returns,
|
||||
// the workers are stopped and the index is written to the repository. The callback must use
|
||||
@@ -128,7 +128,7 @@ type SaverRemoverUnpacked[FT FileTypes] interface {
|
||||
|
||||
type PackBlobs struct {
|
||||
PackID ID
|
||||
Blobs []Blob
|
||||
Blobs Blobs
|
||||
}
|
||||
|
||||
type TerminalCounterFactory interface {
|
||||
|
||||
@@ -42,7 +42,7 @@ type packInfo struct {
|
||||
files map[*fileInfo]struct{} // set of files that use blobs from this pack
|
||||
}
|
||||
|
||||
type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error
|
||||
type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error
|
||||
type startWarmupFn func(context.Context, restic.IDSet) (restic.WarmupJob, error)
|
||||
|
||||
// fileRestorer restores set of files
|
||||
@@ -343,7 +343,7 @@ func (r *fileRestorer) reportError(blobs blobToFileOffsetsMapping, processedBlob
|
||||
func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID,
|
||||
blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet) error {
|
||||
|
||||
blobList := make([]restic.Blob, 0, len(blobs))
|
||||
blobList := make(restic.Blobs, 0, len(blobs))
|
||||
for _, entry := range blobs {
|
||||
blobList = append(blobList, entry.blob)
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/errors"
|
||||
@@ -136,11 +135,9 @@ func newTestRepo(content []TestFile) *TestRepo {
|
||||
filesPathToContent: filesPathToContent,
|
||||
warmupJobs: []*TestWarmupJob{},
|
||||
}
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
blobs = append([]restic.Blob{}, blobs...)
|
||||
sort.Slice(blobs, func(i, j int) bool {
|
||||
return blobs[i].Offset < blobs[j].Offset
|
||||
})
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
blobs = append(restic.Blobs{}, blobs...)
|
||||
blobs.Sort()
|
||||
|
||||
for _, blob := range blobs {
|
||||
found := false
|
||||
@@ -316,7 +313,7 @@ func TestErrorRestoreFiles(t *testing.T) {
|
||||
|
||||
loadError := errors.New("load error")
|
||||
// loader always returns an error
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
return loadError
|
||||
}
|
||||
|
||||
@@ -349,7 +346,7 @@ func TestFatalDownloadError(t *testing.T) {
|
||||
repo := newTestRepo(content)
|
||||
|
||||
loader := repo.loader
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
ctr := 0
|
||||
return loader(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
if ctr < 2 {
|
||||
|
||||
Reference in New Issue
Block a user