diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 25d90a572..1aec70785 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -77,7 +77,7 @@ type archiverRepo interface { restic.WithBlobUploader restic.SaverUnpacked[restic.WriteableFileType] - Config() restic.Config + ChunkerFactory() restic.ChunkerFactory } // Archiver saves a directory structure to the repo. @@ -864,7 +864,7 @@ func (arch *Archiver) loadParentTree(ctx context.Context, sn *data.Snapshot) dat func (arch *Archiver) runWorkers(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync) { arch.fileSaver = newFileSaver(ctx, wg, uploader, - arch.Repo.Config().ChunkerPolynomial, + arch.Repo.ChunkerFactory(), arch.Options.ReadConcurrency) arch.fileSaver.CompleteBlob = arch.CompleteBlob arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index bd33303a4..48b308e00 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -6,7 +6,6 @@ import ( "io" "sync" - "github.com/restic/chunker" "github.com/restic/restic/internal/data" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" @@ -22,7 +21,7 @@ type fileSaver struct { saveFilePool *bufferPool uploader restic.BlobSaverAsync - pol chunker.Pol + chunkerFactory restic.ChunkerFactory ch chan<- saveFileJob @@ -33,15 +32,15 @@ type fileSaver struct { // newFileSaver returns a new file saver. A worker pool with fileWorkers is // started, it is stopped when ctx is cancelled. -func newFileSaver(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync, pol chunker.Pol, fileWorkers uint) *fileSaver { +func newFileSaver(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync, chunkerFactory restic.ChunkerFactory, fileWorkers uint) *fileSaver { ch := make(chan saveFileJob) debug.Log("new file saver with %v file workers", fileWorkers) s := &fileSaver{ - uploader: uploader, - saveFilePool: newBufferPool(chunker.MaxSize), - pol: pol, - ch: ch, + uploader: uploader, + saveFilePool: newBufferPool(chunkerFactory.MaxChunkSize()), + chunkerFactory: chunkerFactory, + ch: ch, CompleteBlob: func(uint64) {}, } @@ -117,7 +116,7 @@ func (s *fileChunkState) reset() { // readNextChunk reads from rd and returns the next chunk of data. io.EOF is // returned when all chunks have been read. -func (s *fileChunkState) readNextChunk(rd io.Reader, chnker *chunker.BaseChunker, data []byte) ([]byte, error) { +func (s *fileChunkState) readNextChunk(rd io.Reader, chnker restic.Chunker, data []byte) ([]byte, error) { data = data[:0] for { if s.bpos >= s.bmax { @@ -158,7 +157,7 @@ func (s *fileChunkState) readNextChunk(rd io.Reader, chnker *chunker.BaseChunker } // saveFile stores the file f in the repo, then closes it. -func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.BaseChunker, chunkState *fileChunkState, snPath string, target string, f fs.File, start func(), finishReading func(), finish func(res futureNodeResult)) { +func (s *fileSaver) saveFile(ctx context.Context, chnker restic.Chunker, chunkState *fileChunkState, snPath string, target string, f fs.File, start func(), finishReading func(), finish func(res futureNodeResult)) { start() fnr := futureNodeResult{ @@ -218,7 +217,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.BaseChunker, c return } - chnker.Reset(s.pol) + chnker.Reset() chunkState.reset() node.Content = []restic.ID{} @@ -304,7 +303,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.BaseChunker, c } func (s *fileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { - chnker := chunker.NewBase(s.pol) + chnker := s.chunkerFactory.NewChunker() chunkState := &fileChunkState{readBuf: make([]byte, chunkReadBufSize)} for { diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index 1af322be0..9be2f0d95 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -8,9 +8,9 @@ import ( "runtime" "testing" - "github.com/restic/chunker" "github.com/restic/restic/internal/data" "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/test" "golang.org/x/sync/errgroup" ) @@ -34,13 +34,10 @@ func startFileSaver(ctx context.Context, t testing.TB, _ fs.FS) (*fileSaver, *mo wg, ctx := errgroup.WithContext(ctx) workers := uint(runtime.NumCPU()) - pol, err := chunker.RandomPolynomial() - if err != nil { - t.Fatal(err) - } + chunkerFactory := repository.TestRepository(t).ChunkerFactory() saver := &mockSaver{saved: make(map[string]int)} - s := newFileSaver(ctx, wg, saver, pol, workers) + s := newFileSaver(ctx, wg, saver, chunkerFactory, workers) s.NodeFromFileInfo = func(snPath, filename string, meta toNoder, ignoreXattrListError bool) (*data.Node, error) { return meta.ToNode(ignoreXattrListError, t.Logf) } diff --git a/internal/repository/chunker.go b/internal/repository/chunker.go new file mode 100644 index 000000000..7debff313 --- /dev/null +++ b/internal/repository/chunker.go @@ -0,0 +1,39 @@ +package repository + +import ( + "github.com/restic/chunker" + "github.com/restic/restic/internal/restic" +) + +type baseChunker struct { + bc *chunker.BaseChunker + pol chunker.Pol +} + +func (c *baseChunker) Reset() { + c.bc.Reset(c.pol) +} + +func (c *baseChunker) NextSplitPoint(buf []byte) int { + return c.bc.NextSplitPoint(buf) +} + +type chunkerFactory struct { + pol chunker.Pol +} + +func newChunkerFactory(pol chunker.Pol) *chunkerFactory { + return &chunkerFactory{pol: pol} +} + +func (f *chunkerFactory) NewChunker() restic.Chunker { + return &baseChunker{bc: chunker.NewBase(f.pol), pol: f.pol} +} + +func (f *chunkerFactory) MaxChunkSize() int { + return chunker.MaxSize +} + +func (r *Repository) ChunkerFactory() restic.ChunkerFactory { + return newChunkerFactory(r.Config().ChunkerPolynomial) +} diff --git a/internal/restic/chunker.go b/internal/restic/chunker.go new file mode 100644 index 000000000..e1187f098 --- /dev/null +++ b/internal/restic/chunker.go @@ -0,0 +1,20 @@ +package restic + +// Chunker splits file content into variable-length chunks. +// Implementations are created by ChunkerFactory and reused across files via Reset. +type Chunker interface { + // Reset reinitializes the chunker for a new file. + Reset() + // NextSplitPoint scans buf for a chunk boundary. + // Returns index before which to split buf, or -1 if no boundary found in this buffer. + // This operation is stateful. All buffers passed to it until a split point is found + // then form a single chunk. + NextSplitPoint(buf []byte) int +} + +// ChunkerFactory creates chunkers configured for a specific repository. +type ChunkerFactory interface { + NewChunker() Chunker + // MaxChunkSize is the maximum size of a single chunk (used for output buffer pools). + MaxChunkSize() int +} diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 8cd408202..07f6c8a50 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -17,6 +17,7 @@ type Repository interface { Connections() uint Config() Config PackSize() uint + ChunkerFactory() ChunkerFactory LoadIndex(ctx context.Context, p TerminalCounterFactory) error