mirror of
https://github.com/restic/restic.git
synced 2026-06-06 08:59:44 +00:00
restic: switch LoadBlobsFromPack to BlobHandles
LoadBlobsFromPack now resolves the handles to Blobs. Repository internal code can still use the Blob-based method. The loader used in the filerestorer test now has to implement sorting the blobs by offset itself as it no longer has access to the repository-internal dataypes.
This commit is contained in:
@@ -106,7 +106,7 @@ func repack(
|
||||
|
||||
worker := func() error {
|
||||
for t := range downloadQueue {
|
||||
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
err := repo.loadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
if err != nil {
|
||||
// a required blob couldn't be retrieved
|
||||
return err
|
||||
|
||||
@@ -91,7 +91,7 @@ func resolveBlobsForPacks(ctx context.Context, repo *Repository, ids restic.IDSe
|
||||
}
|
||||
|
||||
func reuploadBlobsFromPack(ctx context.Context, repo *Repository, packID restic.ID, blobs restic.Blobs, printer progress.Printer, uploader restic.BlobSaverWithAsync) error {
|
||||
err := repo.LoadBlobsFromPack(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
err := repo.loadBlobsFromPack(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
if err != nil {
|
||||
printer.E("failed to load blob %v: %v", blob.ID, err)
|
||||
return nil
|
||||
|
||||
@@ -1047,7 +1047,33 @@ const maxUnusedRange = 1 * 1024 * 1024
|
||||
// handleBlobFn is called at most once for each blob. If the callback returns an error,
|
||||
// then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within
|
||||
// this specific call. The callback must not keep a reference to buf.
|
||||
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, handles []restic.BlobHandle, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
blobs, err := r.blobsInPack(packID, handles)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.loadBlobsFromPack(ctx, packID, blobs, handleBlobFn)
|
||||
}
|
||||
|
||||
func (r *Repository) blobsInPack(packID restic.ID, handles []restic.BlobHandle) (restic.Blobs, error) {
|
||||
blobs := make(restic.Blobs, 0, len(handles))
|
||||
for _, h := range handles {
|
||||
found := false
|
||||
for _, pb := range r.idx.Lookup(h) {
|
||||
if pb.PackID.Equal(packID) {
|
||||
blobs = append(blobs, pb.Blob)
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return nil, errors.Errorf("blob %v not found in pack %v", h, packID)
|
||||
}
|
||||
}
|
||||
return blobs, nil
|
||||
}
|
||||
|
||||
func (r *Repository) loadBlobsFromPack(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
return streamPack(ctx, r.be.Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn)
|
||||
}
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ type Repository interface {
|
||||
ListPack(ctx context.Context, id ID, packSize int64) (entries Blobs, err error)
|
||||
|
||||
LoadBlob(ctx context.Context, t BlobType, id ID, buf []byte) ([]byte, error)
|
||||
LoadBlobsFromPack(ctx context.Context, packID ID, blobs Blobs, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error
|
||||
LoadBlobsFromPack(ctx context.Context, packID ID, blobs []BlobHandle, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error
|
||||
|
||||
// WithUploader starts the necessary workers to upload new blobs. Once the callback returns,
|
||||
// the workers are stopped and the index is written to the repository. The callback must use
|
||||
|
||||
@@ -42,7 +42,7 @@ type packInfo struct {
|
||||
files map[*fileInfo]struct{} // set of files that use blobs from this pack
|
||||
}
|
||||
|
||||
type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error
|
||||
type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.BlobHandle, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error
|
||||
type startWarmupFn func(context.Context, restic.IDSet) (restic.WarmupJob, error)
|
||||
|
||||
// fileRestorer restores set of files
|
||||
@@ -261,14 +261,14 @@ func (r *fileRestorer) truncateFileToSize(location string, size int64) error {
|
||||
|
||||
type blobToFileOffsetsMapping map[restic.ID]struct {
|
||||
files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file
|
||||
blob restic.Blob
|
||||
blob restic.BlobHandle
|
||||
}
|
||||
|
||||
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
||||
// calculate blob->[]files->[]offsets mappings
|
||||
blobs := make(blobToFileOffsetsMapping)
|
||||
for file := range pack.files {
|
||||
addBlob := func(blob restic.Blob, fileOffset int64) {
|
||||
addBlob := func(blob restic.BlobHandle, fileOffset int64) {
|
||||
blobInfo, ok := blobs[blob.ID]
|
||||
if !ok {
|
||||
blobInfo.files = make(map[*fileInfo][]int64)
|
||||
@@ -280,7 +280,7 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
||||
if fileBlobs, ok := file.blobs.(restic.IDs); ok {
|
||||
err := r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob, idx int, fileOffset int64) {
|
||||
if packID.Equal(pack.id) && !file.state.HasMatchingBlob(idx) {
|
||||
addBlob(blob, fileOffset)
|
||||
addBlob(blob.BlobHandle, fileOffset)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
@@ -292,7 +292,7 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
||||
idxPacks := r.idx(restic.DataBlob, blob.id)
|
||||
for _, idxPack := range idxPacks {
|
||||
if idxPack.PackID.Equal(pack.id) {
|
||||
addBlob(idxPack.Blob, blob.offset)
|
||||
addBlob(idxPack.BlobHandle, blob.offset)
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -324,7 +324,7 @@ func (r *fileRestorer) reportError(blobs blobToFileOffsetsMapping, processedBlob
|
||||
// only report error for not yet processed blobs
|
||||
affectedFiles := make(map[*fileInfo]struct{})
|
||||
for _, entry := range blobs {
|
||||
if processedBlobs.Has(entry.blob.BlobHandle) {
|
||||
if processedBlobs.Has(entry.blob) {
|
||||
continue
|
||||
}
|
||||
for file := range entry.files {
|
||||
@@ -343,7 +343,7 @@ func (r *fileRestorer) reportError(blobs blobToFileOffsetsMapping, processedBlob
|
||||
func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID,
|
||||
blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet) error {
|
||||
|
||||
blobList := make(restic.Blobs, 0, len(blobs))
|
||||
blobList := make([]restic.BlobHandle, 0, len(blobs))
|
||||
for _, entry := range blobs {
|
||||
blobList = append(blobList, entry.blob)
|
||||
}
|
||||
|
||||
@@ -2,9 +2,11 @@ package restorer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"slices"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/errors"
|
||||
@@ -135,24 +137,30 @@ func newTestRepo(content []TestFile) *TestRepo {
|
||||
filesPathToContent: filesPathToContent,
|
||||
warmupJobs: []*TestWarmupJob{},
|
||||
}
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
blobs = append(restic.Blobs{}, blobs...)
|
||||
blobs.Sort()
|
||||
|
||||
for _, blob := range blobs {
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, handles []restic.BlobHandle, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
entries := make([]restic.PackedBlob, 0, len(handles))
|
||||
for _, h := range handles {
|
||||
found := false
|
||||
for _, e := range repo.blobs[blob.ID] {
|
||||
for _, e := range repo.blobs[h.ID] {
|
||||
if packID == e.PackID {
|
||||
entries = append(entries, e)
|
||||
found = true
|
||||
buf := repo.packsIDToData[packID][e.Offset : e.Offset+e.Length]
|
||||
err := handleBlobFn(e.BlobHandle, buf, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return fmt.Errorf("missing blob: %v", blob)
|
||||
return fmt.Errorf("missing blob: %v", h)
|
||||
}
|
||||
}
|
||||
slices.SortFunc(entries, func(a, b restic.PackedBlob) int {
|
||||
return cmp.Compare(a.Offset, b.Offset)
|
||||
})
|
||||
|
||||
for _, e := range entries {
|
||||
buf := repo.packsIDToData[packID][e.Offset : e.Offset+e.Length]
|
||||
err := handleBlobFn(e.BlobHandle, buf, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -313,7 +321,7 @@ func TestErrorRestoreFiles(t *testing.T) {
|
||||
|
||||
loadError := errors.New("load error")
|
||||
// loader always returns an error
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, handles []restic.BlobHandle, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
return loadError
|
||||
}
|
||||
|
||||
@@ -346,9 +354,9 @@ func TestFatalDownloadError(t *testing.T) {
|
||||
repo := newTestRepo(content)
|
||||
|
||||
loader := repo.loader
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, blobs restic.Blobs, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, handles []restic.BlobHandle, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
ctr := 0
|
||||
return loader(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
return loader(ctx, packID, handles, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
if ctr < 2 {
|
||||
ctr++
|
||||
return handleBlobFn(blob, buf, err)
|
||||
|
||||
Reference in New Issue
Block a user