From 70a45791b28007341a5d31506dab9c37e850d033 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 7 Jun 2026 22:43:42 +0200 Subject: [PATCH] archiver: switch file saver from Chunker to BaseChunker Update the chunker dependency. Move the chunk memory handling into the fileSaver. This minimizes the logic required in a chunker implementation. readNextChunk is a direct derivative of Chunker.Next. --- go.mod | 2 +- go.sum | 4 +- internal/archiver/file_saver.go | 82 ++++++++++++++++++++++++++++----- 3 files changed, 73 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 00e1171de..9ec797e46 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/pkg/profile v1.7.0 github.com/pkg/sftp v1.13.10 github.com/pkg/xattr v0.4.12 - github.com/restic/chunker v0.4.0 + github.com/restic/chunker v0.5.0 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 go.uber.org/automaxprocs v1.6.0 diff --git a/go.sum b/go.sum index aa573603e..299a7743f 100644 --- a/go.sum +++ b/go.sum @@ -184,8 +184,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/restic/chunker v0.4.0 h1:YUPYCUn70MYP7VO4yllypp2SjmsRhRJaad3xKu1QFRw= -github.com/restic/chunker v0.4.0/go.mod h1:z0cH2BejpW636LXw0R/BGyv+Ey8+m9QGiOanDHItzyw= +github.com/restic/chunker v0.5.0 h1:1y+ut0MBduzxODJ298rhQCtESoEpj8v1hTydZlKaE1Y= +github.com/restic/chunker v0.5.0/go.mod h1:z0cH2BejpW636LXw0R/BGyv+Ey8+m9QGiOanDHItzyw= github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 313f1d19b..bd33303a4 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -15,6 +15,8 @@ import ( "golang.org/x/sync/errgroup" ) +const chunkReadBufSize = 512 * 1024 // matches chunker internal read buffer size + // fileSaver concurrently saves incoming files to the repo. type fileSaver struct { saveFilePool *bufferPool @@ -100,8 +102,63 @@ type saveFileJob struct { complete fileCompleteFunc } +type fileChunkState struct { + readBuf []byte + bpos uint + bmax uint + closed bool +} + +func (s *fileChunkState) reset() { + s.bpos = 0 + s.bmax = 0 + s.closed = false +} + +// readNextChunk reads from rd and returns the next chunk of data. io.EOF is +// returned when all chunks have been read. +func (s *fileChunkState) readNextChunk(rd io.Reader, chnker *chunker.BaseChunker, data []byte) ([]byte, error) { + data = data[:0] + for { + if s.bpos >= s.bmax { + n, err := io.ReadFull(rd, s.readBuf) + + if err == io.ErrUnexpectedEOF { + err = nil + } + + // io.EOF only happens when the end of the file has been reached. + // If this is the case, we need to return the data we have read so far. + if err == io.EOF && !s.closed { + s.closed = true + + if len(data) > 0 { + return data, nil + } + } + + if err != nil { + return nil, err + } + + s.bpos = 0 + s.bmax = uint(n) + } + + split := chnker.NextSplitPoint(s.readBuf[s.bpos:s.bmax]) + if split == -1 { + data = append(data, s.readBuf[s.bpos:s.bmax]...) + s.bpos = s.bmax + } else { + data = append(data, s.readBuf[s.bpos:s.bpos+uint(split)]...) + s.bpos += uint(split) + return data, nil + } + } +} + // saveFile stores the file f in the repo, then closes it. -func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, start func(), finishReading func(), finish func(res futureNodeResult)) { +func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.BaseChunker, chunkState *fileChunkState, snPath string, target string, f fs.File, start func(), finishReading func(), finish func(res futureNodeResult)) { start() fnr := futureNodeResult{ @@ -161,15 +218,15 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat return } - // reuse the chunker - chnker.Reset(f, s.pol) + chnker.Reset(s.pol) + chunkState.reset() node.Content = []restic.ID{} node.Size = 0 var idx int for { buf := s.saveFilePool.Get() - chunk, err := chnker.Next(buf.Data) + chunkData, err := chunkState.readNextChunk(f, chnker, buf.Data) if err == io.EOF { buf.Release() break @@ -181,8 +238,9 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat return } - buf.Data = chunk.Data - node.Size += uint64(chunk.Length) + // put result buffer back for later reuse + buf.Data = chunkData + node.Size += uint64(len(chunkData)) // test if the context has been cancelled, return the error if ctx.Err() != nil { @@ -199,7 +257,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat node.Content = append(node.Content, restic.ID{}) lock.Unlock() - s.uploader.SaveBlobAsync(ctx, restic.DataBlob, buf.Data, restic.ID{}, false, func(newID restic.ID, known bool, sizeInRepo int, err error) { + s.uploader.SaveBlobAsync(ctx, restic.DataBlob, chunkData, restic.ID{}, false, func(newID restic.ID, known bool, sizeInRepo int, err error) { defer buf.Release() if err != nil { completeError(err) @@ -209,7 +267,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat lock.Lock() if !known { fnr.stats.DataBlobs++ - fnr.stats.DataSize += uint64(len(buf.Data)) + fnr.stats.DataSize += uint64(len(chunkData)) fnr.stats.DataSizeInRepo += uint64(sizeInRepo) } node.Content[pos] = newID @@ -226,7 +284,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat return } - s.CompleteBlob(uint64(len(chunk.Data))) + s.CompleteBlob(uint64(len(chunkData))) } err = f.Close() @@ -246,8 +304,8 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat } func (s *fileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { - // a worker has one chunker which is reused for each file (because it contains a rather large buffer) - chnker := chunker.New(nil, s.pol) + chnker := chunker.NewBase(s.pol) + chunkState := &fileChunkState{readBuf: make([]byte, chunkReadBufSize)} for { var job saveFileJob @@ -261,7 +319,7 @@ func (s *fileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { } } - s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.start, func() { + s.saveFile(ctx, chnker, chunkState, job.snPath, job.target, job.file, job.start, func() { if job.completeReading != nil { job.completeReading() }