diff --git a/changelog/unreleased/pull-21827 b/changelog/unreleased/pull-21827 new file mode 100644 index 000000000..92dae1e2f --- /dev/null +++ b/changelog/unreleased/pull-21827 @@ -0,0 +1,9 @@ +Bugfix: `repair packs` correctly handles pack files missing from the index + +The `repair packs` command was unable to salvage blobs from a pack file if +the pack file was not contained in the index or the index entry was incomplete. +The command now uses both the information contained in the index and the +pack file header. + +https://github.com/restic/restic/issues/21820 +https://github.com/restic/restic/pull/21827 diff --git a/cmd/restic/cmd_debug.go b/cmd/restic/cmd_debug.go index d6e51107e..2d3a8d8d3 100644 --- a/cmd/restic/cmd_debug.go +++ b/cmd/restic/cmd_debug.go @@ -11,7 +11,6 @@ import ( "io" "os" "runtime" - "sort" "sync" "time" @@ -340,7 +339,7 @@ func decryptUnsigned(k *crypto.Key, buf []byte) []byte { return out } -func loadBlobs(ctx context.Context, opts DebugExamineOptions, repo restic.Repository, packID restic.ID, list []restic.Blob, printer progress.Printer) error { +func loadBlobs(ctx context.Context, opts DebugExamineOptions, repo restic.Repository, packID restic.ID, list restic.Blobs, printer progress.Printer) error { dec, err := zstd.NewReader(nil) if err != nil { panic(err) @@ -543,13 +542,11 @@ func examinePack(ctx context.Context, opts DebugExamineOptions, repo restic.Repo return nil } -func checkPackSize(blobs []restic.Blob, fileSize int, printer progress.Printer) { +func checkPackSize(blobs restic.Blobs, fileSize int, printer progress.Printer) { // track current size and offset var size, offset uint64 - sort.Slice(blobs, func(i, j int) bool { - return blobs[i].Offset < blobs[j].Offset - }) + blobs.Sort() for _, pb := range blobs { printer.S(" %v blob %v, offset %-6d, raw length %-6d", pb.Type, pb.ID, pb.Offset, pb.Length) diff --git a/internal/repository/check.go b/internal/repository/check.go index 477c72a97..0f7d322e7 100644 --- a/internal/repository/check.go +++ b/internal/repository/check.go @@ -7,7 +7,6 @@ import ( "crypto/sha256" "fmt" "io" - "sort" "github.com/klauspost/compress/zstd" "github.com/restic/restic/internal/backend" @@ -37,7 +36,7 @@ func (e *partialReadError) Error() string { } // CheckPack reads a pack and checks the integrity of all blobs. -func CheckPack(ctx context.Context, r *Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { +func CheckPack(ctx context.Context, r *Repository, id restic.ID, blobs restic.Blobs, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { err := checkPackInner(ctx, r, id, blobs, size, bufRd, dec) if err != nil { if r.cache != nil { @@ -56,7 +55,7 @@ func CheckPack(ctx context.Context, r *Repository, id restic.ID, blobs []restic. return err } -func checkPackInner(ctx context.Context, r *Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { +func checkPackInner(ctx context.Context, r *Repository, id restic.ID, blobs restic.Blobs, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { debug.Log("checking pack %v", id.String()) @@ -65,9 +64,7 @@ func checkPackInner(ctx context.Context, r *Repository, id restic.ID, blobs []re } // sanity check blobs in index - sort.Slice(blobs, func(i, j int) bool { - return blobs[i].Offset < blobs[j].Offset - }) + blobs.Sort() idxHdrSize := pack.CalculateHeaderSize(blobs) lastBlobEnd := 0 nonContinuousPack := false diff --git a/internal/repository/checker.go b/internal/repository/checker.go index d11a95b22..0b61a5876 100644 --- a/internal/repository/checker.go +++ b/internal/repository/checker.go @@ -215,7 +215,7 @@ func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID type checkTask struct { id restic.ID size int64 - blobs []restic.Blob + blobs restic.Blobs } ch := make(chan checkTask) diff --git a/internal/repository/index/associated_data_test.go b/internal/repository/index/associated_data_test.go index 2d4611f5c..07e0a3d58 100644 --- a/internal/repository/index/associated_data_test.go +++ b/internal/repository/index/associated_data_test.go @@ -40,7 +40,7 @@ func TestAssociatedSet(t *testing.T) { bh, blob := makeFakePackedBlob() mi := NewMasterIndex() - test.OK(t, mi.StorePack(context.TODO(), blob.PackID, []restic.Blob{blob.Blob}, &noopSaver{})) + test.OK(t, mi.StorePack(context.TODO(), blob.PackID, restic.Blobs{blob.Blob}, &noopSaver{})) test.OK(t, mi.Flush(context.TODO(), &noopSaver{})) bs := NewAssociatedSet[uint8](mi) @@ -123,14 +123,14 @@ func TestAssociatedSetWithExtendedIndex(t *testing.T) { _, blob := makeFakePackedBlob() mi := NewMasterIndex() - test.OK(t, mi.StorePack(context.TODO(), blob.PackID, []restic.Blob{blob.Blob}, &noopSaver{})) + test.OK(t, mi.StorePack(context.TODO(), blob.PackID, restic.Blobs{blob.Blob}, &noopSaver{})) test.OK(t, mi.Flush(context.TODO(), &noopSaver{})) bs := NewAssociatedSet[uint8](mi) // add new blobs to index after building the set of, blob2 := makeFakePackedBlob() - test.OK(t, mi.StorePack(context.TODO(), blob2.PackID, []restic.Blob{blob2.Blob}, &noopSaver{})) + test.OK(t, mi.StorePack(context.TODO(), blob2.PackID, restic.Blobs{blob2.Blob}, &noopSaver{})) test.OK(t, mi.Flush(context.TODO(), &noopSaver{})) // non-existent @@ -167,10 +167,10 @@ func TestAssociatedSetIntersectAndSub(t *testing.T) { bh3, blob3 := makeFakePackedBlob() bh4, blob4 := makeFakePackedBlob() - test.OK(t, mi.StorePack(context.TODO(), blob1.PackID, []restic.Blob{blob1.Blob}, saver)) - test.OK(t, mi.StorePack(context.TODO(), blob2.PackID, []restic.Blob{blob2.Blob}, saver)) - test.OK(t, mi.StorePack(context.TODO(), blob3.PackID, []restic.Blob{blob3.Blob}, saver)) - test.OK(t, mi.StorePack(context.TODO(), blob4.PackID, []restic.Blob{blob4.Blob}, saver)) + test.OK(t, mi.StorePack(context.TODO(), blob1.PackID, restic.Blobs{blob1.Blob}, saver)) + test.OK(t, mi.StorePack(context.TODO(), blob2.PackID, restic.Blobs{blob2.Blob}, saver)) + test.OK(t, mi.StorePack(context.TODO(), blob3.PackID, restic.Blobs{blob3.Blob}, saver)) + test.OK(t, mi.StorePack(context.TODO(), blob4.PackID, restic.Blobs{blob4.Blob}, saver)) test.OK(t, mi.Flush(context.TODO(), saver)) t.Run("Intersect", func(t *testing.T) { diff --git a/internal/repository/index/index.go b/internal/repository/index/index.go index 10a4275fe..e96530ec1 100644 --- a/internal/repository/index/index.go +++ b/internal/repository/index/index.go @@ -143,7 +143,7 @@ func (idx *Index) Preallocate(t restic.BlobType, numEntries int) { // StorePack remembers the ids of all blobs of a given pack // in the index -func (idx *Index) StorePack(id restic.ID, blobs []restic.Blob) { +func (idx *Index) StorePack(id restic.ID, blobs restic.Blobs) { idx.m.Lock() defer idx.m.Unlock() @@ -228,11 +228,6 @@ func (idx *Index) Values() iter.Seq[restic.PackedBlob] { } } -type EachByPackResult struct { - PackID restic.ID - Blobs []restic.Blob -} - // EachByPack returns a channel that yields all blobs known to the index // grouped by packID but ignoring blobs with a packID in packPlacklist for // finalized indexes. @@ -240,10 +235,10 @@ type EachByPackResult struct { // from the finalized index which have been re-read into a non-finalized index. // When the context is cancelled, the background goroutine // terminates. This blocks any modification of the index. -func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan EachByPackResult { +func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan restic.PackBlobs { idx.m.RLock() - ch := make(chan EachByPackResult) + ch := make(chan restic.PackBlobs) go func() { defer idx.m.RUnlock() @@ -264,7 +259,7 @@ func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <- } for packID, packByType := range byPack { - var result EachByPackResult + var result restic.PackBlobs result.PackID = packID for typ, p := range packByType { for _, e := range p { diff --git a/internal/repository/index/index_test.go b/internal/repository/index/index_test.go index f47b27496..2a53072df 100644 --- a/internal/repository/index/index_test.go +++ b/internal/repository/index/index_test.go @@ -21,7 +21,7 @@ func TestIndexSerialize(t *testing.T) { // create 50 packs with 20 blobs each for i := 0; i < 50; i++ { packID := restic.NewRandomID() - var blobs []restic.Blob + var blobs restic.Blobs pos := uint(0) for j := 0; j < 20; j++ { @@ -85,7 +85,7 @@ func TestIndexSerialize(t *testing.T) { newtests := []restic.PackedBlob{} for i := 0; i < 10; i++ { packID := restic.NewRandomID() - var blobs []restic.Blob + var blobs restic.Blobs pos := uint(0) for j := 0; j < 10; j++ { @@ -145,7 +145,7 @@ func TestIndexSize(t *testing.T) { blobCount := 100 for i := 0; i < packs; i++ { packID := restic.NewRandomID() - var blobs []restic.Blob + var blobs restic.Blobs pos := uint(0) for j := 0; j < blobCount; j++ { @@ -401,7 +401,7 @@ func TestIndexPacks(t *testing.T) { for i := 0; i < 20; i++ { packID := restic.NewRandomID() - idx.StorePack(packID, []restic.Blob{ + idx.StorePack(packID, restic.Blobs{ { BlobHandle: restic.NewRandomBlobHandle(), Offset: 0, @@ -433,7 +433,7 @@ func createRandomIndex(rng *rand.Rand, packfiles int) (idx *index.Index, lookupB // create index with given number of pack files for i := 0; i < packfiles; i++ { packID := NewRandomTestID(rng) - var blobs []restic.Blob + var blobs restic.Blobs offset := 0 for offset < maxPackSize { size := 2000 + rng.Intn(4*1024*1024) @@ -524,7 +524,7 @@ func TestIndexHas(t *testing.T) { // create 50 packs with 20 blobs each for i := 0; i < 50; i++ { packID := restic.NewRandomID() - var blobs []restic.Blob + var blobs restic.Blobs pos := uint(0) for j := 0; j < 20; j++ { @@ -566,7 +566,7 @@ func TestMixedEachByPack(t *testing.T) { for i := 0; i < 50; i++ { packID := restic.NewRandomID() expected[packID] = 1 - blobs := []restic.Blob{ + blobs := restic.Blobs{ { BlobHandle: restic.BlobHandle{Type: restic.DataBlob, ID: restic.NewRandomID()}, Offset: 0, @@ -586,9 +586,7 @@ func TestMixedEachByPack(t *testing.T) { reported[bp.PackID]++ rtest.Equals(t, 2, len(bp.Blobs)) // correct blob count - if bp.Blobs[0].Offset > bp.Blobs[1].Offset { - bp.Blobs[1], bp.Blobs[0] = bp.Blobs[0], bp.Blobs[1] - } + bp.Blobs.Sort() b0 := bp.Blobs[0] rtest.Assert(t, b0.Type == restic.DataBlob && b0.Offset == 0 && b0.Length == 42, "wrong blob", b0) b1 := bp.Blobs[1] @@ -610,7 +608,7 @@ func TestEachByPackIgnoes(t *testing.T) { } else { expected[packID] = 1 } - blobs := []restic.Blob{ + blobs := restic.Blobs{ { BlobHandle: restic.BlobHandle{Type: restic.DataBlob, ID: restic.NewRandomID()}, Offset: 0, diff --git a/internal/repository/index/master_index.go b/internal/repository/index/master_index.go index e37614fc5..70afff0fb 100644 --- a/internal/repository/index/master_index.go +++ b/internal/repository/index/master_index.go @@ -145,12 +145,12 @@ func (mi *MasterIndex) Insert(idx *Index) { } // StorePack remembers the id and pack in the index. -func (mi *MasterIndex) StorePack(ctx context.Context, id restic.ID, blobs []restic.Blob, r restic.SaverUnpacked[restic.FileType]) error { +func (mi *MasterIndex) StorePack(ctx context.Context, id restic.ID, blobs restic.Blobs, r restic.SaverUnpacked[restic.FileType]) error { mi.storePack(id, blobs) return mi.saveFullIndex(ctx, r) } -func (mi *MasterIndex) storePack(id restic.ID, blobs []restic.Blob) { +func (mi *MasterIndex) storePack(id restic.ID, blobs restic.Blobs) { mi.idxMutex.Lock() defer mi.idxMutex.Unlock() @@ -652,7 +652,7 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan defer close(out) // only resort a part of the index to keep the memory overhead bounded for i := byte(0); i < 16; i++ { - packBlob := make(map[restic.ID][]restic.Blob) + packBlob := make(map[restic.ID]restic.Blobs) for pack := range packs { if pack[0]&0xf == i { packBlob[pack] = nil diff --git a/internal/repository/index/master_index_test.go b/internal/repository/index/master_index_test.go index 964ee8814..8348611ad 100644 --- a/internal/repository/index/master_index_test.go +++ b/internal/repository/index/master_index_test.go @@ -64,12 +64,12 @@ func TestMasterIndex(t *testing.T) { } idx1 := index.NewIndex() - idx1.StorePack(blob1.PackID, []restic.Blob{blob1.Blob}) - idx1.StorePack(blob12a.PackID, []restic.Blob{blob12a.Blob}) + idx1.StorePack(blob1.PackID, restic.Blobs{blob1.Blob}) + idx1.StorePack(blob12a.PackID, restic.Blobs{blob12a.Blob}) idx2 := index.NewIndex() - idx2.StorePack(blob2.PackID, []restic.Blob{blob2.Blob}) - idx2.StorePack(blob12b.PackID, []restic.Blob{blob12b.Blob}) + idx2.StorePack(blob2.PackID, restic.Blobs{blob2.Blob}) + idx2.StorePack(blob12b.PackID, restic.Blobs{blob12b.Blob}) mIdx := index.NewMasterIndex() mIdx.Insert(idx1) @@ -135,7 +135,7 @@ func TestMasterIndexAddPending(t *testing.T) { // Test AddPending: try to add a blob that's already in an index (should return false) bhInIndex := restic.NewRandomBlobHandle() idx := index.NewIndex() - idx.StorePack(restic.NewRandomID(), []restic.Blob{{ + idx.StorePack(restic.NewRandomID(), restic.Blobs{{ BlobHandle: bhInIndex, Length: uint(crypto.CiphertextLength(50)), Offset: 0, @@ -180,7 +180,7 @@ func TestMasterIndexStorePackRemovesPending(t *testing.T) { UncompressedLength: 75, } saver := &noopSaver{} - err := mIdx.StorePack(context.Background(), packID, []restic.Blob{blob}, saver) + err := mIdx.StorePack(context.Background(), packID, restic.Blobs{blob}, saver) rtest.OK(t, err) // Verify it is still found @@ -223,10 +223,10 @@ func TestMasterMergeFinalIndexes(t *testing.T) { } idx1 := index.NewIndex() - idx1.StorePack(blob1.PackID, []restic.Blob{blob1.Blob}) + idx1.StorePack(blob1.PackID, restic.Blobs{blob1.Blob}) idx2 := index.NewIndex() - idx2.StorePack(blob2.PackID, []restic.Blob{blob2.Blob}) + idx2.StorePack(blob2.PackID, restic.Blobs{blob2.Blob}) mIdx := index.NewMasterIndex() mIdx.Insert(idx1) @@ -256,8 +256,8 @@ func TestMasterMergeFinalIndexes(t *testing.T) { // merge another index containing identical blobs idx3 := index.NewIndex() - idx3.StorePack(blob1.PackID, []restic.Blob{blob1.Blob}) - idx3.StorePack(blob2.PackID, []restic.Blob{blob2.Blob}) + idx3.StorePack(blob1.PackID, restic.Blobs{blob1.Blob}) + idx3.StorePack(blob2.PackID, restic.Blobs{blob2.Blob}) mIdx.Insert(idx3) finalIndexes, idxCount, newIDs := index.TestMergeIndex(t, mIdx) @@ -588,14 +588,14 @@ func TestRewriteOversizedIndex(t *testing.T) { return idx.Len(restic.DataBlob) > 2*fullIndexCount } - var blobs []restic.Blob + var blobs restic.Blobs // build oversized index idx := index.NewIndex() numPacks := 5 for p := 0; p < numPacks; p++ { packID := restic.NewRandomID() - packBlobs := make([]restic.Blob, 0, fullIndexCount) + packBlobs := make(restic.Blobs, 0, fullIndexCount) for i := 0; i < fullIndexCount; i++ { blob := restic.Blob{ diff --git a/internal/repository/pack/pack.go b/internal/repository/pack/pack.go index 9f5a576d8..c065f4f65 100644 --- a/internal/repository/pack/pack.go +++ b/internal/repository/pack/pack.go @@ -21,7 +21,7 @@ var ErrBroken = errors.New("packer cannot be used after a write error") // Packer is used to create a new Pack. type Packer struct { - blobs []restic.Blob + blobs restic.Blobs bytes uint k *crypto.Key @@ -129,7 +129,7 @@ func (p *Packer) Finalize() error { return nil } -func verifyHeader(k *crypto.Key, header []byte, expected []restic.Blob) error { +func verifyHeader(k *crypto.Key, header []byte, expected restic.Blobs) error { // do not offer a way to skip the pack header verification, as pack headers are usually small enough // to not result in a significant performance impact @@ -157,7 +157,7 @@ func (p *Packer) HeaderOverhead() int { } // makeHeader constructs the header for p. -func makeHeader(blobs []restic.Blob) ([]byte, error) { +func makeHeader(blobs restic.Blobs) ([]byte, error) { buf := make([]byte, 0, len(blobs)*int(entrySize)) for _, b := range blobs { @@ -232,7 +232,7 @@ func (p *Packer) HeaderFull() bool { } // Blobs returns the slice of blobs that have been written. -func (p *Packer) Blobs() []restic.Blob { +func (p *Packer) Blobs() restic.Blobs { p.m.Lock() defer p.m.Unlock() @@ -348,7 +348,7 @@ func (e InvalidFileError) Error() string { // List returns the list of entries found in a pack file and the length of the // header (including header size and crypto overhead) -func List(k *crypto.Key, rd io.ReaderAt, size int64) (entries []restic.Blob, hdrSize uint32, err error) { +func List(k *crypto.Key, rd io.ReaderAt, size int64) (entries restic.Blobs, hdrSize uint32, err error) { buf, err := readHeader(rd, size) if err != nil { return nil, 0, err @@ -367,7 +367,7 @@ func List(k *crypto.Key, rd io.ReaderAt, size int64) (entries []restic.Blob, hdr } // might over allocate a bit if all blobs have EntrySize but only by a few percent - entries = make([]restic.Blob, 0, uint(len(buf))/plainEntrySize) + entries = make(restic.Blobs, 0, uint(len(buf))/plainEntrySize) pos := uint(0) for len(buf) > 0 { @@ -427,7 +427,7 @@ func CalculateEntrySize(blob restic.Blob) int { return int(plainEntrySize) } -func CalculateHeaderSize(blobs []restic.Blob) int { +func CalculateHeaderSize(blobs restic.Blobs) int { size := headerSize for _, blob := range blobs { size += CalculateEntrySize(blob) diff --git a/internal/repository/pack/pack_internal_test.go b/internal/repository/pack/pack_internal_test.go index 2e7400ad0..186450e1c 100644 --- a/internal/repository/pack/pack_internal_test.go +++ b/internal/repository/pack/pack_internal_test.go @@ -182,7 +182,7 @@ func TestReadRecords(t *testing.T) { func TestUnpackedVerification(t *testing.T) { // create random keys k := crypto.NewRandomKey() - blobs := []restic.Blob{ + blobs := restic.Blobs{ { BlobHandle: restic.NewRandomBlobHandle(), Length: 42, diff --git a/internal/repository/repack.go b/internal/repository/repack.go index c2eaa8f41..2793348b8 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -84,7 +84,7 @@ func repack( wg.Go(func() error { defer close(downloadQueue) for pbs := range repo.ListPacksFromIndex(wgCtx, packs) { - var packBlobs []restic.Blob + var packBlobs restic.Blobs keepMutex.Lock() // filter out unnecessary blobs for _, entry := range pbs.Blobs { diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go index 0c9d3a43f..39d057976 100644 --- a/internal/repository/repair_pack.go +++ b/internal/repository/repair_pack.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "slices" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" @@ -15,31 +16,36 @@ func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printe bar.SetMax(uint64(len(ids))) defer bar.Done() - err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { + packToBlobs, err := resolveBlobsForPacks(ctx, repo, ids) + if err != nil { + return err + } + + err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { // examine all data the indexes have for the pack file for b := range repo.ListPacksFromIndex(ctx, ids) { - blobs := b.Blobs - if len(blobs) == 0 { - printer.E("no blobs found for pack %v", b.PackID) - bar.Add(1) - continue + indexBlobs := b.Blobs + err := reuploadBlobsFromPack(ctx, repo, b.PackID, indexBlobs, printer, uploader) + if err != nil { + return err } - err := repo.LoadBlobsFromPack(ctx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + indexBlobs.Sort() + packBlobs := packToBlobs[b.PackID] + packBlobs.Sort() + if packBlobs != nil && !slices.Equal(indexBlobs, packBlobs) { + // handle case where the index entry is broken or incomplete. + // this can result in duplicate blobs, which can be cleaned up by running prune. + printer.E("repairing incomplete index entry for pack %v", b.PackID) + err := reuploadBlobsFromPack(ctx, repo, b.PackID, packBlobs, printer, uploader) if err != nil { - printer.E("failed to load blob %v: %v", blob.ID, err) - return nil + return err } - id, _, _, err := uploader.SaveBlob(ctx, blob.Type, buf, restic.ID{}, true) - if !id.Equal(blob.ID) { - panic("pack id mismatch during upload") - } - return err - }) - // ignore truncated file parts - if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { - return err } + if len(indexBlobs) == 0 && len(packBlobs) == 0 { + printer.E("no blobs found for pack %v", b.PackID) + } + bar.Add(1) } return nil @@ -64,3 +70,41 @@ func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printe return nil } + +func resolveBlobsForPacks(ctx context.Context, repo *Repository, ids restic.IDSet) (map[restic.ID]restic.Blobs, error) { + packToBlobs := make(map[restic.ID]restic.Blobs) + + err := repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { + if ids.Has(id) { + blobs, _, err := repo.ListPack(ctx, id, size) + if err != nil { + return nil + } + packToBlobs[id] = blobs + } + return nil + }) + if err != nil { + return nil, err + } + return packToBlobs, nil +} + +func reuploadBlobsFromPack(ctx context.Context, repo *Repository, packID restic.ID, blobs restic.Blobs, printer progress.Printer, uploader restic.BlobSaverWithAsync) error { + err := repo.LoadBlobsFromPack(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + if err != nil { + printer.E("failed to load blob %v: %v", blob.ID, err) + return nil + } + id, _, _, err := uploader.SaveBlob(ctx, blob.Type, buf, restic.ID{}, true) + if err == nil && !id.Equal(blob.ID) { + panic("pack id mismatch during upload") + } + return err + }) + // ignore truncated file parts + if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { + return err + } + return nil +} diff --git a/internal/repository/repair_pack_test.go b/internal/repository/repair_pack_test.go index 5f02e7d61..54f0ca02a 100644 --- a/internal/repository/repair_pack_test.go +++ b/internal/repository/repair_pack_test.go @@ -96,6 +96,22 @@ func testRepairBrokenPack(t *testing.T, version uint) { } return restic.NewIDSet(damagedID), damagedBlobs }, + }, { + "unindexed pack", + func(t *testing.T, random *rand.Rand, repo *repository.Repository, be backend.Backend, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet) { + // remove one pack file from the index + unindexID := packsBefore.List()[0] + h := backend.Handle{Type: backend.PackFile, Name: unindexID.String()} + + buf, err := backendtest.LoadAll(context.TODO(), be, h) + rtest.OK(t, err) + rtest.OK(t, be.Remove(context.TODO(), h)) + rtest.OK(t, repository.RepairIndex(context.TODO(), repo, repository.RepairIndexOptions{}, &progress.NoopPrinter{})) + + rtest.OK(t, be.Save(context.TODO(), h, backend.NewByteReader(buf, be.Hasher()))) + + return restic.NewIDSet(unindexID), restic.NewBlobSet() + }, }, } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 5bf082a73..86eab8183 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -7,7 +7,6 @@ import ( "io" "math" "runtime" - "sort" "sync" "github.com/klauspost/compress/zstd" @@ -289,7 +288,7 @@ func (r *Repository) loadBlob(ctx context.Context, blobs []restic.PackedBlob, bu continue } - it := newPackBlobIterator(blob.PackID, newByteReader(buf), blob.Offset, []restic.Blob{blob.Blob}, r.key, r.getZstdDecoder()) + it := newPackBlobIterator(blob.PackID, newByteReader(buf), blob.Offset, restic.Blobs{blob.Blob}, r.key, r.getZstdDecoder()) pbv, err := it.Next() if err == nil { @@ -960,7 +959,7 @@ func (r *Repository) List(ctx context.Context, t restic.FileType, fn func(restic // ListPack returns the list of blobs saved in the pack id and the length of // the pack header. -func (r *Repository) ListPack(ctx context.Context, id restic.ID, size int64) ([]restic.Blob, uint32, error) { +func (r *Repository) ListPack(ctx context.Context, id restic.ID, size int64) (restic.Blobs, uint32, error) { h := backend.Handle{Type: restic.PackFile, Name: id.String()} entries, hdrSize, err := pack.List(r.Key(), backend.ReaderAt(ctx, r.be, h), size) @@ -1049,19 +1048,16 @@ const maxUnusedRange = 1 * 1024 * 1024 // handleBlobFn is called at most once for each blob. If the callback returns an error, // then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within // this specific call. The callback must not keep a reference to buf. -func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { return streamPack(ctx, r.be.Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn) } -func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { if len(blobs) == 0 { // nothing to do return nil } - - sort.Slice(blobs, func(i, j int) bool { - return blobs[i].Offset < blobs[j].Offset - }) + blobs.Sort() lowerIdx := 0 lastPos := blobs[0].Offset @@ -1099,7 +1095,7 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn return streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:], handleBlobFn) } -func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: blobs[0].Type.IsMetadata()} dataStart := blobs[0].Offset @@ -1209,7 +1205,7 @@ type packBlobIterator struct { rd discardReader currentOffset uint - blobs []restic.Blob + blobs restic.Blobs key *crypto.Key dec *zstd.Decoder @@ -1225,7 +1221,7 @@ type packBlobValue struct { var errPackEOF = errors.New("reached EOF of pack file") func newPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint, - blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *packBlobIterator { + blobs restic.Blobs, key *crypto.Key, dec *zstd.Decoder) *packBlobIterator { return &packBlobIterator{ packID: packID, rd: rd, diff --git a/internal/repository/repository_internal_test.go b/internal/repository/repository_internal_test.go index edec4aa48..31aed6f64 100644 --- a/internal/repository/repository_internal_test.go +++ b/internal/repository/repository_internal_test.go @@ -96,7 +96,7 @@ func benchmarkLoadIndex(b *testing.B, version uint) { idx := index.NewIndex() for i := 0; i < 5000; i++ { - idx.StorePack(restic.NewRandomID(), []restic.Blob{ + idx.StorePack(restic.NewRandomID(), restic.Blobs{ { BlobHandle: restic.NewRandomBlobHandle(), Length: 1234, @@ -133,7 +133,7 @@ func loadIndex(ctx context.Context, repo restic.LoaderUnpacked, id restic.ID) (* } // buildPackfileWithoutHeader returns a manually built pack file without a header. -func buildPackfileWithoutHeader(blobSizes []int, key *crypto.Key, compress bool) (blobs []restic.Blob, packfile []byte) { +func buildPackfileWithoutHeader(blobSizes []int, key *crypto.Key, compress bool) (blobs restic.Blobs, packfile []byte) { opts := []zstd.EOption{ // Set the compression level configured. zstd.WithEncoderLevel(zstd.SpeedDefault), @@ -280,19 +280,19 @@ func testStreamPack(t *testing.T, version uint) { // first, test regular usage t.Run("regular", func(t *testing.T) { tests := []struct { - blobs []restic.Blob + blobs restic.Blobs calls int shortFirstLoad bool }{ {packfileBlobs[1:2], 1, false}, {packfileBlobs[2:5], 1, false}, {packfileBlobs[2:8], 1, false}, - {[]restic.Blob{ + {restic.Blobs{ packfileBlobs[0], packfileBlobs[4], packfileBlobs[2], }, 1, false}, - {[]restic.Blob{ + {restic.Blobs{ packfileBlobs[0], packfileBlobs[len(packfileBlobs)-1], }, 2, false}, @@ -341,12 +341,12 @@ func testStreamPack(t *testing.T, version uint) { // next, test invalid uses, which should return an error t.Run("invalid", func(t *testing.T) { tests := []struct { - blobs []restic.Blob + blobs restic.Blobs err string }{ { // pass one blob several times - blobs: []restic.Blob{ + blobs: restic.Blobs{ packfileBlobs[3], packfileBlobs[8], packfileBlobs[3], @@ -357,7 +357,7 @@ func testStreamPack(t *testing.T, version uint) { { // pass something that's not a valid blob in the current pack file - blobs: []restic.Blob{ + blobs: restic.Blobs{ { Offset: 123, Length: 20000, @@ -368,7 +368,7 @@ func testStreamPack(t *testing.T, version uint) { { // pass a blob that's too small - blobs: []restic.Blob{ + blobs: restic.Blobs{ { Offset: 123, Length: 10, @@ -523,7 +523,7 @@ func TestStreamPackFallback(t *testing.T) { plaintext := rtest.Random(800, 42) blobID := restic.Hash(plaintext) - blobs := []restic.Blob{ + blobs := restic.Blobs{ { Length: uint(crypto.CiphertextLength(len(plaintext))), Offset: 0, diff --git a/internal/restic/blob.go b/internal/restic/blob.go index 3a6872af3..432c91db1 100644 --- a/internal/restic/blob.go +++ b/internal/restic/blob.go @@ -1,7 +1,9 @@ package restic import ( + "cmp" "fmt" + "slices" "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/errors" @@ -31,6 +33,14 @@ func (b Blob) IsCompressed() bool { return b.UncompressedLength != 0 } +type Blobs []Blob + +func (b Blobs) Sort() { + slices.SortFunc(b, func(a, b Blob) int { + return cmp.Compare(a.Offset, b.Offset) + }) +} + // PackedBlob is a blob stored within a file. type PackedBlob struct { Blob diff --git a/internal/restic/blob_test.go b/internal/restic/blob_test.go index 951872250..089e3004e 100644 --- a/internal/restic/blob_test.go +++ b/internal/restic/blob_test.go @@ -3,6 +3,8 @@ package restic import ( "encoding/json" "testing" + + rtest "github.com/restic/restic/internal/test" ) var blobTypeJSON = []struct { @@ -39,3 +41,15 @@ func TestBlobTypeJSON(t *testing.T) { } } } + +func TestBlobsSort(t *testing.T) { + blobs := Blobs{ + {Offset: 100}, + {Offset: 0}, + {Offset: 50}, + } + blobs.Sort() + rtest.Equals(t, uint(0), blobs[0].Offset) + rtest.Equals(t, uint(50), blobs[1].Offset) + rtest.Equals(t, uint(100), blobs[2].Offset) +} diff --git a/internal/restic/repository.go b/internal/restic/repository.go index c7f326823..1476c2468 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -34,10 +34,10 @@ type Repository interface { ListPacksFromIndex(ctx context.Context, packs IDSet) <-chan PackBlobs // ListPack returns the list of blobs saved in the pack id and the length of // the pack header. - ListPack(ctx context.Context, id ID, packSize int64) (entries []Blob, hdrSize uint32, err error) + ListPack(ctx context.Context, id ID, packSize int64) (entries Blobs, hdrSize uint32, err error) LoadBlob(ctx context.Context, t BlobType, id ID, buf []byte) ([]byte, error) - LoadBlobsFromPack(ctx context.Context, packID ID, blobs []Blob, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error + LoadBlobsFromPack(ctx context.Context, packID ID, blobs Blobs, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error // WithUploader starts the necessary workers to upload new blobs. Once the callback returns, // the workers are stopped and the index is written to the repository. The callback must use @@ -128,7 +128,7 @@ type SaverRemoverUnpacked[FT FileTypes] interface { type PackBlobs struct { PackID ID - Blobs []Blob + Blobs Blobs } type TerminalCounterFactory interface { diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 01f4734b4..63dc3d6e9 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -42,7 +42,7 @@ type packInfo struct { files map[*fileInfo]struct{} // set of files that use blobs from this pack } -type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error +type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error type startWarmupFn func(context.Context, restic.IDSet) (restic.WarmupJob, error) // fileRestorer restores set of files @@ -343,7 +343,7 @@ func (r *fileRestorer) reportError(blobs blobToFileOffsetsMapping, processedBlob func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID, blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet) error { - blobList := make([]restic.Blob, 0, len(blobs)) + blobList := make(restic.Blobs, 0, len(blobs)) for _, entry := range blobs { blobList = append(blobList, entry.blob) } diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index 67819970a..120288378 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "os" - "sort" "testing" "github.com/restic/restic/internal/errors" @@ -136,11 +135,9 @@ func newTestRepo(content []TestFile) *TestRepo { filesPathToContent: filesPathToContent, warmupJobs: []*TestWarmupJob{}, } - repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { - blobs = append([]restic.Blob{}, blobs...) - sort.Slice(blobs, func(i, j int) bool { - return blobs[i].Offset < blobs[j].Offset - }) + repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + blobs = append(restic.Blobs{}, blobs...) + blobs.Sort() for _, blob := range blobs { found := false @@ -316,7 +313,7 @@ func TestErrorRestoreFiles(t *testing.T) { loadError := errors.New("load error") // loader always returns an error - repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { return loadError } @@ -349,7 +346,7 @@ func TestFatalDownloadError(t *testing.T) { repo := newTestRepo(content) loader := repo.loader - repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { ctr := 0 return loader(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { if ctr < 2 {