diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 431aefd69..6900eb96c 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -106,7 +106,7 @@ func repack( worker := func() error { for t := range downloadQueue { - err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + err := repo.loadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error { if err != nil { // a required blob couldn't be retrieved return err diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go index 5059aad04..735653a4c 100644 --- a/internal/repository/repair_pack.go +++ b/internal/repository/repair_pack.go @@ -91,7 +91,7 @@ func resolveBlobsForPacks(ctx context.Context, repo *Repository, ids restic.IDSe } func reuploadBlobsFromPack(ctx context.Context, repo *Repository, packID restic.ID, blobs restic.Blobs, printer progress.Printer, uploader restic.BlobSaverWithAsync) error { - err := repo.LoadBlobsFromPack(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + err := repo.loadBlobsFromPack(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { if err != nil { printer.E("failed to load blob %v: %v", blob.ID, err) return nil diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 7bc61bcc0..7ae36885d 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -1047,7 +1047,33 @@ const maxUnusedRange = 1 * 1024 * 1024 // handleBlobFn is called at most once for each blob. If the callback returns an error, // then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within // this specific call. The callback must not keep a reference to buf. -func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, handles []restic.BlobHandle, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + blobs, err := r.blobsInPack(packID, handles) + if err != nil { + return err + } + return r.loadBlobsFromPack(ctx, packID, blobs, handleBlobFn) +} + +func (r *Repository) blobsInPack(packID restic.ID, handles []restic.BlobHandle) (restic.Blobs, error) { + blobs := make(restic.Blobs, 0, len(handles)) + for _, h := range handles { + found := false + for _, pb := range r.idx.Lookup(h) { + if pb.PackID.Equal(packID) { + blobs = append(blobs, pb.Blob) + found = true + break + } + } + if !found { + return nil, errors.Errorf("blob %v not found in pack %v", h, packID) + } + } + return blobs, nil +} + +func (r *Repository) loadBlobsFromPack(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { return streamPack(ctx, r.be.Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn) } diff --git a/internal/restic/repository.go b/internal/restic/repository.go index f8b5cef33..09693621d 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -36,7 +36,7 @@ type Repository interface { ListPack(ctx context.Context, id ID, packSize int64) (entries Blobs, err error) LoadBlob(ctx context.Context, t BlobType, id ID, buf []byte) ([]byte, error) - LoadBlobsFromPack(ctx context.Context, packID ID, blobs Blobs, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error + LoadBlobsFromPack(ctx context.Context, packID ID, blobs []BlobHandle, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error // WithUploader starts the necessary workers to upload new blobs. Once the callback returns, // the workers are stopped and the index is written to the repository. The callback must use diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 63dc3d6e9..5207f4f8d 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -42,7 +42,7 @@ type packInfo struct { files map[*fileInfo]struct{} // set of files that use blobs from this pack } -type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error +type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.BlobHandle, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error type startWarmupFn func(context.Context, restic.IDSet) (restic.WarmupJob, error) // fileRestorer restores set of files @@ -261,14 +261,14 @@ func (r *fileRestorer) truncateFileToSize(location string, size int64) error { type blobToFileOffsetsMapping map[restic.ID]struct { files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file - blob restic.Blob + blob restic.BlobHandle } func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { // calculate blob->[]files->[]offsets mappings blobs := make(blobToFileOffsetsMapping) for file := range pack.files { - addBlob := func(blob restic.Blob, fileOffset int64) { + addBlob := func(blob restic.BlobHandle, fileOffset int64) { blobInfo, ok := blobs[blob.ID] if !ok { blobInfo.files = make(map[*fileInfo][]int64) @@ -280,7 +280,7 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { if fileBlobs, ok := file.blobs.(restic.IDs); ok { err := r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob, idx int, fileOffset int64) { if packID.Equal(pack.id) && !file.state.HasMatchingBlob(idx) { - addBlob(blob, fileOffset) + addBlob(blob.BlobHandle, fileOffset) } }) if err != nil { @@ -292,7 +292,7 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { idxPacks := r.idx(restic.DataBlob, blob.id) for _, idxPack := range idxPacks { if idxPack.PackID.Equal(pack.id) { - addBlob(idxPack.Blob, blob.offset) + addBlob(idxPack.BlobHandle, blob.offset) break } } @@ -324,7 +324,7 @@ func (r *fileRestorer) reportError(blobs blobToFileOffsetsMapping, processedBlob // only report error for not yet processed blobs affectedFiles := make(map[*fileInfo]struct{}) for _, entry := range blobs { - if processedBlobs.Has(entry.blob.BlobHandle) { + if processedBlobs.Has(entry.blob) { continue } for file := range entry.files { @@ -343,7 +343,7 @@ func (r *fileRestorer) reportError(blobs blobToFileOffsetsMapping, processedBlob func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID, blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet) error { - blobList := make(restic.Blobs, 0, len(blobs)) + blobList := make([]restic.BlobHandle, 0, len(blobs)) for _, entry := range blobs { blobList = append(blobList, entry.blob) } diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index 120288378..f5220998d 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -2,9 +2,11 @@ package restorer import ( "bytes" + "cmp" "context" "fmt" "os" + "slices" "testing" "github.com/restic/restic/internal/errors" @@ -135,24 +137,30 @@ func newTestRepo(content []TestFile) *TestRepo { filesPathToContent: filesPathToContent, warmupJobs: []*TestWarmupJob{}, } - repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { - blobs = append(restic.Blobs{}, blobs...) - blobs.Sort() - - for _, blob := range blobs { + repo.loader = func(ctx context.Context, packID restic.ID, handles []restic.BlobHandle, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + entries := make([]restic.PackedBlob, 0, len(handles)) + for _, h := range handles { found := false - for _, e := range repo.blobs[blob.ID] { + for _, e := range repo.blobs[h.ID] { if packID == e.PackID { + entries = append(entries, e) found = true - buf := repo.packsIDToData[packID][e.Offset : e.Offset+e.Length] - err := handleBlobFn(e.BlobHandle, buf, nil) - if err != nil { - return err - } + break } } if !found { - return fmt.Errorf("missing blob: %v", blob) + return fmt.Errorf("missing blob: %v", h) + } + } + slices.SortFunc(entries, func(a, b restic.PackedBlob) int { + return cmp.Compare(a.Offset, b.Offset) + }) + + for _, e := range entries { + buf := repo.packsIDToData[packID][e.Offset : e.Offset+e.Length] + err := handleBlobFn(e.BlobHandle, buf, nil) + if err != nil { + return err } } return nil @@ -313,7 +321,7 @@ func TestErrorRestoreFiles(t *testing.T) { loadError := errors.New("load error") // loader always returns an error - repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + repo.loader = func(ctx context.Context, packID restic.ID, handles []restic.BlobHandle, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { return loadError } @@ -346,9 +354,9 @@ func TestFatalDownloadError(t *testing.T) { repo := newTestRepo(content) loader := repo.loader - repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + repo.loader = func(ctx context.Context, packID restic.ID, handles []restic.BlobHandle, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { ctr := 0 - return loader(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + return loader(ctx, packID, handles, func(blob restic.BlobHandle, buf []byte, err error) error { if ctr < 2 { ctr++ return handleBlobFn(blob, buf, err)