mirror of
https://github.com/restic/restic.git
synced 2026-06-17 14:14:19 +00:00
repository: merge Unlocker and lockContext
This commit is contained in:
+25
-27
@@ -13,12 +13,23 @@ import (
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
type lockContext struct {
|
||||
type Unlocker interface {
|
||||
Unlock()
|
||||
}
|
||||
|
||||
type unlocker struct {
|
||||
lock *lockHandle
|
||||
cancel context.CancelFunc
|
||||
refreshWG sync.WaitGroup
|
||||
}
|
||||
|
||||
func (l *unlocker) Unlock() {
|
||||
l.cancel()
|
||||
l.refreshWG.Wait()
|
||||
}
|
||||
|
||||
var _ Unlocker = &unlocker{}
|
||||
|
||||
type locker struct {
|
||||
retrySleepStart time.Duration
|
||||
retrySleepMax time.Duration
|
||||
@@ -91,18 +102,18 @@ retryLoop:
|
||||
debug.Log("create lock %p (exclusive %v)", lock, exclusive)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
lockCtx := &lockContext{
|
||||
unlocker := &unlocker{
|
||||
lock: lock,
|
||||
cancel: cancel,
|
||||
}
|
||||
lockCtx.refreshWG.Add(2)
|
||||
unlocker.refreshWG.Add(2)
|
||||
refreshChan := make(chan struct{})
|
||||
forceRefreshChan := make(chan refreshLockRequest)
|
||||
|
||||
go l.refreshLocks(ctx, repo.be, lockCtx, refreshChan, forceRefreshChan, logger)
|
||||
go l.monitorLockRefresh(ctx, lockCtx, refreshChan, forceRefreshChan, logger)
|
||||
go l.refreshLocks(ctx, repo.be, unlocker, refreshChan, forceRefreshChan, logger)
|
||||
go l.monitorLockRefresh(ctx, unlocker, refreshChan, forceRefreshChan, logger)
|
||||
|
||||
return &unlocker{lockCtx}, ctx, nil
|
||||
return unlocker, ctx, nil
|
||||
}
|
||||
|
||||
func minDuration(a, b time.Duration) time.Duration {
|
||||
@@ -116,16 +127,16 @@ type refreshLockRequest struct {
|
||||
result chan bool
|
||||
}
|
||||
|
||||
func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest, logger func(format string, args ...interface{})) {
|
||||
func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, unlocker *unlocker, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest, logger func(format string, args ...interface{})) {
|
||||
debug.Log("start")
|
||||
lock := lockInfo.lock
|
||||
lock := unlocker.lock
|
||||
ticker := time.NewTicker(l.refreshInterval)
|
||||
lastRefresh := lock.Time
|
||||
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
// ensure that the context was cancelled before removing the lock
|
||||
lockInfo.cancel()
|
||||
unlocker.cancel()
|
||||
|
||||
// remove the lock from the repo
|
||||
debug.Log("unlocking repository with lock %v", lock)
|
||||
@@ -134,7 +145,7 @@ func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lock
|
||||
logger("error while unlocking: %v", err)
|
||||
}
|
||||
|
||||
lockInfo.refreshWG.Done()
|
||||
unlocker.refreshWG.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
@@ -146,7 +157,7 @@ func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lock
|
||||
case req := <-forceRefresh:
|
||||
debug.Log("trying to refresh stale lock")
|
||||
// keep on going if our current lock still exists
|
||||
success := tryRefreshStaleLock(ctx, backend, lock, lockInfo.cancel, logger)
|
||||
success := tryRefreshStaleLock(ctx, backend, lock, unlocker.cancel, logger)
|
||||
// inform refresh goroutine about forced refresh
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -180,7 +191,7 @@ func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lock
|
||||
}
|
||||
}
|
||||
|
||||
func (l *locker) monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest, logger func(format string, args ...interface{})) {
|
||||
func (l *locker) monitorLockRefresh(ctx context.Context, unlocker *unlocker, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest, logger func(format string, args ...interface{})) {
|
||||
// time.Now() might use a monotonic timer which is paused during standby
|
||||
// convert to unix time to ensure we compare real time values
|
||||
lastRefresh := time.Now().UnixNano()
|
||||
@@ -195,8 +206,8 @@ func (l *locker) monitorLockRefresh(ctx context.Context, lockInfo *lockContext,
|
||||
ticker := time.NewTicker(pollDuration)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
lockInfo.cancel()
|
||||
lockInfo.refreshWG.Done()
|
||||
unlocker.cancel()
|
||||
unlocker.refreshWG.Done()
|
||||
}()
|
||||
|
||||
var refreshStaleLockResult chan bool
|
||||
@@ -261,19 +272,6 @@ func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *lockHand
|
||||
return true
|
||||
}
|
||||
|
||||
type Unlocker interface {
|
||||
Unlock()
|
||||
}
|
||||
|
||||
type unlocker struct {
|
||||
info *lockContext
|
||||
}
|
||||
|
||||
func (l *unlocker) Unlock() {
|
||||
l.info.cancel()
|
||||
l.info.refreshWG.Wait()
|
||||
}
|
||||
|
||||
// RemoveStaleLocks deletes all locks detected as stale from the repository.
|
||||
func RemoveStaleLocks(ctx context.Context, repo *Repository) (uint, error) {
|
||||
var processed uint
|
||||
|
||||
@@ -38,7 +38,7 @@ func checkedLockRepo(ctx context.Context, t *testing.T, repo *Repository, locker
|
||||
lock, wrappedCtx, err := lockerInst.Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
|
||||
rtest.OK(t, err)
|
||||
rtest.OK(t, wrappedCtx.Err())
|
||||
if lock.info.lock.stale() {
|
||||
if lock.lock.stale() {
|
||||
t.Fatal("lock returned stale lock")
|
||||
}
|
||||
return lock, wrappedCtx
|
||||
|
||||
Reference in New Issue
Block a user