mirror of
https://github.com/restic/restic.git
synced 2026-06-05 08:29:44 +00:00
Merge pull request #4644 from MichaelEischer/refactor-repair-packs
Refactor and test `repair packs`
This commit is contained in:
@@ -12,7 +12,6 @@ import (
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
@@ -25,7 +24,6 @@ import (
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/textfile"
|
||||
"github.com/restic/restic/internal/ui"
|
||||
"github.com/restic/restic/internal/ui/backup"
|
||||
"github.com/restic/restic/internal/ui/termstatus"
|
||||
)
|
||||
@@ -56,31 +54,9 @@ Exit status is 3 if some source data could not be read (incomplete snapshot crea
|
||||
},
|
||||
DisableAutoGenTag: true,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
ctx := cmd.Context()
|
||||
var wg sync.WaitGroup
|
||||
cancelCtx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
// shutdown termstatus
|
||||
cancel()
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
term := termstatus.New(globalOptions.stdout, globalOptions.stderr, globalOptions.Quiet)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
term.Run(cancelCtx)
|
||||
}()
|
||||
|
||||
// use the terminal for stdout/stderr
|
||||
prevStdout, prevStderr := globalOptions.stdout, globalOptions.stderr
|
||||
defer func() {
|
||||
globalOptions.stdout, globalOptions.stderr = prevStdout, prevStderr
|
||||
}()
|
||||
stdioWrapper := ui.NewStdioWrapper(term)
|
||||
globalOptions.stdout, globalOptions.stderr = stdioWrapper.Stdout(), stdioWrapper.Stderr()
|
||||
|
||||
return runBackup(ctx, backupOptions, globalOptions, term, args)
|
||||
term, cancel := setupTermstatus()
|
||||
defer cancel()
|
||||
return runBackup(cmd.Context(), backupOptions, globalOptions, term, args)
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
+16
-16
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/ui"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
@@ -766,7 +767,7 @@ func doPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo r
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
} else if len(plan.ignorePacks) != 0 {
|
||||
err = rebuildIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil)
|
||||
err = rebuildIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil, false)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
@@ -778,7 +779,7 @@ func doPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo r
|
||||
}
|
||||
|
||||
if opts.unsafeRecovery {
|
||||
_, err = writeIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil)
|
||||
err = rebuildIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil, true)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
@@ -788,23 +789,22 @@ func doPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo r
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs) (restic.IDSet, error) {
|
||||
func rebuildIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool) error {
|
||||
Verbosef("rebuilding index\n")
|
||||
|
||||
bar := newProgressMax(!gopts.Quiet, 0, "packs processed")
|
||||
obsoleteIndexes, err := repo.Index().Save(ctx, repo, removePacks, extraObsolete, bar)
|
||||
bar.Done()
|
||||
return obsoleteIndexes, err
|
||||
}
|
||||
|
||||
func rebuildIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs) error {
|
||||
obsoleteIndexes, err := writeIndexFiles(ctx, gopts, repo, removePacks, extraObsolete)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Verbosef("deleting obsolete index files\n")
|
||||
return DeleteFilesChecked(ctx, gopts, repo, obsoleteIndexes, restic.IndexFile)
|
||||
return repo.Index().Save(ctx, repo, removePacks, extraObsolete, restic.MasterIndexSaveOpts{
|
||||
SaveProgress: bar,
|
||||
DeleteProgress: func() *progress.Counter {
|
||||
return newProgressMax(!gopts.Quiet, 0, "old indexes deleted")
|
||||
},
|
||||
DeleteReport: func(id restic.ID, err error) {
|
||||
if gopts.verbosity > 2 {
|
||||
Verbosef("removed index %v\n", id.String())
|
||||
}
|
||||
},
|
||||
SkipDeletion: skipDeletion,
|
||||
})
|
||||
}
|
||||
|
||||
func getUsedBlobs(ctx context.Context, repo restic.Repository, ignoreSnapshots restic.IDSet, quiet bool) (usedBlobs restic.CountedBlobSet, err error) {
|
||||
|
||||
@@ -154,7 +154,7 @@ func rebuildIndex(ctx context.Context, opts RepairIndexOptions, gopts GlobalOpti
|
||||
}
|
||||
}
|
||||
|
||||
err = rebuildIndexFiles(ctx, gopts, repo, removePacks, obsoleteIndexes)
|
||||
err = rebuildIndexFiles(ctx, gopts, repo, removePacks, obsoleteIndexes, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -9,8 +9,8 @@ import (
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/ui/termstatus"
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var cmdRepairPacks = &cobra.Command{
|
||||
@@ -29,7 +29,9 @@ Exit status is 0 if the command was successful, and non-zero if there was any er
|
||||
`,
|
||||
DisableAutoGenTag: true,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return runRepairPacks(cmd.Context(), globalOptions, args)
|
||||
term, cancel := setupTermstatus()
|
||||
defer cancel()
|
||||
return runRepairPacks(cmd.Context(), globalOptions, term, args)
|
||||
},
|
||||
}
|
||||
|
||||
@@ -37,7 +39,7 @@ func init() {
|
||||
cmdRepair.AddCommand(cmdRepairPacks)
|
||||
}
|
||||
|
||||
func runRepairPacks(ctx context.Context, gopts GlobalOptions, args []string) error {
|
||||
func runRepairPacks(ctx context.Context, gopts GlobalOptions, term *termstatus.Terminal, args []string) error {
|
||||
// FIXME discuss and add proper feature flag mechanism
|
||||
flag, _ := os.LookupEnv("RESTIC_FEATURES")
|
||||
if flag != "repair-packs-v1" {
|
||||
@@ -68,21 +70,19 @@ func runRepairPacks(ctx context.Context, gopts GlobalOptions, args []string) err
|
||||
return err
|
||||
}
|
||||
|
||||
return repairPacks(ctx, gopts, repo, ids)
|
||||
}
|
||||
|
||||
func repairPacks(ctx context.Context, gopts GlobalOptions, repo *repository.Repository, ids restic.IDSet) error {
|
||||
bar := newIndexProgress(gopts.Quiet, gopts.JSON)
|
||||
err := repo.LoadIndex(ctx, bar)
|
||||
err = repo.LoadIndex(ctx, bar)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
|
||||
Warnf("saving backup copies of pack files in current folder\n")
|
||||
printer := newTerminalProgressPrinter(gopts.verbosity, term)
|
||||
|
||||
printer.P("saving backup copies of pack files to current folder")
|
||||
for id := range ids {
|
||||
f, err := os.OpenFile("pack-"+id.String(), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o666)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = repo.Backend().Load(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()}, 0, 0, func(rd io.Reader) error {
|
||||
@@ -94,66 +94,15 @@ func repairPacks(ctx context.Context, gopts GlobalOptions, repo *repository.Repo
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
wg, wgCtx := errgroup.WithContext(ctx)
|
||||
repo.StartPackUploader(wgCtx, wg)
|
||||
repo.DisableAutoIndexUpdate()
|
||||
|
||||
Warnf("salvaging intact data from specified pack files\n")
|
||||
bar = newProgressMax(!gopts.Quiet, uint64(len(ids)), "pack files")
|
||||
defer bar.Done()
|
||||
|
||||
wg.Go(func() error {
|
||||
// examine all data the indexes have for the pack file
|
||||
for b := range repo.Index().ListPacks(wgCtx, ids) {
|
||||
blobs := b.Blobs
|
||||
if len(blobs) == 0 {
|
||||
Warnf("no blobs found for pack %v\n", b.PackID)
|
||||
bar.Add(1)
|
||||
continue
|
||||
}
|
||||
|
||||
err = repo.LoadBlobsFromPack(wgCtx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
if err != nil {
|
||||
// Fallback path
|
||||
buf, err = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
|
||||
if err != nil {
|
||||
Warnf("failed to load blob %v: %v\n", blob.ID, err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
id, _, _, err := repo.SaveBlob(wgCtx, blob.Type, buf, restic.ID{}, true)
|
||||
if !id.Equal(blob.ID) {
|
||||
panic("pack id mismatch during upload")
|
||||
}
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bar.Add(1)
|
||||
}
|
||||
return repo.Flush(wgCtx)
|
||||
})
|
||||
|
||||
if err := wg.Wait(); err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
bar.Done()
|
||||
|
||||
// remove salvaged packs from index
|
||||
err = rebuildIndexFiles(ctx, gopts, repo, ids, nil)
|
||||
err = repository.RepairPacks(ctx, repo, ids, printer)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
|
||||
// cleanup
|
||||
Warnf("removing salvaged pack files\n")
|
||||
DeleteFiles(ctx, gopts, repo, ids, restic.PackFile)
|
||||
|
||||
Warnf("\nUse `restic repair snapshots --forget` to remove the corrupted data blobs from all snapshots\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
@@ -38,31 +37,9 @@ Exit status is 0 if the command was successful, and non-zero if there was any er
|
||||
`,
|
||||
DisableAutoGenTag: true,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
ctx := cmd.Context()
|
||||
var wg sync.WaitGroup
|
||||
cancelCtx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
// shutdown termstatus
|
||||
cancel()
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
term := termstatus.New(globalOptions.stdout, globalOptions.stderr, globalOptions.Quiet)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
term.Run(cancelCtx)
|
||||
}()
|
||||
|
||||
// allow usage of warnf / verbosef
|
||||
prevStdout, prevStderr := globalOptions.stdout, globalOptions.stderr
|
||||
defer func() {
|
||||
globalOptions.stdout, globalOptions.stderr = prevStdout, prevStderr
|
||||
}()
|
||||
stdioWrapper := ui.NewStdioWrapper(term)
|
||||
globalOptions.stdout, globalOptions.stderr = stdioWrapper.Stdout(), stdioWrapper.Stderr()
|
||||
|
||||
return runRestore(ctx, restoreOptions, globalOptions, term, args)
|
||||
term, cancel := setupTermstatus()
|
||||
defer cancel()
|
||||
return runRestore(cmd.Context(), restoreOptions, globalOptions, term, args)
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
+14
-42
@@ -3,9 +3,6 @@ package main
|
||||
import (
|
||||
"context"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
@@ -24,46 +21,21 @@ func DeleteFilesChecked(ctx context.Context, gopts GlobalOptions, repo restic.Re
|
||||
// deleteFiles deletes the given fileList of fileType in parallel
|
||||
// if ignoreError=true, it will print a warning if there was an error, else it will abort.
|
||||
func deleteFiles(ctx context.Context, gopts GlobalOptions, ignoreError bool, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType) error {
|
||||
totalCount := len(fileList)
|
||||
fileChan := make(chan restic.ID)
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
wg.Go(func() error {
|
||||
defer close(fileChan)
|
||||
for id := range fileList {
|
||||
select {
|
||||
case fileChan <- id:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
bar := newProgressMax(!gopts.JSON && !gopts.Quiet, 0, "files deleted")
|
||||
defer bar.Done()
|
||||
|
||||
return restic.ParallelRemove(ctx, repo, fileList, fileType, func(id restic.ID, err error) error {
|
||||
if err != nil {
|
||||
if !gopts.JSON {
|
||||
Warnf("unable to remove %v/%v from the repository\n", fileType, id)
|
||||
}
|
||||
if !ignoreError {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if !gopts.JSON && gopts.verbosity > 2 {
|
||||
Verbosef("removed %v/%v\n", fileType, id)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
bar := newProgressMax(!gopts.JSON && !gopts.Quiet, uint64(totalCount), "files deleted")
|
||||
defer bar.Done()
|
||||
// deleting files is IO-bound
|
||||
workerCount := repo.Connections()
|
||||
for i := 0; i < int(workerCount); i++ {
|
||||
wg.Go(func() error {
|
||||
for id := range fileChan {
|
||||
h := backend.Handle{Type: fileType, Name: id.String()}
|
||||
err := repo.Backend().Remove(ctx, h)
|
||||
if err != nil {
|
||||
if !gopts.JSON {
|
||||
Warnf("unable to remove %v from the repository\n", h)
|
||||
}
|
||||
if !ignoreError {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if !gopts.JSON && gopts.verbosity > 2 {
|
||||
Verbosef("removed %v\n", h)
|
||||
}
|
||||
bar.Add(1)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
err := wg.Wait()
|
||||
return err
|
||||
}, bar)
|
||||
}
|
||||
|
||||
+31
-8
@@ -30,7 +30,7 @@ func calculateProgressInterval(show bool, json bool) time.Duration {
|
||||
}
|
||||
|
||||
// newTerminalProgressMax returns a progress.Counter that prints to stdout or terminal if provided.
|
||||
func newGenericProgressMax(show bool, max uint64, description string, print func(status string)) *progress.Counter {
|
||||
func newGenericProgressMax(show bool, max uint64, description string, print func(status string, final bool)) *progress.Counter {
|
||||
if !show {
|
||||
return nil
|
||||
}
|
||||
@@ -46,16 +46,18 @@ func newGenericProgressMax(show bool, max uint64, description string, print func
|
||||
ui.FormatDuration(d), ui.FormatPercent(v, max), v, max, description)
|
||||
}
|
||||
|
||||
print(status)
|
||||
if final {
|
||||
fmt.Print("\n")
|
||||
}
|
||||
print(status, final)
|
||||
})
|
||||
}
|
||||
|
||||
func newTerminalProgressMax(show bool, max uint64, description string, term *termstatus.Terminal) *progress.Counter {
|
||||
return newGenericProgressMax(show, max, description, func(status string) {
|
||||
term.SetStatus([]string{status})
|
||||
return newGenericProgressMax(show, max, description, func(status string, final bool) {
|
||||
if final {
|
||||
term.SetStatus([]string{})
|
||||
term.Print(status)
|
||||
} else {
|
||||
term.SetStatus([]string{status})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -64,7 +66,7 @@ func newProgressMax(show bool, max uint64, description string) *progress.Counter
|
||||
return newGenericProgressMax(show, max, description, printProgress)
|
||||
}
|
||||
|
||||
func printProgress(status string) {
|
||||
func printProgress(status string, final bool) {
|
||||
|
||||
canUpdateStatus := stdoutCanUpdateStatus()
|
||||
|
||||
@@ -95,6 +97,9 @@ func printProgress(status string) {
|
||||
}
|
||||
|
||||
_, _ = os.Stdout.Write([]byte(clear + status + carriageControl))
|
||||
if final {
|
||||
_, _ = os.Stdout.Write([]byte("\n"))
|
||||
}
|
||||
}
|
||||
|
||||
func newIndexProgress(quiet bool, json bool) *progress.Counter {
|
||||
@@ -104,3 +109,21 @@ func newIndexProgress(quiet bool, json bool) *progress.Counter {
|
||||
func newIndexTerminalProgress(quiet bool, json bool, term *termstatus.Terminal) *progress.Counter {
|
||||
return newTerminalProgressMax(!quiet && !json && stdoutIsTerminal(), 0, "index files loaded", term)
|
||||
}
|
||||
|
||||
type terminalProgressPrinter struct {
|
||||
term *termstatus.Terminal
|
||||
ui.Message
|
||||
show bool
|
||||
}
|
||||
|
||||
func (t *terminalProgressPrinter) NewCounter(description string) *progress.Counter {
|
||||
return newTerminalProgressMax(t.show, 0, description, t.term)
|
||||
}
|
||||
|
||||
func newTerminalProgressPrinter(verbosity uint, term *termstatus.Terminal) progress.Printer {
|
||||
return &terminalProgressPrinter{
|
||||
term: term,
|
||||
Message: *ui.NewMessage(term, verbosity),
|
||||
show: verbosity > 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/ui"
|
||||
"github.com/restic/restic/internal/ui/termstatus"
|
||||
)
|
||||
|
||||
// setupTermstatus creates a new termstatus and reroutes globalOptions.{stdout,stderr} to it
|
||||
// The returned function must be called to shut down the termstatus,
|
||||
//
|
||||
// Expected usage:
|
||||
// ```
|
||||
// term, cancel := setupTermstatus()
|
||||
// defer cancel()
|
||||
// // do stuff
|
||||
// ```
|
||||
func setupTermstatus() (*termstatus.Terminal, func()) {
|
||||
var wg sync.WaitGroup
|
||||
// only shutdown once cancel is called to ensure that no output is lost
|
||||
cancelCtx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
term := termstatus.New(globalOptions.stdout, globalOptions.stderr, globalOptions.Quiet)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
term.Run(cancelCtx)
|
||||
}()
|
||||
|
||||
// use the termstatus for stdout/stderr
|
||||
prevStdout, prevStderr := globalOptions.stdout, globalOptions.stderr
|
||||
stdioWrapper := ui.NewStdioWrapper(term)
|
||||
globalOptions.stdout, globalOptions.stderr = stdioWrapper.Stdout(), stdioWrapper.Stderr()
|
||||
|
||||
return term, func() {
|
||||
// shutdown termstatus
|
||||
globalOptions.stdout, globalOptions.stderr = prevStdout, prevStderr
|
||||
cancel()
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user