Compare commits

...

42 Commits

Author SHA1 Message Date
Alexander Neumann
272ccec7e1 Add VERSION for 0.8.3 2018-02-26 21:32:16 +01:00
Alexander Neumann
68bf1509bd Update manpages and auto-completion 2018-02-26 21:32:16 +01:00
Alexander Neumann
cfccd67600 Generate CHANGELOG.md for 0.8.3 2018-02-26 21:32:07 +01:00
Alexander Neumann
bc461d32e0 Add release date for 0.8.3 2018-02-26 21:31:50 +01:00
Alexander Neumann
ee4bfdf954 changelog: Fix spelling 2018-02-26 21:27:32 +01:00
Alexander Neumann
3037894f62 Add entry to changelog 2018-02-26 21:20:41 +01:00
Alexander Neumann
89075bdf6d Merge pull request #1643 from restic/fix-1641
Ignore files in the repo with invalid names
2018-02-26 21:18:05 +01:00
Alexander Neumann
c323f73bf9 Ignore files in the repo with invalid names
Closes #1641
2018-02-26 20:53:38 +01:00
Alexander Neumann
aef5e03731 Merge pull request #1638 from restic/fix-list-retry
backend/retry: return worker function error and abort
2018-02-25 21:20:08 +01:00
Alexander Neumann
fc1f74d32d Merge pull request #1640 from restic/fix-1637
mount: Ignore non-existing locks
2018-02-25 14:00:04 +01:00
Alexander Neumann
7d59df1ab8 mount: Ignore non-existing locks
Closes #1637
2018-02-25 13:11:03 +01:00
Alexander Neumann
2866f3f31c Add pull request to changelog entry 2018-02-24 14:53:46 +01:00
Alexander Neumann
dc1154c8ad Merge pull request #1556 from ifedorenko/check-subset
Add --read-data-subset flag to check command
2018-02-24 14:53:20 +01:00
Alexander Neumann
35a816e8ab Add entry to changelog 2018-02-24 13:34:42 +01:00
Alexander Neumann
93210614f4 backend/retry: return worker function error and abort
This is a bug fix: Before, when the worker function fn in List() of the
RetryBackend returned an error, the operation is retried with the next
file. This is not consistent with the documentation, the intention was
that when fn returns an error, this is passed on to the caller and the
List() operation is aborted. Only errors happening on the underlying
backend are retried.

The error leads to restic ignoring exclusive locks that are present in
the repo, so it may happen that a new backup is written which references
data that is going to be removed by a concurrently running `prune`
operation.

The bug was reported by a user here:
https://forum.restic.net/t/restic-backup-returns-0-exit-code-when-already-locked/484
2018-02-24 13:26:13 +01:00
Alexander Neumann
dfd37afee2 Merge pull request #1636 from kurin/pack-header
Refactor the eager-header reads for readability.
2018-02-23 17:36:45 +01:00
Toby Burress
08a5281bd4 Incorporate PR review comments. 2018-02-22 17:37:10 +00:00
Toby Burress
cdb48a8970 Add tests for the eager-header refactor. 2018-02-22 01:14:04 +00:00
Toby Burress
4fd5f0b8a9 Refactor the eager-header reads for readability.
This pulls the header reads into a function that works in terms of the
number of records requested.  This preserves the existing logic of
initially reading 15 records and then falling back if that fails.

In the event of a header with more than 15 records, it will read all
records, including the already-seen final 15 records.
2018-02-22 00:45:40 +00:00
Alexander Neumann
92ad6bf74f Add pull request to changelog 2018-02-21 19:52:16 +01:00
Alexander Neumann
2c7dd3edf4 Merge pull request #1635 from ifedorenko/1633-negative-load-offset
Fixed unexpected 'pack file cannot be listed' error
2018-02-21 19:51:38 +01:00
Igor Fedorenko
19e7803ac6 Fixed unexpected 'pack file cannot be listed' error
Fixes #1633

Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
2018-02-20 21:28:57 -05:00
Alexander Neumann
9f0605766c Add entry to changelog 2018-02-20 22:10:52 +01:00
Alexander Neumann
1a5d7a9965 Merge pull request #1634 from restic/update-blazer
Update github.com/kurin/blazer to 0.3.0
2018-02-20 22:01:30 +01:00
Alexander Neumann
296769355d Update github.com/kurin/blazer to 0.3.0
This commit will reduce the number of HTTP requests per file uploaded
from two to one.
2018-02-20 21:01:21 +01:00
Igor Fedorenko
07d080830e Add --read-data-subset flag to check command
Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
2018-02-18 23:31:27 -05:00
Alexander Neumann
c99eabfb37 Merge pull request #1625 from restic/update-blazer
Update B2 client library (github.com/kurin/blazer)
2018-02-18 19:18:52 +01:00
Alexander Neumann
842fe43590 Update github.com/kurin/blazer to 0.2.2 2018-02-18 14:53:23 +01:00
Alexander Neumann
be02008025 Merge pull request #1611 from qbit/master
On OpenBSD only root can set sticky bit
2018-02-18 12:57:54 +01:00
Alexander Neumann
29da86b473 Merge pull request #1623 from restic/backend-relax-restrictions
backend: Relax requirement for new files
2018-02-18 12:56:52 +01:00
Alexander Neumann
bad7215696 Add entry to CHANGELOG 2018-02-18 12:04:44 +01:00
Alexander Neumann
881ff5e554 Move changelog file, improve text 2018-02-18 11:51:45 +01:00
Alexander Neumann
86b7fd0335 Merge pull request #1624 from rawtaz/patch-1
Attempt to make issue template a bit clearer
2018-02-18 11:50:11 +01:00
rawtaz
70209d7d1d End both list items with a dot.
Changed my mind after checking other lists in the project's files, ending with a dot seems to be preferred here, and I like that better too.
2018-02-17 23:47:40 +01:00
rawtaz
f07552161c Remove potentially excessive dot
Some people like list items to not end with a comma or dot, some like it when they do. To keep things like and coherent I removed them in this case.
2018-02-17 23:40:30 +01:00
rawtaz
856f3a9135 Add forum URL back 2018-02-17 23:36:47 +01:00
Alexander Neumann
49e9bcadb7 Merge pull request #1560 from ifedorenko/1559-load-error-handling
Retry all repository file download errors
2018-02-17 23:25:28 +01:00
rawtaz
1b8823ef2e Attempt to make issue template a bit clearer 2018-02-17 23:19:58 +01:00
Alexander Neumann
b5062959c8 backend: Relax requirement for new files
Before, all backend implementations were required to return an error if
the file that is to be written already exists in the backend. For most
backends, that means making a request (e.g. via HTTP) and returning an
error when the file already exists.

This is not accurate, the file could have been created between the HTTP
request testing for it, and when writing starts. In addition, apart from
the `config` file in the repo, all other file names have pseudo-random
names with a very very low probability of a collision. And even if a
file name is written again, the way the restic repo is structured this
just means that the same content is placed there again. Which is not a
problem, just not very efficient.

So, this commit relaxes the requirement to return an error when the file
in the backend already exists, which allows reducing the number of API
requests and thereby the latency for remote backends.
2018-02-17 22:39:18 +01:00
Igor Fedorenko
ab040d8811 Introduced repository.DownloadAndHash helper
Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
2018-02-16 21:13:11 -05:00
Igor Fedorenko
d58ae43317 Reworked Backend.Load API to retry errors during ongoing download
Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
2018-02-16 21:12:14 -05:00
Aaron Bieber
99d88ad297 Disable the 'testSticky' test on OpenBSD. Only root can set sticky. 2018-02-11 07:46:31 -07:00
57 changed files with 1307 additions and 478 deletions

View File

@@ -1,14 +1,28 @@
<!--
NOTE: Not filling out the issue template needs a good reason, otherwise it may
take a lot longer to find the problem! Please take the time to help us
debugging the problem by collecting information, even if it seems irrelevant to
you. Thanks!
If you have a question, the forum at https://forum.restic.net is a better place.
Please do not create issues for usage or documentation questions! We're using
the GitHub issue tracker mainly for tracking bugs and feature requests.
Welcome! - We kindly ask that you:
1. Fill out the issue template below - not doing so needs a good reason.
2. Use the forum if you have a question rather than a bug or feature request.
The forum is at: https://forum.restic.net
NOTE: Not filling out the issue template needs a good reason, as otherwise it
may take a lot longer to find the problem, not to mention it can take up a lot
more time which can otherwise be spent on development. Please also take the
time to help us debug the issue by collecting relevant information, even if
it doesn't seem to be relevant to you. Thanks!
The forum is a better place for questions about restic or general suggestions
and topics, e.g. usage or documentation questions! This issue tracker is mainly
for tracking bugs and feature requests directly relating to the development of
the software itself, rather than the project.
Thanks for understanding, and for contributing to the project!
-->
## Output of `restic version`
@@ -24,10 +38,10 @@ This section should include at least:
information to diagnose the problem!
-->
## What backend/server/service did you use to store the repository?
## Expected behavior
<!--
@@ -48,12 +62,14 @@ The more time you spend describing an easy way to reproduce the behavior (if
this is possible), the easier it is for the project developers to fix it!
-->
## Do you have any idea what may have caused this?
## Do you have an idea how to solve the issue?
## Did restic help you or made you happy in any way?
<!--

View File

@@ -1,3 +1,89 @@
Changelog for restic 0.8.3 (2018-02-26)
=======================================
The following sections list the changes in restic 0.8.3 relevant to
restic users. The changes are ordered by importance.
Summary
-------
* Fix #1633: Fixed unexpected 'pack file cannot be listed' error
* Fix #1641: Ignore files with invalid names in the repo
* Fix #1638: Handle errors listing files in the backend
* Enh #1497: Add --read-data-subset flag to check command
* Enh #1560: Retry all repository file download errors
* Enh #1623: Don't check for presence of files in the backend before writing
* Enh #1634: Upgrade B2 client library, reduce HTTP requests
Details
-------
* Bugfix #1633: Fixed unexpected 'pack file cannot be listed' error
Due to a regression introduced in 0.8.2, the `rebuild-index` and `prune` commands failed to
read pack files with size of 587, 588, 589 or 590 bytes.
https://github.com/restic/restic/issues/1633
https://github.com/restic/restic/pull/1635
* Bugfix #1641: Ignore files with invalid names in the repo
The release 0.8.2 introduced a bug: when restic encounters files in the repo which do not have a
valid name, it tries to load a file with a name of lots of zeroes instead of ignoring it. This is now
resolved, invalid file names are just ignored.
https://github.com/restic/restic/issues/1641
https://github.com/restic/restic/pull/1643
* Bugfix #1638: Handle errors listing files in the backend
A user reported in the forum that restic completes a backup although a concurrent `prune`
operation was running. A few error messages were printed, but the backup was attempted and
completed successfully. No error code was returned.
This should not happen: The repository is exclusively locked during `prune`, so when `restic
backup` is run in parallel, it should abort and return an error code instead.
It was found that the bug was in the code introduced only recently, which retries a List()
operation on the backend should that fail. It is now corrected.
https://github.com/restic/restic/pull/1638
* Enhancement #1497: Add --read-data-subset flag to check command
This change introduces ability to check integrity of a subset of repository data packs. This
can be used to spread integrity check of larger repositories over a period of time.
https://github.com/restic/restic/issues/1497
https://github.com/restic/restic/pull/1556
* Enhancement #1560: Retry all repository file download errors
Restic will now retry failed downloads, similar to other operations.
https://github.com/restic/restic/pull/1560
* Enhancement #1623: Don't check for presence of files in the backend before writing
Before, all backend implementations were required to return an error if the file that is to be
written already exists in the backend. For most backends, that means making a request (e.g. via
HTTP) and returning an error when the file already exists.
This is not accurate, the file could have been created between the HTTP request testing for it,
and when writing starts, so we've relaxed this requeriment, which saves one additional HTTP
request per newly added file.
https://github.com/restic/restic/pull/1623
* Enhancement #1634: Upgrade B2 client library, reduce HTTP requests
We've upgraded the B2 client library restic uses to access BackBlaze B2. This reduces the
number of HTTP requests needed to upload a new file from two to one, which should improve
throughput to B2.
https://github.com/restic/restic/pull/1634
Changelog for restic 0.8.2 (2018-02-17)
=======================================

