archiver: switch file saver from Chunker to BaseChunker

Update the chunker dependency. Move the chunk memory handling into the
fileSaver. This minimizes the logic required in a chunker
implementation.

readNextChunk is a direct derivative of Chunker.Next.
This commit is contained in:
Michael Eischer
2026-06-07 22:43:42 +02:00
parent 9e1a526611
commit 70a45791b2
3 changed files with 73 additions and 15 deletions
+1 -1
View File
@@ -30,7 +30,7 @@ require (
github.com/pkg/profile v1.7.0
github.com/pkg/sftp v1.13.10
github.com/pkg/xattr v0.4.12
github.com/restic/chunker v0.4.0
github.com/restic/chunker v0.5.0
github.com/spf13/cobra v1.10.2
github.com/spf13/pflag v1.0.10
go.uber.org/automaxprocs v1.6.0
+2 -2
View File
@@ -184,8 +184,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/restic/chunker v0.4.0 h1:YUPYCUn70MYP7VO4yllypp2SjmsRhRJaad3xKu1QFRw=
github.com/restic/chunker v0.4.0/go.mod h1:z0cH2BejpW636LXw0R/BGyv+Ey8+m9QGiOanDHItzyw=
github.com/restic/chunker v0.5.0 h1:1y+ut0MBduzxODJ298rhQCtESoEpj8v1hTydZlKaE1Y=
github.com/restic/chunker v0.5.0/go.mod h1:z0cH2BejpW636LXw0R/BGyv+Ey8+m9QGiOanDHItzyw=
github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
+70 -12
View File
@@ -15,6 +15,8 @@ import (
"golang.org/x/sync/errgroup"
)
const chunkReadBufSize = 512 * 1024 // matches chunker internal read buffer size
// fileSaver concurrently saves incoming files to the repo.
type fileSaver struct {
saveFilePool *bufferPool
@@ -100,8 +102,63 @@ type saveFileJob struct {
complete fileCompleteFunc
}
type fileChunkState struct {
readBuf []byte
bpos uint
bmax uint
closed bool
}
func (s *fileChunkState) reset() {
s.bpos = 0
s.bmax = 0
s.closed = false
}
// readNextChunk reads from rd and returns the next chunk of data. io.EOF is
// returned when all chunks have been read.
func (s *fileChunkState) readNextChunk(rd io.Reader, chnker *chunker.BaseChunker, data []byte) ([]byte, error) {
data = data[:0]
for {
if s.bpos >= s.bmax {
n, err := io.ReadFull(rd, s.readBuf)
if err == io.ErrUnexpectedEOF {
err = nil
}
// io.EOF only happens when the end of the file has been reached.
// If this is the case, we need to return the data we have read so far.
if err == io.EOF && !s.closed {
s.closed = true
if len(data) > 0 {
return data, nil
}
}
if err != nil {
return nil, err
}
s.bpos = 0
s.bmax = uint(n)
}
split := chnker.NextSplitPoint(s.readBuf[s.bpos:s.bmax])
if split == -1 {
data = append(data, s.readBuf[s.bpos:s.bmax]...)
s.bpos = s.bmax
} else {
data = append(data, s.readBuf[s.bpos:s.bpos+uint(split)]...)
s.bpos += uint(split)
return data, nil
}
}
}
// saveFile stores the file f in the repo, then closes it.
func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, start func(), finishReading func(), finish func(res futureNodeResult)) {
func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.BaseChunker, chunkState *fileChunkState, snPath string, target string, f fs.File, start func(), finishReading func(), finish func(res futureNodeResult)) {
start()
fnr := futureNodeResult{
@@ -161,15 +218,15 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
return
}
// reuse the chunker
chnker.Reset(f, s.pol)
chnker.Reset(s.pol)
chunkState.reset()
node.Content = []restic.ID{}
node.Size = 0
var idx int
for {
buf := s.saveFilePool.Get()
chunk, err := chnker.Next(buf.Data)
chunkData, err := chunkState.readNextChunk(f, chnker, buf.Data)
if err == io.EOF {
buf.Release()
break
@@ -181,8 +238,9 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
return
}
buf.Data = chunk.Data
node.Size += uint64(chunk.Length)
// put result buffer back for later reuse
buf.Data = chunkData
node.Size += uint64(len(chunkData))
// test if the context has been cancelled, return the error
if ctx.Err() != nil {
@@ -199,7 +257,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
node.Content = append(node.Content, restic.ID{})
lock.Unlock()
s.uploader.SaveBlobAsync(ctx, restic.DataBlob, buf.Data, restic.ID{}, false, func(newID restic.ID, known bool, sizeInRepo int, err error) {
s.uploader.SaveBlobAsync(ctx, restic.DataBlob, chunkData, restic.ID{}, false, func(newID restic.ID, known bool, sizeInRepo int, err error) {
defer buf.Release()
if err != nil {
completeError(err)
@@ -209,7 +267,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
lock.Lock()
if !known {
fnr.stats.DataBlobs++
fnr.stats.DataSize += uint64(len(buf.Data))
fnr.stats.DataSize += uint64(len(chunkData))
fnr.stats.DataSizeInRepo += uint64(sizeInRepo)
}
node.Content[pos] = newID
@@ -226,7 +284,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
return
}
s.CompleteBlob(uint64(len(chunk.Data)))
s.CompleteBlob(uint64(len(chunkData)))
}
err = f.Close()
@@ -246,8 +304,8 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
}
func (s *fileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
// a worker has one chunker which is reused for each file (because it contains a rather large buffer)
chnker := chunker.New(nil, s.pol)
chnker := chunker.NewBase(s.pol)
chunkState := &fileChunkState{readBuf: make([]byte, chunkReadBufSize)}
for {
var job saveFileJob
@@ -261,7 +319,7 @@ func (s *fileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
}
}
s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.start, func() {
s.saveFile(ctx, chnker, chunkState, job.snPath, job.target, job.file, job.start, func() {
if job.completeReading != nil {
job.completeReading()
}