mirror of
https://github.com/restic/restic.git
synced 2026-06-28 11:34:18 +00:00
repository: add ChunkerFactory and decouple archiver from chunker
Introduce restic.Chunker and ChunkerFactory interfaces with a repository-backed implementation. The archiver obtains chunkers via ChunkerFactory() from the repository.
This commit is contained in:
@@ -77,7 +77,7 @@ type archiverRepo interface {
|
||||
restic.WithBlobUploader
|
||||
restic.SaverUnpacked[restic.WriteableFileType]
|
||||
|
||||
Config() restic.Config
|
||||
ChunkerFactory() restic.ChunkerFactory
|
||||
}
|
||||
|
||||
// Archiver saves a directory structure to the repo.
|
||||
@@ -864,7 +864,7 @@ func (arch *Archiver) loadParentTree(ctx context.Context, sn *data.Snapshot) dat
|
||||
func (arch *Archiver) runWorkers(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync) {
|
||||
arch.fileSaver = newFileSaver(ctx, wg,
|
||||
uploader,
|
||||
arch.Repo.Config().ChunkerPolynomial,
|
||||
arch.Repo.ChunkerFactory(),
|
||||
arch.Options.ReadConcurrency)
|
||||
arch.fileSaver.CompleteBlob = arch.CompleteBlob
|
||||
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/chunker"
|
||||
"github.com/restic/restic/internal/data"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
@@ -22,7 +21,7 @@ type fileSaver struct {
|
||||
saveFilePool *bufferPool
|
||||
uploader restic.BlobSaverAsync
|
||||
|
||||
pol chunker.Pol
|
||||
chunkerFactory restic.ChunkerFactory
|
||||
|
||||
ch chan<- saveFileJob
|
||||
|
||||
@@ -33,15 +32,15 @@ type fileSaver struct {
|
||||
|
||||
// newFileSaver returns a new file saver. A worker pool with fileWorkers is
|
||||
// started, it is stopped when ctx is cancelled.
|
||||
func newFileSaver(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync, pol chunker.Pol, fileWorkers uint) *fileSaver {
|
||||
func newFileSaver(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync, chunkerFactory restic.ChunkerFactory, fileWorkers uint) *fileSaver {
|
||||
ch := make(chan saveFileJob)
|
||||
debug.Log("new file saver with %v file workers", fileWorkers)
|
||||
|
||||
s := &fileSaver{
|
||||
uploader: uploader,
|
||||
saveFilePool: newBufferPool(chunker.MaxSize),
|
||||
pol: pol,
|
||||
ch: ch,
|
||||
uploader: uploader,
|
||||
saveFilePool: newBufferPool(chunkerFactory.MaxChunkSize()),
|
||||
chunkerFactory: chunkerFactory,
|
||||
ch: ch,
|
||||
|
||||
CompleteBlob: func(uint64) {},
|
||||
}
|
||||
@@ -117,7 +116,7 @@ func (s *fileChunkState) reset() {
|
||||
|
||||
// readNextChunk reads from rd and returns the next chunk of data. io.EOF is
|
||||
// returned when all chunks have been read.
|
||||
func (s *fileChunkState) readNextChunk(rd io.Reader, chnker *chunker.BaseChunker, data []byte) ([]byte, error) {
|
||||
func (s *fileChunkState) readNextChunk(rd io.Reader, chnker restic.Chunker, data []byte) ([]byte, error) {
|
||||
data = data[:0]
|
||||
for {
|
||||
if s.bpos >= s.bmax {
|
||||
@@ -158,7 +157,7 @@ func (s *fileChunkState) readNextChunk(rd io.Reader, chnker *chunker.BaseChunker
|
||||
}
|
||||
|
||||
// saveFile stores the file f in the repo, then closes it.
|
||||
func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.BaseChunker, chunkState *fileChunkState, snPath string, target string, f fs.File, start func(), finishReading func(), finish func(res futureNodeResult)) {
|
||||
func (s *fileSaver) saveFile(ctx context.Context, chnker restic.Chunker, chunkState *fileChunkState, snPath string, target string, f fs.File, start func(), finishReading func(), finish func(res futureNodeResult)) {
|
||||
start()
|
||||
|
||||
fnr := futureNodeResult{
|
||||
@@ -218,7 +217,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.BaseChunker, c
|
||||
return
|
||||
}
|
||||
|
||||
chnker.Reset(s.pol)
|
||||
chnker.Reset()
|
||||
chunkState.reset()
|
||||
|
||||
node.Content = []restic.ID{}
|
||||
@@ -304,7 +303,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.BaseChunker, c
|
||||
}
|
||||
|
||||
func (s *fileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
|
||||
chnker := chunker.NewBase(s.pol)
|
||||
chnker := s.chunkerFactory.NewChunker()
|
||||
chunkState := &fileChunkState{readBuf: make([]byte, chunkReadBufSize)}
|
||||
|
||||
for {
|
||||
|
||||
@@ -8,9 +8,9 @@ import (
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/chunker"
|
||||
"github.com/restic/restic/internal/data"
|
||||
"github.com/restic/restic/internal/fs"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/test"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
@@ -34,13 +34,10 @@ func startFileSaver(ctx context.Context, t testing.TB, _ fs.FS) (*fileSaver, *mo
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
workers := uint(runtime.NumCPU())
|
||||
pol, err := chunker.RandomPolynomial()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
chunkerFactory := repository.TestRepository(t).ChunkerFactory()
|
||||
|
||||
saver := &mockSaver{saved: make(map[string]int)}
|
||||
s := newFileSaver(ctx, wg, saver, pol, workers)
|
||||
s := newFileSaver(ctx, wg, saver, chunkerFactory, workers)
|
||||
s.NodeFromFileInfo = func(snPath, filename string, meta toNoder, ignoreXattrListError bool) (*data.Node, error) {
|
||||
return meta.ToNode(ignoreXattrListError, t.Logf)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"github.com/restic/chunker"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
type baseChunker struct {
|
||||
bc *chunker.BaseChunker
|
||||
pol chunker.Pol
|
||||
}
|
||||
|
||||
func (c *baseChunker) Reset() {
|
||||
c.bc.Reset(c.pol)
|
||||
}
|
||||
|
||||
func (c *baseChunker) NextSplitPoint(buf []byte) int {
|
||||
return c.bc.NextSplitPoint(buf)
|
||||
}
|
||||
|
||||
type chunkerFactory struct {
|
||||
pol chunker.Pol
|
||||
}
|
||||
|
||||
func newChunkerFactory(pol chunker.Pol) *chunkerFactory {
|
||||
return &chunkerFactory{pol: pol}
|
||||
}
|
||||
|
||||
func (f *chunkerFactory) NewChunker() restic.Chunker {
|
||||
return &baseChunker{bc: chunker.NewBase(f.pol), pol: f.pol}
|
||||
}
|
||||
|
||||
func (f *chunkerFactory) MaxChunkSize() int {
|
||||
return chunker.MaxSize
|
||||
}
|
||||
|
||||
func (r *Repository) ChunkerFactory() restic.ChunkerFactory {
|
||||
return newChunkerFactory(r.Config().ChunkerPolynomial)
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package restic
|
||||
|
||||
// Chunker splits file content into variable-length chunks.
|
||||
// Implementations are created by ChunkerFactory and reused across files via Reset.
|
||||
type Chunker interface {
|
||||
// Reset reinitializes the chunker for a new file.
|
||||
Reset()
|
||||
// NextSplitPoint scans buf for a chunk boundary.
|
||||
// Returns index before which to split buf, or -1 if no boundary found in this buffer.
|
||||
// This operation is stateful. All buffers passed to it until a split point is found
|
||||
// then form a single chunk.
|
||||
NextSplitPoint(buf []byte) int
|
||||
}
|
||||
|
||||
// ChunkerFactory creates chunkers configured for a specific repository.
|
||||
type ChunkerFactory interface {
|
||||
NewChunker() Chunker
|
||||
// MaxChunkSize is the maximum size of a single chunk (used for output buffer pools).
|
||||
MaxChunkSize() int
|
||||
}
|
||||
@@ -17,6 +17,7 @@ type Repository interface {
|
||||
Connections() uint
|
||||
Config() Config
|
||||
PackSize() uint
|
||||
ChunkerFactory() ChunkerFactory
|
||||
|
||||
LoadIndex(ctx context.Context, p TerminalCounterFactory) error
|
||||
|
||||
|
||||
Reference in New Issue
Block a user