4
Gopkg.lock generated
View File

@@ -88,8 +88,8 @@
[[projects]]
name = "github.com/kurin/blazer"
packages = ["b2","base","internal/b2types","internal/blog"]
revision = "e269a1a17bb6aec278c06a57cb7e8f8d0d333e04"
version = "v0.2.1"
revision = "cd0304efa98725679cf68422cefa328d3d96f2f4"
version = "v0.3.0"
[[projects]]
name = "github.com/marstr/guid"

View File

@@ -1 +1 @@
0.8.2
0.8.3

View File

@@ -0,0 +1,8 @@
Enhancement: Add --read-data-subset flag to check command
This change introduces ability to check integrity of a subset of repository
data packs. This can be used to spread integrity check of larger repositories
over a period of time.
https://github.com/restic/restic/issues/1497
https://github.com/restic/restic/pull/1556

View File

@@ -0,0 +1,7 @@
Bugfix: Fixed unexpected 'pack file cannot be listed' error
Due to a regression introduced in 0.8.2, the `rebuild-index` and `prune`
commands failed to read pack files with size of 587, 588, 589 or 590 bytes.
https://github.com/restic/restic/issues/1633
https://github.com/restic/restic/pull/1635

View File

@@ -0,0 +1,10 @@
Bugfix: Ignore files with invalid names in the repo
The release 0.8.2 introduced a bug: when restic encounters files in the repo
which do not have a valid name, it tries to load a file with a name of lots of
zeroes instead of ignoring it. This is now resolved, invalid file names are
just ignored.
https://github.com/restic/restic/issues/1641
https://github.com/restic/restic/pull/1643
https://forum.restic.net/t/help-fixing-repo-no-such-file/485/3

View File

@@ -0,0 +1,5 @@
Enhancement: Retry all repository file download errors
Restic will now retry failed downloads, similar to other operations.
https://github.com/restic/restic/pull/1560

View File

@@ -0,0 +1,12 @@
Enhancement: Don't check for presence of files in the backend before writing
Before, all backend implementations were required to return an error if the
file that is to be written already exists in the backend. For most backends,
that means making a request (e.g. via HTTP) and returning an error when the
file already exists.
This is not accurate, the file could have been created between the HTTP request
testing for it, and when writing starts, so we've relaxed this requeriment,
which saves one additional HTTP request per newly added file.
https://github.com/restic/restic/pull/1623

View File

@@ -0,0 +1,7 @@
Enhancement: Upgrade B2 client library, reduce HTTP requests
We've upgraded the B2 client library restic uses to access BackBlaze B2. This
reduces the number of HTTP requests needed to upload a new file from two to
one, which should improve throughput to B2.
https://github.com/restic/restic/pull/1634

View File

@@ -0,0 +1,16 @@
Bugfix: Handle errors listing files in the backend
A user reported in the forum that restic completes a backup although a
concurrent `prune` operation was running. A few error messages were printed,
but the backup was attempted and completed successfully. No error code was
returned.
This should not happen: The repository is exclusively locked during `prune`, so
when `restic backup` is run in parallel, it should abort and return an error
code instead.
It was found that the bug was in the code introduced only recently, which
retries a List() operation on the backend should that fail. It is now corrected.
https://github.com/restic/restic/pull/1638
https://forum.restic.net/t/restic-backup-returns-0-exit-code-when-already-locked/484

View File

