mirror of
https://github.com/restic/restic.git
synced 2026-02-22 16:56:24 +00:00
Compare commits
42 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
272ccec7e1 | ||
|
|
68bf1509bd | ||
|
|
cfccd67600 | ||
|
|
bc461d32e0 | ||
|
|
ee4bfdf954 | ||
|
|
3037894f62 | ||
|
|
89075bdf6d | ||
|
|
c323f73bf9 | ||
|
|
aef5e03731 | ||
|
|
fc1f74d32d | ||
|
|
7d59df1ab8 | ||
|
|
2866f3f31c | ||
|
|
dc1154c8ad | ||
|
|
35a816e8ab | ||
|
|
93210614f4 | ||
|
|
dfd37afee2 | ||
|
|
08a5281bd4 | ||
|
|
cdb48a8970 | ||
|
|
4fd5f0b8a9 | ||
|
|
92ad6bf74f | ||
|
|
2c7dd3edf4 | ||
|
|
19e7803ac6 | ||
|
|
9f0605766c | ||
|
|
1a5d7a9965 | ||
|
|
296769355d | ||
|
|
07d080830e | ||
|
|
c99eabfb37 | ||
|
|
842fe43590 | ||
|
|
be02008025 | ||
|
|
29da86b473 | ||
|
|
bad7215696 | ||
|
|
881ff5e554 | ||
|
|
86b7fd0335 | ||
|
|
70209d7d1d | ||
|
|
f07552161c | ||
|
|
856f3a9135 | ||
|
|
49e9bcadb7 | ||
|
|
1b8823ef2e | ||
|
|
b5062959c8 | ||
|
|
ab040d8811 | ||
|
|
d58ae43317 | ||
|
|
99d88ad297 |
34
.github/ISSUE_TEMPLATE.md
vendored
34
.github/ISSUE_TEMPLATE.md
vendored
@@ -1,14 +1,28 @@
|
||||
<!--
|
||||
NOTE: Not filling out the issue template needs a good reason, otherwise it may
|
||||
take a lot longer to find the problem! Please take the time to help us
|
||||
debugging the problem by collecting information, even if it seems irrelevant to
|
||||
you. Thanks!
|
||||
|
||||
If you have a question, the forum at https://forum.restic.net is a better place.
|
||||
Please do not create issues for usage or documentation questions! We're using
|
||||
the GitHub issue tracker mainly for tracking bugs and feature requests.
|
||||
Welcome! - We kindly ask that you:
|
||||
|
||||
1. Fill out the issue template below - not doing so needs a good reason.
|
||||
2. Use the forum if you have a question rather than a bug or feature request.
|
||||
|
||||
The forum is at: https://forum.restic.net
|
||||
|
||||
NOTE: Not filling out the issue template needs a good reason, as otherwise it
|
||||
may take a lot longer to find the problem, not to mention it can take up a lot
|
||||
more time which can otherwise be spent on development. Please also take the
|
||||
time to help us debug the issue by collecting relevant information, even if
|
||||
it doesn't seem to be relevant to you. Thanks!
|
||||
|
||||
The forum is a better place for questions about restic or general suggestions
|
||||
and topics, e.g. usage or documentation questions! This issue tracker is mainly
|
||||
for tracking bugs and feature requests directly relating to the development of
|
||||
the software itself, rather than the project.
|
||||
|
||||
Thanks for understanding, and for contributing to the project!
|
||||
|
||||
-->
|
||||
|
||||
|
||||
## Output of `restic version`
|
||||
|
||||
|
||||
@@ -24,10 +38,10 @@ This section should include at least:
|
||||
information to diagnose the problem!
|
||||
-->
|
||||
|
||||
|
||||
## What backend/server/service did you use to store the repository?
|
||||
|
||||
|
||||
|
||||
## Expected behavior
|
||||
|
||||
<!--
|
||||
@@ -48,12 +62,14 @@ The more time you spend describing an easy way to reproduce the behavior (if
|
||||
this is possible), the easier it is for the project developers to fix it!
|
||||
-->
|
||||
|
||||
|
||||
## Do you have any idea what may have caused this?
|
||||
|
||||
|
||||
|
||||
## Do you have an idea how to solve the issue?
|
||||
|
||||
|
||||
|
||||
## Did restic help you or made you happy in any way?
|
||||
|
||||
<!--
|
||||
|
||||
86
CHANGELOG.md
86
CHANGELOG.md
@@ -1,3 +1,89 @@
|
||||
Changelog for restic 0.8.3 (2018-02-26)
|
||||
=======================================
|
||||
|
||||
The following sections list the changes in restic 0.8.3 relevant to
|
||||
restic users. The changes are ordered by importance.
|
||||
|
||||
Summary
|
||||
-------
|
||||
|
||||
* Fix #1633: Fixed unexpected 'pack file cannot be listed' error
|
||||
* Fix #1641: Ignore files with invalid names in the repo
|
||||
* Fix #1638: Handle errors listing files in the backend
|
||||
* Enh #1497: Add --read-data-subset flag to check command
|
||||
* Enh #1560: Retry all repository file download errors
|
||||
* Enh #1623: Don't check for presence of files in the backend before writing
|
||||
* Enh #1634: Upgrade B2 client library, reduce HTTP requests
|
||||
|
||||
Details
|
||||
-------
|
||||
|
||||
* Bugfix #1633: Fixed unexpected 'pack file cannot be listed' error
|
||||
|
||||
Due to a regression introduced in 0.8.2, the `rebuild-index` and `prune` commands failed to
|
||||
read pack files with size of 587, 588, 589 or 590 bytes.
|
||||
|
||||
https://github.com/restic/restic/issues/1633
|
||||
https://github.com/restic/restic/pull/1635
|
||||
|
||||
* Bugfix #1641: Ignore files with invalid names in the repo
|
||||
|
||||
The release 0.8.2 introduced a bug: when restic encounters files in the repo which do not have a
|
||||
valid name, it tries to load a file with a name of lots of zeroes instead of ignoring it. This is now
|
||||
resolved, invalid file names are just ignored.
|
||||
|
||||
https://github.com/restic/restic/issues/1641
|
||||
https://github.com/restic/restic/pull/1643
|
||||
|
||||
* Bugfix #1638: Handle errors listing files in the backend
|
||||
|
||||
A user reported in the forum that restic completes a backup although a concurrent `prune`
|
||||
operation was running. A few error messages were printed, but the backup was attempted and
|
||||
completed successfully. No error code was returned.
|
||||
|
||||
This should not happen: The repository is exclusively locked during `prune`, so when `restic
|
||||
backup` is run in parallel, it should abort and return an error code instead.
|
||||
|
||||
It was found that the bug was in the code introduced only recently, which retries a List()
|
||||
operation on the backend should that fail. It is now corrected.
|
||||
|
||||
https://github.com/restic/restic/pull/1638
|
||||
|
||||
* Enhancement #1497: Add --read-data-subset flag to check command
|
||||
|
||||
This change introduces ability to check integrity of a subset of repository data packs. This
|
||||
can be used to spread integrity check of larger repositories over a period of time.
|
||||
|
||||
https://github.com/restic/restic/issues/1497
|
||||
https://github.com/restic/restic/pull/1556
|
||||
|
||||
* Enhancement #1560: Retry all repository file download errors
|
||||
|
||||
Restic will now retry failed downloads, similar to other operations.
|
||||
|
||||
https://github.com/restic/restic/pull/1560
|
||||
|
||||
* Enhancement #1623: Don't check for presence of files in the backend before writing
|
||||
|
||||
Before, all backend implementations were required to return an error if the file that is to be
|
||||
written already exists in the backend. For most backends, that means making a request (e.g. via
|
||||
HTTP) and returning an error when the file already exists.
|
||||
|
||||
This is not accurate, the file could have been created between the HTTP request testing for it,
|
||||
and when writing starts, so we've relaxed this requeriment, which saves one additional HTTP
|
||||
request per newly added file.
|
||||
|
||||
https://github.com/restic/restic/pull/1623
|
||||
|
||||
* Enhancement #1634: Upgrade B2 client library, reduce HTTP requests
|
||||
|
||||
We've upgraded the B2 client library restic uses to access BackBlaze B2. This reduces the
|
||||
number of HTTP requests needed to upload a new file from two to one, which should improve
|
||||
throughput to B2.
|
||||
|
||||
https://github.com/restic/restic/pull/1634
|
||||
|
||||
|
||||
Changelog for restic 0.8.2 (2018-02-17)
|
||||
=======================================
|
||||
|
||||
|
||||
4
Gopkg.lock
generated
4
Gopkg.lock
generated
@@ -88,8 +88,8 @@
|
||||
[[projects]]
|
||||
name = "github.com/kurin/blazer"
|
||||
packages = ["b2","base","internal/b2types","internal/blog"]
|
||||
revision = "e269a1a17bb6aec278c06a57cb7e8f8d0d333e04"
|
||||
version = "v0.2.1"
|
||||
revision = "cd0304efa98725679cf68422cefa328d3d96f2f4"
|
||||
version = "v0.3.0"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/marstr/guid"
|
||||
|
||||
8
changelog/0.8.3_2018-02-26/issue-1497
Normal file
8
changelog/0.8.3_2018-02-26/issue-1497
Normal file
@@ -0,0 +1,8 @@
|
||||
Enhancement: Add --read-data-subset flag to check command
|
||||
|
||||
This change introduces ability to check integrity of a subset of repository
|
||||
data packs. This can be used to spread integrity check of larger repositories
|
||||
over a period of time.
|
||||
|
||||
https://github.com/restic/restic/issues/1497
|
||||
https://github.com/restic/restic/pull/1556
|
||||
7
changelog/0.8.3_2018-02-26/issue-1633
Normal file
7
changelog/0.8.3_2018-02-26/issue-1633
Normal file
@@ -0,0 +1,7 @@
|
||||
Bugfix: Fixed unexpected 'pack file cannot be listed' error
|
||||
|
||||
Due to a regression introduced in 0.8.2, the `rebuild-index` and `prune`
|
||||
commands failed to read pack files with size of 587, 588, 589 or 590 bytes.
|
||||
|
||||
https://github.com/restic/restic/issues/1633
|
||||
https://github.com/restic/restic/pull/1635
|
||||
10
changelog/0.8.3_2018-02-26/issue-1641
Normal file
10
changelog/0.8.3_2018-02-26/issue-1641
Normal file
@@ -0,0 +1,10 @@
|
||||
Bugfix: Ignore files with invalid names in the repo
|
||||
|
||||
The release 0.8.2 introduced a bug: when restic encounters files in the repo
|
||||
which do not have a valid name, it tries to load a file with a name of lots of
|
||||
zeroes instead of ignoring it. This is now resolved, invalid file names are
|
||||
just ignored.
|
||||
|
||||
https://github.com/restic/restic/issues/1641
|
||||
https://github.com/restic/restic/pull/1643
|
||||
https://forum.restic.net/t/help-fixing-repo-no-such-file/485/3
|
||||
5
changelog/0.8.3_2018-02-26/pull-1560
Normal file
5
changelog/0.8.3_2018-02-26/pull-1560
Normal file
@@ -0,0 +1,5 @@
|
||||
Enhancement: Retry all repository file download errors
|
||||
|
||||
Restic will now retry failed downloads, similar to other operations.
|
||||
|
||||
https://github.com/restic/restic/pull/1560
|
||||
12
changelog/0.8.3_2018-02-26/pull-1623
Normal file
12
changelog/0.8.3_2018-02-26/pull-1623
Normal file
@@ -0,0 +1,12 @@
|
||||
Enhancement: Don't check for presence of files in the backend before writing
|
||||
|
||||
Before, all backend implementations were required to return an error if the
|
||||
file that is to be written already exists in the backend. For most backends,
|
||||
that means making a request (e.g. via HTTP) and returning an error when the
|
||||
file already exists.
|
||||
|
||||
This is not accurate, the file could have been created between the HTTP request
|
||||
testing for it, and when writing starts, so we've relaxed this requeriment,
|
||||
which saves one additional HTTP request per newly added file.
|
||||
|
||||
https://github.com/restic/restic/pull/1623
|
||||
7
changelog/0.8.3_2018-02-26/pull-1634
Normal file
7
changelog/0.8.3_2018-02-26/pull-1634
Normal file
@@ -0,0 +1,7 @@
|
||||
Enhancement: Upgrade B2 client library, reduce HTTP requests
|
||||
|
||||
We've upgraded the B2 client library restic uses to access BackBlaze B2. This
|
||||
reduces the number of HTTP requests needed to upload a new file from two to
|
||||
one, which should improve throughput to B2.
|
||||
|
||||
https://github.com/restic/restic/pull/1634
|
||||
16
changelog/0.8.3_2018-02-26/pull-1638
Normal file
16
changelog/0.8.3_2018-02-26/pull-1638
Normal file
@@ -0,0 +1,16 @@
|
||||
Bugfix: Handle errors listing files in the backend
|
||||
|
||||
A user reported in the forum that restic completes a backup although a
|
||||
concurrent `prune` operation was running. A few error messages were printed,
|
||||
but the backup was attempted and completed successfully. No error code was
|
||||
returned.
|
||||
|
||||
This should not happen: The repository is exclusively locked during `prune`, so
|
||||
when `restic backup` is run in parallel, it should abort and return an error
|
||||
code instead.
|
||||
|
||||
It was found that the bug was in the code introduced only recently, which
|
||||
retries a List() operation on the backend should that fail. It is now corrected.
|
||||
|
||||
https://github.com/restic/restic/pull/1638
|
||||
https://forum.restic.net/t/restic-backup-returns-0-exit-code-when-already-locked/484
|
||||
@@ -3,6 +3,8 @@ package main
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
@@ -26,13 +28,17 @@ repository and not use a local cache.
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return runCheck(checkOptions, globalOptions, args)
|
||||
},
|
||||
PreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
return checkFlags(checkOptions)
|
||||
},
|
||||
}
|
||||
|
||||
// CheckOptions bundles all options for the 'check' command.
|
||||
type CheckOptions struct {
|
||||
ReadData bool
|
||||
CheckUnused bool
|
||||
WithCache bool
|
||||
ReadData bool
|
||||
ReadDataSubset string
|
||||
CheckUnused bool
|
||||
WithCache bool
|
||||
}
|
||||
|
||||
var checkOptions CheckOptions
|
||||
@@ -42,10 +48,45 @@ func init() {
|
||||
|
||||
f := cmdCheck.Flags()
|
||||
f.BoolVar(&checkOptions.ReadData, "read-data", false, "read all data blobs")
|
||||
f.StringVar(&checkOptions.ReadDataSubset, "read-data-subset", "", "read subset of data packs")
|
||||
f.BoolVar(&checkOptions.CheckUnused, "check-unused", false, "find unused blobs")
|
||||
f.BoolVar(&checkOptions.WithCache, "with-cache", false, "use the cache")
|
||||
}
|
||||
|
||||
func checkFlags(opts CheckOptions) error {
|
||||
if opts.ReadData && opts.ReadDataSubset != "" {
|
||||
return errors.Fatalf("check flags --read-data and --read-data-subset cannot be used together")
|
||||
}
|
||||
if opts.ReadDataSubset != "" {
|
||||
dataSubset, err := stringToIntSlice(opts.ReadDataSubset)
|
||||
if err != nil || len(dataSubset) != 2 {
|
||||
return errors.Fatalf("check flag --read-data-subset must have two positive integer values, e.g. --read-data-subset=1/2")
|
||||
}
|
||||
if dataSubset[0] == 0 || dataSubset[1] == 0 || dataSubset[0] > dataSubset[1] {
|
||||
return errors.Fatalf("check flag --read-data-subset=n/t values must be positive integers, and n <= t, e.g. --read-data-subset=1/2")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// stringToIntSlice converts string to []uint, using '/' as element separator
|
||||
func stringToIntSlice(param string) (split []uint, err error) {
|
||||
if param == "" {
|
||||
return nil, nil
|
||||
}
|
||||
parts := strings.Split(param, "/")
|
||||
result := make([]uint, len(parts))
|
||||
for idx, part := range parts {
|
||||
uintval, err := strconv.ParseUint(part, 10, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result[idx] = uint(uintval)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func newReadProgress(gopts GlobalOptions, todo restic.Stat) *restic.Progress {
|
||||
if gopts.Quiet {
|
||||
return nil
|
||||
@@ -158,13 +199,25 @@ func runCheck(opts CheckOptions, gopts GlobalOptions, args []string) error {
|
||||
}
|
||||
}
|
||||
|
||||
if opts.ReadData {
|
||||
Verbosef("read all data\n")
|
||||
doReadData := func(bucket, totalBuckets uint) {
|
||||
packs := restic.IDSet{}
|
||||
for pack := range chkr.GetPacks() {
|
||||
if (uint(pack[0]) % totalBuckets) == (bucket - 1) {
|
||||
packs.Insert(pack)
|
||||
}
|
||||
}
|
||||
packCount := uint64(len(packs))
|
||||
|
||||
p := newReadProgress(gopts, restic.Stat{Blobs: chkr.CountPacks()})
|
||||
if packCount < chkr.CountPacks() {
|
||||
Verbosef(fmt.Sprintf("read group #%d of %d data packs (out of total %d packs in %d groups)\n", bucket, packCount, chkr.CountPacks(), totalBuckets))
|
||||
} else {
|
||||
Verbosef("read all data\n")
|
||||
}
|
||||
|
||||
p := newReadProgress(gopts, restic.Stat{Blobs: packCount})
|
||||
errChan := make(chan error)
|
||||
|
||||
go chkr.ReadData(gopts.ctx, p, errChan)
|
||||
go chkr.ReadPacks(gopts.ctx, packs, p, errChan)
|
||||
|
||||
for err := range errChan {
|
||||
errorsFound = true
|
||||
@@ -172,6 +225,14 @@ func runCheck(opts CheckOptions, gopts GlobalOptions, args []string) error {
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case opts.ReadData:
|
||||
doReadData(1, 1)
|
||||
case opts.ReadDataSubset != "":
|
||||
dataSubset, _ := stringToIntSlice(opts.ReadDataSubset)
|
||||
doReadData(dataSubset[0], dataSubset[1])
|
||||
}
|
||||
|
||||
if errorsFound {
|
||||
return errors.Fatal("repository contains errors")
|
||||
}
|
||||
|
||||
@@ -91,19 +91,23 @@ func unlockRepo(lock *restic.Lock) error {
|
||||
globalLocks.Lock()
|
||||
defer globalLocks.Unlock()
|
||||
|
||||
debug.Log("unlocking repository with lock %p", lock)
|
||||
if err := lock.Unlock(); err != nil {
|
||||
debug.Log("error while unlocking: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < len(globalLocks.locks); i++ {
|
||||
if lock == globalLocks.locks[i] {
|
||||
// remove the lock from the repo
|
||||
debug.Log("unlocking repository with lock %v", lock)
|
||||
if err := lock.Unlock(); err != nil {
|
||||
debug.Log("error while unlocking: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// remove the lock from the list of locks
|
||||
globalLocks.locks = append(globalLocks.locks[:i], globalLocks.locks[i+1:]...)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
debug.Log("unable to find lock %v in the global list of locks, ignoring", lock)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -119,6 +123,7 @@ func unlockAll() error {
|
||||
}
|
||||
debug.Log("successfully removed lock")
|
||||
}
|
||||
globalLocks.locks = globalLocks.locks[:0]
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -87,3 +87,29 @@ yield the same error:
|
||||
Load indexes
|
||||
ciphertext verification failed
|
||||
|
||||
By default, ``check`` command does not check that repository data files
|
||||
are unmodified. Use ``--read-data`` parameter to check all repository
|
||||
data files:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ restic -r /tmp/backup check --read-data
|
||||
load indexes
|
||||
check all packs
|
||||
check snapshots, trees and blobs
|
||||
read all data
|
||||
|
||||
Use ``--read-data-subset=n/t`` parameter to check subset of repository data
|
||||
files. The parameter takes two values, ``n`` and ``t``. All repository data
|
||||
files are logically devided in ``t`` roughly equal groups and only files that
|
||||
belong to the group number ``n`` are checked. For example, the following
|
||||
commands check all repository data files over 5 separate invocations:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ restic -r /tmp/backup check --read-data-subset=1/5
|
||||
$ restic -r /tmp/backup check --read-data-subset=2/5
|
||||
$ restic -r /tmp/backup check --read-data-subset=3/5
|
||||
$ restic -r /tmp/backup check --read-data-subset=4/5
|
||||
$ restic -r /tmp/backup check --read-data-subset=5/5
|
||||
|
||||
|
||||
@@ -349,6 +349,8 @@ _restic_check()
|
||||
local_nonpersistent_flags+=("--help")
|
||||
flags+=("--read-data")
|
||||
local_nonpersistent_flags+=("--read-data")
|
||||
flags+=("--read-data-subset=")
|
||||
local_nonpersistent_flags+=("--read-data-subset=")
|
||||
flags+=("--with-cache")
|
||||
local_nonpersistent_flags+=("--with-cache")
|
||||
flags+=("--cacert=")
|
||||
|
||||
@@ -36,6 +36,10 @@ repository and not use a local cache.
|
||||
\fB\-\-read\-data\fP[=false]
|
||||
read all data blobs
|
||||
|
||||
.PP
|
||||
\fB\-\-read\-data\-subset\fP=""
|
||||
read subset of data packs
|
||||
|
||||
.PP
|
||||
\fB\-\-with\-cache\fP[=false]
|
||||
use the cache
|
||||
|
||||
@@ -38,13 +38,13 @@ func randomID() restic.ID {
|
||||
|
||||
// forgetfulBackend returns a backend that forgets everything.
|
||||
func forgetfulBackend() restic.Backend {
|
||||
be := &mock.Backend{}
|
||||
be := mock.NewBackend()
|
||||
|
||||
be.TestFn = func(ctx context.Context, h restic.Handle) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
be.LoadFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
be.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
return nil, errors.New("not found")
|
||||
}
|
||||
|
||||
|
||||
@@ -135,16 +135,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
|
||||
|
||||
debug.Log("Save %v at %v", h, objName)
|
||||
|
||||
// Check key does not already exist
|
||||
found, err := be.container.GetBlobReference(objName).Exists()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "GetBlobReference().Exists()")
|
||||
}
|
||||
if found {
|
||||
debug.Log("%v already exists", h)
|
||||
return errors.New("key already exists")
|
||||
}
|
||||
|
||||
be.sem.GetToken()
|
||||
|
||||
// wrap the reader so that net/http client cannot close the reader, return
|
||||
@@ -178,10 +168,13 @@ func (wr wrapReader) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Load returns a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is nonzero, only a portion of the file is
|
||||
// returned. rd must be closed after use.
|
||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
|
||||
}
|
||||
|
||||
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -142,9 +142,13 @@ func (be *b2Backend) IsNotExist(err error) bool {
|
||||
return b2.IsNotExist(errors.Cause(err))
|
||||
}
|
||||
|
||||
// Load returns the data stored in the backend for h at the given offset
|
||||
// and saves it in p. Load has the same semantics as io.ReaderAt.
|
||||
func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
|
||||
}
|
||||
|
||||
func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, err
|
||||
@@ -196,12 +200,6 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) er
|
||||
debug.Log("Save %v, name %v", h, name)
|
||||
obj := be.bucket.Object(name)
|
||||
|
||||
_, err := obj.Attrs(ctx)
|
||||
if err == nil {
|
||||
debug.Log(" %v already exists", h)
|
||||
return errors.New("key already exists")
|
||||
}
|
||||
|
||||
w := obj.NewWriter(ctx)
|
||||
n, err := io.Copy(w, rd)
|
||||
debug.Log(" saved %d bytes, err %v", n, err)
|
||||
|
||||
@@ -66,12 +66,12 @@ func (be *ErrorBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader)
|
||||
// given offset. If length is larger than zero, only a portion of the file
|
||||
// is returned. rd must be closed after use. If an error is returned, the
|
||||
// ReadCloser must be nil.
|
||||
func (be *ErrorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
func (be *ErrorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
|
||||
if be.fail(be.FailLoad) {
|
||||
return nil, errors.Errorf("Load(%v, %v, %v) random error induced", h, length, offset)
|
||||
return errors.Errorf("Load(%v, %v, %v) random error induced", h, length, offset)
|
||||
}
|
||||
|
||||
return be.Backend.Load(ctx, h, length, offset)
|
||||
return be.Backend.Load(ctx, h, length, offset, consumer)
|
||||
}
|
||||
|
||||
// Stat returns information about the File identified by h.
|
||||
|
||||
@@ -88,15 +88,11 @@ func (be *RetryBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader)
|
||||
// given offset. If length is larger than zero, only a portion of the file
|
||||
// is returned. rd must be closed after use. If an error is returned, the
|
||||
// ReadCloser must be nil.
|
||||
func (be *RetryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (rd io.ReadCloser, err error) {
|
||||
err = be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
|
||||
func (be *RetryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) (err error) {
|
||||
return be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
|
||||
func() error {
|
||||
var innerError error
|
||||
rd, innerError = be.Backend.Load(ctx, h, length, offset)
|
||||
|
||||
return innerError
|
||||
return be.Backend.Load(ctx, h, length, offset, consumer)
|
||||
})
|
||||
return rd, err
|
||||
}
|
||||
|
||||
// Stat returns information about the File identified by h.
|
||||
@@ -129,16 +125,39 @@ func (be *RetryBackend) Test(ctx context.Context, h restic.Handle) (exists bool,
|
||||
return exists, err
|
||||
}
|
||||
|
||||
// List runs fn for each file in the backend which has the type t.
|
||||
// List runs fn for each file in the backend which has the type t. When an
|
||||
// error is returned by the underlying backend, the request is retried. When fn
|
||||
// returns an error, the operation is aborted and the error is returned to the
|
||||
// caller.
|
||||
func (be *RetryBackend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
listed := make(map[string]struct{})
|
||||
return be.retry(ctx, fmt.Sprintf("List(%v)", t), func() error {
|
||||
// create a new context that we can cancel when fn returns an error, so
|
||||
// that listing is aborted
|
||||
listCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
listed := make(map[string]struct{}) // remember for which files we already ran fn
|
||||
var innerErr error // remember when fn returned an error, so we can return that to the caller
|
||||
|
||||
err := be.retry(listCtx, fmt.Sprintf("List(%v)", t), func() error {
|
||||
return be.Backend.List(ctx, t, func(fi restic.FileInfo) error {
|
||||
if _, ok := listed[fi.Name]; ok {
|
||||
return nil
|
||||
}
|
||||
listed[fi.Name] = struct{}{}
|
||||
return fn(fi)
|
||||
|
||||
innerErr = fn(fi)
|
||||
if innerErr != nil {
|
||||
// if fn returned an error, listing is aborted, so we cancel the context
|
||||
cancel()
|
||||
}
|
||||
return innerErr
|
||||
})
|
||||
})
|
||||
|
||||
// the error fn returned takes precedence
|
||||
if innerErr != nil {
|
||||
return innerErr
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -123,3 +123,164 @@ func TestBackendListRetry(t *testing.T) {
|
||||
test.Equals(t, 2, retry) // assert retried once
|
||||
test.Equals(t, []string{ID1, ID2}, listed) // assert no duplicate files
|
||||
}
|
||||
|
||||
func TestBackendListRetryErrorFn(t *testing.T) {
|
||||
var names = []string{"id1", "id2", "foo", "bar"}
|
||||
|
||||
be := &mock.Backend{
|
||||
ListFn: func(ctx context.Context, tpe restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
t.Logf("List called for %v", tpe)
|
||||
for _, name := range names {
|
||||
err := fn(restic.FileInfo{Name: name})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
retryBackend := RetryBackend{
|
||||
Backend: be,
|
||||
}
|
||||
|
||||
var ErrTest = errors.New("test error")
|
||||
|
||||
var listed []string
|
||||
run := 0
|
||||
err := retryBackend.List(context.TODO(), restic.DataFile, func(fi restic.FileInfo) error {
|
||||
t.Logf("fn called for %v", fi.Name)
|
||||
run++
|
||||
// return an error for the third item in the list
|
||||
if run == 3 {
|
||||
t.Log("returning an error")
|
||||
return ErrTest
|
||||
}
|
||||
listed = append(listed, fi.Name)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != ErrTest {
|
||||
t.Fatalf("wrong error returned, want %v, got %v", ErrTest, err)
|
||||
}
|
||||
|
||||
// processing should stop after the error was returned, so run should be 3
|
||||
if run != 3 {
|
||||
t.Fatalf("function was called %d times, wanted %v", run, 3)
|
||||
}
|
||||
|
||||
test.Equals(t, []string{"id1", "id2"}, listed)
|
||||
}
|
||||
|
||||
func TestBackendListRetryErrorBackend(t *testing.T) {
|
||||
var names = []string{"id1", "id2", "foo", "bar"}
|
||||
|
||||
var ErrBackendTest = errors.New("test error")
|
||||
|
||||
retries := 0
|
||||
be := &mock.Backend{
|
||||
ListFn: func(ctx context.Context, tpe restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
t.Logf("List called for %v, retries %v", tpe, retries)
|
||||
retries++
|
||||
for i, name := range names {
|
||||
if i == 2 {
|
||||
return ErrBackendTest
|
||||
}
|
||||
|
||||
err := fn(restic.FileInfo{Name: name})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
const maxRetries = 2
|
||||
retryBackend := RetryBackend{
|
||||
MaxTries: maxRetries,
|
||||
Backend: be,
|
||||
}
|
||||
|
||||
var listed []string
|
||||
err := retryBackend.List(context.TODO(), restic.DataFile, func(fi restic.FileInfo) error {
|
||||
t.Logf("fn called for %v", fi.Name)
|
||||
listed = append(listed, fi.Name)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != ErrBackendTest {
|
||||
t.Fatalf("wrong error returned, want %v, got %v", ErrBackendTest, err)
|
||||
}
|
||||
|
||||
if retries != maxRetries+1 {
|
||||
t.Fatalf("List was called %d times, wanted %v", retries, maxRetries+1)
|
||||
}
|
||||
|
||||
test.Equals(t, names[:2], listed)
|
||||
}
|
||||
|
||||
// failingReader returns an error after reading limit number of bytes
|
||||
type failingReader struct {
|
||||
data []byte
|
||||
pos int
|
||||
limit int
|
||||
}
|
||||
|
||||
func (r failingReader) Read(p []byte) (n int, err error) {
|
||||
i := 0
|
||||
for ; i < len(p) && i+r.pos < r.limit; i++ {
|
||||
p[i] = r.data[r.pos+i]
|
||||
}
|
||||
r.pos += i
|
||||
if r.pos >= r.limit {
|
||||
return i, errors.Errorf("reader reached limit of %d", r.limit)
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
func (r failingReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// closingReader adapts io.Reader to io.ReadCloser interface
|
||||
type closingReader struct {
|
||||
rd io.Reader
|
||||
}
|
||||
|
||||
func (r closingReader) Read(p []byte) (n int, err error) {
|
||||
return r.rd.Read(p)
|
||||
}
|
||||
func (r closingReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestBackendLoadRetry(t *testing.T) {
|
||||
data := test.Random(23, 1024)
|
||||
limit := 100
|
||||
attempt := 0
|
||||
|
||||
be := mock.NewBackend()
|
||||
be.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// returns failing reader on first invocation, good reader on subsequent invocations
|
||||
attempt++
|
||||
if attempt > 1 {
|
||||
return closingReader{rd: bytes.NewReader(data)}, nil
|
||||
}
|
||||
return failingReader{data: data, limit: limit}, nil
|
||||
}
|
||||
|
||||
retryBackend := RetryBackend{
|
||||
Backend: be,
|
||||
}
|
||||
|
||||
var buf []byte
|
||||
err := retryBackend.Load(context.TODO(), restic.Handle{}, 0, 0, func(rd io.Reader) (err error) {
|
||||
buf, err = ioutil.ReadAll(rd)
|
||||
return err
|
||||
})
|
||||
test.OK(t, err)
|
||||
test.Equals(t, data, buf)
|
||||
test.Equals(t, 2, attempt)
|
||||
}
|
||||
|
||||
@@ -218,13 +218,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
|
||||
|
||||
be.sem.GetToken()
|
||||
|
||||
// Check key does not already exist
|
||||
if _, err := be.service.Objects.Get(be.bucketName, objName).Do(); err == nil {
|
||||
debug.Log("%v already exists", h)
|
||||
be.sem.ReleaseToken()
|
||||
return errors.New("key already exists")
|
||||
}
|
||||
|
||||
debug.Log("InsertObject(%v, %v)", be.bucketName, objName)
|
||||
|
||||
// Set chunk size to zero to disable resumable uploads.
|
||||
@@ -282,10 +275,13 @@ func (wr wrapReader) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Load returns a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is nonzero, only a portion of the file is
|
||||
// returned. rd must be closed after use.
|
||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
|
||||
}
|
||||
|
||||
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -146,10 +146,13 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd io.Reader) error {
|
||||
return setNewFileMode(filename, backend.Modes.File)
|
||||
}
|
||||
|
||||
// Load returns a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is nonzero, only a portion of the file is
|
||||
// returned. rd must be closed after use.
|
||||
func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return backend.DefaultLoad(ctx, h, length, offset, b.openReader, fn)
|
||||
}
|
||||
|
||||
func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v", h, length, offset)
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
||||
@@ -85,10 +86,13 @@ func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load returns a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is nonzero, only a portion of the file is
|
||||
// returned. rd must be closed after use.
|
||||
func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
|
||||
}
|
||||
|
||||
func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -166,10 +166,13 @@ func (b *restBackend) IsNotExist(err error) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
// Load returns a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is nonzero, only a portion of the file is
|
||||
// returned. rd must be closed after use.
|
||||
func (b *restBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (b *restBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return backend.DefaultLoad(ctx, h, length, offset, b.openReader, fn)
|
||||
}
|
||||
|
||||
func (b *restBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v", h, length, offset)
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -235,13 +235,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
// Check key does not already exist
|
||||
_, err = be.client.StatObject(be.cfg.Bucket, objName, minio.StatObjectOptions{})
|
||||
if err == nil {
|
||||
debug.Log("%v already exists", h)
|
||||
return errors.New("key already exists")
|
||||
}
|
||||
|
||||
var size int64 = -1
|
||||
|
||||
type lenner interface {
|
||||
@@ -281,10 +274,13 @@ func (wr wrapReader) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Load returns a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is nonzero, only a portion of the file is
|
||||
// returned. rd must be closed after use.
|
||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
|
||||
}
|
||||
|
||||
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -327,10 +327,13 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err err
|
||||
return errors.Wrap(r.c.Chmod(filename, backend.Modes.File), "Chmod")
|
||||
}
|
||||
|
||||
// Load returns a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is nonzero, only a portion of the file is
|
||||
// returned. rd must be closed after use.
|
||||
func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn)
|
||||
}
|
||||
|
||||
func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v", h, length, offset)
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -109,10 +109,13 @@ func (be *beSwift) Location() string {
|
||||
return be.container
|
||||
}
|
||||
|
||||
// Load returns a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is nonzero, only a portion of the file is
|
||||
// returned. rd must be closed after use.
|
||||
func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
|
||||
}
|
||||
|
||||
func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v", h, length, offset)
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, err
|
||||
@@ -165,19 +168,6 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
// Check key does not already exist
|
||||
switch _, _, err = be.conn.Object(be.container, objName); err {
|
||||
case nil:
|
||||
debug.Log("%v already exists", h)
|
||||
return errors.New("key already exists")
|
||||
|
||||
case swift.ObjectNotFound:
|
||||
// Ok, that's what we want
|
||||
|
||||
default:
|
||||
return errors.Wrap(err, "conn.Object")
|
||||
}
|
||||
|
||||
encoding := "binary/octet-stream"
|
||||
|
||||
debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding)
|
||||
|
||||
@@ -42,20 +42,15 @@ func (s *Suite) BenchmarkLoadFile(t *testing.B) {
|
||||
t.ResetTimer()
|
||||
|
||||
for i := 0; i < t.N; i++ {
|
||||
rd, err := be.Load(context.TODO(), handle, 0, 0)
|
||||
var n int
|
||||
err := be.Load(context.TODO(), handle, 0, 0, func(rd io.Reader) (ierr error) {
|
||||
n, ierr = io.ReadFull(rd, buf)
|
||||
return ierr
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
n, err := io.ReadFull(rd, buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = rd.Close(); err != nil {
|
||||
t.Fatalf("Close() returned error: %v", err)
|
||||
}
|
||||
|
||||
if n != length {
|
||||
t.Fatalf("wrong number of bytes read: want %v, got %v", length, n)
|
||||
}
|
||||
@@ -84,20 +79,15 @@ func (s *Suite) BenchmarkLoadPartialFile(t *testing.B) {
|
||||
t.ResetTimer()
|
||||
|
||||
for i := 0; i < t.N; i++ {
|
||||
rd, err := be.Load(context.TODO(), handle, testLength, 0)
|
||||
var n int
|
||||
err := be.Load(context.TODO(), handle, testLength, 0, func(rd io.Reader) (ierr error) {
|
||||
n, ierr = io.ReadFull(rd, buf)
|
||||
return ierr
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
n, err := io.ReadFull(rd, buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = rd.Close(); err != nil {
|
||||
t.Fatalf("Close() returned error: %v", err)
|
||||
}
|
||||
|
||||
if n != testLength {
|
||||
t.Fatalf("wrong number of bytes read: want %v, got %v", testLength, n)
|
||||
}
|
||||
@@ -128,20 +118,15 @@ func (s *Suite) BenchmarkLoadPartialFileOffset(t *testing.B) {
|
||||
t.ResetTimer()
|
||||
|
||||
for i := 0; i < t.N; i++ {
|
||||
rd, err := be.Load(context.TODO(), handle, testLength, int64(testOffset))
|
||||
var n int
|
||||
err := be.Load(context.TODO(), handle, testLength, int64(testOffset), func(rd io.Reader) (ierr error) {
|
||||
n, ierr = io.ReadFull(rd, buf)
|
||||
return ierr
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
n, err := io.ReadFull(rd, buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = rd.Close(); err != nil {
|
||||
t.Fatalf("Close() returned error: %v", err)
|
||||
}
|
||||
|
||||
if n != testLength {
|
||||
t.Fatalf("wrong number of bytes read: want %v, got %v", testLength, n)
|
||||
}
|
||||
|
||||
@@ -115,13 +115,14 @@ func (s *Suite) TestLoad(t *testing.T) {
|
||||
b := s.open(t)
|
||||
defer s.close(t, b)
|
||||
|
||||
rd, err := b.Load(context.TODO(), restic.Handle{}, 0, 0)
|
||||
noop := func(rd io.Reader) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := b.Load(context.TODO(), restic.Handle{}, 0, 0, noop)
|
||||
if err == nil {
|
||||
t.Fatalf("Load() did not return an error for invalid handle")
|
||||
}
|
||||
if rd != nil {
|
||||
_ = rd.Close()
|
||||
}
|
||||
|
||||
err = testLoad(b, restic.Handle{Type: restic.DataFile, Name: "foobar"}, 0, 0)
|
||||
if err == nil {
|
||||
@@ -141,13 +142,19 @@ func (s *Suite) TestLoad(t *testing.T) {
|
||||
|
||||
t.Logf("saved %d bytes as %v", length, handle)
|
||||
|
||||
rd, err = b.Load(context.TODO(), handle, 100, -1)
|
||||
err = b.Load(context.TODO(), handle, 100, -1, noop)
|
||||
if err == nil {
|
||||
t.Fatalf("Load() returned no error for negative offset!")
|
||||
}
|
||||
|
||||
if rd != nil {
|
||||
t.Fatalf("Load() returned a non-nil reader for negative offset!")
|
||||
err = b.Load(context.TODO(), handle, 0, 0, func(rd io.Reader) error {
|
||||
return errors.Errorf("deliberate error")
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatalf("Load() did not propagate consumer error!")
|
||||
}
|
||||
if err.Error() != "deliberate error" {
|
||||
t.Fatalf("Load() did not correctly propagate consumer error!")
|
||||
}
|
||||
|
||||
loadTests := 50
|
||||
@@ -176,63 +183,38 @@ func (s *Suite) TestLoad(t *testing.T) {
|
||||
d = d[:l]
|
||||
}
|
||||
|
||||
rd, err := b.Load(context.TODO(), handle, getlen, int64(o))
|
||||
var buf []byte
|
||||
err := b.Load(context.TODO(), handle, getlen, int64(o), func(rd io.Reader) (ierr error) {
|
||||
buf, ierr = ioutil.ReadAll(rd)
|
||||
return ierr
|
||||
})
|
||||
if err != nil {
|
||||
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
|
||||
t.Errorf("Load(%d, %d) returned unexpected error: %+v", l, o, err)
|
||||
continue
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(rd)
|
||||
if err != nil {
|
||||
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
|
||||
t.Errorf("Load(%d, %d) ReadAll() returned unexpected error: %+v", l, o, err)
|
||||
if err = rd.Close(); err != nil {
|
||||
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if l == 0 && len(buf) != len(d) {
|
||||
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
|
||||
t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, len(d), len(buf))
|
||||
if err = rd.Close(); err != nil {
|
||||
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if l > 0 && l <= len(d) && len(buf) != l {
|
||||
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
|
||||
t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, l, len(buf))
|
||||
if err = rd.Close(); err != nil {
|
||||
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if l > len(d) && len(buf) != len(d) {
|
||||
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
|
||||
t.Errorf("Load(%d, %d) wrong number of bytes read for overlong read: want %d, got %d", l, o, l, len(buf))
|
||||
if err = rd.Close(); err != nil {
|
||||
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if !bytes.Equal(buf, d) {
|
||||
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
|
||||
t.Errorf("Load(%d, %d) returned wrong bytes", l, o)
|
||||
if err = rd.Close(); err != nil {
|
||||
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
err = rd.Close()
|
||||
if err != nil {
|
||||
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
|
||||
t.Errorf("Load(%d, %d) rd.Close() returned unexpected error: %+v", l, o, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -647,17 +629,10 @@ func store(t testing.TB, b restic.Backend, tpe restic.FileType, data []byte) res
|
||||
|
||||
// testLoad loads a blob (but discards its contents).
|
||||
func testLoad(b restic.Backend, h restic.Handle, length int, offset int64) error {
|
||||
rd, err := b.Load(context.TODO(), h, 0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = io.Copy(ioutil.Discard, rd)
|
||||
cerr := rd.Close()
|
||||
if err == nil {
|
||||
err = cerr
|
||||
}
|
||||
return err
|
||||
return b.Load(context.TODO(), h, 0, 0, func(rd io.Reader) (ierr error) {
|
||||
_, ierr = io.Copy(ioutil.Discard, rd)
|
||||
return ierr
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Suite) delayedRemove(t testing.TB, be restic.Backend, handles ...restic.Handle) error {
|
||||
@@ -776,31 +751,23 @@ func (s *Suite) TestBackend(t *testing.T) {
|
||||
length := end - start
|
||||
|
||||
buf2 := make([]byte, length)
|
||||
rd, err := b.Load(context.TODO(), h, len(buf2), int64(start))
|
||||
var n int
|
||||
err = b.Load(context.TODO(), h, len(buf2), int64(start), func(rd io.Reader) (ierr error) {
|
||||
n, ierr = io.ReadFull(rd, buf2)
|
||||
return ierr
|
||||
})
|
||||
test.OK(t, err)
|
||||
n, err := io.ReadFull(rd, buf2)
|
||||
test.OK(t, err)
|
||||
test.Equals(t, len(buf2), n)
|
||||
|
||||
remaining, err := io.Copy(ioutil.Discard, rd)
|
||||
test.OK(t, err)
|
||||
test.Equals(t, int64(0), remaining)
|
||||
|
||||
test.OK(t, rd.Close())
|
||||
|
||||
test.Equals(t, ts.data[start:end], string(buf2))
|
||||
}
|
||||
|
||||
// test adding the first file again
|
||||
ts := testStrings[0]
|
||||
|
||||
// create blob
|
||||
h := restic.Handle{Type: tpe, Name: ts.id}
|
||||
err := b.Save(context.TODO(), h, strings.NewReader(ts.data))
|
||||
test.Assert(t, err != nil, "backend has allowed overwrite of existing blob: expected error for %v, got %v", h, err)
|
||||
|
||||
// remove and recreate
|
||||
err = s.delayedRemove(t, b, h)
|
||||
err := s.delayedRemove(t, b, h)
|
||||
test.OK(t, err)
|
||||
|
||||
// test that the blob is gone
|
||||
|
||||
@@ -10,24 +10,11 @@ import (
|
||||
|
||||
// LoadAll reads all data stored in the backend for the handle.
|
||||
func LoadAll(ctx context.Context, be restic.Backend, h restic.Handle) (buf []byte, err error) {
|
||||
rd, err := be.Load(ctx, h, 0, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_, e := io.Copy(ioutil.Discard, rd)
|
||||
if err == nil {
|
||||
err = e
|
||||
}
|
||||
|
||||
e = rd.Close()
|
||||
if err == nil {
|
||||
err = e
|
||||
}
|
||||
}()
|
||||
|
||||
return ioutil.ReadAll(rd)
|
||||
err = be.Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) {
|
||||
buf, ierr = ioutil.ReadAll(rd)
|
||||
return ierr
|
||||
})
|
||||
return buf, err
|
||||
}
|
||||
|
||||
// LimitedReadCloser wraps io.LimitedReader and exposes the Close() method.
|
||||
@@ -46,3 +33,19 @@ func (l *LimitedReadCloser) Read(p []byte) (int, error) {
|
||||
func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser {
|
||||
return &LimitedReadCloser{ReadCloser: r, Reader: io.LimitReader(r, n)}
|
||||
}
|
||||
|
||||
// DefaultLoad implements Backend.Load using lower-level openReader func
|
||||
func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64,
|
||||
openReader func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error),
|
||||
fn func(rd io.Reader) error) error {
|
||||
rd, err := openReader(ctx, h, length, offset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = fn(rd)
|
||||
if err != nil {
|
||||
rd.Close() // ignore secondary errors closing the reader
|
||||
return err
|
||||
}
|
||||
return rd.Close()
|
||||
}
|
||||
|
||||
@@ -3,11 +3,13 @@ package backend_test
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/mem"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
)
|
||||
@@ -89,3 +91,54 @@ func TestLoadLargeBuffer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type mockReader struct {
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (rd *mockReader) Read(p []byte) (n int, err error) {
|
||||
return 0, nil
|
||||
}
|
||||
func (rd *mockReader) Close() error {
|
||||
rd.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestDefaultLoad(t *testing.T) {
|
||||
|
||||
h := restic.Handle{Name: "id", Type: restic.DataFile}
|
||||
rd := &mockReader{}
|
||||
|
||||
// happy case, assert correct parameters are passed around and content stream is closed
|
||||
err := backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
rtest.Equals(t, h, ih)
|
||||
rtest.Equals(t, int(10), length)
|
||||
rtest.Equals(t, int64(11), offset)
|
||||
|
||||
return rd, nil
|
||||
}, func(ird io.Reader) error {
|
||||
rtest.Equals(t, rd, ird)
|
||||
return nil
|
||||
})
|
||||
rtest.OK(t, err)
|
||||
rtest.Equals(t, true, rd.closed)
|
||||
|
||||
// unhappy case, assert producer errors are handled correctly
|
||||
err = backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
return nil, errors.Errorf("producer error")
|
||||
}, func(ird io.Reader) error {
|
||||
t.Fatalf("unexpected consumer invocation")
|
||||
return nil
|
||||
})
|
||||
rtest.Equals(t, "producer error", err.Error())
|
||||
|
||||
// unhappy case, assert consumer errors are handled correctly
|
||||
rd = &mockReader{}
|
||||
err = backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
return rd, nil
|
||||
}, func(ird io.Reader) error {
|
||||
return errors.Errorf("consumer error")
|
||||
})
|
||||
rtest.Equals(t, true, rd.closed)
|
||||
rtest.Equals(t, "consumer error", err.Error())
|
||||
}
|
||||
|
||||
54
internal/cache/backend.go
vendored
54
internal/cache/backend.go
vendored
@@ -121,17 +121,10 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
rd, err := b.Backend.Load(ctx, h, 0, 0)
|
||||
err := b.Backend.Load(ctx, h, 0, 0, func(rd io.Reader) error {
|
||||
return b.Cache.Save(h, rd)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = b.Cache.Save(h, rd); err != nil {
|
||||
_ = rd.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
if err = rd.Close(); err != nil {
|
||||
// try to remove from the cache, ignore errors
|
||||
_ = b.Cache.Remove(h)
|
||||
return err
|
||||
@@ -142,17 +135,22 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
|
||||
|
||||
// loadFromCacheOrDelegate will try to load the file from the cache, and fall
|
||||
// back to the backend if that fails.
|
||||
func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
|
||||
rd, err := b.Cache.Load(h, length, offset)
|
||||
if err == nil {
|
||||
return rd, nil
|
||||
if err != nil {
|
||||
return b.Backend.Load(ctx, h, length, offset, consumer)
|
||||
}
|
||||
|
||||
return b.Backend.Load(ctx, h, length, offset)
|
||||
err = consumer(rd)
|
||||
if err != nil {
|
||||
rd.Close() // ignore secondary errors
|
||||
return err
|
||||
}
|
||||
return rd.Close()
|
||||
}
|
||||
|
||||
// Load loads a file from the cache or the backend.
|
||||
func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
|
||||
b.inProgressMutex.Lock()
|
||||
waitForFinish, inProgress := b.inProgress[h]
|
||||
b.inProgressMutex.Unlock()
|
||||
@@ -167,7 +165,12 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
|
||||
debug.Log("Load(%v, %v, %v) from cache", h, length, offset)
|
||||
rd, err := b.Cache.Load(h, length, offset)
|
||||
if err == nil {
|
||||
return rd, nil
|
||||
err = consumer(rd)
|
||||
if err != nil {
|
||||
rd.Close() // ignore secondary errors
|
||||
return err
|
||||
}
|
||||
return rd.Close()
|
||||
}
|
||||
debug.Log("error loading %v from cache: %v", h, err)
|
||||
}
|
||||
@@ -179,20 +182,20 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
|
||||
|
||||
err := b.cacheFile(ctx, h)
|
||||
if err == nil {
|
||||
return b.loadFromCacheOrDelegate(ctx, h, length, offset)
|
||||
return b.loadFromCacheOrDelegate(ctx, h, length, offset, consumer)
|
||||
}
|
||||
|
||||
debug.Log("error caching %v: %v", h, err)
|
||||
}
|
||||
|
||||
debug.Log("Load(%v, %v, %v): partial file requested, delegating to backend", h, length, offset)
|
||||
return b.Backend.Load(ctx, h, length, offset)
|
||||
return b.Backend.Load(ctx, h, length, offset, consumer)
|
||||
}
|
||||
|
||||
// if we don't automatically cache this file type, fall back to the backend
|
||||
if _, ok := autoCacheFiles[h.Type]; !ok {
|
||||
debug.Log("Load(%v, %v, %v): delegating to backend", h, length, offset)
|
||||
return b.Backend.Load(ctx, h, length, offset)
|
||||
return b.Backend.Load(ctx, h, length, offset, consumer)
|
||||
}
|
||||
|
||||
debug.Log("auto-store %v in the cache", h)
|
||||
@@ -200,11 +203,20 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
|
||||
|
||||
if err == nil {
|
||||
// load the cached version
|
||||
return b.Cache.Load(h, 0, 0)
|
||||
rd, err := b.Cache.Load(h, 0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = consumer(rd)
|
||||
if err != nil {
|
||||
rd.Close() // ignore secondary errors
|
||||
return err
|
||||
}
|
||||
return rd.Close()
|
||||
}
|
||||
|
||||
debug.Log("error caching %v: %v, falling back to backend", h, err)
|
||||
return b.Backend.Load(ctx, h, length, offset)
|
||||
return b.Backend.Load(ctx, h, length, offset, consumer)
|
||||
}
|
||||
|
||||
// Stat tests whether the backend has a file. If it does not exist but still
|
||||
|
||||
@@ -2,15 +2,12 @@ package checker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/fs"
|
||||
"github.com/restic/restic/internal/hashing"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
@@ -625,20 +622,19 @@ func (c *Checker) CountPacks() uint64 {
|
||||
return uint64(len(c.packs))
|
||||
}
|
||||
|
||||
// GetPacks returns IDSet of packs in the repository
|
||||
func (c *Checker) GetPacks() restic.IDSet {
|
||||
return c.packs
|
||||
}
|
||||
|
||||
// checkPack reads a pack and checks the integrity of all blobs.
|
||||
func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
|
||||
debug.Log("checking pack %v", id)
|
||||
h := restic.Handle{Type: restic.DataFile, Name: id.String()}
|
||||
|
||||
rd, err := r.Backend().Load(ctx, h, 0, 0)
|
||||
packfile, hash, size, err := repository.DownloadAndHash(ctx, r, h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
packfile, err := fs.TempFile("", "restic-temp-check-")
|
||||
if err != nil {
|
||||
_ = rd.Close()
|
||||
return errors.Wrap(err, "TempFile")
|
||||
return errors.Wrap(err, "checkPack")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
@@ -646,18 +642,6 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
|
||||
_ = os.Remove(packfile.Name())
|
||||
}()
|
||||
|
||||
hrd := hashing.NewReader(rd, sha256.New())
|
||||
size, err := io.Copy(packfile, hrd)
|
||||
if err != nil {
|
||||
_ = rd.Close()
|
||||
return errors.Wrap(err, "Copy")
|
||||
}
|
||||
|
||||
if err = rd.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash := restic.IDFromHash(hrd.Sum(nil))
|
||||
debug.Log("hash for pack %v is %v", id, hash)
|
||||
|
||||
if !hash.Equal(id) {
|
||||
@@ -718,6 +702,11 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
|
||||
|
||||
// ReadData loads all data from the repository and checks the integrity.
|
||||
func (c *Checker) ReadData(ctx context.Context, p *restic.Progress, errChan chan<- error) {
|
||||
c.ReadPacks(ctx, c.packs, p, errChan)
|
||||
}
|
||||
|
||||
// ReadPacks loads data from specified packs and checks the integrity.
|
||||
func (c *Checker) ReadPacks(ctx context.Context, packs restic.IDSet, p *restic.Progress, errChan chan<- error) {
|
||||
defer close(errChan)
|
||||
|
||||
p.Start()
|
||||
@@ -726,18 +715,6 @@ func (c *Checker) ReadData(ctx context.Context, p *restic.Progress, errChan chan
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
ch := make(chan restic.ID)
|
||||
|
||||
// start producer for channel ch
|
||||
g.Go(func() error {
|
||||
defer close(ch)
|
||||
return c.repo.List(ctx, restic.DataFile, func(id restic.ID, size int64) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case ch <- id:
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
// run workers
|
||||
for i := 0; i < defaultParallelism; i++ {
|
||||
g.Go(func() error {
|
||||
@@ -769,6 +746,15 @@ func (c *Checker) ReadData(ctx context.Context, p *restic.Progress, errChan chan
|
||||
})
|
||||
}
|
||||
|
||||
// push packs to ch
|
||||
for pack := range packs {
|
||||
select {
|
||||
case ch <- pack:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
close(ch)
|
||||
|
||||
err := g.Wait()
|
||||
if err != nil {
|
||||
select {
|
||||
|
||||
@@ -195,20 +195,17 @@ func TestModifiedIndex(t *testing.T) {
|
||||
Type: restic.IndexFile,
|
||||
Name: "90f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd",
|
||||
}
|
||||
f, err := repo.Backend().Load(context.TODO(), h, 0, 0)
|
||||
err := repo.Backend().Load(context.TODO(), h, 0, 0, func(rd io.Reader) error {
|
||||
// save the index again with a modified name so that the hash doesn't match
|
||||
// the content any more
|
||||
h2 := restic.Handle{
|
||||
Type: restic.IndexFile,
|
||||
Name: "80f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd",
|
||||
}
|
||||
return repo.Backend().Save(context.TODO(), h2, rd)
|
||||
})
|
||||
test.OK(t, err)
|
||||
|
||||
// save the index again with a modified name so that the hash doesn't match
|
||||
// the content any more
|
||||
h2 := restic.Handle{
|
||||
Type: restic.IndexFile,
|
||||
Name: "80f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd",
|
||||
}
|
||||
err = repo.Backend().Save(context.TODO(), h2, f)
|
||||
test.OK(t, err)
|
||||
|
||||
test.OK(t, f.Close())
|
||||
|
||||
chkr := checker.New(repo)
|
||||
hints, errs := chkr.LoadIndex(context.TODO())
|
||||
if len(errs) == 0 {
|
||||
@@ -262,35 +259,27 @@ type errorBackend struct {
|
||||
ProduceErrors bool
|
||||
}
|
||||
|
||||
func (b errorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
rd, err := b.Backend.Load(ctx, h, length, offset)
|
||||
if err != nil {
|
||||
return rd, err
|
||||
}
|
||||
|
||||
if b.ProduceErrors {
|
||||
return errorReadCloser{rd}, err
|
||||
}
|
||||
|
||||
return rd, nil
|
||||
func (b errorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
|
||||
return b.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error {
|
||||
if b.ProduceErrors {
|
||||
return consumer(errorReadCloser{rd})
|
||||
}
|
||||
return consumer(rd)
|
||||
})
|
||||
}
|
||||
|
||||
type errorReadCloser struct {
|
||||
io.ReadCloser
|
||||
io.Reader
|
||||
}
|
||||
|
||||
func (erd errorReadCloser) Read(p []byte) (int, error) {
|
||||
n, err := erd.ReadCloser.Read(p)
|
||||
n, err := erd.Reader.Read(p)
|
||||
if n > 0 {
|
||||
induceError(p[:n])
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (erd errorReadCloser) Close() error {
|
||||
return erd.ReadCloser.Close()
|
||||
}
|
||||
|
||||
// induceError flips a bit in the slice.
|
||||
func induceError(data []byte) {
|
||||
if rand.Float32() < 0.2 {
|
||||
|
||||
@@ -25,16 +25,13 @@ func (r rateLimitedBackend) Save(ctx context.Context, h restic.Handle, rd io.Rea
|
||||
return r.Backend.Save(ctx, h, r.limiter.Upstream(rd))
|
||||
}
|
||||
|
||||
func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
rc, err := r.Backend.Load(ctx, h, length, offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return limitedReadCloser{
|
||||
original: rc,
|
||||
limited: r.limiter.Downstream(rc),
|
||||
}, nil
|
||||
func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
|
||||
return r.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error {
|
||||
lrd := limitedReadCloser{
|
||||
limited: r.limiter.Downstream(rd),
|
||||
}
|
||||
return consumer(lrd)
|
||||
})
|
||||
}
|
||||
|
||||
type limitedReadCloser struct {
|
||||
@@ -47,6 +44,9 @@ func (l limitedReadCloser) Read(b []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
func (l limitedReadCloser) Close() error {
|
||||
if l.original == nil {
|
||||
return nil
|
||||
}
|
||||
return l.original.Close()
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ type Backend struct {
|
||||
CloseFn func() error
|
||||
IsNotExistFn func(err error) bool
|
||||
SaveFn func(ctx context.Context, h restic.Handle, rd io.Reader) error
|
||||
LoadFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error)
|
||||
OpenReaderFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error)
|
||||
StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error)
|
||||
ListFn func(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error
|
||||
RemoveFn func(ctx context.Context, h restic.Handle) error
|
||||
@@ -22,6 +22,12 @@ type Backend struct {
|
||||
LocationFn func() string
|
||||
}
|
||||
|
||||
// NewBackend returns new mock Backend instance
|
||||
func NewBackend() *Backend {
|
||||
be := &Backend{}
|
||||
return be
|
||||
}
|
||||
|
||||
// Close the backend.
|
||||
func (m *Backend) Close() error {
|
||||
if m.CloseFn == nil {
|
||||
@@ -58,13 +64,27 @@ func (m *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) error
|
||||
return m.SaveFn(ctx, h, rd)
|
||||
}
|
||||
|
||||
// Load loads data from the backend.
|
||||
func (m *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
if m.LoadFn == nil {
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (m *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
rd, err := m.openReader(ctx, h, length, offset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = fn(rd)
|
||||
if err != nil {
|
||||
rd.Close() // ignore secondary errors closing the reader
|
||||
return err
|
||||
}
|
||||
return rd.Close()
|
||||
}
|
||||
|
||||
func (m *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
if m.OpenReaderFn == nil {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
return m.LoadFn(ctx, h, length, offset)
|
||||
return m.OpenReaderFn(ctx, h, length, offset)
|
||||
}
|
||||
|
||||
// Stat an object in the backend.
|
||||
|
||||
@@ -170,23 +170,73 @@ func (p *Packer) String() string {
|
||||
return fmt.Sprintf("<Packer %d blobs, %d bytes>", len(p.blobs), p.bytes)
|
||||
}
|
||||
|
||||
const maxHeaderSize = 16 * 1024 * 1024
|
||||
var (
|
||||
// size of the header-length field at the end of the file
|
||||
headerLengthSize = binary.Size(uint32(0))
|
||||
// we require at least one entry in the header, and one blob for a pack file
|
||||
minFileSize = entrySize + crypto.Extension + uint(headerLengthSize)
|
||||
)
|
||||
|
||||
// we require at least one entry in the header, and one blob for a pack file
|
||||
var minFileSize = entrySize + crypto.Extension
|
||||
const (
|
||||
maxHeaderSize = 16 * 1024 * 1024
|
||||
// number of header enries to download as part of header-length request
|
||||
eagerEntries = 15
|
||||
)
|
||||
|
||||
// number of header enries to download as part of header-length request
|
||||
var eagerEntries = uint(15)
|
||||
// readRecords reads up to max records from the underlying ReaderAt, returning
|
||||
// the raw header, the total number of records in the header, and any error.
|
||||
// If the header contains fewer than max entries, the header is truncated to
|
||||
// the appropriate size.
|
||||
func readRecords(rd io.ReaderAt, size int64, max int) ([]byte, int, error) {
|
||||
var bufsize int
|
||||
bufsize += max * int(entrySize)
|
||||
bufsize += crypto.Extension
|
||||
bufsize += headerLengthSize
|
||||
|
||||
if bufsize > int(size) {
|
||||
bufsize = int(size)
|
||||
}
|
||||
|
||||
b := make([]byte, bufsize)
|
||||
off := size - int64(bufsize)
|
||||
if _, err := rd.ReadAt(b, off); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
hlen := binary.LittleEndian.Uint32(b[len(b)-headerLengthSize:])
|
||||
b = b[:len(b)-headerLengthSize]
|
||||
debug.Log("header length: %v", hlen)
|
||||
|
||||
var err error
|
||||
switch {
|
||||
case hlen == 0:
|
||||
err = InvalidFileError{Message: "header length is zero"}
|
||||
case hlen < crypto.Extension:
|
||||
err = InvalidFileError{Message: "header length is too small"}
|
||||
case (hlen-crypto.Extension)%uint32(entrySize) != 0:
|
||||
err = InvalidFileError{Message: "header length is invalid"}
|
||||
case int64(hlen) > size-int64(headerLengthSize):
|
||||
err = InvalidFileError{Message: "header is larger than file"}
|
||||
case int64(hlen) > maxHeaderSize:
|
||||
err = InvalidFileError{Message: "header is larger than maxHeaderSize"}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, 0, errors.Wrap(err, "readHeader")
|
||||
}
|
||||
|
||||
total := (int(hlen) - crypto.Extension) / int(entrySize)
|
||||
if total < max {
|
||||
// truncate to the beginning of the pack header
|
||||
b = b[len(b)-int(hlen):]
|
||||
}
|
||||
|
||||
return b, total, nil
|
||||
}
|
||||
|
||||
// readHeader reads the header at the end of rd. size is the length of the
|
||||
// whole data accessible in rd.
|
||||
func readHeader(rd io.ReaderAt, size int64) ([]byte, error) {
|
||||
debug.Log("size: %v", size)
|
||||
if size == 0 {
|
||||
err := InvalidFileError{Message: "file is empty"}
|
||||
return nil, errors.Wrap(err, "readHeader")
|
||||
}
|
||||
|
||||
if size < int64(minFileSize) {
|
||||
err := InvalidFileError{Message: "file is too small"}
|
||||
return nil, errors.Wrap(err, "readHeader")
|
||||
@@ -196,69 +246,19 @@ func readHeader(rd io.ReaderAt, size int64) ([]byte, error) {
|
||||
// eagerly download eagerEntries header entries as part of header-length request.
|
||||
// only make second request if actual number of entries is greater than eagerEntries
|
||||
|
||||
eagerHl := uint32((eagerEntries * entrySize) + crypto.Extension)
|
||||
if int64(eagerHl) > size {
|
||||
eagerHl = uint32(size) - uint32(binary.Size(uint32(0)))
|
||||
}
|
||||
eagerBuf := make([]byte, eagerHl+uint32(binary.Size(uint32(0))))
|
||||
|
||||
n, err := rd.ReadAt(eagerBuf, size-int64(len(eagerBuf)))
|
||||
b, c, err := readRecords(rd, size, eagerEntries)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if n != len(eagerBuf) {
|
||||
return nil, errors.New("not enough bytes read")
|
||||
if c <= eagerEntries {
|
||||
// eager read sufficed, return what we got
|
||||
return b, nil
|
||||
}
|
||||
|
||||
hl := binary.LittleEndian.Uint32(eagerBuf[eagerHl:])
|
||||
debug.Log("header length: %v", size)
|
||||
|
||||
if hl == 0 {
|
||||
err := InvalidFileError{Message: "header length is zero"}
|
||||
return nil, errors.Wrap(err, "readHeader")
|
||||
b, _, err = readRecords(rd, size, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if hl < crypto.Extension {
|
||||
err := InvalidFileError{Message: "header length is too small"}
|
||||
return nil, errors.Wrap(err, "readHeader")
|
||||
}
|
||||
|
||||
if (hl-crypto.Extension)%uint32(entrySize) != 0 {
|
||||
err := InvalidFileError{Message: "header length is invalid"}
|
||||
return nil, errors.Wrap(err, "readHeader")
|
||||
}
|
||||
|
||||
if int64(hl) > size-int64(binary.Size(hl)) {
|
||||
err := InvalidFileError{Message: "header is larger than file"}
|
||||
return nil, errors.Wrap(err, "readHeader")
|
||||
}
|
||||
|
||||
if int64(hl) > maxHeaderSize {
|
||||
err := InvalidFileError{Message: "header is larger than maxHeaderSize"}
|
||||
return nil, errors.Wrap(err, "readHeader")
|
||||
}
|
||||
|
||||
eagerBuf = eagerBuf[:eagerHl]
|
||||
|
||||
var buf []byte
|
||||
if hl <= eagerHl {
|
||||
// already have all header bytes. yay.
|
||||
buf = eagerBuf[eagerHl-hl:]
|
||||
} else {
|
||||
// need more header bytes
|
||||
buf = make([]byte, hl)
|
||||
missingHl := hl - eagerHl
|
||||
n, err := rd.ReadAt(buf[:missingHl], size-int64(hl)-int64(binary.Size(hl)))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "ReadAt")
|
||||
}
|
||||
if uint32(n) != missingHl {
|
||||
return nil, errors.New("not enough bytes read")
|
||||
}
|
||||
copy(buf[hl-eagerHl:], eagerBuf)
|
||||
}
|
||||
|
||||
return buf, nil
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// InvalidFileError is return when a file is found that is not a pack file.
|
||||
|
||||
@@ -22,11 +22,11 @@ func (rd *countingReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
|
||||
func TestReadHeaderEagerLoad(t *testing.T) {
|
||||
|
||||
testReadHeader := func(entryCount uint, expectedReadInvocationCount int) {
|
||||
expectedHeader := rtest.Random(0, int(entryCount*entrySize)+crypto.Extension)
|
||||
testReadHeader := func(dataSize, entryCount, expectedReadInvocationCount int) {
|
||||
expectedHeader := rtest.Random(0, entryCount*int(entrySize)+crypto.Extension)
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
buf.Write(rtest.Random(0, 100)) // pack blobs data
|
||||
buf.Write(rtest.Random(0, dataSize)) // pack blobs data
|
||||
buf.Write(expectedHeader) // pack header
|
||||
binary.Write(buf, binary.LittleEndian, uint32(len(expectedHeader))) // pack header length
|
||||
|
||||
@@ -39,8 +39,72 @@ func TestReadHeaderEagerLoad(t *testing.T) {
|
||||
rtest.Equals(t, expectedReadInvocationCount, rd.invocationCount)
|
||||
}
|
||||
|
||||
testReadHeader(1, 1)
|
||||
testReadHeader(eagerEntries-1, 1)
|
||||
testReadHeader(eagerEntries, 1)
|
||||
testReadHeader(eagerEntries+1, 2)
|
||||
// basic
|
||||
testReadHeader(100, 1, 1)
|
||||
|
||||
// header entries == eager entries
|
||||
testReadHeader(100, eagerEntries-1, 1)
|
||||
testReadHeader(100, eagerEntries, 1)
|
||||
testReadHeader(100, eagerEntries+1, 2)
|
||||
|
||||
// file size == eager header load size
|
||||
eagerLoadSize := int((eagerEntries * entrySize) + crypto.Extension)
|
||||
headerSize := int(1*entrySize) + crypto.Extension
|
||||
dataSize := eagerLoadSize - headerSize - binary.Size(uint32(0))
|
||||
testReadHeader(dataSize-1, 1, 1)
|
||||
testReadHeader(dataSize, 1, 1)
|
||||
testReadHeader(dataSize+1, 1, 1)
|
||||
testReadHeader(dataSize+2, 1, 1)
|
||||
testReadHeader(dataSize+3, 1, 1)
|
||||
testReadHeader(dataSize+4, 1, 1)
|
||||
}
|
||||
|
||||
func TestReadRecords(t *testing.T) {
|
||||
testReadRecords := func(dataSize, entryCount, totalRecords int) {
|
||||
totalHeader := rtest.Random(0, totalRecords*int(entrySize)+crypto.Extension)
|
||||
off := len(totalHeader) - (entryCount*int(entrySize) + crypto.Extension)
|
||||
if off < 0 {
|
||||
off = 0
|
||||
}
|
||||
expectedHeader := totalHeader[off:]
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
buf.Write(rtest.Random(0, dataSize)) // pack blobs data
|
||||
buf.Write(totalHeader) // pack header
|
||||
binary.Write(buf, binary.LittleEndian, uint32(len(totalHeader))) // pack header length
|
||||
|
||||
rd := bytes.NewReader(buf.Bytes())
|
||||
|
||||
header, count, err := readRecords(rd, int64(rd.Len()), entryCount)
|
||||
rtest.OK(t, err)
|
||||
rtest.Equals(t, expectedHeader, header)
|
||||
rtest.Equals(t, totalRecords, count)
|
||||
}
|
||||
|
||||
// basic
|
||||
testReadRecords(100, 1, 1)
|
||||
testReadRecords(100, 0, 1)
|
||||
testReadRecords(100, 1, 0)
|
||||
|
||||
// header entries ~ eager entries
|
||||
testReadRecords(100, eagerEntries, eagerEntries-1)
|
||||
testReadRecords(100, eagerEntries, eagerEntries)
|
||||
testReadRecords(100, eagerEntries, eagerEntries+1)
|
||||
|
||||
// file size == eager header load size
|
||||
eagerLoadSize := int((eagerEntries * entrySize) + crypto.Extension)
|
||||
headerSize := int(1*entrySize) + crypto.Extension
|
||||
dataSize := eagerLoadSize - headerSize - binary.Size(uint32(0))
|
||||
testReadRecords(dataSize-1, 1, 1)
|
||||
testReadRecords(dataSize, 1, 1)
|
||||
testReadRecords(dataSize+1, 1, 1)
|
||||
testReadRecords(dataSize+2, 1, 1)
|
||||
testReadRecords(dataSize+3, 1, 1)
|
||||
testReadRecords(dataSize+4, 1, 1)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
for j := 0; j < 2; j++ {
|
||||
testReadRecords(dataSize, i, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,14 +2,11 @@ package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/fs"
|
||||
"github.com/restic/restic/internal/hashing"
|
||||
"github.com/restic/restic/internal/pack"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
||||
@@ -27,28 +24,11 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee
|
||||
// load the complete pack into a temp file
|
||||
h := restic.Handle{Type: restic.DataFile, Name: packID.String()}
|
||||
|
||||
tempfile, err := fs.TempFile("", "restic-temp-repack-")
|
||||
tempfile, hash, packLength, err := DownloadAndHash(ctx, repo, h)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "TempFile")
|
||||
return nil, errors.Wrap(err, "Repack")
|
||||
}
|
||||
|
||||
beRd, err := repo.Backend().Load(ctx, h, 0, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hrd := hashing.NewReader(beRd, sha256.New())
|
||||
packLength, err := io.Copy(tempfile, hrd)
|
||||
if err != nil {
|
||||
_ = beRd.Close()
|
||||
return nil, errors.Wrap(err, "Copy")
|
||||
}
|
||||
|
||||
if err = beRd.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "Close")
|
||||
}
|
||||
|
||||
hash := restic.IDFromHash(hrd.Sum(nil))
|
||||
debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash)
|
||||
|
||||
if !packID.Equal(hash) {
|
||||
|
||||
@@ -3,12 +3,16 @@ package repository
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/restic/restic/internal/cache"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/fs"
|
||||
"github.com/restic/restic/internal/hashing"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
@@ -542,6 +546,7 @@ func (r *Repository) List(ctx context.Context, t restic.FileType, fn func(restic
|
||||
id, err := restic.ParseID(fi.Name)
|
||||
if err != nil {
|
||||
debug.Log("unable to parse %v as an ID", fi.Name)
|
||||
return nil
|
||||
}
|
||||
return fn(id, fi.Size)
|
||||
})
|
||||
@@ -654,3 +659,36 @@ func (r *Repository) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, e
|
||||
_, err = r.SaveBlob(ctx, restic.TreeBlob, buf, id)
|
||||
return id, err
|
||||
}
|
||||
|
||||
// DownloadAndHash is all-in-one helper to download content of the file at h to a temporary filesystem location
|
||||
// and calculate ID of the contents. Returned (temporary) file is positioned at the beginning of the file;
|
||||
// it is reponsibility of the caller to close and delete the file.
|
||||
func DownloadAndHash(ctx context.Context, repo restic.Repository, h restic.Handle) (tmpfile *os.File, hash restic.ID, size int64, err error) {
|
||||
tmpfile, err = fs.TempFile("", "restic-temp-")
|
||||
if err != nil {
|
||||
return nil, restic.ID{}, -1, errors.Wrap(err, "TempFile")
|
||||
}
|
||||
|
||||
err = repo.Backend().Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) {
|
||||
_, ierr = tmpfile.Seek(0, io.SeekStart)
|
||||
if ierr == nil {
|
||||
ierr = tmpfile.Truncate(0)
|
||||
}
|
||||
if ierr != nil {
|
||||
return ierr
|
||||
}
|
||||
hrd := hashing.NewReader(rd, sha256.New())
|
||||
size, ierr = io.Copy(tmpfile, hrd)
|
||||
hash = restic.IDFromHash(hrd.Sum(nil))
|
||||
return ierr
|
||||
})
|
||||
|
||||
_, err = tmpfile.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
tmpfile.Close()
|
||||
os.Remove(tmpfile.Name())
|
||||
return nil, restic.ID{}, -1, errors.Wrap(err, "Seek")
|
||||
}
|
||||
|
||||
return tmpfile, hash, size, err
|
||||
}
|
||||
|
||||
@@ -23,11 +23,15 @@ type Backend interface {
|
||||
// Save stores the data in the backend under the given handle.
|
||||
Save(ctx context.Context, h Handle, rd io.Reader) error
|
||||
|
||||
// Load returns a reader that yields the contents of the file at h at the
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is larger than zero, only a portion of the file
|
||||
// is returned. rd must be closed after use. If an error is returned, the
|
||||
// ReadCloser must be nil.
|
||||
Load(ctx context.Context, h Handle, length int, offset int64) (io.ReadCloser, error)
|
||||
// is read.
|
||||
//
|
||||
// The function fn may be called multiple times during the same Load invocation
|
||||
// and therefore must be idempotent.
|
||||
//
|
||||
// Implementations are encouraged to use backend.DefaultLoad
|
||||
Load(ctx context.Context, h Handle, length int, offset int64, fn func(rd io.Reader) error) error
|
||||
|
||||
// Stat returns information about the File identified by h.
|
||||
Stat(ctx context.Context, h Handle) (FileInfo, error)
|
||||
|
||||
@@ -44,7 +44,11 @@ type ErrAlreadyLocked struct {
|
||||
}
|
||||
|
||||
func (e ErrAlreadyLocked) Error() string {
|
||||
return fmt.Sprintf("repository is already locked by %v", e.otherLock)
|
||||
s := ""
|
||||
if e.otherLock.Exclusive {
|
||||
s = "exclusively "
|
||||
}
|
||||
return fmt.Sprintf("repository is already locked %sby %v", s, e.otherLock)
|
||||
}
|
||||
|
||||
// IsAlreadyLocked returns true iff err is an instance of ErrAlreadyLocked.
|
||||
|
||||
@@ -209,8 +209,11 @@ func TestNodeRestoreAt(t *testing.T) {
|
||||
rtest.Assert(t, test.GID == n2.GID,
|
||||
"%v: GID doesn't match (%v != %v)", test.Type, test.GID, n2.GID)
|
||||
if test.Type != "symlink" {
|
||||
rtest.Assert(t, test.Mode == n2.Mode,
|
||||
"%v: mode doesn't match (0%o != 0%o)", test.Type, test.Mode, n2.Mode)
|
||||
// On OpenBSD only root can set sticky bit (see sticky(8)).
|
||||
if runtime.GOOS != "openbsd" && test.Name == "testSticky" {
|
||||
rtest.Assert(t, test.Mode == n2.Mode,
|
||||
"%v: mode doesn't match (0%o != 0%o)", test.Type, test.Mode, n2.Mode)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,17 +25,16 @@ func ReaderAt(be Backend, h Handle) io.ReaderAt {
|
||||
// ReadAt reads from the backend handle h at the given position.
|
||||
func ReadAt(ctx context.Context, be Backend, h Handle, offset int64, p []byte) (n int, err error) {
|
||||
debug.Log("ReadAt(%v) at %v, len %v", h, offset, len(p))
|
||||
rd, err := be.Load(ctx, h, len(p), offset)
|
||||
|
||||
err = be.Load(ctx, h, len(p), offset, func(rd io.Reader) (ierr error) {
|
||||
n, ierr = io.ReadFull(rd, p)
|
||||
|
||||
return ierr
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n, err = io.ReadFull(rd, p)
|
||||
e := rd.Close()
|
||||
if err == nil {
|
||||
err = e
|
||||
}
|
||||
|
||||
debug.Log("ReadAt(%v) ReadFull returned %v bytes", h, n)
|
||||
|
||||
return n, errors.Wrapf(err, "ReadFull(%v)", h)
|
||||
|
||||
123
vendor/github.com/kurin/blazer/b2/b2.go
generated
vendored
123
vendor/github.com/kurin/blazer/b2/b2.go
generated
vendored
@@ -45,6 +45,7 @@ type Client struct {
|
||||
slock sync.Mutex
|
||||
sWriters map[string]*Writer
|
||||
sReaders map[string]*Reader
|
||||
sMethods map[string]int
|
||||
}
|
||||
|
||||
// NewClient creates and returns a new Client with valid B2 service account
|
||||
@@ -54,7 +55,9 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (
|
||||
backend: &beRoot{
|
||||
b2i: &b2Root{},
|
||||
},
|
||||
sMethods: make(map[string]int),
|
||||
}
|
||||
opts = append(opts, client(c))
|
||||
if err := c.backend.authorizeAccount(ctx, account, key, opts...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -62,6 +65,7 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (
|
||||
}
|
||||
|
||||
type clientOptions struct {
|
||||
client *Client
|
||||
transport http.RoundTripper
|
||||
failSomeUploads bool
|
||||
expireTokens bool
|
||||
@@ -115,12 +119,38 @@ func ForceCapExceeded() ClientOption {
|
||||
}
|
||||
}
|
||||
|
||||
func client(cl *Client) ClientOption {
|
||||
return func(c *clientOptions) {
|
||||
c.client = cl
|
||||
}
|
||||
}
|
||||
|
||||
type clientTransport struct {
|
||||
client *Client
|
||||
rt http.RoundTripper
|
||||
}
|
||||
|
||||
func (ct *clientTransport) RoundTrip(r *http.Request) (*http.Response, error) {
|
||||
method := r.Header.Get("X-Blazer-Method")
|
||||
if method != "" && ct.client != nil {
|
||||
ct.client.slock.Lock()
|
||||
ct.client.sMethods[method]++
|
||||
ct.client.slock.Unlock()
|
||||
}
|
||||
t := ct.rt
|
||||
if t == nil {
|
||||
t = http.DefaultTransport
|
||||
}
|
||||
return t.RoundTrip(r)
|
||||
}
|
||||
|
||||
// Bucket is a reference to a B2 bucket.
|
||||
type Bucket struct {
|
||||
b beBucketInterface
|
||||
r beRootInterface
|
||||
|
||||
c *Client
|
||||
c *Client
|
||||
urlPool *urlPool
|
||||
}
|
||||
|
||||
type BucketType string
|
||||
@@ -188,6 +218,36 @@ func IsNotExist(err error) bool {
|
||||
return berr.notFoundErr
|
||||
}
|
||||
|
||||
const uploadURLPoolSize = 100
|
||||
|
||||
type urlPool struct {
|
||||
ch chan beURLInterface
|
||||
}
|
||||
|
||||
func newURLPool() *urlPool {
|
||||
return &urlPool{ch: make(chan beURLInterface, uploadURLPoolSize)}
|
||||
}
|
||||
|
||||
func (p *urlPool) get() beURLInterface {
|
||||
select {
|
||||
case ue := <-p.ch:
|
||||
// if the channel has an upload URL available, use that
|
||||
return ue
|
||||
default:
|
||||
// otherwise return nil, a new upload URL needs to be generated
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *urlPool) put(u beURLInterface) {
|
||||
select {
|
||||
case p.ch <- u:
|
||||
// put the URL back if possible
|
||||
default:
|
||||
// if the channel is full, throw it away
|
||||
}
|
||||
}
|
||||
|
||||
// Bucket returns a bucket if it exists.
|
||||
func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) {
|
||||
buckets, err := c.backend.listBuckets(ctx)
|
||||
@@ -197,9 +257,10 @@ func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) {
|
||||
for _, bucket := range buckets {
|
||||
if bucket.name() == name {
|
||||
return &Bucket{
|
||||
b: bucket,
|
||||
r: c.backend,
|
||||
c: c,
|
||||
b: bucket,
|
||||
r: c.backend,
|
||||
c: c,
|
||||
urlPool: newURLPool(),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
@@ -220,9 +281,10 @@ func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs)
|
||||
for _, bucket := range buckets {
|
||||
if bucket.name() == name {
|
||||
return &Bucket{
|
||||
b: bucket,
|
||||
r: c.backend,
|
||||
c: c,
|
||||
b: bucket,
|
||||
r: c.backend,
|
||||
c: c,
|
||||
urlPool: newURLPool(),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
@@ -234,13 +296,14 @@ func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs)
|
||||
return nil, err
|
||||
}
|
||||
return &Bucket{
|
||||
b: b,
|
||||
r: c.backend,
|
||||
c: c,
|
||||
b: b,
|
||||
r: c.backend,
|
||||
c: c,
|
||||
urlPool: newURLPool(),
|
||||
}, err
|
||||
}
|
||||
|
||||
// ListBucket returns all the available buckets.
|
||||
// ListBuckets returns all the available buckets.
|
||||
func (c *Client) ListBuckets(ctx context.Context) ([]*Bucket, error) {
|
||||
bs, err := c.backend.listBuckets(ctx)
|
||||
if err != nil {
|
||||
@@ -249,9 +312,10 @@ func (c *Client) ListBuckets(ctx context.Context) ([]*Bucket, error) {
|
||||
var buckets []*Bucket
|
||||
for _, b := range bs {
|
||||
buckets = append(buckets, &Bucket{
|
||||
b: b,
|
||||
r: c.backend,
|
||||
c: c,
|
||||
b: b,
|
||||
r: c.backend,
|
||||
c: c,
|
||||
urlPool: newURLPool(),
|
||||
})
|
||||
}
|
||||
return buckets, nil
|
||||
@@ -565,6 +629,37 @@ func (b *Bucket) ListCurrentObjects(ctx context.Context, count int, c *Cursor) (
|
||||
return objects, next, rtnErr
|
||||
}
|
||||
|
||||
// ListUnfinishedLargeFiles lists any objects that correspond to large file uploads that haven't been completed.
|
||||
// This can happen for example when an upload is interrupted.
|
||||
func (b *Bucket) ListUnfinishedLargeFiles(ctx context.Context, count int, c *Cursor) ([]*Object, *Cursor, error) {
|
||||
if c == nil {
|
||||
c = &Cursor{}
|
||||
}
|
||||
fs, name, err := b.b.listUnfinishedLargeFiles(ctx, count, c.name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
var next *Cursor
|
||||
if name != "" {
|
||||
next = &Cursor{
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
var objects []*Object
|
||||
for _, f := range fs {
|
||||
objects = append(objects, &Object{
|
||||
name: f.name(),
|
||||
f: f,
|
||||
b: b,
|
||||
})
|
||||
}
|
||||
var rtnErr error
|
||||
if len(objects) == 0 || next == nil {
|
||||
rtnErr = io.EOF
|
||||
}
|
||||
return objects, next, rtnErr
|
||||
}
|
||||
|
||||
// Hide hides the object from name-based listing.
|
||||
func (o *Object) Hide(ctx context.Context) error {
|
||||
if err := o.ensure(ctx); err != nil {
|
||||
|
||||
4
vendor/github.com/kurin/blazer/b2/b2_test.go
generated
vendored
4
vendor/github.com/kurin/blazer/b2/b2_test.go
generated
vendored
@@ -198,6 +198,10 @@ func (t *testBucket) listFileVersions(ctx context.Context, count int, a, b, c, d
|
||||
return x, y, "", z
|
||||
}
|
||||
|
||||
func (t *testBucket) listUnfinishedLargeFiles(ctx context.Context, count int, cont string) ([]b2FileInterface, string, error) {
|
||||
return nil, "", fmt.Errorf("testBucket.listUnfinishedLargeFiles(ctx, %d, %q): not implemented", count, cont)
|
||||
}
|
||||
|
||||
func (t *testBucket) downloadFileByName(_ context.Context, name string, offset, size int64) (b2FileReaderInterface, error) {
|
||||
gmux.Lock()
|
||||
defer gmux.Unlock()
|
||||
|
||||
27
vendor/github.com/kurin/blazer/b2/backend.go
generated
vendored
27
vendor/github.com/kurin/blazer/b2/backend.go
generated
vendored
@@ -49,6 +49,7 @@ type beBucketInterface interface {
|
||||
startLargeFile(ctx context.Context, name, contentType string, info map[string]string) (beLargeFileInterface, error)
|
||||
listFileNames(context.Context, int, string, string, string) ([]beFileInterface, string, error)
|
||||
listFileVersions(context.Context, int, string, string, string, string) ([]beFileInterface, string, string, error)
|
||||
listUnfinishedLargeFiles(context.Context, int, string) ([]beFileInterface, string, error)
|
||||
downloadFileByName(context.Context, string, int64, int64) (beFileReaderInterface, error)
|
||||
hideFile(context.Context, string) (beFileInterface, error)
|
||||
getDownloadAuthorization(context.Context, string, time.Duration) (string, error)
|
||||
@@ -339,6 +340,32 @@ func (b *beBucket) listFileVersions(ctx context.Context, count int, nextName, ne
|
||||
return files, name, id, nil
|
||||
}
|
||||
|
||||
func (b *beBucket) listUnfinishedLargeFiles(ctx context.Context, count int, continuation string) ([]beFileInterface, string, error) {
|
||||
var cont string
|
||||
var files []beFileInterface
|
||||
f := func() error {
|
||||
g := func() error {
|
||||
fs, c, err := b.b2bucket.listUnfinishedLargeFiles(ctx, count, continuation)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cont = c
|
||||
for _, f := range fs {
|
||||
files = append(files, &beFile{
|
||||
b2file: f,
|
||||
ri: b.ri,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return withReauth(ctx, b.ri, g)
|
||||
}
|
||||
if err := withBackoff(ctx, b.ri, f); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
return files, cont, nil
|
||||
}
|
||||
|
||||
func (b *beBucket) downloadFileByName(ctx context.Context, name string, offset, size int64) (beFileReaderInterface, error) {
|
||||
var reader beFileReaderInterface
|
||||
f := func() error {
|
||||
|
||||
17
vendor/github.com/kurin/blazer/b2/baseline.go
generated
vendored
17
vendor/github.com/kurin/blazer/b2/baseline.go
generated
vendored
@@ -46,6 +46,7 @@ type b2BucketInterface interface {
|
||||
startLargeFile(ctx context.Context, name, contentType string, info map[string]string) (b2LargeFileInterface, error)
|
||||
listFileNames(context.Context, int, string, string, string) ([]b2FileInterface, string, error)
|
||||
listFileVersions(context.Context, int, string, string, string, string) ([]b2FileInterface, string, string, error)
|
||||
listUnfinishedLargeFiles(context.Context, int, string) ([]b2FileInterface, string, error)
|
||||
downloadFileByName(context.Context, string, int64, int64) (b2FileReaderInterface, error)
|
||||
hideFile(context.Context, string) (b2FileInterface, error)
|
||||
getDownloadAuthorization(context.Context, string, time.Duration) (string, error)
|
||||
@@ -137,9 +138,11 @@ func (b *b2Root) authorizeAccount(ctx context.Context, account, key string, opts
|
||||
f(c)
|
||||
}
|
||||
var aopts []base.AuthOption
|
||||
ct := &clientTransport{client: c.client}
|
||||
if c.transport != nil {
|
||||
aopts = append(aopts, base.Transport(c.transport))
|
||||
ct.rt = c.transport
|
||||
}
|
||||
aopts = append(aopts, base.Transport(ct))
|
||||
if c.failSomeUploads {
|
||||
aopts = append(aopts, base.FailSomeUploads())
|
||||
}
|
||||
@@ -314,6 +317,18 @@ func (b *b2Bucket) listFileVersions(ctx context.Context, count int, nextName, ne
|
||||
return files, name, id, nil
|
||||
}
|
||||
|
||||
func (b *b2Bucket) listUnfinishedLargeFiles(ctx context.Context, count int, continuation string) ([]b2FileInterface, string, error) {
|
||||
fs, cont, err := b.b.ListUnfinishedLargeFiles(ctx, count, continuation)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
var files []b2FileInterface
|
||||
for _, f := range fs {
|
||||
files = append(files, &b2File{f})
|
||||
}
|
||||
return files, cont, nil
|
||||
}
|
||||
|
||||
func (b *b2Bucket) downloadFileByName(ctx context.Context, name string, offset, size int64) (b2FileReaderInterface, error) {
|
||||
fr, err := b.b.DownloadFileByName(ctx, name, offset, size)
|
||||
if err != nil {
|
||||
|
||||
90
vendor/github.com/kurin/blazer/b2/integration_test.go
generated
vendored
90
vendor/github.com/kurin/blazer/b2/integration_test.go
generated
vendored
@@ -17,7 +17,9 @@ package b2
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -786,6 +788,62 @@ func TestAttrsNoRoundtrip(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
/*func TestAttrsFewRoundtrips(t *testing.T) {
|
||||
rt := &rtCounter{rt: defaultTransport}
|
||||
defaultTransport = rt
|
||||
defer func() {
|
||||
defaultTransport = rt.rt
|
||||
}()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
bucket, done := startLiveTest(ctx, t)
|
||||
defer done()
|
||||
|
||||
_, _, err := writeFile(ctx, bucket, smallFileName, 42, 1e8)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
o := bucket.Object(smallFileName)
|
||||
trips := rt.trips
|
||||
attrs, err := o.Attrs(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if attrs.Name != smallFileName {
|
||||
t.Errorf("got the wrong object: got %q, want %q", attrs.Name, smallFileName)
|
||||
}
|
||||
|
||||
if trips != rt.trips {
|
||||
t.Errorf("Attrs(): too many round trips, got %d, want 1", rt.trips-trips)
|
||||
}
|
||||
}*/
|
||||
|
||||
func TestSmallUploadsFewRoundtrips(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
bucket, done := startLiveTest(ctx, t)
|
||||
defer done()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
_, _, err := writeFile(ctx, bucket, fmt.Sprintf("%s.%d", smallFileName, i), 42, 1e8)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
si := bucket.c.Status()
|
||||
getURL := si.MethodCalls["b2_get_upload_url"]
|
||||
uploadFile := si.MethodCalls["b2_upload_file"]
|
||||
if getURL >= uploadFile {
|
||||
t.Errorf("too many calls to b2_get_upload_url")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteWithoutName(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
|
||||
@@ -804,6 +862,26 @@ func TestDeleteWithoutName(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestListUnfinishedLargeFiles(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
bucket, done := startLiveTest(ctx, t)
|
||||
defer done()
|
||||
|
||||
w := bucket.Object(largeFileName).NewWriter(ctx)
|
||||
w.ChunkSize = 1e5
|
||||
if _, err := io.Copy(w, io.LimitReader(zReader{}, 1e6)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Don't close the writer.
|
||||
fs, _, err := bucket.ListUnfinishedLargeFiles(ctx, 10, nil)
|
||||
if err != io.EOF && err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(fs) != 1 {
|
||||
t.Errorf("ListUnfinishedLargeFiles: got %d, want 1", len(fs))
|
||||
}
|
||||
}
|
||||
|
||||
type object struct {
|
||||
o *Object
|
||||
err error
|
||||
@@ -914,6 +992,16 @@ func (cc *ccRC) Close() error {
|
||||
return cc.ReadCloser.Close()
|
||||
}
|
||||
|
||||
var uniq string
|
||||
|
||||
func init() {
|
||||
b := make([]byte, 4)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
uniq = hex.EncodeToString(b)
|
||||
}
|
||||
|
||||
func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) {
|
||||
id := os.Getenv(apiID)
|
||||
key := os.Getenv(apiKey)
|
||||
@@ -929,7 +1017,7 @@ func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) {
|
||||
t.Fatal(err)
|
||||
return nil, nil
|
||||
}
|
||||
bucket, err := client.NewBucket(ctx, id+"-"+bucketName, nil)
|
||||
bucket, err := client.NewBucket(ctx, fmt.Sprintf("%s-%s-%s", id, bucketName, uniq), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return nil, nil
|
||||
|
||||
14
vendor/github.com/kurin/blazer/b2/monitor.go
generated
vendored
14
vendor/github.com/kurin/blazer/b2/monitor.go
generated
vendored
@@ -18,8 +18,9 @@ import "fmt"
|
||||
|
||||
// StatusInfo reports information about a client.
|
||||
type StatusInfo struct {
|
||||
Writers map[string]*WriterStatus
|
||||
Readers map[string]*ReaderStatus
|
||||
Writers map[string]*WriterStatus
|
||||
Readers map[string]*ReaderStatus
|
||||
MethodCalls map[string]int
|
||||
}
|
||||
|
||||
// WriterStatus reports the status for each writer.
|
||||
@@ -42,8 +43,9 @@ func (c *Client) Status() *StatusInfo {
|
||||
defer c.slock.Unlock()
|
||||
|
||||
si := &StatusInfo{
|
||||
Writers: make(map[string]*WriterStatus),
|
||||
Readers: make(map[string]*ReaderStatus),
|
||||
Writers: make(map[string]*WriterStatus),
|
||||
Readers: make(map[string]*ReaderStatus),
|
||||
MethodCalls: make(map[string]int),
|
||||
}
|
||||
|
||||
for name, w := range c.sWriters {
|
||||
@@ -54,6 +56,10 @@ func (c *Client) Status() *StatusInfo {
|
||||
si.Readers[name] = r.status()
|
||||
}
|
||||
|
||||
for name, n := range c.sMethods {
|
||||
si.MethodCalls[name] = n
|
||||
}
|
||||
|
||||
return si
|
||||
}
|
||||
|
||||
|
||||
2
vendor/github.com/kurin/blazer/b2/readerat.go
generated
vendored
2
vendor/github.com/kurin/blazer/b2/readerat.go
generated
vendored
@@ -33,7 +33,7 @@ func (r *readerAt) ReadAt(p []byte, off int64) (int, error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer func() { r.rs.Seek(cur, io.SeekStart) }()
|
||||
defer r.rs.Seek(cur, io.SeekStart)
|
||||
|
||||
if _, err := r.rs.Seek(off, io.SeekStart); err != nil {
|
||||
return 0, err
|
||||
|
||||
16
vendor/github.com/kurin/blazer/b2/writer.go
generated
vendored
16
vendor/github.com/kurin/blazer/b2/writer.go
generated
vendored
@@ -144,7 +144,7 @@ func (w *Writer) thread() {
|
||||
}
|
||||
if sha, ok := w.seen[chunk.id]; ok {
|
||||
if sha != chunk.buf.Hash() {
|
||||
w.setErr(errors.New("resumable upload was requested, but chunks don't match!"))
|
||||
w.setErr(errors.New("resumable upload was requested, but chunks don't match"))
|
||||
return
|
||||
}
|
||||
chunk.buf.Close()
|
||||
@@ -245,11 +245,23 @@ func (w *Writer) Write(p []byte) (int, error) {
|
||||
return i + k, err
|
||||
}
|
||||
|
||||
func (w *Writer) getUploadURL(ctx context.Context) (beURLInterface, error) {
|
||||
u := w.o.b.urlPool.get()
|
||||
if u == nil {
|
||||
return w.o.b.b.getUploadURL(w.ctx)
|
||||
}
|
||||
|
||||
return u, nil
|
||||
}
|
||||
|
||||
func (w *Writer) simpleWriteFile() error {
|
||||
ue, err := w.o.b.b.getUploadURL(w.ctx)
|
||||
ue, err := w.getUploadURL(w.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// This defer needs to be in a func() so that we put whatever the value of ue
|
||||
// is at function exit.
|
||||
defer func() { w.o.b.urlPool.put(ue) }()
|
||||
sha1 := w.w.Hash()
|
||||
ctype := w.contentType
|
||||
if ctype == "" {
|
||||
|
||||
59
vendor/github.com/kurin/blazer/base/base.go
generated
vendored
59
vendor/github.com/kurin/blazer/base/base.go
generated
vendored
@@ -18,7 +18,6 @@
|
||||
// It currently lacks support for the following APIs:
|
||||
//
|
||||
// b2_download_file_by_id
|
||||
// b2_list_unfinished_large_files
|
||||
package base
|
||||
|
||||
import (
|
||||
@@ -43,7 +42,7 @@ import (
|
||||
|
||||
const (
|
||||
APIBase = "https://api.backblazeb2.com"
|
||||
DefaultUserAgent = "blazer/0.2.1"
|
||||
DefaultUserAgent = "blazer/0.3.0"
|
||||
)
|
||||
|
||||
type b2err struct {
|
||||
@@ -69,17 +68,15 @@ func Action(err error) ErrAction {
|
||||
if e.retry > 0 {
|
||||
return Retry
|
||||
}
|
||||
if e.code >= 500 && e.code < 600 {
|
||||
if e.method == "b2_upload_file" || e.method == "b2_upload_part" {
|
||||
return AttemptNewUpload
|
||||
}
|
||||
if e.code >= 500 && e.code < 600 && (e.method == "b2_upload_file" || e.method == "b2_upload_part") {
|
||||
return AttemptNewUpload
|
||||
}
|
||||
switch e.code {
|
||||
case 401:
|
||||
if e.method == "b2_authorize_account" {
|
||||
switch e.method {
|
||||
case "b2_authorize_account":
|
||||
return Punt
|
||||
}
|
||||
if e.method == "b2_upload_file" || e.method == "b2_upload_part" {
|
||||
case "b2_upload_file", "b2_upload_part":
|
||||
return AttemptNewUpload
|
||||
}
|
||||
return ReAuthenticate
|
||||
@@ -698,9 +695,9 @@ func (b *Bucket) File(id, name string) *File {
|
||||
}
|
||||
|
||||
// UploadFile wraps b2_upload_file.
|
||||
func (u *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, contentType, sha1 string, info map[string]string) (*File, error) {
|
||||
func (url *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, contentType, sha1 string, info map[string]string) (*File, error) {
|
||||
headers := map[string]string{
|
||||
"Authorization": u.token,
|
||||
"Authorization": url.token,
|
||||
"X-Bz-File-Name": name,
|
||||
"Content-Type": contentType,
|
||||
"Content-Length": fmt.Sprintf("%d", size),
|
||||
@@ -710,7 +707,7 @@ func (u *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, conte
|
||||
headers[fmt.Sprintf("X-Bz-Info-%s", k)] = v
|
||||
}
|
||||
b2resp := &b2types.UploadFileResponse{}
|
||||
if err := u.b2.opts.makeRequest(ctx, "b2_upload_file", "POST", u.uri, nil, b2resp, headers, &requestBody{body: r, size: int64(size)}); err != nil {
|
||||
if err := url.b2.opts.makeRequest(ctx, "b2_upload_file", "POST", url.uri, nil, b2resp, headers, &requestBody{body: r, size: int64(size)}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &File{
|
||||
@@ -719,7 +716,7 @@ func (u *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, conte
|
||||
Timestamp: millitime(b2resp.Timestamp),
|
||||
Status: b2resp.Action,
|
||||
id: b2resp.FileID,
|
||||
b2: u.b2,
|
||||
b2: url.b2,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -906,6 +903,9 @@ func (l *LargeFile) FinishLargeFile(ctx context.Context) (*File, error) {
|
||||
}
|
||||
b2resp := &b2types.FinishLargeFileResponse{}
|
||||
for k, v := range l.hashes {
|
||||
if len(b2req.Hashes) < k {
|
||||
return nil, fmt.Errorf("b2_finish_large_file: invalid index %d", k)
|
||||
}
|
||||
b2req.Hashes[k-1] = v
|
||||
}
|
||||
headers := map[string]string{
|
||||
@@ -924,6 +924,39 @@ func (l *LargeFile) FinishLargeFile(ctx context.Context) (*File, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ListUnfinishedLargeFiles wraps b2_list_unfinished_large_files.
|
||||
func (b *Bucket) ListUnfinishedLargeFiles(ctx context.Context, count int, continuation string) ([]*File, string, error) {
|
||||
b2req := &b2types.ListUnfinishedLargeFilesRequest{
|
||||
BucketID: b.id,
|
||||
Continuation: continuation,
|
||||
Count: count,
|
||||
}
|
||||
b2resp := &b2types.ListUnfinishedLargeFilesResponse{}
|
||||
headers := map[string]string{
|
||||
"Authorization": b.b2.authToken,
|
||||
}
|
||||
if err := b.b2.opts.makeRequest(ctx, "b2_list_unfinished_large_files", "POST", b.b2.apiURI+b2types.V1api+"b2_list_unfinished_large_files", b2req, b2resp, headers, nil); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
cont := b2resp.Continuation
|
||||
var files []*File
|
||||
for _, f := range b2resp.Files {
|
||||
files = append(files, &File{
|
||||
Name: f.Name,
|
||||
Timestamp: millitime(f.Timestamp),
|
||||
b2: b.b2,
|
||||
id: f.FileID,
|
||||
Info: &FileInfo{
|
||||
Name: f.Name,
|
||||
ContentType: f.ContentType,
|
||||
Info: f.Info,
|
||||
Timestamp: millitime(f.Timestamp),
|
||||
},
|
||||
})
|
||||
}
|
||||
return files, cont, nil
|
||||
}
|
||||
|
||||
// ListFileNames wraps b2_list_file_names.
|
||||
func (b *Bucket) ListFileNames(ctx context.Context, count int, continuation, prefix, delimiter string) ([]*File, string, error) {
|
||||
b2req := &b2types.ListFileNamesRequest{
|
||||
|
||||
4
vendor/github.com/kurin/blazer/base/integration_test.go
generated
vendored
4
vendor/github.com/kurin/blazer/base/integration_test.go
generated
vendored
@@ -156,8 +156,8 @@ func TestStorage(t *testing.T) {
|
||||
|
||||
// b2_start_large_file
|
||||
largeInfoMap := map[string]string{
|
||||
"one_BILLION": "1e9",
|
||||
"two_TRILLION": "2eSomething, I guess 2e12",
|
||||
"one_billion": "1e9",
|
||||
"two_trillion": "2eSomething, I guess 2e12",
|
||||
}
|
||||
lf, err := bucket.StartLargeFile(ctx, largeFileName, "application/octet-stream", largeInfoMap)
|
||||
if err != nil {
|
||||
|
||||
11
vendor/github.com/kurin/blazer/internal/b2types/b2types.go
generated
vendored
11
vendor/github.com/kurin/blazer/internal/b2types/b2types.go
generated
vendored
@@ -227,3 +227,14 @@ type GetDownloadAuthorizationResponse struct {
|
||||
Prefix string `json:"fileNamePrefix"`
|
||||
Token string `json:"authorizationToken"`
|
||||
}
|
||||
|
||||
type ListUnfinishedLargeFilesRequest struct {
|
||||
BucketID string `json:"bucketId"`
|
||||
Continuation string `json:"startFileId,omitempty"`
|
||||
Count int `json:"maxFileCount,omitempty"`
|
||||
}
|
||||
|
||||
type ListUnfinishedLargeFilesResponse struct {
|
||||
Files []GetFileInfoResponse `json:"files"`
|
||||
Continuation string `json:"nextFileId"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user