diff --git a/go.mod b/go.mod index 00e1171de..9ec797e46 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/pkg/profile v1.7.0 github.com/pkg/sftp v1.13.10 github.com/pkg/xattr v0.4.12 - github.com/restic/chunker v0.4.0 + github.com/restic/chunker v0.5.0 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 go.uber.org/automaxprocs v1.6.0 diff --git a/go.sum b/go.sum index aa573603e..299a7743f 100644 --- a/go.sum +++ b/go.sum @@ -184,8 +184,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/restic/chunker v0.4.0 h1:YUPYCUn70MYP7VO4yllypp2SjmsRhRJaad3xKu1QFRw= -github.com/restic/chunker v0.4.0/go.mod h1:z0cH2BejpW636LXw0R/BGyv+Ey8+m9QGiOanDHItzyw= +github.com/restic/chunker v0.5.0 h1:1y+ut0MBduzxODJ298rhQCtESoEpj8v1hTydZlKaE1Y= +github.com/restic/chunker v0.5.0/go.mod h1:z0cH2BejpW636LXw0R/BGyv+Ey8+m9QGiOanDHItzyw= github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 25d90a572..1aec70785 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -77,7 +77,7 @@ type archiverRepo interface { restic.WithBlobUploader restic.SaverUnpacked[restic.WriteableFileType] - Config() restic.Config + ChunkerFactory() restic.ChunkerFactory } // Archiver saves a directory structure to the repo. @@ -864,7 +864,7 @@ func (arch *Archiver) loadParentTree(ctx context.Context, sn *data.Snapshot) dat func (arch *Archiver) runWorkers(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync) { arch.fileSaver = newFileSaver(ctx, wg, uploader, - arch.Repo.Config().ChunkerPolynomial, + arch.Repo.ChunkerFactory(), arch.Options.ReadConcurrency) arch.fileSaver.CompleteBlob = arch.CompleteBlob arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 313f1d19b..48b308e00 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -6,7 +6,6 @@ import ( "io" "sync" - "github.com/restic/chunker" "github.com/restic/restic/internal/data" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" @@ -15,12 +14,14 @@ import ( "golang.org/x/sync/errgroup" ) +const chunkReadBufSize = 512 * 1024 // matches chunker internal read buffer size + // fileSaver concurrently saves incoming files to the repo. type fileSaver struct { saveFilePool *bufferPool uploader restic.BlobSaverAsync - pol chunker.Pol + chunkerFactory restic.ChunkerFactory ch chan<- saveFileJob @@ -31,15 +32,15 @@ type fileSaver struct { // newFileSaver returns a new file saver. A worker pool with fileWorkers is // started, it is stopped when ctx is cancelled. -func newFileSaver(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync, pol chunker.Pol, fileWorkers uint) *fileSaver { +func newFileSaver(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync, chunkerFactory restic.ChunkerFactory, fileWorkers uint) *fileSaver { ch := make(chan saveFileJob) debug.Log("new file saver with %v file workers", fileWorkers) s := &fileSaver{ - uploader: uploader, - saveFilePool: newBufferPool(chunker.MaxSize), - pol: pol, - ch: ch, + uploader: uploader, + saveFilePool: newBufferPool(chunkerFactory.MaxChunkSize()), + chunkerFactory: chunkerFactory, + ch: ch, CompleteBlob: func(uint64) {}, } @@ -100,8 +101,63 @@ type saveFileJob struct { complete fileCompleteFunc } +type fileChunkState struct { + readBuf []byte + bpos uint + bmax uint + closed bool +} + +func (s *fileChunkState) reset() { + s.bpos = 0 + s.bmax = 0 + s.closed = false +} + +// readNextChunk reads from rd and returns the next chunk of data. io.EOF is +// returned when all chunks have been read. +func (s *fileChunkState) readNextChunk(rd io.Reader, chnker restic.Chunker, data []byte) ([]byte, error) { + data = data[:0] + for { + if s.bpos >= s.bmax { + n, err := io.ReadFull(rd, s.readBuf) + + if err == io.ErrUnexpectedEOF { + err = nil + } + + // io.EOF only happens when the end of the file has been reached. + // If this is the case, we need to return the data we have read so far. + if err == io.EOF && !s.closed { + s.closed = true + + if len(data) > 0 { + return data, nil + } + } + + if err != nil { + return nil, err + } + + s.bpos = 0 + s.bmax = uint(n) + } + + split := chnker.NextSplitPoint(s.readBuf[s.bpos:s.bmax]) + if split == -1 { + data = append(data, s.readBuf[s.bpos:s.bmax]...) + s.bpos = s.bmax + } else { + data = append(data, s.readBuf[s.bpos:s.bpos+uint(split)]...) + s.bpos += uint(split) + return data, nil + } + } +} + // saveFile stores the file f in the repo, then closes it. -func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, start func(), finishReading func(), finish func(res futureNodeResult)) { +func (s *fileSaver) saveFile(ctx context.Context, chnker restic.Chunker, chunkState *fileChunkState, snPath string, target string, f fs.File, start func(), finishReading func(), finish func(res futureNodeResult)) { start() fnr := futureNodeResult{ @@ -161,15 +217,15 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat return } - // reuse the chunker - chnker.Reset(f, s.pol) + chnker.Reset() + chunkState.reset() node.Content = []restic.ID{} node.Size = 0 var idx int for { buf := s.saveFilePool.Get() - chunk, err := chnker.Next(buf.Data) + chunkData, err := chunkState.readNextChunk(f, chnker, buf.Data) if err == io.EOF { buf.Release() break @@ -181,8 +237,9 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat return } - buf.Data = chunk.Data - node.Size += uint64(chunk.Length) + // put result buffer back for later reuse + buf.Data = chunkData + node.Size += uint64(len(chunkData)) // test if the context has been cancelled, return the error if ctx.Err() != nil { @@ -199,7 +256,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat node.Content = append(node.Content, restic.ID{}) lock.Unlock() - s.uploader.SaveBlobAsync(ctx, restic.DataBlob, buf.Data, restic.ID{}, false, func(newID restic.ID, known bool, sizeInRepo int, err error) { + s.uploader.SaveBlobAsync(ctx, restic.DataBlob, chunkData, restic.ID{}, false, func(newID restic.ID, known bool, sizeInRepo int, err error) { defer buf.Release() if err != nil { completeError(err) @@ -209,7 +266,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat lock.Lock() if !known { fnr.stats.DataBlobs++ - fnr.stats.DataSize += uint64(len(buf.Data)) + fnr.stats.DataSize += uint64(len(chunkData)) fnr.stats.DataSizeInRepo += uint64(sizeInRepo) } node.Content[pos] = newID @@ -226,7 +283,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat return } - s.CompleteBlob(uint64(len(chunk.Data))) + s.CompleteBlob(uint64(len(chunkData))) } err = f.Close() @@ -246,8 +303,8 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat } func (s *fileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { - // a worker has one chunker which is reused for each file (because it contains a rather large buffer) - chnker := chunker.New(nil, s.pol) + chnker := s.chunkerFactory.NewChunker() + chunkState := &fileChunkState{readBuf: make([]byte, chunkReadBufSize)} for { var job saveFileJob @@ -261,7 +318,7 @@ func (s *fileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { } } - s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.start, func() { + s.saveFile(ctx, chnker, chunkState, job.snPath, job.target, job.file, job.start, func() { if job.completeReading != nil { job.completeReading() } diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index 1af322be0..9be2f0d95 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -8,9 +8,9 @@ import ( "runtime" "testing" - "github.com/restic/chunker" "github.com/restic/restic/internal/data" "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/test" "golang.org/x/sync/errgroup" ) @@ -34,13 +34,10 @@ func startFileSaver(ctx context.Context, t testing.TB, _ fs.FS) (*fileSaver, *mo wg, ctx := errgroup.WithContext(ctx) workers := uint(runtime.NumCPU()) - pol, err := chunker.RandomPolynomial() - if err != nil { - t.Fatal(err) - } + chunkerFactory := repository.TestRepository(t).ChunkerFactory() saver := &mockSaver{saved: make(map[string]int)} - s := newFileSaver(ctx, wg, saver, pol, workers) + s := newFileSaver(ctx, wg, saver, chunkerFactory, workers) s.NodeFromFileInfo = func(snPath, filename string, meta toNoder, ignoreXattrListError bool) (*data.Node, error) { return meta.ToNode(ignoreXattrListError, t.Logf) } diff --git a/internal/repository/chunker.go b/internal/repository/chunker.go new file mode 100644 index 000000000..b13d9c03d --- /dev/null +++ b/internal/repository/chunker.go @@ -0,0 +1,47 @@ +package repository + +import ( + "github.com/restic/chunker" + "github.com/restic/restic/internal/restic" +) + +type baseChunker struct { + bc *chunker.BaseChunker + pol chunker.Pol +} + +func (c *baseChunker) Reset() { + c.bc.Reset(c.pol) +} + +func (c *baseChunker) NextSplitPoint(buf []byte) int { + return c.bc.NextSplitPoint(buf) +} + +type chunkerFactory struct { + pol chunker.Pol + zeroChunk func() restic.ID +} + +func newChunkerFactory(r *Repository) *chunkerFactory { + return &chunkerFactory{ + pol: r.Config().ChunkerPolynomial, + zeroChunk: r.zeroChunk, + } +} + +func (f *chunkerFactory) NewChunker() restic.Chunker { + return &baseChunker{bc: chunker.NewBase(f.pol), pol: f.pol} +} + +func (f *chunkerFactory) MaxChunkSize() int { + return chunker.MaxSize +} + +func (f *chunkerFactory) ZeroChunk() restic.ID { + return f.zeroChunk() +} + +func (r *Repository) ChunkerFactory() restic.ChunkerFactory { + return newChunkerFactory(r) +} diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 889c76049..e52c01f11 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -51,6 +51,9 @@ type Repository struct { allocDec sync.Once enc *zstd.Encoder dec *zstd.Decoder + + zeroChunkOnce sync.Once + zeroChunkID restic.ID } // internalRepository allows using SaveUnpacked and RemoveUnpacked with all FileTypes @@ -1023,7 +1026,7 @@ func (r *Repository) saveBlob(ctx context.Context, t restic.BlobType, buf []byte // useful for sparse files containing large all zero regions. For these we can // process chunks as fast as we can read the from disk. if len(buf) == chunker.MinSize && restic.ZeroPrefixLen(buf) == chunker.MinSize { - newID = ZeroChunk() + newID = r.zeroChunk() } else { newID = restic.Hash(buf) } @@ -1340,13 +1343,9 @@ func (b *packBlobIterator) Next() (packBlobValue, error) { return packBlobValue{entry.BlobHandle, plaintext, err}, nil } -var zeroChunkOnce sync.Once -var zeroChunkID restic.ID - -// ZeroChunk computes and returns (cached) the ID of an all-zero chunk with size chunker.MinSize -func ZeroChunk() restic.ID { - zeroChunkOnce.Do(func() { - zeroChunkID = restic.Hash(make([]byte, chunker.MinSize)) +func (r *Repository) zeroChunk() restic.ID { + r.zeroChunkOnce.Do(func() { + r.zeroChunkID = restic.Hash(make([]byte, chunker.MinSize)) }) - return zeroChunkID + return r.zeroChunkID } diff --git a/internal/restic/chunker.go b/internal/restic/chunker.go new file mode 100644 index 000000000..48094355f --- /dev/null +++ b/internal/restic/chunker.go @@ -0,0 +1,22 @@ +package restic + +// Chunker splits file content into variable-length chunks. +// Implementations are created by ChunkerFactory and reused across files via Reset. +type Chunker interface { + // Reset reinitializes the chunker for a new file. + Reset() + // NextSplitPoint scans buf for a chunk boundary. + // Returns index before which to split buf, or -1 if no boundary found in this buffer. + // This operation is stateful. All buffers passed to it until a split point is found + // then form a single chunk. + NextSplitPoint(buf []byte) int +} + +// ChunkerFactory creates chunkers configured for a specific repository. +type ChunkerFactory interface { + NewChunker() Chunker + // MaxChunkSize is the maximum size of a single chunk (used for output buffer pools). + MaxChunkSize() int + // ZeroChunk returns the ID of an all-zero chunk with minimum chunk size. + ZeroChunk() ID +} diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 8cd408202..07f6c8a50 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -17,6 +17,7 @@ type Repository interface { Connections() uint Config() Config PackSize() uint + ChunkerFactory() ChunkerFactory LoadIndex(ctx context.Context, p TerminalCounterFactory) error diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 9bbe8fa39..76b2d31c7 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -11,7 +11,6 @@ import ( "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/feature" - "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" ) @@ -72,7 +71,8 @@ func newFileRestorer(dst string, sparse bool, allowRecursiveDelete bool, startWarmup startWarmupFn, - progress ProgressReporter) *fileRestorer { + progress ProgressReporter, + zeroChunk restic.ID) *fileRestorer { // as packs are streamed the concurrency is limited by IO workerCount := int(connections) @@ -82,7 +82,7 @@ func newFileRestorer(dst string, blobsLoader: blobsLoader, startWarmup: startWarmup, filesWriter: newFilesWriter(workerCount, allowRecursiveDelete), - zeroChunk: repository.ZeroChunk(), + zeroChunk: zeroChunk, sparse: sparse, progress: progressOrNoop(progress), allowRecursiveDelete: allowRecursiveDelete, diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index 5a423ea2a..8526fc608 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -12,6 +12,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/feature" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" ) @@ -208,7 +209,8 @@ func restoreAndVerify(t *testing.T, tempdir string, content []TestFile, files ma t.Helper() repo := newTestRepo(content) - r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, sparse, false, repo.StartWarmup, nil) + r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, sparse, false, repo.StartWarmup, nil, + repository.TestRepository(t).ChunkerFactory().ZeroChunk()) if files == nil { r.files = repo.files @@ -358,7 +360,8 @@ func TestErrorRestoreFiles(t *testing.T) { return loadError } - r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, repo.StartWarmup, nil) + r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, repo.StartWarmup, nil, + repository.TestRepository(t).ChunkerFactory().ZeroChunk()) r.files = repo.files err := r.restoreFiles(context.TODO()) @@ -399,7 +402,8 @@ func TestFatalDownloadError(t *testing.T) { }) } - r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, repo.StartWarmup, nil) + r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, repo.StartWarmup, nil, + repository.TestRepository(t).ChunkerFactory().ZeroChunk()) r.files = repo.files var errors []string diff --git a/internal/restorer/restorer.go b/internal/restorer/restorer.go index 51555a6b2..98b28f915 100644 --- a/internal/restorer/restorer.go +++ b/internal/restorer/restorer.go @@ -362,7 +362,8 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) (uint64, error) idx := data.NewHardlinkIndex[string]() filerestorer := newFileRestorer(dst, res.repo.LoadBlobsFromPack, res.repo.LookupBlob, - res.repo.Connections(), res.opts.Sparse, res.opts.Delete, res.repo.StartWarmup, res.opts.Progress) + res.repo.Connections(), res.opts.Sparse, res.opts.Delete, res.repo.StartWarmup, res.opts.Progress, + res.repo.ChunkerFactory().ZeroChunk()) filerestorer.Error = res.Error filerestorer.Info = res.Info