Merge pull request #21862 from MichaelEischer/lock-cleanup

Move Lock type to repository package and clean it up a bit
This commit is contained in:
Michael Eischer
2026-06-13 21:46:26 +02:00
committed by GitHub
13 changed files with 268 additions and 289 deletions
+1 -1
View File
@@ -142,7 +142,7 @@ func runCat(ctx context.Context, gopts global.Options, args []string, term ui.Te
printer.S(string(buf))
return nil
case "lock":
lock, err := restic.LoadLock(ctx, repo, id)
lock, err := repository.LoadLock(ctx, repo, id)
if err != nil {
return err
}
+1 -1
View File
@@ -16,7 +16,7 @@ import (
func testRunInit(t testing.TB, gopts global.Options) {
repository.TestUseLowSecurityKDFParameters(t)
restic.TestDisableCheckPolynomial(t)
restic.TestSetLockTimeout(t, 0)
repository.TestSetLockTimeout(t, 0)
err := withTermStatus(t, gopts, func(ctx context.Context, gopts global.Options) error {
return runInit(ctx, InitOptions{}, gopts, nil, gopts.Term)
+1 -1
View File
@@ -18,7 +18,7 @@ func internalOpenWithLocked(ctx context.Context, gopts global.Options, dryRun bo
if !dryRun {
var lock repository.Unlocker
lock, ctx, err = repository.Lock(ctx, repo, exclusive, gopts.RetryLock, func(msg string) {
lock, ctx, err = repository.LockRepo(ctx, repo, exclusive, gopts.RetryLock, func(msg string) {
if !gopts.JSON {
printer.P("%s", msg)
}
+2 -3
View File
@@ -20,7 +20,6 @@ import (
"github.com/restic/restic/internal/feature"
"github.com/restic/restic/internal/global"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/termstatus"
)
@@ -198,7 +197,7 @@ func main() {
var exitMessage string
switch {
case restic.IsAlreadyLocked(err):
case repository.IsAlreadyLocked(err):
exitMessage = fmt.Sprintf("%v\nthe `unlock` command can be used to remove stale locks", err)
case err == ErrInvalidSourceData:
exitMessage = fmt.Sprintf("Warning: %v", err)
@@ -228,7 +227,7 @@ func main() {
exitCode = 3
case errors.Is(err, global.ErrNoRepository):
exitCode = 10
case restic.IsAlreadyLocked(err):
case repository.IsAlreadyLocked(err):
exitCode = 11
case errors.Is(err, repository.ErrNoKeyFound):
exitCode = 12
+43 -45
View File
@@ -13,12 +13,23 @@ import (
"github.com/restic/restic/internal/restic"
)
type lockContext struct {
lock *restic.Lock
type Unlocker interface {
Unlock()
}
type unlocker struct {
lock *lockHandle
cancel context.CancelFunc
refreshWG sync.WaitGroup
}
func (l *unlocker) Unlock() {
l.cancel()
l.refreshWG.Wait()
}
var _ Unlocker = &unlocker{}
type locker struct {
retrySleepStart time.Duration
retrySleepMax time.Duration
@@ -34,17 +45,17 @@ var lockerInst = &locker{
refreshInterval: defaultRefreshInterval,
// consider a lock refresh failed a bit before the lock actually becomes stale
// the difference allows to compensate for a small time drift between clients.
refreshabilityTimeout: restic.StaleLockTimeout - defaultRefreshInterval*3/2,
refreshabilityTimeout: staleLockTimeout - defaultRefreshInterval*3/2,
}
func Lock(ctx context.Context, repo *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (Unlocker, context.Context, error) {
// LockRepo acquires a repository lock. The returned context is cancelled when
// Unlock is called; cancelling the original context stops lock refresh.
func LockRepo(ctx context.Context, repo *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (Unlocker, context.Context, error) {
return lockerInst.Lock(ctx, repo, exclusive, retryLock, printRetry, logger)
}
// Lock wraps the ctx such that it is cancelled when the repository is unlocked
// cancelling the original context also stops the lock refresh
func (l *locker) Lock(ctx context.Context, r *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (Unlocker, context.Context, error) {
var lock *restic.Lock
func (l *locker) Lock(ctx context.Context, r *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*unlocker, context.Context, error) {
var lock *lockHandle
var err error
retrySleep := minDuration(l.retrySleepStart, retryLock)
@@ -55,8 +66,8 @@ func (l *locker) Lock(ctx context.Context, r *Repository, exclusive bool, retryL
retryLoop:
for {
lock, err = restic.NewLock(ctx, repo, exclusive)
if err != nil && restic.IsAlreadyLocked(err) {
lock, err = newLock(ctx, repo, exclusive)
if err != nil && IsAlreadyLocked(err) {
if !retryMessagePrinted {
printRetry(fmt.Sprintf("repo already locked, waiting up to %s for the lock\n", retryLock))
@@ -72,7 +83,7 @@ retryLoop:
case <-retryTimeout:
debug.Log("repo already locked, timeout expired")
// Last lock attempt
lock, err = restic.NewLock(ctx, repo, exclusive)
lock, err = newLock(ctx, repo, exclusive)
break retryLoop
case <-retrySleepCh:
retrySleep = minDuration(retrySleep*2, l.retrySleepMax)
@@ -82,7 +93,7 @@ retryLoop:
break retryLoop
}
}
if restic.IsInvalidLock(err) {
if isInvalidLock(err) {
return nil, ctx, errors.Fatalf("%v\n\nthe `unlock --remove-all` command can be used to remove invalid locks. Make sure that no other restic process is accessing the repository when running the command", err)
}
if err != nil {
@@ -91,18 +102,18 @@ retryLoop:
debug.Log("create lock %p (exclusive %v)", lock, exclusive)
ctx, cancel := context.WithCancel(ctx)
lockInfo := &lockContext{
unlocker := &unlocker{
lock: lock,
cancel: cancel,
}
lockInfo.refreshWG.Add(2)
unlocker.refreshWG.Add(2)
refreshChan := make(chan struct{})
forceRefreshChan := make(chan refreshLockRequest)
go l.refreshLocks(ctx, repo.be, lockInfo, refreshChan, forceRefreshChan, logger)
go l.monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan, logger)
go l.refreshLocks(ctx, repo.be, unlocker, refreshChan, forceRefreshChan, logger)
go l.monitorLockRefresh(ctx, unlocker, refreshChan, forceRefreshChan, logger)
return &unlocker{lockInfo}, ctx, nil
return unlocker, ctx, nil
}
func minDuration(a, b time.Duration) time.Duration {
@@ -116,25 +127,25 @@ type refreshLockRequest struct {
result chan bool
}
func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest, logger func(format string, args ...interface{})) {
func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, unlocker *unlocker, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest, logger func(format string, args ...interface{})) {
debug.Log("start")
lock := lockInfo.lock
lock := unlocker.lock
ticker := time.NewTicker(l.refreshInterval)
lastRefresh := lock.Time
defer func() {
ticker.Stop()
// ensure that the context was cancelled before removing the lock
lockInfo.cancel()
unlocker.cancel()
// remove the lock from the repo
debug.Log("unlocking repository with lock %v", lock)
if err := lock.Unlock(ctx); err != nil {
if err := lock.unlock(ctx); err != nil {
debug.Log("error while unlocking: %v", err)
logger("error while unlocking: %v", err)
}
lockInfo.refreshWG.Done()
unlocker.refreshWG.Done()
}()
for {
@@ -146,8 +157,8 @@ func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lock
case req := <-forceRefresh:
debug.Log("trying to refresh stale lock")
// keep on going if our current lock still exists
success := tryRefreshStaleLock(ctx, backend, lock, lockInfo.cancel, logger)
// inform refresh goroutine about forced refresh
success := tryRefreshStaleLock(ctx, backend, lock, unlocker.cancel, logger)
// inform monitor goroutine about forced refresh
select {
case <-ctx.Done():
case req.result <- success:
@@ -165,7 +176,7 @@ func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lock
}
debug.Log("refreshing locks")
err := lock.Refresh(context.TODO())
err := lock.refresh(context.TODO())
if err != nil {
logger("unable to refresh lock: %v\n", err)
} else {
@@ -180,7 +191,7 @@ func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lock
}
}
func (l *locker) monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest, logger func(format string, args ...interface{})) {
func (l *locker) monitorLockRefresh(ctx context.Context, unlocker *unlocker, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest, logger func(format string, args ...interface{})) {
// time.Now() might use a monotonic timer which is paused during standby
// convert to unix time to ensure we compare real time values
lastRefresh := time.Now().UnixNano()
@@ -195,8 +206,8 @@ func (l *locker) monitorLockRefresh(ctx context.Context, lockInfo *lockContext,
ticker := time.NewTicker(pollDuration)
defer func() {
ticker.Stop()
lockInfo.cancel()
lockInfo.refreshWG.Done()
unlocker.cancel()
unlocker.refreshWG.Done()
}()
var refreshStaleLockResult chan bool
@@ -242,7 +253,7 @@ func (l *locker) monitorLockRefresh(ctx context.Context, lockInfo *lockContext,
}
}
func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *restic.Lock, cancel context.CancelFunc, logger func(format string, args ...interface{})) bool {
func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *lockHandle, cancel context.CancelFunc, logger func(format string, args ...interface{})) bool {
freeze := backend.AsBackend[backend.FreezeBackend](be)
if freeze != nil {
debug.Log("freezing backend")
@@ -250,7 +261,7 @@ func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *restic.L
defer freeze.Unfreeze()
}
err := lock.RefreshStaleLock(ctx)
err := lock.refreshStaleLock(ctx)
if err != nil {
logger("failed to refresh stale lock: %v\n", err)
// cancel context while the backend is still frozen to prevent accidental modifications
@@ -261,30 +272,17 @@ func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *restic.L
return true
}
type Unlocker interface {
Unlock()
}
type unlocker struct {
info *lockContext
}
func (l *unlocker) Unlock() {
l.info.cancel()
l.info.refreshWG.Wait()
}
// RemoveStaleLocks deletes all locks detected as stale from the repository.
func RemoveStaleLocks(ctx context.Context, repo *Repository) (uint, error) {
var processed uint
err := restic.ForAllLocks(ctx, repo, nil, func(id restic.ID, lock *restic.Lock, err error) error {
err := forAllLocks(ctx, repo, nil, func(id restic.ID, lock *lockHandle, err error) error {
if err != nil {
// ignore locks that cannot be loaded
debug.Log("ignore lock %v: %v", id, err)
return nil
}
if lock.Stale() {
if lock.stale() {
err = (&internalRepository{repo}).RemoveUnpacked(ctx, restic.LockFile, id)
if err == nil {
processed++
@@ -1,35 +1,30 @@
package restic
package repository
import (
"context"
"fmt"
"os"
"os/signal"
"os/user"
"sync"
"syscall"
"testing"
"time"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
// UnlockCancelDelay bounds the duration how long lock cleanup operations will wait
// unlockCancelDelay bounds the duration how long lock cleanup operations will wait
// if the passed in context was canceled.
const UnlockCancelDelay = 1 * time.Minute
const unlockCancelDelay = 1 * time.Minute
// Lock represents a process locking the repository for an operation.
//
// Lock is the in-repository representation of a repository lock file.
// There are two types of locks: exclusive and non-exclusive. There may be many
// different non-exclusive locks, but at most one exclusive lock, which can
// only be acquired while no non-exclusive lock is held.
//
// A lock must be refreshed regularly to not be considered stale, this must be
// triggered by regularly calling Refresh.
// A lock must be refreshed regularly to not be considered stale.
type Lock struct {
lock sync.Mutex
Time time.Time `json:"time"`
Exclusive bool `json:"exclusive"`
Hostname string `json:"hostname"`
@@ -37,15 +32,18 @@ type Lock struct {
PID int `json:"pid"`
UID uint32 `json:"uid,omitempty"`
GID uint32 `json:"gid,omitempty"`
repo Unpacked[FileType]
lockID *ID
}
// alreadyLockedError is returned when NewLock or NewExclusiveLock are unable to
// acquire the desired lock.
// lockHandle is a reference to a lock file in the repository.
type lockHandle struct {
Lock
repo restic.Unpacked[restic.FileType]
lockID *restic.ID
}
// alreadyLockedError is returned when newLock is unable to acquire the desired lock.
type alreadyLockedError struct {
otherLock *Lock
otherLock *lockHandle
}
func (e *alreadyLockedError) Error() string {
@@ -63,8 +61,7 @@ func IsAlreadyLocked(err error) bool {
return errors.As(err, &e)
}
// invalidLockError is returned when NewLock or NewExclusiveLock fail due
// to an invalid lock.
// invalidLockError is returned when newLock fails due to an invalid lock.
type invalidLockError struct {
err error
}
@@ -77,14 +74,14 @@ func (e *invalidLockError) Unwrap() error {
return e.err
}
// IsInvalidLock returns true iff err indicates that locking failed due to
// isInvalidLock returns true iff err indicates that locking failed due to
// an invalid lock.
func IsInvalidLock(err error) bool {
func isInvalidLock(err error) bool {
var e *invalidLockError
return errors.As(err, &e)
}
var ErrRemovedLock = errors.New("lock file was removed in the meantime")
var errRemovedLock = errors.New("lock file was removed in the meantime")
var waitBeforeLockCheck = 200 * time.Millisecond
@@ -98,16 +95,18 @@ func TestSetLockTimeout(t testing.TB, d time.Duration) {
initialWaitBetweenLockRetries = d
}
// NewLock returns a new lock for the repository. If an
// newLock returns a new lock for the repository. If an
// exclusive lock is already held by another process, it returns an error
// that satisfies IsAlreadyLocked. If the new lock is exclude, then other
// that satisfies IsAlreadyLocked. If the new lock is exclusive, then other
// non-exclusive locks also result in an IsAlreadyLocked error.
func NewLock(ctx context.Context, repo Unpacked[FileType], exclusive bool) (*Lock, error) {
lock := &Lock{
Time: time.Now(),
PID: os.Getpid(),
Exclusive: exclusive,
repo: repo,
func newLock(ctx context.Context, repo restic.Unpacked[restic.FileType], exclusive bool) (*lockHandle, error) {
lock := &lockHandle{
Lock: Lock{
Time: time.Now(),
PID: os.Getpid(),
Exclusive: exclusive,
},
repo: repo,
}
hn, err := os.Hostname()
@@ -133,21 +132,21 @@ func NewLock(ctx context.Context, repo Unpacked[FileType], exclusive bool) (*Loc
time.Sleep(waitBeforeLockCheck)
if err = lock.checkForOtherLocks(ctx); err != nil {
_ = lock.Unlock(ctx)
_ = lock.unlock(ctx)
return nil, err
}
return lock, nil
}
func (l *Lock) fillUserInfo() error {
func (l *lockHandle) fillUserInfo() error {
usr, err := user.Current()
if err != nil {
return nil
}
l.Username = usr.Username
l.UID, l.GID, err = UidGidInt(usr)
l.UID, l.GID, err = restic.UidGidInt(usr)
return err
}
@@ -157,9 +156,9 @@ func (l *Lock) fillUserInfo() error {
// if there are any other locks, regardless if exclusive or not. If a
// non-exclusive lock is to be created, an error is only returned when an
// exclusive lock is found.
func (l *Lock) checkForOtherLocks(ctx context.Context) error {
func (l *lockHandle) checkForOtherLocks(ctx context.Context) error {
var err error
checkedIDs := NewIDSet()
checkedIDs := restic.NewIDSet()
if l.lockID != nil {
checkedIDs.Insert(*l.lockID)
}
@@ -174,10 +173,9 @@ func (l *Lock) checkForOtherLocks(ctx context.Context) error {
delay *= 2
}
// Store updates in new IDSet to prevent data races
var m sync.Mutex
// Store updates in new IDSet to prevent data races with Has() check in forAllLocks
newCheckedIDs := checkedIDs.Clone()
err = ForAllLocks(ctx, l.repo, checkedIDs, func(id ID, lock *Lock, err error) error {
err = forAllLocks(ctx, l.repo, checkedIDs, func(id restic.ID, lock *lockHandle, err error) error {
if err != nil {
// if we cannot load a lock then it is unclear whether it can be ignored
// it could either be invalid or just unreadable due to network/permission problems
@@ -185,18 +183,12 @@ func (l *Lock) checkForOtherLocks(ctx context.Context) error {
return err
}
if l.Exclusive {
return &alreadyLockedError{otherLock: lock}
}
if !l.Exclusive && lock.Exclusive {
if l.Exclusive || lock.Exclusive {
return &alreadyLockedError{otherLock: lock}
}
// valid locks will remain valid
m.Lock()
newCheckedIDs.Insert(id)
m.Unlock()
return nil
})
checkedIDs = newCheckedIDs
@@ -209,7 +201,7 @@ func (l *Lock) checkForOtherLocks(ctx context.Context) error {
return err
}
}
if errors.Is(err, ErrInvalidData) {
if errors.Is(err, restic.ErrInvalidData) {
return &invalidLockError{err}
}
return err
@@ -228,37 +220,35 @@ func cancelableDelay(ctx context.Context, delay time.Duration) error {
}
// createLock acquires the lock by creating a file in the repository.
func (l *Lock) createLock(ctx context.Context) (ID, error) {
id, err := SaveJSONUnpacked(ctx, l.repo, LockFile, l)
func (l *lockHandle) createLock(ctx context.Context) (restic.ID, error) {
id, err := restic.SaveJSONUnpacked(ctx, l.repo, restic.LockFile, &l.Lock)
if err != nil {
return ID{}, err
return restic.ID{}, err
}
return id, nil
}
// Unlock removes the lock from the repository.
func (l *Lock) Unlock(ctx context.Context) error {
// unlock removes the lock from the repository.
func (l *lockHandle) unlock(ctx context.Context) error {
if l == nil || l.lockID == nil {
return nil
}
ctx, cancel := delayedCancelContext(ctx, UnlockCancelDelay)
ctx, cancel := delayedCancelContext(ctx, unlockCancelDelay)
defer cancel()
return l.repo.RemoveUnpacked(ctx, LockFile, *l.lockID)
return l.repo.RemoveUnpacked(ctx, restic.LockFile, *l.lockID)
}
var StaleLockTimeout = 30 * time.Minute
var staleLockTimeout = 30 * time.Minute
// Stale returns true if the lock is stale. A lock is stale if the timestamp is
// stale returns true if the lock is stale. A lock is stale if the timestamp is
// older than 30 minutes or if it was created on the current machine and the
// process isn't alive any more.
func (l *Lock) Stale() bool {
l.lock.Lock()
defer l.lock.Unlock()
func (l *lockHandle) stale() bool {
debug.Log("testing if lock %v for process %d is stale", l.lockID, l.PID)
if time.Since(l.Time) > StaleLockTimeout {
if time.Since(l.Time) > staleLockTimeout {
debug.Log("lock is stale, timestamp is too old: %v\n", l.Time)
return true
}
@@ -297,40 +287,42 @@ func delayedCancelContext(parentCtx context.Context, delay time.Duration) (conte
return
}
time.Sleep(delay)
_ = cancelableDelay(ctx, delay)
cancel()
}()
return ctx, cancel
}
// Refresh refreshes the lock by creating a new file in the backend with a new
// refresh refreshes the lock by creating a new file in the backend with a new
// timestamp. Afterwards the old lock is removed.
func (l *Lock) Refresh(ctx context.Context) error {
func (l *lockHandle) refresh(ctx context.Context) error {
debug.Log("refreshing lock %v", l.lockID)
l.lock.Lock()
l.Time = time.Now()
l.lock.Unlock()
id, err := l.createLock(ctx)
id, err := l.createReplacementLock(ctx)
if err != nil {
return err
}
l.lock.Lock()
defer l.lock.Unlock()
debug.Log("new lock ID %v", id)
oldLockID := l.lockID
l.lockID = &id
ctx, cancel := delayedCancelContext(ctx, UnlockCancelDelay)
ctx, cancel := delayedCancelContext(ctx, unlockCancelDelay)
defer cancel()
return l.repo.RemoveUnpacked(ctx, LockFile, *oldLockID)
return l.adoptReplacementLock(ctx, id)
}
// RefreshStaleLock is an extended variant of Refresh that can also refresh stale lock files.
func (l *Lock) RefreshStaleLock(ctx context.Context) error {
func (l *lockHandle) createReplacementLock(ctx context.Context) (restic.ID, error) {
l.Time = time.Now()
return l.createLock(ctx)
}
func (l *lockHandle) adoptReplacementLock(ctx context.Context, id restic.ID) error {
debug.Log("new lock ID %v", id)
oldID := *l.lockID
l.lockID = &id
return l.repo.RemoveUnpacked(ctx, restic.LockFile, oldID)
}
// refreshStaleLock is an extended variant of refresh that can also refresh stale lock files.
func (l *lockHandle) refreshStaleLock(ctx context.Context) error {
debug.Log("refreshing stale lock %v", l.lockID)
// refreshing a stale lock is possible if it still exists and continues to do
// so until after creating a new lock. The initial check avoids creating a new
@@ -339,13 +331,10 @@ func (l *Lock) RefreshStaleLock(ctx context.Context) error {
if err != nil {
return err
} else if !exists {
return ErrRemovedLock
return errRemovedLock
}
l.lock.Lock()
l.Time = time.Now()
l.lock.Unlock()
id, err := l.createLock(ctx)
id, err := l.createReplacementLock(ctx)
if err != nil {
return err
}
@@ -354,38 +343,28 @@ func (l *Lock) RefreshStaleLock(ctx context.Context) error {
exists, err = l.checkExistence(ctx)
ctx, cancel := delayedCancelContext(ctx, UnlockCancelDelay)
ctx, cancel := delayedCancelContext(ctx, unlockCancelDelay)
defer cancel()
if err != nil {
// cleanup replacement lock
_ = l.repo.RemoveUnpacked(ctx, LockFile, id)
_ = l.repo.RemoveUnpacked(ctx, restic.LockFile, id)
return err
}
if !exists {
// cleanup replacement lock
_ = l.repo.RemoveUnpacked(ctx, LockFile, id)
return ErrRemovedLock
_ = l.repo.RemoveUnpacked(ctx, restic.LockFile, id)
return errRemovedLock
}
l.lock.Lock()
defer l.lock.Unlock()
debug.Log("new lock ID %v", id)
oldLockID := l.lockID
l.lockID = &id
return l.repo.RemoveUnpacked(ctx, LockFile, *oldLockID)
return l.adoptReplacementLock(ctx, id)
}
func (l *Lock) checkExistence(ctx context.Context) (bool, error) {
l.lock.Lock()
defer l.lock.Unlock()
func (l *lockHandle) checkExistence(ctx context.Context) (bool, error) {
exists := false
err := l.repo.List(ctx, LockFile, func(id ID, _ int64) error {
err := l.repo.List(ctx, restic.LockFile, func(id restic.ID, _ int64) error {
if id.Equal(*l.lockID) {
exists = true
}
@@ -395,10 +374,7 @@ func (l *Lock) checkExistence(ctx context.Context) (bool, error) {
return exists, err
}
func (l *Lock) String() string {
l.lock.Lock()
defer l.lock.Unlock()
func (l *lockHandle) String() string {
text := fmt.Sprintf("PID %d on %s by %s (UID %d, GID %d)\nlock was created at %s (%s ago)\nstorage ID %v",
l.PID, l.Hostname, l.Username, l.UID, l.GID,
l.Time.Format("2006-01-02 15:04:05"), time.Since(l.Time),
@@ -407,41 +383,22 @@ func (l *Lock) String() string {
return text
}
// listen for incoming SIGHUP and ignore
var ignoreSIGHUP sync.Once
func init() {
ignoreSIGHUP.Do(func() {
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP)
for s := range c {
debug.Log("Signal received: %v\n", s)
}
}()
})
}
// LoadLock loads and unserializes a lock from a repository.
func LoadLock(ctx context.Context, repo LoaderUnpacked, id ID) (*Lock, error) {
lock := &Lock{}
if err := LoadJSONUnpacked(ctx, repo, LockFile, id, lock); err != nil {
return nil, err
}
lock.lockID = &id
return lock, nil
func LoadLock(ctx context.Context, repo restic.LoaderUnpacked, id restic.ID) (Lock, error) {
var lock Lock
err := restic.LoadJSONUnpacked(ctx, repo, restic.LockFile, id, &lock)
return lock, err
}
// ForAllLocks reads all locks in parallel and calls the given callback.
// forAllLocks reads all locks in parallel and calls the given callback.
// It is guaranteed that the function is not run concurrently. If the
// callback returns an error, this function is cancelled and also returns that error.
// If a lock ID is passed via excludeID, it will be ignored.
func ForAllLocks(ctx context.Context, repo ListerLoaderUnpacked, excludeIDs IDSet, fn func(ID, *Lock, error) error) error {
func forAllLocks(ctx context.Context, repo restic.ListerLoaderUnpacked, excludeIDs restic.IDSet, fn func(restic.ID, *lockHandle, error) error) error {
var m sync.Mutex
// For locks decoding is nearly for free, thus just assume were only limited by IO
return ParallelList(ctx, repo, LockFile, repo.Connections(), func(ctx context.Context, id ID, size int64) error {
return restic.ParallelList(ctx, repo, restic.LockFile, repo.Connections(), func(ctx context.Context, id restic.ID, size int64) error {
if excludeIDs.Has(id) {
return nil
}
@@ -452,9 +409,13 @@ func ForAllLocks(ctx context.Context, repo ListerLoaderUnpacked, excludeIDs IDSe
return nil
}
lock, err := LoadLock(ctx, repo, id)
var handle *lockHandle
if err == nil {
handle = &lockHandle{Lock: lock, lockID: &id}
}
m.Lock()
defer m.Unlock()
return fn(id, lock, err)
return fn(id, handle, err)
})
}
@@ -1,4 +1,4 @@
package restic_test
package repository
import (
"context"
@@ -10,47 +10,46 @@ import (
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/mem"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
func TestLock(t *testing.T) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
func TestLockFile(t *testing.T) {
repo := TestRepository(t)
TestSetLockTimeout(t, 5*time.Millisecond)
lock, err := repository.TestNewLock(t, repo, false)
lock, err := newLock(context.TODO(), &internalRepository{repo}, false)
rtest.OK(t, err)
rtest.OK(t, lock.Unlock(context.TODO()))
rtest.OK(t, lock.unlock(context.TODO()))
}
func TestDoubleUnlock(t *testing.T) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
repo := TestRepository(t)
TestSetLockTimeout(t, 5*time.Millisecond)
lock, err := repository.TestNewLock(t, repo, false)
lock, err := newLock(context.TODO(), &internalRepository{repo}, false)
rtest.OK(t, err)
rtest.OK(t, lock.Unlock(context.TODO()))
rtest.OK(t, lock.unlock(context.TODO()))
err = lock.Unlock(context.TODO())
err = lock.unlock(context.TODO())
rtest.Assert(t, err != nil,
"double unlock didn't return an error, got %v", err)
}
func TestMultipleLock(t *testing.T) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
repo := TestRepository(t)
TestSetLockTimeout(t, 5*time.Millisecond)
lock1, err := repository.TestNewLock(t, repo, false)
lock1, err := newLock(context.TODO(), &internalRepository{repo}, false)
rtest.OK(t, err)
lock2, err := repository.TestNewLock(t, repo, false)
lock2, err := newLock(context.TODO(), &internalRepository{repo}, false)
rtest.OK(t, err)
rtest.OK(t, lock1.Unlock(context.TODO()))
rtest.OK(t, lock2.Unlock(context.TODO()))
rtest.OK(t, lock1.unlock(context.TODO()))
rtest.OK(t, lock2.unlock(context.TODO()))
}
type failLockLoadingBackend struct {
@@ -66,58 +65,58 @@ func (be *failLockLoadingBackend) Load(ctx context.Context, h backend.Handle, le
func TestMultipleLockFailure(t *testing.T) {
be := &failLockLoadingBackend{Backend: mem.New()}
repo, _ := repository.TestRepositoryWithBackend(t, be, 0, repository.Options{})
restic.TestSetLockTimeout(t, 5*time.Millisecond)
repo, _ := TestRepositoryWithBackend(t, be, 0, Options{})
TestSetLockTimeout(t, 5*time.Millisecond)
lock1, err := repository.TestNewLock(t, repo, false)
lock1, err := newLock(context.TODO(), &internalRepository{repo}, false)
rtest.OK(t, err)
_, err = repository.TestNewLock(t, repo, false)
_, err = newLock(context.TODO(), &internalRepository{repo}, false)
rtest.Assert(t, err != nil, "unreadable lock file did not result in an error")
rtest.OK(t, lock1.Unlock(context.TODO()))
rtest.OK(t, lock1.unlock(context.TODO()))
}
func TestLockExclusive(t *testing.T) {
repo := repository.TestRepository(t)
repo := TestRepository(t)
elock, err := repository.TestNewLock(t, repo, true)
elock, err := newLock(context.TODO(), &internalRepository{repo}, true)
rtest.OK(t, err)
rtest.OK(t, elock.Unlock(context.TODO()))
rtest.OK(t, elock.unlock(context.TODO()))
}
func TestLockOnExclusiveLockedRepo(t *testing.T) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
repo := TestRepository(t)
TestSetLockTimeout(t, 5*time.Millisecond)
elock, err := repository.TestNewLock(t, repo, true)
elock, err := newLock(context.TODO(), &internalRepository{repo}, true)
rtest.OK(t, err)
lock, err := repository.TestNewLock(t, repo, false)
lock, err := newLock(context.TODO(), &internalRepository{repo}, false)
rtest.Assert(t, err != nil,
"create normal lock with exclusively locked repo didn't return an error")
rtest.Assert(t, restic.IsAlreadyLocked(err),
rtest.Assert(t, IsAlreadyLocked(err),
"create normal lock with exclusively locked repo didn't return the correct error")
rtest.OK(t, lock.Unlock(context.TODO()))
rtest.OK(t, elock.Unlock(context.TODO()))
rtest.OK(t, lock.unlock(context.TODO()))
rtest.OK(t, elock.unlock(context.TODO()))
}
func TestExclusiveLockOnLockedRepo(t *testing.T) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
repo := TestRepository(t)
TestSetLockTimeout(t, 5*time.Millisecond)
elock, err := repository.TestNewLock(t, repo, false)
elock, err := newLock(context.TODO(), &internalRepository{repo}, false)
rtest.OK(t, err)
lock, err := repository.TestNewLock(t, repo, true)
lock, err := newLock(context.TODO(), &internalRepository{repo}, true)
rtest.Assert(t, err != nil,
"create normal lock with exclusively locked repo didn't return an error")
rtest.Assert(t, restic.IsAlreadyLocked(err),
rtest.Assert(t, IsAlreadyLocked(err),
"create normal lock with exclusively locked repo didn't return the correct error")
rtest.OK(t, lock.Unlock(context.TODO()))
rtest.OK(t, elock.Unlock(context.TODO()))
rtest.OK(t, lock.unlock(context.TODO()))
rtest.OK(t, elock.unlock(context.TODO()))
}
var staleLockTests = []struct {
@@ -159,18 +158,20 @@ func TestLockStale(t *testing.T) {
otherHostname := "other-" + hostname
for i, test := range staleLockTests {
lock := restic.Lock{
Time: test.timestamp,
PID: test.pid,
Hostname: hostname,
lock := lockHandle{
Lock: Lock{
Time: test.timestamp,
PID: test.pid,
Hostname: hostname,
},
}
rtest.Assert(t, lock.Stale() == test.stale,
rtest.Assert(t, lock.stale() == test.stale,
"TestStaleLock: test %d failed: expected stale: %v, got %v",
i, test.stale, !test.stale)
lock.Hostname = otherHostname
rtest.Assert(t, lock.Stale() == test.staleOnOtherHost,
rtest.Assert(t, lock.stale() == test.staleOnOtherHost,
"TestStaleLock: test %d failed: expected staleOnOtherHost: %v, got %v",
i, test.staleOnOtherHost, !test.staleOnOtherHost)
}
@@ -195,11 +196,11 @@ func checkSingleLock(t *testing.T, repo restic.Lister) restic.ID {
return *lockID
}
func testLockRefresh(t *testing.T, refresh func(lock *restic.Lock) error) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
func testLockRefresh(t *testing.T, refresh func(lock *lockHandle) error) {
repo := TestRepository(t)
TestSetLockTimeout(t, 5*time.Millisecond)
lock, err := repository.TestNewLock(t, repo, false)
lock, err := newLock(context.TODO(), &internalRepository{repo}, false)
rtest.OK(t, err)
time0 := lock.Time
@@ -212,36 +213,36 @@ func testLockRefresh(t *testing.T, refresh func(lock *restic.Lock) error) {
rtest.Assert(t, !lockID.Equal(lockID2),
"expected a new ID after lock refresh, got the same")
lock2, err := restic.LoadLock(context.TODO(), repo, lockID2)
lock2, err := LoadLock(context.TODO(), repo, lockID2)
rtest.OK(t, err)
rtest.Assert(t, lock2.Time.After(time0),
"expected a later timestamp after lock refresh")
rtest.OK(t, lock.Unlock(context.TODO()))
rtest.OK(t, lock.unlock(context.TODO()))
}
func TestLockRefresh(t *testing.T) {
testLockRefresh(t, func(lock *restic.Lock) error {
return lock.Refresh(context.TODO())
testLockRefresh(t, func(lock *lockHandle) error {
return lock.refresh(context.TODO())
})
}
func TestLockRefreshStale(t *testing.T) {
testLockRefresh(t, func(lock *restic.Lock) error {
return lock.RefreshStaleLock(context.TODO())
testLockRefresh(t, func(lock *lockHandle) error {
return lock.refreshStaleLock(context.TODO())
})
}
func TestLockRefreshStaleMissing(t *testing.T) {
repo, _, be := repository.TestRepositoryWithVersion(t, 0)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
repo, _, be := TestRepositoryWithVersion(t, 0)
TestSetLockTimeout(t, 5*time.Millisecond)
lock, err := repository.TestNewLock(t, repo, false)
lock, err := newLock(context.TODO(), &internalRepository{repo}, false)
rtest.OK(t, err)
lockID := checkSingleLock(t, repo)
// refresh must fail if lock was removed
rtest.OK(t, be.Remove(context.TODO(), backend.Handle{Type: restic.LockFile, Name: lockID.String()}))
time.Sleep(time.Millisecond)
err = lock.RefreshStaleLock(context.TODO())
rtest.Assert(t, err == restic.ErrRemovedLock, "unexpected error, expected %v, got %v", restic.ErrRemovedLock, err)
err = lock.refreshStaleLock(context.TODO())
rtest.Assert(t, err == errRemovedLock, "unexpected error, expected %v, got %v", errRemovedLock, err)
}
@@ -1,36 +1,35 @@
//go:build !windows
package restic
package repository
import (
"os"
"os/user"
"strconv"
"os/signal"
"sync"
"syscall"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
)
// UidGidInt returns uid, gid of the user as a number.
//
//nolint:revive // capitalization is correct as is
func UidGidInt(u *user.User) (uid, gid uint32, err error) {
ui, err := strconv.ParseUint(u.Uid, 10, 32)
if err != nil {
return 0, 0, errors.Errorf("invalid UID %q", u.Uid)
}
gi, err := strconv.ParseUint(u.Gid, 10, 32)
if err != nil {
return 0, 0, errors.Errorf("invalid GID %q", u.Gid)
}
return uint32(ui), uint32(gi), nil
// listen for incoming SIGHUP and ignore
var ignoreSIGHUP sync.Once
func init() {
ignoreSIGHUP.Do(func() {
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP)
for s := range c {
debug.Log("Signal received: %v\n", s)
}
}()
})
}
// checkProcess will check if the process retaining the lock
// exists and responds to SIGHUP signal.
// Returns true if the process exists and responds.
func (l *Lock) processExists() bool {
func (l *lockHandle) processExists() bool {
proc, err := os.FindProcess(l.PID)
if err != nil {
debug.Log("error searching for process %d: %v\n", l.PID, err)
@@ -1,20 +1,14 @@
package restic
package repository
import (
"os"
"os/user"
"github.com/restic/restic/internal/debug"
)
// UidGidInt always returns 0 on Windows, since uid isn't numbers
func UidGidInt(_ *user.User) (uid, gid uint32, err error) {
return 0, 0, nil
}
// checkProcess will check if the process retaining the lock exists.
// Returns true if the process exists.
func (l *Lock) processExists() bool {
func (l *lockHandle) processExists() bool {
proc, err := os.FindProcess(l.PID)
if err != nil {
debug.Log("error searching for process %d: %v\n", l.PID, err)
+11 -14
View File
@@ -38,9 +38,6 @@ func checkedLockRepo(ctx context.Context, t *testing.T, repo *Repository, locker
lock, wrappedCtx, err := lockerInst.Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
rtest.OK(t, err)
rtest.OK(t, wrappedCtx.Err())
if lock.(*unlocker).info.lock.Stale() {
t.Fatal("lock returned stale lock")
}
return lock, wrappedCtx
}
@@ -76,14 +73,14 @@ func TestLockConflict(t *testing.T) {
repo, be := openLockTestRepo(t, nil)
repo2 := TestOpenBackend(t, be)
lock, _, err := Lock(context.Background(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
lock, _, err := LockRepo(context.Background(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
rtest.OK(t, err)
defer lock.Unlock()
_, _, err = Lock(context.Background(), repo2, false, 0, func(msg string) {}, func(format string, args ...interface{}) {})
_, _, err = LockRepo(context.Background(), repo2, false, 0, func(msg string) {}, func(format string, args ...interface{}) {})
if err == nil {
t.Fatal("second lock should have failed")
}
rtest.Assert(t, restic.IsAlreadyLocked(err), "unexpected error %v", err)
rtest.Assert(t, IsAlreadyLocked(err), "unexpected error %v", err)
}
type writeOnceBackend struct {
@@ -240,14 +237,14 @@ func TestLockWaitTimeout(t *testing.T) {
t.Parallel()
repo, _ := openLockTestRepo(t, nil)
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
elock, _, err := LockRepo(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
rtest.OK(t, err)
defer elock.Unlock()
retryLock := 200 * time.Millisecond
start := time.Now()
_, _, err = Lock(context.TODO(), repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
_, _, err = LockRepo(context.TODO(), repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
duration := time.Since(start)
rtest.Assert(t, err != nil,
@@ -262,7 +259,7 @@ func TestLockWaitCancel(t *testing.T) {
t.Parallel()
repo, _ := openLockTestRepo(t, nil)
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
elock, _, err := LockRepo(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
rtest.OK(t, err)
defer elock.Unlock()
@@ -273,7 +270,7 @@ func TestLockWaitCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
time.AfterFunc(cancelAfter, cancel)
_, _, err = Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
_, _, err = LockRepo(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
duration := time.Since(start)
rtest.Assert(t, err != nil,
@@ -288,7 +285,7 @@ func TestLockWaitSuccess(t *testing.T) {
t.Parallel()
repo, _ := openLockTestRepo(t, nil)
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
elock, _, err := LockRepo(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
rtest.OK(t, err)
retryLock := 200 * time.Millisecond
@@ -298,7 +295,7 @@ func TestLockWaitSuccess(t *testing.T) {
elock.Unlock()
})
lock, _, err := Lock(context.TODO(), repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
lock, _, err := LockRepo(context.TODO(), repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
rtest.OK(t, err)
lock.Unlock()
}
@@ -309,8 +306,8 @@ func createFakeLock(repo *Repository, t time.Time, pid int) (restic.ID, error) {
return restic.ID{}, err
}
newLock := &restic.Lock{Time: t, PID: pid, Hostname: hostname}
return restic.SaveJSONUnpacked(context.TODO(), &internalRepository{repo}, restic.LockFile, &newLock)
newLock := &Lock{Time: t, PID: pid, Hostname: hostname}
return restic.SaveJSONUnpacked(context.TODO(), &internalRepository{repo}, restic.LockFile, newLock)
}
func lockExists(repo restic.Lister, t testing.TB, lockID restic.ID) bool {
-5
View File
@@ -158,11 +158,6 @@ func BenchmarkAllVersions(b *testing.B, bench VersionedBenchmark) {
}
}
func TestNewLock(_ *testing.T, repo *Repository, exclusive bool) (*restic.Lock, error) {
// TODO get rid of this test helper
return restic.NewLock(context.TODO(), &internalRepository{repo}, exclusive)
}
// TestCheckRepo runs the checker on repo.
func TestCheckRepo(t testing.TB, repo *Repository) {
chkr := newChecker(repo)
+25
View File
@@ -0,0 +1,25 @@
//go:build !windows
package restic
import (
"os/user"
"strconv"
"github.com/restic/restic/internal/errors"
)
// UidGidInt returns uid, gid of the user as a number.
//
//nolint:revive // capitalization is correct as is
func UidGidInt(u *user.User) (uid, gid uint32, err error) {
ui, err := strconv.ParseUint(u.Uid, 10, 32)
if err != nil {
return 0, 0, errors.Errorf("invalid UID %q", u.Uid)
}
gi, err := strconv.ParseUint(u.Gid, 10, 32)
if err != nil {
return 0, 0, errors.Errorf("invalid GID %q", u.Gid)
}
return uint32(ui), uint32(gi), nil
}
+10
View File
@@ -0,0 +1,10 @@
package restic
import (
"os/user"
)
// UidGidInt always returns 0 on Windows, since uid isn't numbers
func UidGidInt(_ *user.User) (uid, gid uint32, err error) {
return 0, 0, nil
}