@@ -3,6 +3,8 @@ package main
import (
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/spf13/cobra"
@@ -26,13 +28,17 @@ repository and not use a local cache.
RunE: func(cmd *cobra.Command, args []string) error {
return runCheck(checkOptions, globalOptions, args)
},
PreRunE: func(cmd *cobra.Command, args []string) error {
return checkFlags(checkOptions)
},
}
// CheckOptions bundles all options for the 'check' command.
type CheckOptions struct {
ReadData bool
CheckUnused bool
WithCache bool
ReadData bool
ReadDataSubset string
CheckUnused bool
WithCache bool
}
var checkOptions CheckOptions
@@ -42,10 +48,45 @@ func init() {
f := cmdCheck.Flags()
f.BoolVar(&checkOptions.ReadData, "read-data", false, "read all data blobs")
f.StringVar(&checkOptions.ReadDataSubset, "read-data-subset", "", "read subset of data packs")
f.BoolVar(&checkOptions.CheckUnused, "check-unused", false, "find unused blobs")
f.BoolVar(&checkOptions.WithCache, "with-cache", false, "use the cache")
}
func checkFlags(opts CheckOptions) error {
if opts.ReadData && opts.ReadDataSubset != "" {
return errors.Fatalf("check flags --read-data and --read-data-subset cannot be used together")
}
if opts.ReadDataSubset != "" {
dataSubset, err := stringToIntSlice(opts.ReadDataSubset)
if err != nil || len(dataSubset) != 2 {
return errors.Fatalf("check flag --read-data-subset must have two positive integer values, e.g. --read-data-subset=1/2")
}
if dataSubset[0] == 0 || dataSubset[1] == 0 || dataSubset[0] > dataSubset[1] {
return errors.Fatalf("check flag --read-data-subset=n/t values must be positive integers, and n <= t, e.g. --read-data-subset=1/2")
}
}
return nil
}
// stringToIntSlice converts string to []uint, using '/' as element separator
func stringToIntSlice(param string) (split []uint, err error) {
if param == "" {
return nil, nil
}
parts := strings.Split(param, "/")
result := make([]uint, len(parts))
for idx, part := range parts {
uintval, err := strconv.ParseUint(part, 10, 0)
if err != nil {
return nil, err
}
result[idx] = uint(uintval)
}
return result, nil
}
func newReadProgress(gopts GlobalOptions, todo restic.Stat) *restic.Progress {
if gopts.Quiet {
return nil
@@ -158,13 +199,25 @@ func runCheck(opts CheckOptions, gopts GlobalOptions, args []string) error {
}
}
if opts.ReadData {
Verbosef("read all data\n")
doReadData := func(bucket, totalBuckets uint) {
packs := restic.IDSet{}
for pack := range chkr.GetPacks() {
if (uint(pack[0]) % totalBuckets) == (bucket - 1) {
packs.Insert(pack)
}
}
packCount := uint64(len(packs))
p := newReadProgress(gopts, restic.Stat{Blobs: chkr.CountPacks()})
if packCount < chkr.CountPacks() {
Verbosef(fmt.Sprintf("read group #%d of %d data packs (out of total %d packs in %d groups)\n", bucket, packCount, chkr.CountPacks(), totalBuckets))
} else {
Verbosef("read all data\n")
}
p := newReadProgress(gopts, restic.Stat{Blobs: packCount})
errChan := make(chan error)
go chkr.ReadData(gopts.ctx, p, errChan)
go chkr.ReadPacks(gopts.ctx, packs, p, errChan)
for err := range errChan {
errorsFound = true
@@ -172,6 +225,14 @@ func runCheck(opts CheckOptions, gopts GlobalOptions, args []string) error {
}
}
switch {
case opts.ReadData:
doReadData(1, 1)
case opts.ReadDataSubset != "":
dataSubset, _ := stringToIntSlice(opts.ReadDataSubset)
doReadData(dataSubset[0], dataSubset[1])
}
if errorsFound {
return errors.Fatal("repository contains errors")
}

View File

@@ -91,19 +91,23 @@ func unlockRepo(lock *restic.Lock) error {
globalLocks.Lock()
defer globalLocks.Unlock()
debug.Log("unlocking repository with lock %p", lock)
if err := lock.Unlock(); err != nil {
debug.Log("error while unlocking: %v", err)
return err
}
for i := 0; i < len(globalLocks.locks); i++ {
if lock == globalLocks.locks[i] {
// remove the lock from the repo
debug.Log("unlocking repository with lock %v", lock)
if err := lock.Unlock(); err != nil {
debug.Log("error while unlocking: %v", err)
return err
}
// remove the lock from the list of locks
globalLocks.locks = append(globalLocks.locks[:i], globalLocks.locks[i+1:]...)
return nil
}
}
debug.Log("unable to find lock %v in the global list of locks, ignoring", lock)
return nil
}
@@ -119,6 +123,7 @@ func unlockAll() error {
}
debug.Log("successfully removed lock")
}
globalLocks.locks = globalLocks.locks[:0]
return nil
}

View File

@@ -87,3 +87,29 @@ yield the same error:
Load indexes
ciphertext verification failed
By default, ``check`` command does not check that repository data files
are unmodified. Use ``--read-data`` parameter to check all repository
data files:
.. code-block:: console
$ restic -r /tmp/backup check --read-data
load indexes
check all packs
check snapshots, trees and blobs
read all data
Use ``--read-data-subset=n/t`` parameter to check subset of repository data
files. The parameter takes two values, ``n`` and ``t``. All repository data
files are logically devided in ``t`` roughly equal groups and only files that
belong to the group number ``n`` are checked. For example, the following
commands check all repository data files over 5 separate invocations:
.. code-block:: console
$ restic -r /tmp/backup check --read-data-subset=1/5
$ restic -r /tmp/backup check --read-data-subset=2/5
$ restic -r /tmp/backup check --read-data-subset=3/5
$ restic -r /tmp/backup check --read-data-subset=4/5
$ restic -r /tmp/backup check --read-data-subset=5/5

View File

@@ -349,6 +349,8 @@ _restic_check()
local_nonpersistent_flags+=("--help")
flags+=("--read-data")
local_nonpersistent_flags+=("--read-data")
flags+=("--read-data-subset=")
local_nonpersistent_flags+=("--read-data-subset=")
flags+=("--with-cache")
local_nonpersistent_flags+=("--with-cache")
flags+=("--cacert=")

View File

@@ -36,6 +36,10 @@ repository and not use a local cache.
\fB\-\-read\-data\fP[=false]
read all data blobs
.PP
\fB\-\-read\-data\-subset\fP=""
read subset of data packs
.PP
\fB\-\-with\-cache\fP[=false]
use the cache

View File

@@ -38,13 +38,13 @@ func randomID() restic.ID {
// forgetfulBackend returns a backend that forgets everything.
func forgetfulBackend() restic.Backend {
be := &mock.Backend{}
be := mock.NewBackend()
be.TestFn = func(ctx context.Context, h restic.Handle) (bool, error) {
return false, nil
}
be.LoadFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
be.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
return nil, errors.New("not found")
}

View File

@@ -135,16 +135,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
debug.Log("Save %v at %v", h, objName)
// Check key does not already exist
found, err := be.container.GetBlobReference(objName).Exists()
if err != nil {
return errors.Wrap(err, "GetBlobReference().Exists()")
}
if found {
debug.Log("%v already exists", h)
return errors.New("key already exists")
}
be.sem.GetToken()
// wrap the reader so that net/http client cannot close the reader, return
@@ -178,10 +168,13 @@ func (wr wrapReader) Close() error {
return err
}
// Load returns a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is
// returned. rd must be closed after use.
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
if err := h.Valid(); err != nil {
return nil, err

View File

@@ -142,9 +142,13 @@ func (be *b2Backend) IsNotExist(err error) bool {
return b2.IsNotExist(errors.Cause(err))
}
// Load returns the data stored in the backend for h at the given offset
// and saves it in p. Load has the same semantics as io.ReaderAt.
func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
if err := h.Valid(); err != nil {
return nil, err
@@ -196,12 +200,6 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) er
debug.Log("Save %v, name %v", h, name)
obj := be.bucket.Object(name)
_, err := obj.Attrs(ctx)
if err == nil {
debug.Log(" %v already exists", h)
return errors.New("key already exists")
}
w := obj.NewWriter(ctx)
n, err := io.Copy(w, rd)
debug.Log(" saved %d bytes, err %v", n, err)

View File

@@ -66,12 +66,12 @@ func (be *ErrorBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader)
// given offset. If length is larger than zero, only a portion of the file
// is returned. rd must be closed after use. If an error is returned, the
// ReadCloser must be nil.
func (be *ErrorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
func (be *ErrorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
if be.fail(be.FailLoad) {
return nil, errors.Errorf("Load(%v, %v, %v) random error induced", h, length, offset)
return errors.Errorf("Load(%v, %v, %v) random error induced", h, length, offset)
}
return be.Backend.Load(ctx, h, length, offset)
return be.Backend.Load(ctx, h, length, offset, consumer)
}
// Stat returns information about the File identified by h.

View File

@@ -88,15 +88,11 @@ func (be *RetryBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader)
// given offset. If length is larger than zero, only a portion of the file
// is returned. rd must be closed after use. If an error is returned, the
// ReadCloser must be nil.
func (be *RetryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (rd io.ReadCloser, err error) {
err = be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
func (be *RetryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) (err error) {
return be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
func() error {
var innerError error
rd, innerError = be.Backend.Load(ctx, h, length, offset)
return innerError
return be.Backend.Load(ctx, h, length, offset, consumer)
})
return rd, err
}
// Stat returns information about the File identified by h.
@@ -129,16 +125,39 @@ func (be *RetryBackend) Test(ctx context.Context, h restic.Handle) (exists bool,
return exists, err
}
// List runs fn for each file in the backend which has the type t.
// List runs fn for each file in the backend which has the type t. When an
// error is returned by the underlying backend, the request is retried. When fn
// returns an error, the operation is aborted and the error is returned to the
// caller.
func (be *RetryBackend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
listed := make(map[string]struct{})
return be.retry(ctx, fmt.Sprintf("List(%v)", t), func() error {
// create a new context that we can cancel when fn returns an error, so
// that listing is aborted
listCtx, cancel := context.WithCancel(ctx)
defer cancel()
listed := make(map[string]struct{}) // remember for which files we already ran fn
var innerErr error // remember when fn returned an error, so we can return that to the caller
err := be.retry(listCtx, fmt.Sprintf("List(%v)", t), func() error {
return be.Backend.List(ctx, t, func(fi restic.FileInfo) error {
if _, ok := listed[fi.Name]; ok {
return nil
}
listed[fi.Name] = struct{}{}
return fn(fi)
innerErr = fn(fi)
if innerErr != nil {
// if fn returned an error, listing is aborted, so we cancel the context
cancel()
}
return innerErr
})
})
// the error fn returned takes precedence
if innerErr != nil {
return innerErr
}
return err
}

View File

@@ -123,3 +123,164 @@ func TestBackendListRetry(t *testing.T) {
test.Equals(t, 2, retry) // assert retried once
test.Equals(t, []string{ID1, ID2}, listed) // assert no duplicate files
}
func TestBackendListRetryErrorFn(t *testing.T) {
var names = []string{"id1", "id2", "foo", "bar"}
be := &mock.Backend{
ListFn: func(ctx context.Context, tpe restic.FileType, fn func(restic.FileInfo) error) error {
t.Logf("List called for %v", tpe)
for _, name := range names {
err := fn(restic.FileInfo{Name: name})
if err != nil {
return err
}
}
return nil
},
}
retryBackend := RetryBackend{
Backend: be,
}
var ErrTest = errors.New("test error")
var listed []string
run := 0
err := retryBackend.List(context.TODO(), restic.DataFile, func(fi restic.FileInfo) error {
t.Logf("fn called for %v", fi.Name)
run++
// return an error for the third item in the list
if run == 3 {
t.Log("returning an error")
return ErrTest
}
listed = append(listed, fi.Name)
return nil
})
if err != ErrTest {
t.Fatalf("wrong error returned, want %v, got %v", ErrTest, err)
}
// processing should stop after the error was returned, so run should be 3
if run != 3 {
t.Fatalf("function was called %d times, wanted %v", run, 3)
}
test.Equals(t, []string{"id1", "id2"}, listed)
}
func TestBackendListRetryErrorBackend(t *testing.T) {
var names = []string{"id1", "id2", "foo", "bar"}
var ErrBackendTest = errors.New("test error")
retries := 0
be := &mock.Backend{
ListFn: func(ctx context.Context, tpe restic.FileType, fn func(restic.FileInfo) error) error {
t.Logf("List called for %v, retries %v", tpe, retries)
retries++
for i, name := range names {
if i == 2 {
return ErrBackendTest
}
err := fn(restic.FileInfo{Name: name})
if err != nil {
return err
}
}
return nil
},
}
const maxRetries = 2
retryBackend := RetryBackend{
MaxTries: maxRetries,
Backend: be,
}
var listed []string
err := retryBackend.List(context.TODO(), restic.DataFile, func(fi restic.FileInfo) error {
t.Logf("fn called for %v", fi.Name)
listed = append(listed, fi.Name)
return nil
})
if err != ErrBackendTest {
t.Fatalf("wrong error returned, want %v, got %v", ErrBackendTest, err)
}
if retries != maxRetries+1 {
t.Fatalf("List was called %d times, wanted %v", retries, maxRetries+1)
}
test.Equals(t, names[:2], listed)
}
// failingReader returns an error after reading limit number of bytes
type failingReader struct {
data []byte
pos int
limit int
}
func (r failingReader) Read(p []byte) (n int, err error) {
i := 0
for ; i < len(p) && i+r.pos < r.limit; i++ {
p[i] = r.data[r.pos+i]
}
r.pos += i
if r.pos >= r.limit {
return i, errors.Errorf("reader reached limit of %d", r.limit)
}
return i, nil
}
func (r failingReader) Close() error {
return nil
}
// closingReader adapts io.Reader to io.ReadCloser interface
type closingReader struct {
rd io.Reader
}
func (r closingReader) Read(p []byte) (n int, err error) {
return r.rd.Read(p)
}
func (r closingReader) Close() error {
return nil
}
func TestBackendLoadRetry(t *testing.T) {
data := test.Random(23, 1024)
limit := 100
attempt := 0
be := mock.NewBackend()
be.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
// returns failing reader on first invocation, good reader on subsequent invocations
attempt++
if attempt > 1 {
return closingReader{rd: bytes.NewReader(data)}, nil
}
return failingReader{data: data, limit: limit}, nil
}
retryBackend := RetryBackend{
Backend: be,
}
var buf []byte
err := retryBackend.Load(context.TODO(), restic.Handle{}, 0, 0, func(rd io.Reader) (err error) {
buf, err = ioutil.ReadAll(rd)
return err
})
test.OK(t, err)
test.Equals(t, data, buf)
test.Equals(t, 2, attempt)
}

View File

@@ -218,13 +218,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
be.sem.GetToken()
// Check key does not already exist
if _, err := be.service.Objects.Get(be.bucketName, objName).Do(); err == nil {
debug.Log("%v already exists", h)
be.sem.ReleaseToken()
return errors.New("key already exists")
}
debug.Log("InsertObject(%v, %v)", be.bucketName, objName)
// Set chunk size to zero to disable resumable uploads.
@@ -282,10 +275,13 @@ func (wr wrapReader) Close() error {
return err
}
// Load returns a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is
// returned. rd must be closed after use.
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
if err := h.Valid(); err != nil {
return nil, err

View File

@@ -146,10 +146,13 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd io.Reader) error {
return setNewFileMode(filename, backend.Modes.File)
}
// Load returns a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is
// returned. rd must be closed after use.
func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return backend.DefaultLoad(ctx, h, length, offset, b.openReader, fn)
}
func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v", h, length, offset)
if err := h.Valid(); err != nil {
return nil, err

View File

@@ -7,6 +7,7 @@ import (
"io/ioutil"
"sync"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
@@ -85,10 +86,13 @@ func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader
return nil
}
// Load returns a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is
// returned. rd must be closed after use.
func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
if err := h.Valid(); err != nil {
return nil, err
}

View File

@@ -166,10 +166,13 @@ func (b *restBackend) IsNotExist(err error) bool {
return ok
}
// Load returns a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is
// returned. rd must be closed after use.
func (b *restBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (b *restBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return backend.DefaultLoad(ctx, h, length, offset, b.openReader, fn)
}
func (b *restBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v", h, length, offset)
if err := h.Valid(); err != nil {
return nil, err

View File

@@ -235,13 +235,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
be.sem.GetToken()
defer be.sem.ReleaseToken()
// Check key does not already exist
_, err = be.client.StatObject(be.cfg.Bucket, objName, minio.StatObjectOptions{})
if err == nil {
debug.Log("%v already exists", h)
return errors.New("key already exists")
}
var size int64 = -1
type lenner interface {
@@ -281,10 +274,13 @@ func (wr wrapReader) Close() error {
return err
}
// Load returns a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is
// returned. rd must be closed after use.
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
if err := h.Valid(); err != nil {
return nil, err

View File

@@ -327,10 +327,13 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err err
return errors.Wrap(r.c.Chmod(filename, backend.Modes.File), "Chmod")
}
// Load returns a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is
// returned. rd must be closed after use.
func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn)
}
func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v", h, length, offset)
if err := h.Valid(); err != nil {
return nil, err

View File

@@ -109,10 +109,13 @@ func (be *beSwift) Location() string {
return be.container
}
// Load returns a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is
// returned. rd must be closed after use.
func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v", h, length, offset)
if err := h.Valid(); err != nil {
return nil, err
@@ -165,19 +168,6 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
be.sem.GetToken()
defer be.sem.ReleaseToken()
// Check key does not already exist
switch _, _, err = be.conn.Object(be.container, objName); err {
case nil:
debug.Log("%v already exists", h)
return errors.New("key already exists")
case swift.ObjectNotFound:
// Ok, that's what we want
default:
return errors.Wrap(err, "conn.Object")
}
encoding := "binary/octet-stream"
debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding)

View File

@@ -42,20 +42,15 @@ func (s *Suite) BenchmarkLoadFile(t *testing.B) {
t.ResetTimer()
for i := 0; i < t.N; i++ {
rd, err := be.Load(context.TODO(), handle, 0, 0)
var n int
err := be.Load(context.TODO(), handle, 0, 0, func(rd io.Reader) (ierr error) {
n, ierr = io.ReadFull(rd, buf)
return ierr
})
if err != nil {
t.Fatal(err)
}
n, err := io.ReadFull(rd, buf)
if err != nil {
t.Fatal(err)
}
if err = rd.Close(); err != nil {
t.Fatalf("Close() returned error: %v", err)
}
if n != length {
t.Fatalf("wrong number of bytes read: want %v, got %v", length, n)
}
@@ -84,20 +79,15 @@ func (s *Suite) BenchmarkLoadPartialFile(t *testing.B) {
t.ResetTimer()
for i := 0; i < t.N; i++ {
rd, err := be.Load(context.TODO(), handle, testLength, 0)
var n int
err := be.Load(context.TODO(), handle, testLength, 0, func(rd io.Reader) (ierr error) {
n, ierr = io.ReadFull(rd, buf)
return ierr
})
if err != nil {
t.Fatal(err)
}
n, err := io.ReadFull(rd, buf)
if err != nil {
t.Fatal(err)
}
if err = rd.Close(); err != nil {
t.Fatalf("Close() returned error: %v", err)
}
if n != testLength {
t.Fatalf("wrong number of bytes read: want %v, got %v", testLength, n)
}
@@ -128,20 +118,15 @@ func (s *Suite) BenchmarkLoadPartialFileOffset(t *testing.B) {
t.ResetTimer()
for i := 0; i < t.N; i++ {
rd, err := be.Load(context.TODO(), handle, testLength, int64(testOffset))
var n int
err := be.Load(context.TODO(), handle, testLength, int64(testOffset), func(rd io.Reader) (ierr error) {
n, ierr = io.ReadFull(rd, buf)
return ierr
})
if err != nil {
t.Fatal(err)
}
n, err := io.ReadFull(rd, buf)
if err != nil {
t.Fatal(err)
}
if err = rd.Close(); err != nil {
t.Fatalf("Close() returned error: %v", err)
}
if n != testLength {
t.Fatalf("wrong number of bytes read: want %v, got %v", testLength, n)
}

View File

@@ -115,13 +115,14 @@ func (s *Suite) TestLoad(t *testing.T) {
b := s.open(t)
defer s.close(t, b)
rd, err := b.Load(context.TODO(), restic.Handle{}, 0, 0)
noop := func(rd io.Reader) error {
return nil
}
err := b.Load(context.TODO(), restic.Handle{}, 0, 0, noop)
if err == nil {
t.Fatalf("Load() did not return an error for invalid handle")
}
if rd != nil {
_ = rd.Close()
}
err = testLoad(b, restic.Handle{Type: restic.DataFile, Name: "foobar"}, 0, 0)
if err == nil {
@@ -141,13 +142,19 @@ func (s *Suite) TestLoad(t *testing.T) {
t.Logf("saved %d bytes as %v", length, handle)
rd, err = b.Load(context.TODO(), handle, 100, -1)
err = b.Load(context.TODO(), handle, 100, -1, noop)
if err == nil {
t.Fatalf("Load() returned no error for negative offset!")
}
if rd != nil {
t.Fatalf("Load() returned a non-nil reader for negative offset!")
err = b.Load(context.TODO(), handle, 0, 0, func(rd io.Reader) error {
return errors.Errorf("deliberate error")
})
if err == nil {
t.Fatalf("Load() did not propagate consumer error!")
}
if err.Error() != "deliberate error" {
t.Fatalf("Load() did not correctly propagate consumer error!")
}
loadTests := 50
@@ -176,63 +183,38 @@ func (s *Suite) TestLoad(t *testing.T) {
d = d[:l]
}
rd, err := b.Load(context.TODO(), handle, getlen, int64(o))
var buf []byte
err := b.Load(context.TODO(), handle, getlen, int64(o), func(rd io.Reader) (ierr error) {
buf, ierr = ioutil.ReadAll(rd)
return ierr
})
if err != nil {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) returned unexpected error: %+v", l, o, err)
continue
}
buf, err := ioutil.ReadAll(rd)
if err != nil {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) ReadAll() returned unexpected error: %+v", l, o, err)
if err = rd.Close(); err != nil {
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
}
continue
}
if l == 0 && len(buf) != len(d) {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, len(d), len(buf))
if err = rd.Close(); err != nil {
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
}
continue
}
if l > 0 && l <= len(d) && len(buf) != l {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, l, len(buf))
if err = rd.Close(); err != nil {
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
}
continue
}
if l > len(d) && len(buf) != len(d) {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) wrong number of bytes read for overlong read: want %d, got %d", l, o, l, len(buf))
if err = rd.Close(); err != nil {
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
}
continue
}
if !bytes.Equal(buf, d) {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) returned wrong bytes", l, o)
if err = rd.Close(); err != nil {
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
}
continue
}
err = rd.Close()
if err != nil {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) rd.Close() returned unexpected error: %+v", l, o, err)
continue
}
}
@@ -647,17 +629,10 @@ func store(t testing.TB, b restic.Backend, tpe restic.FileType, data []byte) res
// testLoad loads a blob (but discards its contents).
func testLoad(b restic.Backend, h restic.Handle, length int, offset int64) error {
rd, err := b.Load(context.TODO(), h, 0, 0)
if err != nil {
return err
}
_, err = io.Copy(ioutil.Discard, rd)
cerr := rd.Close()
if err == nil {
err = cerr
}
return err
return b.Load(context.TODO(), h, 0, 0, func(rd io.Reader) (ierr error) {
_, ierr = io.Copy(ioutil.Discard, rd)
return ierr
})
}
func (s *Suite) delayedRemove(t testing.TB, be restic.Backend, handles ...restic.Handle) error {
@@ -776,31 +751,23 @@ func (s *Suite) TestBackend(t *testing.T) {
length := end - start
buf2 := make([]byte, length)
rd, err := b.Load(context.TODO(), h, len(buf2), int64(start))
var n int
err = b.Load(context.TODO(), h, len(buf2), int64(start), func(rd io.Reader) (ierr error) {
n, ierr = io.ReadFull(rd, buf2)
return ierr
})
test.OK(t, err)
n, err := io.ReadFull(rd, buf2)
test.OK(t, err)
test.Equals(t, len(buf2), n)
remaining, err := io.Copy(ioutil.Discard, rd)
test.OK(t, err)
test.Equals(t, int64(0), remaining)
test.OK(t, rd.Close())
test.Equals(t, ts.data[start:end], string(buf2))
}
// test adding the first file again
ts := testStrings[0]
// create blob
h := restic.Handle{Type: tpe, Name: ts.id}
err := b.Save(context.TODO(), h, strings.NewReader(ts.data))
test.Assert(t, err != nil, "backend has allowed overwrite of existing blob: expected error for %v, got %v", h, err)
// remove and recreate
err = s.delayedRemove(t, b, h)
err := s.delayedRemove(t, b, h)
test.OK(t, err)
// test that the blob is gone

View File

@@ -10,24 +10,11 @@ import (
// LoadAll reads all data stored in the backend for the handle.
func LoadAll(ctx context.Context, be restic.Backend, h restic.Handle) (buf []byte, err error) {
rd, err := be.Load(ctx, h, 0, 0)
if err != nil {
return nil, err
}
defer func() {
_, e := io.Copy(ioutil.Discard, rd)
if err == nil {
err = e
}
e = rd.Close()
if err == nil {
err = e
}
}()
return ioutil.ReadAll(rd)
err = be.Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) {
buf, ierr = ioutil.ReadAll(rd)
return ierr
})
return buf, err
}
// LimitedReadCloser wraps io.LimitedReader and exposes the Close() method.
@@ -46,3 +33,19 @@ func (l *LimitedReadCloser) Read(p []byte) (int, error) {
func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser {
return &LimitedReadCloser{ReadCloser: r, Reader: io.LimitReader(r, n)}
}
// DefaultLoad implements Backend.Load using lower-level openReader func
func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64,
openReader func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error),
fn func(rd io.Reader) error) error {
rd, err := openReader(ctx, h, length, offset)
if err != nil {
return err
}
err = fn(rd)
if err != nil {
rd.Close() // ignore secondary errors closing the reader
return err
}
return rd.Close()
}

View File

@@ -3,11 +3,13 @@ package backend_test
import (
"bytes"
"context"
"io"
"math/rand"
"testing"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/mem"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
@@ -89,3 +91,54 @@ func TestLoadLargeBuffer(t *testing.T) {
}
}
}
type mockReader struct {
closed bool
}
func (rd *mockReader) Read(p []byte) (n int, err error) {
return 0, nil
}
func (rd *mockReader) Close() error {
rd.closed = true
return nil
}
func TestDefaultLoad(t *testing.T) {
h := restic.Handle{Name: "id", Type: restic.DataFile}
rd := &mockReader{}
// happy case, assert correct parameters are passed around and content stream is closed
err := backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) {
rtest.Equals(t, h, ih)
rtest.Equals(t, int(10), length)
rtest.Equals(t, int64(11), offset)
return rd, nil
}, func(ird io.Reader) error {
rtest.Equals(t, rd, ird)
return nil
})
rtest.OK(t, err)
rtest.Equals(t, true, rd.closed)
// unhappy case, assert producer errors are handled correctly
err = backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) {
return nil, errors.Errorf("producer error")
}, func(ird io.Reader) error {
t.Fatalf("unexpected consumer invocation")
return nil
})
rtest.Equals(t, "producer error", err.Error())
// unhappy case, assert consumer errors are handled correctly
rd = &mockReader{}
err = backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) {
return rd, nil
}, func(ird io.Reader) error {
return errors.Errorf("consumer error")
})
rtest.Equals(t, true, rd.closed)
rtest.Equals(t, "consumer error", err.Error())
}

View File

@@ -121,17 +121,10 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
return nil
}
rd, err := b.Backend.Load(ctx, h, 0, 0)
err := b.Backend.Load(ctx, h, 0, 0, func(rd io.Reader) error {
return b.Cache.Save(h, rd)
})
if err != nil {
return err
}
if err = b.Cache.Save(h, rd); err != nil {
_ = rd.Close()
return err
}
if err = rd.Close(); err != nil {
// try to remove from the cache, ignore errors
_ = b.Cache.Remove(h)
return err
@@ -142,17 +135,22 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
// loadFromCacheOrDelegate will try to load the file from the cache, and fall
// back to the backend if that fails.
func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
rd, err := b.Cache.Load(h, length, offset)
if err == nil {
return rd, nil
if err != nil {
return b.Backend.Load(ctx, h, length, offset, consumer)
}
return b.Backend.Load(ctx, h, length, offset)
err = consumer(rd)
if err != nil {
rd.Close() // ignore secondary errors
return err
}
return rd.Close()
}
// Load loads a file from the cache or the backend.
func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
b.inProgressMutex.Lock()
waitForFinish, inProgress := b.inProgress[h]
b.inProgressMutex.Unlock()
@@ -167,7 +165,12 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
debug.Log("Load(%v, %v, %v) from cache", h, length, offset)
rd, err := b.Cache.Load(h, length, offset)
if err == nil {
return rd, nil
err = consumer(rd)
if err != nil {
rd.Close() // ignore secondary errors
return err
}
return rd.Close()
}
debug.Log("error loading %v from cache: %v", h, err)
}
@@ -179,20 +182,20 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
err := b.cacheFile(ctx, h)
if err == nil {
return b.loadFromCacheOrDelegate(ctx, h, length, offset)
return b.loadFromCacheOrDelegate(ctx, h, length, offset, consumer)
}
debug.Log("error caching %v: %v", h, err)
}
debug.Log("Load(%v, %v, %v): partial file requested, delegating to backend", h, length, offset)
return b.Backend.Load(ctx, h, length, offset)
return b.Backend.Load(ctx, h, length, offset, consumer)
}
// if we don't automatically cache this file type, fall back to the backend
if _, ok := autoCacheFiles[h.Type]; !ok {
debug.Log("Load(%v, %v, %v): delegating to backend", h, length, offset)
return b.Backend.Load(ctx, h, length, offset)
return b.Backend.Load(ctx, h, length, offset, consumer)
}
debug.Log("auto-store %v in the cache", h)
@@ -200,11 +203,20 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
if err == nil {
// load the cached version
return b.Cache.Load(h, 0, 0)
rd, err := b.Cache.Load(h, 0, 0)
if err != nil {
return err
}
err = consumer(rd)
if err != nil {
rd.Close() // ignore secondary errors
return err
}
return rd.Close()
}
debug.Log("error caching %v: %v, falling back to backend", h, err)
return b.Backend.Load(ctx, h, length, offset)
return b.Backend.Load(ctx, h, length, offset, consumer)
}
// Stat tests whether the backend has a file. If it does not exist but still

