mirror of
https://github.com/restic/restic.git
synced 2026-06-21 16:14:18 +00:00
0f4236cb39
Drop the Unlocker interface and return the unlock callback directly from LockRepo, simplifying callers that only need to defer unlock().
304 lines
8.6 KiB
Go
304 lines
8.6 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/restic/restic/internal/backend"
|
|
"github.com/restic/restic/internal/debug"
|
|
"github.com/restic/restic/internal/errors"
|
|
"github.com/restic/restic/internal/restic"
|
|
)
|
|
|
|
type unlocker struct {
|
|
lock *lockHandle
|
|
cancel context.CancelFunc
|
|
refreshWG sync.WaitGroup
|
|
}
|
|
|
|
func (l *unlocker) Unlock() {
|
|
l.cancel()
|
|
l.refreshWG.Wait()
|
|
}
|
|
|
|
type locker struct {
|
|
retrySleepStart time.Duration
|
|
retrySleepMax time.Duration
|
|
refreshInterval time.Duration
|
|
refreshabilityTimeout time.Duration
|
|
}
|
|
|
|
const defaultRefreshInterval = 5 * time.Minute
|
|
|
|
var lockerInst = &locker{
|
|
retrySleepStart: 5 * time.Second,
|
|
retrySleepMax: 60 * time.Second,
|
|
refreshInterval: defaultRefreshInterval,
|
|
// consider a lock refresh failed a bit before the lock actually becomes stale
|
|
// the difference allows to compensate for a small time drift between clients.
|
|
refreshabilityTimeout: staleLockTimeout - defaultRefreshInterval*3/2,
|
|
}
|
|
|
|
// LockRepo acquires a repository lock. The returned context is cancelled when
|
|
// Unlock is called; cancelling the original context stops lock refresh.
|
|
func LockRepo(ctx context.Context, repo *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (func(), context.Context, error) {
|
|
return lockerInst.Lock(ctx, repo, exclusive, retryLock, printRetry, logger)
|
|
}
|
|
|
|
func (l *locker) Lock(ctx context.Context, r *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (func(), context.Context, error) {
|
|
var lock *lockHandle
|
|
var err error
|
|
|
|
retrySleep := minDuration(l.retrySleepStart, retryLock)
|
|
retryMessagePrinted := false
|
|
retryTimeout := time.After(retryLock)
|
|
|
|
repo := &internalRepository{r}
|
|
|
|
retryLoop:
|
|
for {
|
|
lock, err = newLock(ctx, repo, exclusive)
|
|
if err != nil && IsAlreadyLocked(err) {
|
|
|
|
if !retryMessagePrinted {
|
|
printRetry(fmt.Sprintf("repo already locked, waiting up to %s for the lock\n", retryLock))
|
|
retryMessagePrinted = true
|
|
}
|
|
|
|
debug.Log("repo already locked, retrying in %v", retrySleep)
|
|
retrySleepCh := time.After(retrySleep)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx, ctx.Err()
|
|
case <-retryTimeout:
|
|
debug.Log("repo already locked, timeout expired")
|
|
// Last lock attempt
|
|
lock, err = newLock(ctx, repo, exclusive)
|
|
break retryLoop
|
|
case <-retrySleepCh:
|
|
retrySleep = minDuration(retrySleep*2, l.retrySleepMax)
|
|
}
|
|
} else {
|
|
// anything else, either a successful lock or another error
|
|
break retryLoop
|
|
}
|
|
}
|
|
if isInvalidLock(err) {
|
|
return nil, ctx, errors.Fatalf("%v\n\nthe `unlock --remove-all` command can be used to remove invalid locks. Make sure that no other restic process is accessing the repository when running the command", err)
|
|
}
|
|
if err != nil {
|
|
return nil, ctx, fmt.Errorf("unable to create lock in backend: %w", err)
|
|
}
|
|
debug.Log("create lock %p (exclusive %v)", lock, exclusive)
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
unlocker := &unlocker{
|
|
lock: lock,
|
|
cancel: cancel,
|
|
}
|
|
unlocker.refreshWG.Add(2)
|
|
refreshChan := make(chan struct{})
|
|
forceRefreshChan := make(chan refreshLockRequest)
|
|
|
|
go l.refreshLocks(ctx, repo.be, unlocker, refreshChan, forceRefreshChan, logger)
|
|
go l.monitorLockRefresh(ctx, unlocker, refreshChan, forceRefreshChan, logger)
|
|
|
|
return unlocker.Unlock, ctx, nil
|
|
}
|
|
|
|
func minDuration(a, b time.Duration) time.Duration {
|
|
if a <= b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
type refreshLockRequest struct {
|
|
result chan bool
|
|
}
|
|
|
|
func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, unlocker *unlocker, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest, logger func(format string, args ...interface{})) {
|
|
debug.Log("start")
|
|
lock := unlocker.lock
|
|
ticker := time.NewTicker(l.refreshInterval)
|
|
lastRefresh := lock.Time
|
|
|
|
defer func() {
|
|
ticker.Stop()
|
|
// ensure that the context was cancelled before removing the lock
|
|
unlocker.cancel()
|
|
|
|
// remove the lock from the repo
|
|
debug.Log("unlocking repository with lock %v", lock)
|
|
if err := lock.unlock(ctx); err != nil {
|
|
debug.Log("error while unlocking: %v", err)
|
|
logger("error while unlocking: %v", err)
|
|
}
|
|
|
|
unlocker.refreshWG.Done()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
debug.Log("terminate")
|
|
return
|
|
|
|
case req := <-forceRefresh:
|
|
debug.Log("trying to refresh stale lock")
|
|
// keep on going if our current lock still exists
|
|
success := tryRefreshStaleLock(ctx, backend, lock, unlocker.cancel, logger)
|
|
// inform monitor goroutine about forced refresh
|
|
select {
|
|
case <-ctx.Done():
|
|
case req.result <- success:
|
|
}
|
|
|
|
if success {
|
|
// update lock refresh time
|
|
lastRefresh = lock.Time
|
|
}
|
|
|
|
case <-ticker.C:
|
|
if time.Since(lastRefresh) > l.refreshabilityTimeout {
|
|
// the lock is too old, wait until the expiry monitor cancels the context
|
|
continue
|
|
}
|
|
|
|
debug.Log("refreshing locks")
|
|
err := lock.refresh(context.TODO())
|
|
if err != nil {
|
|
logger("unable to refresh lock: %v\n", err)
|
|
} else {
|
|
lastRefresh = lock.Time
|
|
// inform monitor goroutine about successful refresh
|
|
select {
|
|
case <-ctx.Done():
|
|
case refreshed <- struct{}{}:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *locker) monitorLockRefresh(ctx context.Context, unlocker *unlocker, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest, logger func(format string, args ...interface{})) {
|
|
// time.Now() might use a monotonic timer which is paused during standby
|
|
// convert to unix time to ensure we compare real time values
|
|
lastRefresh := time.Now().UnixNano()
|
|
pollDuration := 1 * time.Second
|
|
if l.refreshInterval < pollDuration {
|
|
// required for TestLockFailedRefresh
|
|
pollDuration = l.refreshInterval / 5
|
|
}
|
|
// timers are paused during standby, which is a problem as the refresh timeout
|
|
// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
|
|
// https://github.com/golang/go/issues/35012
|
|
ticker := time.NewTicker(pollDuration)
|
|
defer func() {
|
|
ticker.Stop()
|
|
unlocker.cancel()
|
|
unlocker.refreshWG.Done()
|
|
}()
|
|
|
|
var refreshStaleLockResult chan bool
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
debug.Log("terminate expiry monitoring")
|
|
return
|
|
case <-refreshed:
|
|
if refreshStaleLockResult != nil {
|
|
// ignore delayed refresh notifications while the stale lock is refreshed
|
|
continue
|
|
}
|
|
lastRefresh = time.Now().UnixNano()
|
|
case <-ticker.C:
|
|
if time.Now().UnixNano()-lastRefresh < l.refreshabilityTimeout.Nanoseconds() || refreshStaleLockResult != nil {
|
|
continue
|
|
}
|
|
|
|
debug.Log("trying to refreshStaleLock")
|
|
// keep on going if our current lock still exists
|
|
refreshReq := refreshLockRequest{
|
|
result: make(chan bool),
|
|
}
|
|
refreshStaleLockResult = refreshReq.result
|
|
|
|
// inform refresh goroutine about forced refresh
|
|
select {
|
|
case <-ctx.Done():
|
|
case forceRefresh <- refreshReq:
|
|
}
|
|
case success := <-refreshStaleLockResult:
|
|
if success {
|
|
lastRefresh = time.Now().UnixNano()
|
|
refreshStaleLockResult = nil
|
|
continue
|
|
}
|
|
|
|
logger("Fatal: failed to refresh lock in time\n")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *lockHandle, cancel context.CancelFunc, logger func(format string, args ...interface{})) bool {
|
|
freeze := backend.AsBackend[backend.FreezeBackend](be)
|
|
if freeze != nil {
|
|
debug.Log("freezing backend")
|
|
freeze.Freeze()
|
|
defer freeze.Unfreeze()
|
|
}
|
|
|
|
err := lock.refreshStaleLock(ctx)
|
|
if err != nil {
|
|
logger("failed to refresh stale lock: %v\n", err)
|
|
// cancel context while the backend is still frozen to prevent accidental modifications
|
|
cancel()
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// RemoveStaleLocks deletes all locks detected as stale from the repository.
|
|
func RemoveStaleLocks(ctx context.Context, repo *Repository) (uint, error) {
|
|
var processed uint
|
|
err := forAllLocks(ctx, repo, nil, func(id restic.ID, lock *lockHandle, err error) error {
|
|
if err != nil {
|
|
// ignore locks that cannot be loaded
|
|
debug.Log("ignore lock %v: %v", id, err)
|
|
return nil
|
|
}
|
|
|
|
if lock.stale() {
|
|
err = (&internalRepository{repo}).RemoveUnpacked(ctx, restic.LockFile, id)
|
|
if err == nil {
|
|
processed++
|
|
}
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
return processed, err
|
|
}
|
|
|
|
// RemoveAllLocks removes all locks forcefully.
|
|
func RemoveAllLocks(ctx context.Context, repo *Repository) (uint, error) {
|
|
var processed uint32
|
|
err := restic.ParallelList(ctx, repo, restic.LockFile, repo.Connections(), func(ctx context.Context, id restic.ID, _ int64) error {
|
|
err := (&internalRepository{repo}).RemoveUnpacked(ctx, restic.LockFile, id)
|
|
if err == nil {
|
|
atomic.AddUint32(&processed, 1)
|
|
}
|
|
return err
|
|
})
|
|
return uint(processed), err
|
|
}
|