View File

@@ -2,15 +2,12 @@ package checker
import (
"context"
"crypto/sha256"
"fmt"
"io"
"os"
"sync"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/hashing"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
@@ -625,20 +622,19 @@ func (c *Checker) CountPacks() uint64 {
return uint64(len(c.packs))
}
// GetPacks returns IDSet of packs in the repository
func (c *Checker) GetPacks() restic.IDSet {
return c.packs
}
// checkPack reads a pack and checks the integrity of all blobs.
func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
debug.Log("checking pack %v", id)
h := restic.Handle{Type: restic.DataFile, Name: id.String()}
rd, err := r.Backend().Load(ctx, h, 0, 0)
packfile, hash, size, err := repository.DownloadAndHash(ctx, r, h)
if err != nil {
return err
}
packfile, err := fs.TempFile("", "restic-temp-check-")
if err != nil {
_ = rd.Close()
return errors.Wrap(err, "TempFile")
return errors.Wrap(err, "checkPack")
}
defer func() {
@@ -646,18 +642,6 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
_ = os.Remove(packfile.Name())
}()
hrd := hashing.NewReader(rd, sha256.New())
size, err := io.Copy(packfile, hrd)
if err != nil {
_ = rd.Close()
return errors.Wrap(err, "Copy")
}
if err = rd.Close(); err != nil {
return err
}
hash := restic.IDFromHash(hrd.Sum(nil))
debug.Log("hash for pack %v is %v", id, hash)
if !hash.Equal(id) {
@@ -718,6 +702,11 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
// ReadData loads all data from the repository and checks the integrity.
func (c *Checker) ReadData(ctx context.Context, p *restic.Progress, errChan chan<- error) {
c.ReadPacks(ctx, c.packs, p, errChan)
}
// ReadPacks loads data from specified packs and checks the integrity.
func (c *Checker) ReadPacks(ctx context.Context, packs restic.IDSet, p *restic.Progress, errChan chan<- error) {
defer close(errChan)
p.Start()
@@ -726,18 +715,6 @@ func (c *Checker) ReadData(ctx context.Context, p *restic.Progress, errChan chan
g, ctx := errgroup.WithContext(ctx)
ch := make(chan restic.ID)
// start producer for channel ch
g.Go(func() error {
defer close(ch)
return c.repo.List(ctx, restic.DataFile, func(id restic.ID, size int64) error {
select {
case <-ctx.Done():
case ch <- id:
}
return nil
})
})
// run workers
for i := 0; i < defaultParallelism; i++ {
g.Go(func() error {
@@ -769,6 +746,15 @@ func (c *Checker) ReadData(ctx context.Context, p *restic.Progress, errChan chan
})
}
// push packs to ch
for pack := range packs {
select {
case ch <- pack:
case <-ctx.Done():
}
}
close(ch)
err := g.Wait()
if err != nil {
select {

View File

@@ -195,20 +195,17 @@ func TestModifiedIndex(t *testing.T) {
Type: restic.IndexFile,
Name: "90f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd",
}
f, err := repo.Backend().Load(context.TODO(), h, 0, 0)
err := repo.Backend().Load(context.TODO(), h, 0, 0, func(rd io.Reader) error {
// save the index again with a modified name so that the hash doesn't match
// the content any more
h2 := restic.Handle{
Type: restic.IndexFile,
Name: "80f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd",
}
return repo.Backend().Save(context.TODO(), h2, rd)
})
test.OK(t, err)
// save the index again with a modified name so that the hash doesn't match
// the content any more
h2 := restic.Handle{
Type: restic.IndexFile,
Name: "80f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd",
}
err = repo.Backend().Save(context.TODO(), h2, f)
test.OK(t, err)
test.OK(t, f.Close())
chkr := checker.New(repo)
hints, errs := chkr.LoadIndex(context.TODO())
if len(errs) == 0 {
@@ -262,35 +259,27 @@ type errorBackend struct {
ProduceErrors bool
}
func (b errorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
rd, err := b.Backend.Load(ctx, h, length, offset)
if err != nil {
return rd, err
}
if b.ProduceErrors {
return errorReadCloser{rd}, err
}
return rd, nil
func (b errorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
return b.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error {
if b.ProduceErrors {
return consumer(errorReadCloser{rd})
}
return consumer(rd)
})
}
type errorReadCloser struct {
io.ReadCloser
io.Reader
}
func (erd errorReadCloser) Read(p []byte) (int, error) {
n, err := erd.ReadCloser.Read(p)
n, err := erd.Reader.Read(p)
if n > 0 {
induceError(p[:n])
}
return n, err
}
func (erd errorReadCloser) Close() error {
return erd.ReadCloser.Close()
}
// induceError flips a bit in the slice.
func induceError(data []byte) {
if rand.Float32() < 0.2 {

View File

@@ -25,16 +25,13 @@ func (r rateLimitedBackend) Save(ctx context.Context, h restic.Handle, rd io.Rea
return r.Backend.Save(ctx, h, r.limiter.Upstream(rd))
}
func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
rc, err := r.Backend.Load(ctx, h, length, offset)
if err != nil {
return nil, err
}
return limitedReadCloser{
original: rc,
limited: r.limiter.Downstream(rc),
}, nil
func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
return r.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error {
lrd := limitedReadCloser{
limited: r.limiter.Downstream(rd),
}
return consumer(lrd)
})
}
type limitedReadCloser struct {
@@ -47,6 +44,9 @@ func (l limitedReadCloser) Read(b []byte) (n int, err error) {
}
func (l limitedReadCloser) Close() error {
if l.original == nil {
return nil
}
return l.original.Close()
}

View File

@@ -13,7 +13,7 @@ type Backend struct {
CloseFn func() error
IsNotExistFn func(err error) bool
SaveFn func(ctx context.Context, h restic.Handle, rd io.Reader) error
LoadFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error)
OpenReaderFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error)
StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error)
ListFn func(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error
RemoveFn func(ctx context.Context, h restic.Handle) error
@@ -22,6 +22,12 @@ type Backend struct {
LocationFn func() string
}
// NewBackend returns new mock Backend instance
func NewBackend() *Backend {
be := &Backend{}
return be
}
// Close the backend.
func (m *Backend) Close() error {
if m.CloseFn == nil {
@@ -58,13 +64,27 @@ func (m *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) error
return m.SaveFn(ctx, h, rd)
}
// Load loads data from the backend.
func (m *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
if m.LoadFn == nil {
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (m *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
rd, err := m.openReader(ctx, h, length, offset)
if err != nil {
return err
}
err = fn(rd)
if err != nil {
rd.Close() // ignore secondary errors closing the reader
return err
}
return rd.Close()
}
func (m *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
if m.OpenReaderFn == nil {
return nil, errors.New("not implemented")
}
return m.LoadFn(ctx, h, length, offset)
return m.OpenReaderFn(ctx, h, length, offset)
}
// Stat an object in the backend.

View File

@@ -170,23 +170,73 @@ func (p *Packer) String() string {
return fmt.Sprintf("<Packer %d blobs, %d bytes>", len(p.blobs), p.bytes)
}
const maxHeaderSize = 16 * 1024 * 1024
var (
// size of the header-length field at the end of the file
headerLengthSize = binary.Size(uint32(0))
// we require at least one entry in the header, and one blob for a pack file
minFileSize = entrySize + crypto.Extension + uint(headerLengthSize)
)
// we require at least one entry in the header, and one blob for a pack file
var minFileSize = entrySize + crypto.Extension
const (
maxHeaderSize = 16 * 1024 * 1024
// number of header enries to download as part of header-length request
eagerEntries = 15
)
// number of header enries to download as part of header-length request
var eagerEntries = uint(15)
// readRecords reads up to max records from the underlying ReaderAt, returning
// the raw header, the total number of records in the header, and any error.
// If the header contains fewer than max entries, the header is truncated to
// the appropriate size.
func readRecords(rd io.ReaderAt, size int64, max int) ([]byte, int, error) {
var bufsize int
bufsize += max * int(entrySize)
bufsize += crypto.Extension
bufsize += headerLengthSize
if bufsize > int(size) {
bufsize = int(size)
}
b := make([]byte, bufsize)
off := size - int64(bufsize)
if _, err := rd.ReadAt(b, off); err != nil {
return nil, 0, err
}
hlen := binary.LittleEndian.Uint32(b[len(b)-headerLengthSize:])
b = b[:len(b)-headerLengthSize]
debug.Log("header length: %v", hlen)
var err error
switch {
case hlen == 0:
err = InvalidFileError{Message: "header length is zero"}
case hlen < crypto.Extension:
err = InvalidFileError{Message: "header length is too small"}
case (hlen-crypto.Extension)%uint32(entrySize) != 0:
err = InvalidFileError{Message: "header length is invalid"}
case int64(hlen) > size-int64(headerLengthSize):
err = InvalidFileError{Message: "header is larger than file"}
case int64(hlen) > maxHeaderSize:
err = InvalidFileError{Message: "header is larger than maxHeaderSize"}
}
if err != nil {
return nil, 0, errors.Wrap(err, "readHeader")
}
total := (int(hlen) - crypto.Extension) / int(entrySize)
if total < max {
// truncate to the beginning of the pack header
b = b[len(b)-int(hlen):]
}
return b, total, nil
}
// readHeader reads the header at the end of rd. size is the length of the
// whole data accessible in rd.
func readHeader(rd io.ReaderAt, size int64) ([]byte, error) {
debug.Log("size: %v", size)
if size == 0 {
err := InvalidFileError{Message: "file is empty"}
return nil, errors.Wrap(err, "readHeader")
}
if size < int64(minFileSize) {
err := InvalidFileError{Message: "file is too small"}
return nil, errors.Wrap(err, "readHeader")
@@ -196,69 +246,19 @@ func readHeader(rd io.ReaderAt, size int64) ([]byte, error) {
// eagerly download eagerEntries header entries as part of header-length request.
// only make second request if actual number of entries is greater than eagerEntries
eagerHl := uint32((eagerEntries * entrySize) + crypto.Extension)
if int64(eagerHl) > size {
eagerHl = uint32(size) - uint32(binary.Size(uint32(0)))
}
eagerBuf := make([]byte, eagerHl+uint32(binary.Size(uint32(0))))
n, err := rd.ReadAt(eagerBuf, size-int64(len(eagerBuf)))
b, c, err := readRecords(rd, size, eagerEntries)
if err != nil {
return nil, err
}
if n != len(eagerBuf) {
return nil, errors.New("not enough bytes read")
if c <= eagerEntries {
// eager read sufficed, return what we got
return b, nil
}
hl := binary.LittleEndian.Uint32(eagerBuf[eagerHl:])
debug.Log("header length: %v", size)
if hl == 0 {
err := InvalidFileError{Message: "header length is zero"}
return nil, errors.Wrap(err, "readHeader")
b, _, err = readRecords(rd, size, c)
if err != nil {
return nil, err
}
if hl < crypto.Extension {
err := InvalidFileError{Message: "header length is too small"}
return nil, errors.Wrap(err, "readHeader")
}
if (hl-crypto.Extension)%uint32(entrySize) != 0 {
err := InvalidFileError{Message: "header length is invalid"}
return nil, errors.Wrap(err, "readHeader")
}
if int64(hl) > size-int64(binary.Size(hl)) {
err := InvalidFileError{Message: "header is larger than file"}
return nil, errors.Wrap(err, "readHeader")
}
if int64(hl) > maxHeaderSize {
err := InvalidFileError{Message: "header is larger than maxHeaderSize"}
return nil, errors.Wrap(err, "readHeader")
}
eagerBuf = eagerBuf[:eagerHl]
var buf []byte
if hl <= eagerHl {
// already have all header bytes. yay.
buf = eagerBuf[eagerHl-hl:]
} else {
// need more header bytes
buf = make([]byte, hl)
missingHl := hl - eagerHl
n, err := rd.ReadAt(buf[:missingHl], size-int64(hl)-int64(binary.Size(hl)))
if err != nil {
return nil, errors.Wrap(err, "ReadAt")
}
if uint32(n) != missingHl {
return nil, errors.New("not enough bytes read")
}
copy(buf[hl-eagerHl:], eagerBuf)
}
return buf, nil
return b, nil
}
// InvalidFileError is return when a file is found that is not a pack file.

View File

@@ -22,11 +22,11 @@ func (rd *countingReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
func TestReadHeaderEagerLoad(t *testing.T) {
testReadHeader := func(entryCount uint, expectedReadInvocationCount int) {
expectedHeader := rtest.Random(0, int(entryCount*entrySize)+crypto.Extension)
testReadHeader := func(dataSize, entryCount, expectedReadInvocationCount int) {
expectedHeader := rtest.Random(0, entryCount*int(entrySize)+crypto.Extension)
buf := &bytes.Buffer{}
buf.Write(rtest.Random(0, 100)) // pack blobs data
buf.Write(rtest.Random(0, dataSize)) // pack blobs data
buf.Write(expectedHeader) // pack header
binary.Write(buf, binary.LittleEndian, uint32(len(expectedHeader))) // pack header length
@@ -39,8 +39,72 @@ func TestReadHeaderEagerLoad(t *testing.T) {
rtest.Equals(t, expectedReadInvocationCount, rd.invocationCount)
}
testReadHeader(1, 1)
testReadHeader(eagerEntries-1, 1)
testReadHeader(eagerEntries, 1)
testReadHeader(eagerEntries+1, 2)
// basic
testReadHeader(100, 1, 1)
// header entries == eager entries
testReadHeader(100, eagerEntries-1, 1)
testReadHeader(100, eagerEntries, 1)
testReadHeader(100, eagerEntries+1, 2)
// file size == eager header load size
eagerLoadSize := int((eagerEntries * entrySize) + crypto.Extension)
headerSize := int(1*entrySize) + crypto.Extension
dataSize := eagerLoadSize - headerSize - binary.Size(uint32(0))
testReadHeader(dataSize-1, 1, 1)
testReadHeader(dataSize, 1, 1)
testReadHeader(dataSize+1, 1, 1)
testReadHeader(dataSize+2, 1, 1)
testReadHeader(dataSize+3, 1, 1)
testReadHeader(dataSize+4, 1, 1)
}
func TestReadRecords(t *testing.T) {
testReadRecords := func(dataSize, entryCount, totalRecords int) {
totalHeader := rtest.Random(0, totalRecords*int(entrySize)+crypto.Extension)
off := len(totalHeader) - (entryCount*int(entrySize) + crypto.Extension)
if off < 0 {
off = 0
}
expectedHeader := totalHeader[off:]
buf := &bytes.Buffer{}
buf.Write(rtest.Random(0, dataSize)) // pack blobs data
buf.Write(totalHeader) // pack header
binary.Write(buf, binary.LittleEndian, uint32(len(totalHeader))) // pack header length
rd := bytes.NewReader(buf.Bytes())
header, count, err := readRecords(rd, int64(rd.Len()), entryCount)
rtest.OK(t, err)
rtest.Equals(t, expectedHeader, header)
rtest.Equals(t, totalRecords, count)
}
// basic
testReadRecords(100, 1, 1)
testReadRecords(100, 0, 1)
testReadRecords(100, 1, 0)
// header entries ~ eager entries
testReadRecords(100, eagerEntries, eagerEntries-1)
testReadRecords(100, eagerEntries, eagerEntries)
testReadRecords(100, eagerEntries, eagerEntries+1)
// file size == eager header load size
eagerLoadSize := int((eagerEntries * entrySize) + crypto.Extension)
headerSize := int(1*entrySize) + crypto.Extension
dataSize := eagerLoadSize - headerSize - binary.Size(uint32(0))
testReadRecords(dataSize-1, 1, 1)
testReadRecords(dataSize, 1, 1)
testReadRecords(dataSize+1, 1, 1)
testReadRecords(dataSize+2, 1, 1)
testReadRecords(dataSize+3, 1, 1)
testReadRecords(dataSize+4, 1, 1)
for i := 0; i < 2; i++ {
for j := 0; j < 2; j++ {
testReadRecords(dataSize, i, j)
}
}
}

View File

@@ -2,14 +2,11 @@ package repository
import (
"context"
"crypto/sha256"
"fmt"
"io"
"os"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/hashing"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/restic"
@@ -27,28 +24,11 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee
// load the complete pack into a temp file
h := restic.Handle{Type: restic.DataFile, Name: packID.String()}
tempfile, err := fs.TempFile("", "restic-temp-repack-")
tempfile, hash, packLength, err := DownloadAndHash(ctx, repo, h)
if err != nil {
return nil, errors.Wrap(err, "TempFile")
return nil, errors.Wrap(err, "Repack")
}
beRd, err := repo.Backend().Load(ctx, h, 0, 0)
if err != nil {
return nil, err
}
hrd := hashing.NewReader(beRd, sha256.New())
packLength, err := io.Copy(tempfile, hrd)
if err != nil {
_ = beRd.Close()
return nil, errors.Wrap(err, "Copy")
}
if err = beRd.Close(); err != nil {
return nil, errors.Wrap(err, "Close")
}
hash := restic.IDFromHash(hrd.Sum(nil))
debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash)
if !packID.Equal(hash) {

View File

@@ -3,12 +3,16 @@ package repository
import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"os"
"github.com/restic/restic/internal/cache"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/hashing"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/backend"
@@ -542,6 +546,7 @@ func (r *Repository) List(ctx context.Context, t restic.FileType, fn func(restic
id, err := restic.ParseID(fi.Name)
if err != nil {
debug.Log("unable to parse %v as an ID", fi.Name)
return nil
}
return fn(id, fi.Size)
})
@@ -654,3 +659,36 @@ func (r *Repository) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, e
_, err = r.SaveBlob(ctx, restic.TreeBlob, buf, id)
return id, err
}
// DownloadAndHash is all-in-one helper to download content of the file at h to a temporary filesystem location
// and calculate ID of the contents. Returned (temporary) file is positioned at the beginning of the file;
// it is reponsibility of the caller to close and delete the file.
func DownloadAndHash(ctx context.Context, repo restic.Repository, h restic.Handle) (tmpfile *os.File, hash restic.ID, size int64, err error) {
tmpfile, err = fs.TempFile("", "restic-temp-")
if err != nil {
return nil, restic.ID{}, -1, errors.Wrap(err, "TempFile")
}
err = repo.Backend().Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) {
_, ierr = tmpfile.Seek(0, io.SeekStart)
if ierr == nil {
ierr = tmpfile.Truncate(0)
}
if ierr != nil {
return ierr
}
hrd := hashing.NewReader(rd, sha256.New())
size, ierr = io.Copy(tmpfile, hrd)
hash = restic.IDFromHash(hrd.Sum(nil))
return ierr
})
_, err = tmpfile.Seek(0, io.SeekStart)
if err != nil {
tmpfile.Close()
os.Remove(tmpfile.Name())
return nil, restic.ID{}, -1, errors.Wrap(err, "Seek")
}
return tmpfile, hash, size, err
}

View File

@@ -23,11 +23,15 @@ type Backend interface {
// Save stores the data in the backend under the given handle.
Save(ctx context.Context, h Handle, rd io.Reader) error
// Load returns a reader that yields the contents of the file at h at the
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset. If length is larger than zero, only a portion of the file
// is returned. rd must be closed after use. If an error is returned, the
// ReadCloser must be nil.
Load(ctx context.Context, h Handle, length int, offset int64) (io.ReadCloser, error)
// is read.
//
// The function fn may be called multiple times during the same Load invocation
// and therefore must be idempotent.
//
// Implementations are encouraged to use backend.DefaultLoad
Load(ctx context.Context, h Handle, length int, offset int64, fn func(rd io.Reader) error) error
// Stat returns information about the File identified by h.
Stat(ctx context.Context, h Handle) (FileInfo, error)

View File

@@ -44,7 +44,11 @@ type ErrAlreadyLocked struct {
}
func (e ErrAlreadyLocked) Error() string {
return fmt.Sprintf("repository is already locked by %v", e.otherLock)
s := ""
if e.otherLock.Exclusive {
s = "exclusively "
}
return fmt.Sprintf("repository is already locked %sby %v", s, e.otherLock)
}
// IsAlreadyLocked returns true iff err is an instance of ErrAlreadyLocked.

View File

@@ -209,8 +209,11 @@ func TestNodeRestoreAt(t *testing.T) {
rtest.Assert(t, test.GID == n2.GID,
"%v: GID doesn't match (%v != %v)", test.Type, test.GID, n2.GID)
if test.Type != "symlink" {
rtest.Assert(t, test.Mode == n2.Mode,
"%v: mode doesn't match (0%o != 0%o)", test.Type, test.Mode, n2.Mode)
// On OpenBSD only root can set sticky bit (see sticky(8)).
if runtime.GOOS != "openbsd" && test.Name == "testSticky" {
rtest.Assert(t, test.Mode == n2.Mode,
"%v: mode doesn't match (0%o != 0%o)", test.Type, test.Mode, n2.Mode)
}
}
}

View File

@@ -25,17 +25,16 @@ func ReaderAt(be Backend, h Handle) io.ReaderAt {
// ReadAt reads from the backend handle h at the given position.
func ReadAt(ctx context.Context, be Backend, h Handle, offset int64, p []byte) (n int, err error) {
debug.Log("ReadAt(%v) at %v, len %v", h, offset, len(p))
rd, err := be.Load(ctx, h, len(p), offset)
err = be.Load(ctx, h, len(p), offset, func(rd io.Reader) (ierr error) {
n, ierr = io.ReadFull(rd, p)
return ierr
})
if err != nil {
return 0, err
}
n, err = io.ReadFull(rd, p)
e := rd.Close()
if err == nil {
err = e
}
debug.Log("ReadAt(%v) ReadFull returned %v bytes", h, n)
return n, errors.Wrapf(err, "ReadFull(%v)", h)

View File

@@ -45,6 +45,7 @@ type Client struct {
slock sync.Mutex
sWriters map[string]*Writer
sReaders map[string]*Reader
sMethods map[string]int
}
// NewClient creates and returns a new Client with valid B2 service account
@@ -54,7 +55,9 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (
backend: &beRoot{
b2i: &b2Root{},
},
sMethods: make(map[string]int),
}
opts = append(opts, client(c))
if err := c.backend.authorizeAccount(ctx, account, key, opts...); err != nil {
return nil, err
}
@@ -62,6 +65,7 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (
}
type clientOptions struct {
client *Client
transport http.RoundTripper
failSomeUploads bool
expireTokens bool
@@ -115,12 +119,38 @@ func ForceCapExceeded() ClientOption {
}
}
func client(cl *Client) ClientOption {
return func(c *clientOptions) {
c.client = cl
}
}
type clientTransport struct {
client *Client
rt http.RoundTripper
}
func (ct *clientTransport) RoundTrip(r *http.Request) (*http.Response, error) {
method := r.Header.Get("X-Blazer-Method")
if method != "" && ct.client != nil {
ct.client.slock.Lock()
ct.client.sMethods[method]++
ct.client.slock.Unlock()
}
t := ct.rt
if t == nil {
t = http.DefaultTransport
}
return t.RoundTrip(r)
}
// Bucket is a reference to a B2 bucket.
type Bucket struct {
b beBucketInterface
r beRootInterface
c *Client
c *Client
urlPool *urlPool
}
type BucketType string
@@ -188,6 +218,36 @@ func IsNotExist(err error) bool {
return berr.notFoundErr
}
const uploadURLPoolSize = 100
type urlPool struct {
ch chan beURLInterface
}
func newURLPool() *urlPool {
return &urlPool{ch: make(chan beURLInterface, uploadURLPoolSize)}
}
func (p *urlPool) get() beURLInterface {
select {
case ue := <-p.ch:
// if the channel has an upload URL available, use that
return ue
default:
// otherwise return nil, a new upload URL needs to be generated
return nil
}
}
func (p *urlPool) put(u beURLInterface) {
select {
case p.ch <- u:
// put the URL back if possible
default:
// if the channel is full, throw it away
}
}
// Bucket returns a bucket if it exists.
func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) {
buckets, err := c.backend.listBuckets(ctx)
@@ -197,9 +257,10 @@ func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) {
for _, bucket := range buckets {
if bucket.name() == name {
return &Bucket{
b: bucket,
r: c.backend,
c: c,
b: bucket,
r: c.backend,
c: c,
urlPool: newURLPool(),
}, nil
}
}
@@ -220,9 +281,10 @@ func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs)
for _, bucket := range buckets {
if bucket.name() == name {
return &Bucket{
b: bucket,
r: c.backend,
c: c,
b: bucket,
r: c.backend,
c: c,
urlPool: newURLPool(),
}, nil
}
}
@@ -234,13 +296,14 @@ func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs)
return nil, err
}
return &Bucket{
b: b,
r: c.backend,
c: c,
b: b,
r: c.backend,
c: c,
urlPool: newURLPool(),
}, err
}
// ListBucket returns all the available buckets.
// ListBuckets returns all the available buckets.
func (c *Client) ListBuckets(ctx context.Context) ([]*Bucket, error) {
bs, err := c.backend.listBuckets(ctx)
if err != nil {
@@ -249,9 +312,10 @@ func (c *Client) ListBuckets(ctx context.Context) ([]*Bucket, error) {
var buckets []*Bucket
for _, b := range bs {
buckets = append(buckets, &Bucket{
b: b,
r: c.backend,
c: c,
b: b,
r: c.backend,
c: c,
urlPool: newURLPool(),
})
}
return buckets, nil
@@ -565,6 +629,37 @@ func (b *Bucket) ListCurrentObjects(ctx context.Context, count int, c *Cursor) (
return objects, next, rtnErr
}
// ListUnfinishedLargeFiles lists any objects that correspond to large file uploads that haven't been completed.
// This can happen for example when an upload is interrupted.
func (b *Bucket) ListUnfinishedLargeFiles(ctx context.Context, count int, c *Cursor) ([]*Object, *Cursor, error) {
if c == nil {
c = &Cursor{}
}
fs, name, err := b.b.listUnfinishedLargeFiles(ctx, count, c.name)
if err != nil {
return nil, nil, err
}
var next *Cursor
if name != "" {
next = &Cursor{
name: name,
}
}
var objects []*Object
for _, f := range fs {
objects = append(objects, &Object{
name: f.name(),
f: f,
b: b,
})
}
var rtnErr error
if len(objects) == 0 || next == nil {
rtnErr = io.EOF
}
return objects, next, rtnErr
}
// Hide hides the object from name-based listing.
func (o *Object) Hide(ctx context.Context) error {
if err := o.ensure(ctx); err != nil {

View File

@@ -198,6 +198,10 @@ func (t *testBucket) listFileVersions(ctx context.Context, count int, a, b, c, d
return x, y, "", z
}
func (t *testBucket) listUnfinishedLargeFiles(ctx context.Context, count int, cont string) ([]b2FileInterface, string, error) {
return nil, "", fmt.Errorf("testBucket.listUnfinishedLargeFiles(ctx, %d, %q): not implemented", count, cont)
}
func (t *testBucket) downloadFileByName(_ context.Context, name string, offset, size int64) (b2FileReaderInterface, error) {
gmux.Lock()
defer gmux.Unlock()

View File

@@ -49,6 +49,7 @@ type beBucketInterface interface {
startLargeFile(ctx context.Context, name, contentType string, info map[string]string) (beLargeFileInterface, error)
listFileNames(context.Context, int, string, string, string) ([]beFileInterface, string, error)
listFileVersions(context.Context, int, string, string, string, string) ([]beFileInterface, string, string, error)
listUnfinishedLargeFiles(context.Context, int, string) ([]beFileInterface, string, error)
downloadFileByName(context.Context, string, int64, int64) (beFileReaderInterface, error)
hideFile(context.Context, string) (beFileInterface, error)
getDownloadAuthorization(context.Context, string, time.Duration) (string, error)
@@ -339,6 +340,32 @@ func (b *beBucket) listFileVersions(ctx context.Context, count int, nextName, ne
return files, name, id, nil
}
func (b *beBucket) listUnfinishedLargeFiles(ctx context.Context, count int, continuation string) ([]beFileInterface, string, error) {
var cont string
var files []beFileInterface
f := func() error {
g := func() error {
fs, c, err := b.b2bucket.listUnfinishedLargeFiles(ctx, count, continuation)
if err != nil {
return err
}
cont = c
for _, f := range fs {
files = append(files, &beFile{
b2file: f,
ri: b.ri,
})
}
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, "", err
}
return files, cont, nil
}
func (b *beBucket) downloadFileByName(ctx context.Context, name string, offset, size int64) (beFileReaderInterface, error) {
var reader beFileReaderInterface
f := func() error {

View File

@@ -46,6 +46,7 @@ type b2BucketInterface interface {
startLargeFile(ctx context.Context, name, contentType string, info map[string]string) (b2LargeFileInterface, error)
listFileNames(context.Context, int, string, string, string) ([]b2FileInterface, string, error)
listFileVersions(context.Context, int, string, string, string, string) ([]b2FileInterface, string, string, error)
listUnfinishedLargeFiles(context.Context, int, string) ([]b2FileInterface, string, error)
downloadFileByName(context.Context, string, int64, int64) (b2FileReaderInterface, error)
hideFile(context.Context, string) (b2FileInterface, error)
getDownloadAuthorization(context.Context, string, time.Duration) (string, error)
@@ -137,9 +138,11 @@ func (b *b2Root) authorizeAccount(ctx context.Context, account, key string, opts
f(c)
}
var aopts []base.AuthOption
ct := &clientTransport{client: c.client}
if c.transport != nil {
aopts = append(aopts, base.Transport(c.transport))
ct.rt = c.transport
}
aopts = append(aopts, base.Transport(ct))
if c.failSomeUploads {
aopts = append(aopts, base.FailSomeUploads())
}
@@ -314,6 +317,18 @@ func (b *b2Bucket) listFileVersions(ctx context.Context, count int, nextName, ne
return files, name, id, nil
}
func (b *b2Bucket) listUnfinishedLargeFiles(ctx context.Context, count int, continuation string) ([]b2FileInterface, string, error) {
fs, cont, err := b.b.ListUnfinishedLargeFiles(ctx, count, continuation)
if err != nil {
return nil, "", err
}
var files []b2FileInterface
for _, f := range fs {
files = append(files, &b2File{f})
}
return files, cont, nil
}
func (b *b2Bucket) downloadFileByName(ctx context.Context, name string, offset, size int64) (b2FileReaderInterface, error) {
fr, err := b.b.DownloadFileByName(ctx, name, offset, size)
if err != nil {

View File

@@ -17,7 +17,9 @@ package b2
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha1"
"encoding/hex"
"fmt"
"io"
"net/http"
@@ -786,6 +788,62 @@ func TestAttrsNoRoundtrip(t *testing.T) {
}
}
/*func TestAttrsFewRoundtrips(t *testing.T) {
rt := &rtCounter{rt: defaultTransport}
defaultTransport = rt
defer func() {
defaultTransport = rt.rt
}()
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
bucket, done := startLiveTest(ctx, t)
defer done()
_, _, err := writeFile(ctx, bucket, smallFileName, 42, 1e8)
if err != nil {
t.Fatal(err)
}
o := bucket.Object(smallFileName)
trips := rt.trips
attrs, err := o.Attrs(ctx)
if err != nil {
t.Fatal(err)
}
if attrs.Name != smallFileName {
t.Errorf("got the wrong object: got %q, want %q", attrs.Name, smallFileName)
}
if trips != rt.trips {
t.Errorf("Attrs(): too many round trips, got %d, want 1", rt.trips-trips)
}
}*/
func TestSmallUploadsFewRoundtrips(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
bucket, done := startLiveTest(ctx, t)
defer done()
for i := 0; i < 10; i++ {
_, _, err := writeFile(ctx, bucket, fmt.Sprintf("%s.%d", smallFileName, i), 42, 1e8)
if err != nil {
t.Fatal(err)
}
}
si := bucket.c.Status()
getURL := si.MethodCalls["b2_get_upload_url"]
uploadFile := si.MethodCalls["b2_upload_file"]
if getURL >= uploadFile {
t.Errorf("too many calls to b2_get_upload_url")
}
}
func TestDeleteWithoutName(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
@@ -804,6 +862,26 @@ func TestDeleteWithoutName(t *testing.T) {
}
}
func TestListUnfinishedLargeFiles(t *testing.T) {
ctx := context.Background()
bucket, done := startLiveTest(ctx, t)
defer done()
w := bucket.Object(largeFileName).NewWriter(ctx)
w.ChunkSize = 1e5
if _, err := io.Copy(w, io.LimitReader(zReader{}, 1e6)); err != nil {
t.Fatal(err)
}
// Don't close the writer.
fs, _, err := bucket.ListUnfinishedLargeFiles(ctx, 10, nil)
if err != io.EOF && err != nil {
t.Fatal(err)
}
if len(fs) != 1 {
t.Errorf("ListUnfinishedLargeFiles: got %d, want 1", len(fs))
}
}
type object struct {
o *Object
err error
@@ -914,6 +992,16 @@ func (cc *ccRC) Close() error {
return cc.ReadCloser.Close()
}
var uniq string
func init() {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
panic(err)
}
uniq = hex.EncodeToString(b)
}
func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) {
id := os.Getenv(apiID)
key := os.Getenv(apiKey)
@@ -929,7 +1017,7 @@ func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) {
t.Fatal(err)
return nil, nil
}
bucket, err := client.NewBucket(ctx, id+"-"+bucketName, nil)
bucket, err := client.NewBucket(ctx, fmt.Sprintf("%s-%s-%s", id, bucketName, uniq), nil)
if err != nil {
t.Fatal(err)
return nil, nil

View File

@@ -18,8 +18,9 @@ import "fmt"
// StatusInfo reports information about a client.
type StatusInfo struct {
Writers map[string]*WriterStatus
Readers map[string]*ReaderStatus
Writers map[string]*WriterStatus
Readers map[string]*ReaderStatus
MethodCalls map[string]int
}
// WriterStatus reports the status for each writer.
@@ -42,8 +43,9 @@ func (c *Client) Status() *StatusInfo {
defer c.slock.Unlock()
si := &StatusInfo{
Writers: make(map[string]*WriterStatus),
Readers: make(map[string]*ReaderStatus),
Writers: make(map[string]*WriterStatus),
Readers: make(map[string]*ReaderStatus),
MethodCalls: make(map[string]int),
}
for name, w := range c.sWriters {
@@ -54,6 +56,10 @@ func (c *Client) Status() *StatusInfo {
si.Readers[name] = r.status()
}
for name, n := range c.sMethods {
si.MethodCalls[name] = n
}
return si
}

View File

@@ -33,7 +33,7 @@ func (r *readerAt) ReadAt(p []byte, off int64) (int, error) {
if err != nil {
return 0, err
}
defer func() { r.rs.Seek(cur, io.SeekStart) }()
defer r.rs.Seek(cur, io.SeekStart)
if _, err := r.rs.Seek(off, io.SeekStart); err != nil {
return 0, err

View File

@@ -144,7 +144,7 @@ func (w *Writer) thread() {
}
if sha, ok := w.seen[chunk.id]; ok {
if sha != chunk.buf.Hash() {
w.setErr(errors.New("resumable upload was requested, but chunks don't match!"))
w.setErr(errors.New("resumable upload was requested, but chunks don't match"))
return
}
chunk.buf.Close()
@@ -245,11 +245,23 @@ func (w *Writer) Write(p []byte) (int, error) {
return i + k, err
}
func (w *Writer) getUploadURL(ctx context.Context) (beURLInterface, error) {
u := w.o.b.urlPool.get()
if u == nil {
return w.o.b.b.getUploadURL(w.ctx)
}
return u, nil
}
func (w *Writer) simpleWriteFile() error {
ue, err := w.o.b.b.getUploadURL(w.ctx)
ue, err := w.getUploadURL(w.ctx)
if err != nil {
return err
}
// This defer needs to be in a func() so that we put whatever the value of ue
// is at function exit.
defer func() { w.o.b.urlPool.put(ue) }()
sha1 := w.w.Hash()
ctype := w.contentType
if ctype == "" {

View File

@@ -18,7 +18,6 @@
// It currently lacks support for the following APIs:
//
// b2_download_file_by_id
// b2_list_unfinished_large_files
package base
import (
@@ -43,7 +42,7 @@ import (
const (
APIBase = "https://api.backblazeb2.com"
DefaultUserAgent = "blazer/0.2.1"
DefaultUserAgent = "blazer/0.3.0"
)
type b2err struct {
@@ -69,17 +68,15 @@ func Action(err error) ErrAction {
if e.retry > 0 {
return Retry
}
if e.code >= 500 && e.code < 600 {
if e.method == "b2_upload_file" || e.method == "b2_upload_part" {
return AttemptNewUpload
}
if e.code >= 500 && e.code < 600 && (e.method == "b2_upload_file" || e.method == "b2_upload_part") {
return AttemptNewUpload
}
switch e.code {
case 401:
if e.method == "b2_authorize_account" {
switch e.method {
case "b2_authorize_account":
return Punt
}
if e.method == "b2_upload_file" || e.method == "b2_upload_part" {
case "b2_upload_file", "b2_upload_part":
return AttemptNewUpload
}
return ReAuthenticate
@@ -698,9 +695,9 @@ func (b *Bucket) File(id, name string) *File {
}
// UploadFile wraps b2_upload_file.
func (u *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, contentType, sha1 string, info map[string]string) (*File, error) {
func (url *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, contentType, sha1 string, info map[string]string) (*File, error) {
headers := map[string]string{
"Authorization": u.token,
"Authorization": url.token,
"X-Bz-File-Name": name,
"Content-Type": contentType,
"Content-Length": fmt.Sprintf("%d", size),
@@ -710,7 +707,7 @@ func (u *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, conte
headers[fmt.Sprintf("X-Bz-Info-%s", k)] = v
}
b2resp := &b2types.UploadFileResponse{}
if err := u.b2.opts.makeRequest(ctx, "b2_upload_file", "POST", u.uri, nil, b2resp, headers, &requestBody{body: r, size: int64(size)}); err != nil {
if err := url.b2.opts.makeRequest(ctx, "b2_upload_file", "POST", url.uri, nil, b2resp, headers, &requestBody{body: r, size: int64(size)}); err != nil {
return nil, err
}
return &File{
@@ -719,7 +716,7 @@ func (u *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, conte
Timestamp: millitime(b2resp.Timestamp),
Status: b2resp.Action,
id: b2resp.FileID,
b2: u.b2,
b2: url.b2,
}, nil
}
@@ -906,6 +903,9 @@ func (l *LargeFile) FinishLargeFile(ctx context.Context) (*File, error) {
}
b2resp := &b2types.FinishLargeFileResponse{}
for k, v := range l.hashes {
if len(b2req.Hashes) < k {
return nil, fmt.Errorf("b2_finish_large_file: invalid index %d", k)
}
b2req.Hashes[k-1] = v
}
headers := map[string]string{
@@ -924,6 +924,39 @@ func (l *LargeFile) FinishLargeFile(ctx context.Context) (*File, error) {
}, nil
}
// ListUnfinishedLargeFiles wraps b2_list_unfinished_large_files.
func (b *Bucket) ListUnfinishedLargeFiles(ctx context.Context, count int, continuation string) ([]*File, string, error) {
b2req := &b2types.ListUnfinishedLargeFilesRequest{
BucketID: b.id,
Continuation: continuation,
Count: count,
}
b2resp := &b2types.ListUnfinishedLargeFilesResponse{}
headers := map[string]string{
"Authorization": b.b2.authToken,
}
if err := b.b2.opts.makeRequest(ctx, "b2_list_unfinished_large_files", "POST", b.b2.apiURI+b2types.V1api+"b2_list_unfinished_large_files", b2req, b2resp, headers, nil); err != nil {
return nil, "", err
}
cont := b2resp.Continuation
var files []*File
for _, f := range b2resp.Files {
files = append(files, &File{
Name: f.Name,
Timestamp: millitime(f.Timestamp),
b2: b.b2,
id: f.FileID,
Info: &FileInfo{
Name: f.Name,
ContentType: f.ContentType,
Info: f.Info,
Timestamp: millitime(f.Timestamp),
},
})
}
return files, cont, nil
}
// ListFileNames wraps b2_list_file_names.
func (b *Bucket) ListFileNames(ctx context.Context, count int, continuation, prefix, delimiter string) ([]*File, string, error) {
b2req := &b2types.ListFileNamesRequest{

View File

@@ -156,8 +156,8 @@ func TestStorage(t *testing.T) {
// b2_start_large_file
largeInfoMap := map[string]string{
"one_BILLION": "1e9",
"two_TRILLION": "2eSomething, I guess 2e12",
"one_billion": "1e9",
"two_trillion": "2eSomething, I guess 2e12",
}
lf, err := bucket.StartLargeFile(ctx, largeFileName, "application/octet-stream", largeInfoMap)
if err != nil {

View File

@@ -227,3 +227,14 @@ type GetDownloadAuthorizationResponse struct {
Prefix string `json:"fileNamePrefix"`
Token string `json:"authorizationToken"`
}
type ListUnfinishedLargeFilesRequest struct {
BucketID string `json:"bucketId"`
Continuation string `json:"startFileId,omitempty"`
Count int `json:"maxFileCount,omitempty"`
}
type ListUnfinishedLargeFilesResponse struct {
Files []GetFileInfoResponse `json:"files"`
Continuation string `json:"nextFileId"`
}