mirror of
https://github.com/restic/restic.git
synced 2026-06-24 17:44:17 +00:00
Update dependencies
Among others, this updates minio-go, so that the new "eu-west-3" zone for AWS is supported.
This commit is contained in:
+68
-51
@@ -18,6 +18,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
vkit "cloud.google.com/go/pubsub/apiv1"
|
||||
"golang.org/x/net/context"
|
||||
pb "google.golang.org/genproto/googleapis/pubsub/v1"
|
||||
)
|
||||
@@ -26,16 +27,15 @@ import (
|
||||
// when it is no longer needed.
|
||||
// subName is the full name of the subscription to pull messages from.
|
||||
// ctx is the context to use for acking messages and extending message deadlines.
|
||||
func newMessageIterator(ctx context.Context, s service, subName string, po *pullOptions) *streamingMessageIterator {
|
||||
sp := s.newStreamingPuller(ctx, subName, int32(po.ackDeadline.Seconds()))
|
||||
_ = sp.open() // error stored in sp
|
||||
return newStreamingMessageIterator(ctx, sp, po)
|
||||
func newMessageIterator(ctx context.Context, subc *vkit.SubscriberClient, subName string, po *pullOptions) *streamingMessageIterator {
|
||||
ps := newPullStream(ctx, subc, subName, int32(po.ackDeadline.Seconds()))
|
||||
return newStreamingMessageIterator(ctx, ps, po)
|
||||
}
|
||||
|
||||
type streamingMessageIterator struct {
|
||||
ctx context.Context
|
||||
po *pullOptions
|
||||
sp *streamingPuller
|
||||
ps *pullStream
|
||||
kaTicker *time.Ticker // keep-alive (deadline extensions)
|
||||
ackTicker *time.Ticker // message acks
|
||||
nackTicker *time.Ticker // message nacks (more frequent than acks)
|
||||
@@ -47,10 +47,11 @@ type streamingMessageIterator struct {
|
||||
mu sync.Mutex
|
||||
keepAliveDeadlines map[string]time.Time
|
||||
pendingReq *pb.StreamingPullRequest
|
||||
err error // error from stream failure
|
||||
pendingModAcks map[string]int32 // ack IDs whose ack deadline is to be modified
|
||||
err error // error from stream failure
|
||||
}
|
||||
|
||||
func newStreamingMessageIterator(ctx context.Context, sp *streamingPuller, po *pullOptions) *streamingMessageIterator {
|
||||
func newStreamingMessageIterator(ctx context.Context, ps *pullStream, po *pullOptions) *streamingMessageIterator {
|
||||
// TODO: make kaTicker frequency more configurable. (ackDeadline - 5s) is a
|
||||
// reasonable default for now, because the minimum ack period is 10s. This
|
||||
// gives us 5s grace.
|
||||
@@ -62,7 +63,7 @@ func newStreamingMessageIterator(ctx context.Context, sp *streamingPuller, po *p
|
||||
nackTicker := time.NewTicker(100 * time.Millisecond)
|
||||
it := &streamingMessageIterator{
|
||||
ctx: ctx,
|
||||
sp: sp,
|
||||
ps: ps,
|
||||
po: po,
|
||||
kaTicker: kaTicker,
|
||||
ackTicker: ackTicker,
|
||||
@@ -72,6 +73,7 @@ func newStreamingMessageIterator(ctx context.Context, sp *streamingPuller, po *p
|
||||
drained: make(chan struct{}),
|
||||
keepAliveDeadlines: map[string]time.Time{},
|
||||
pendingReq: &pb.StreamingPullRequest{},
|
||||
pendingModAcks: map[string]int32{},
|
||||
}
|
||||
it.wg.Add(1)
|
||||
go it.sender()
|
||||
@@ -121,20 +123,11 @@ func (it *streamingMessageIterator) done(ackID string, ack bool) {
|
||||
if ack {
|
||||
it.pendingReq.AckIds = append(it.pendingReq.AckIds, ackID)
|
||||
} else {
|
||||
it.addDeadlineMod(ackID, 0) // Nack indicated by modifying the deadline to zero.
|
||||
it.pendingModAcks[ackID] = 0 // Nack indicated by modifying the deadline to zero.
|
||||
}
|
||||
it.checkDrained()
|
||||
}
|
||||
|
||||
// addDeadlineMod adds the ack ID to the pending request with the given deadline.
|
||||
//
|
||||
// Called with the lock held.
|
||||
func (it *streamingMessageIterator) addDeadlineMod(ackID string, deadlineSecs int32) {
|
||||
pr := it.pendingReq
|
||||
pr.ModifyDeadlineAckIds = append(pr.ModifyDeadlineAckIds, ackID)
|
||||
pr.ModifyDeadlineSeconds = append(pr.ModifyDeadlineSeconds, deadlineSecs)
|
||||
}
|
||||
|
||||
// fail is called when a stream method returns a permanent error.
|
||||
func (it *streamingMessageIterator) fail(err error) {
|
||||
it.mu.Lock()
|
||||
@@ -162,20 +155,33 @@ func (it *streamingMessageIterator) receive() ([]*Message, error) {
|
||||
return nil, err
|
||||
}
|
||||
// Receive messages from stream. This may block indefinitely.
|
||||
msgs, err := it.sp.fetchMessages()
|
||||
// The streamingPuller handles retries, so any error here
|
||||
// is fatal.
|
||||
res, err := it.ps.Recv()
|
||||
// The pullStream handles retries, so any error here is fatal.
|
||||
if err != nil {
|
||||
it.fail(err)
|
||||
return nil, err
|
||||
}
|
||||
// We received some messages. Remember them so we can
|
||||
// keep them alive.
|
||||
deadline := time.Now().Add(it.po.maxExtension)
|
||||
msgs, err := convertMessages(res.ReceivedMessages)
|
||||
if err != nil {
|
||||
it.fail(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// We received some messages. Remember them so we can keep them alive. Also,
|
||||
// arrange for a receipt mod-ack (which will occur at the next firing of
|
||||
// nackTicker).
|
||||
maxExt := time.Now().Add(it.po.maxExtension)
|
||||
deadline := trunc32(int64(it.po.ackDeadline.Seconds()))
|
||||
it.mu.Lock()
|
||||
for _, m := range msgs {
|
||||
m.doneFunc = it.done
|
||||
it.keepAliveDeadlines[m.ackID] = deadline
|
||||
it.keepAliveDeadlines[m.ackID] = maxExt
|
||||
// The receipt mod-ack uses the subscription's configured ack deadline. Don't
|
||||
// change the mod-ack if one is already pending. This is possible if there
|
||||
// are retries.
|
||||
if _, ok := it.pendingModAcks[m.ackID]; !ok {
|
||||
it.pendingModAcks[m.ackID] = deadline
|
||||
}
|
||||
}
|
||||
it.mu.Unlock()
|
||||
return msgs, nil
|
||||
@@ -187,7 +193,7 @@ func (it *streamingMessageIterator) sender() {
|
||||
defer it.kaTicker.Stop()
|
||||
defer it.ackTicker.Stop()
|
||||
defer it.nackTicker.Stop()
|
||||
defer it.sp.closeSend()
|
||||
defer it.ps.CloseSend()
|
||||
|
||||
done := false
|
||||
for !done {
|
||||
@@ -206,28 +212,34 @@ func (it *streamingMessageIterator) sender() {
|
||||
// All outstanding messages have been marked done:
|
||||
// nothing left to do except send the final request.
|
||||
it.mu.Lock()
|
||||
send = (len(it.pendingReq.AckIds) > 0 || len(it.pendingReq.ModifyDeadlineAckIds) > 0)
|
||||
send = (len(it.pendingReq.AckIds) > 0 || len(it.pendingModAcks) > 0)
|
||||
done = true
|
||||
|
||||
case <-it.kaTicker.C:
|
||||
it.mu.Lock()
|
||||
send = it.handleKeepAlives()
|
||||
it.handleKeepAlives()
|
||||
send = (len(it.pendingModAcks) > 0)
|
||||
|
||||
case <-it.nackTicker.C:
|
||||
it.mu.Lock()
|
||||
send = (len(it.pendingReq.ModifyDeadlineAckIds) > 0)
|
||||
send = (len(it.pendingModAcks) > 0)
|
||||
|
||||
case <-it.ackTicker.C:
|
||||
it.mu.Lock()
|
||||
send = (len(it.pendingReq.AckIds) > 0)
|
||||
|
||||
}
|
||||
// Lock is held here.
|
||||
if send {
|
||||
req := it.pendingReq
|
||||
it.pendingReq = &pb.StreamingPullRequest{}
|
||||
modAcks := it.pendingModAcks
|
||||
it.pendingModAcks = map[string]int32{}
|
||||
it.mu.Unlock()
|
||||
err := it.sp.send(req)
|
||||
for id, s := range modAcks {
|
||||
req.ModifyDeadlineAckIds = append(req.ModifyDeadlineAckIds, id)
|
||||
req.ModifyDeadlineSeconds = append(req.ModifyDeadlineSeconds, s)
|
||||
}
|
||||
err := it.send(req)
|
||||
if err != nil {
|
||||
// The streamingPuller handles retries, so any error here
|
||||
// is fatal to the iterator.
|
||||
@@ -240,32 +252,37 @@ func (it *streamingMessageIterator) sender() {
|
||||
}
|
||||
}
|
||||
|
||||
// handleKeepAlives modifies the pending request to include deadline extensions
|
||||
// for live messages. It also purges expired messages. It reports whether
|
||||
// there were any live messages.
|
||||
//
|
||||
// Called with the lock held.
|
||||
func (it *streamingMessageIterator) handleKeepAlives() bool {
|
||||
live, expired := getKeepAliveAckIDs(it.keepAliveDeadlines)
|
||||
for _, e := range expired {
|
||||
delete(it.keepAliveDeadlines, e)
|
||||
func (it *streamingMessageIterator) send(req *pb.StreamingPullRequest) error {
|
||||
// Note: len(modAckIDs) == len(modSecs)
|
||||
var rest *pb.StreamingPullRequest
|
||||
for len(req.AckIds) > 0 || len(req.ModifyDeadlineAckIds) > 0 {
|
||||
req, rest = splitRequest(req, maxPayload)
|
||||
if err := it.ps.Send(req); err != nil {
|
||||
return err
|
||||
}
|
||||
req = rest
|
||||
}
|
||||
dl := trunc32(int64(it.po.ackDeadline.Seconds()))
|
||||
for _, m := range live {
|
||||
it.addDeadlineMod(m, dl)
|
||||
}
|
||||
it.checkDrained()
|
||||
return len(live) > 0
|
||||
return nil
|
||||
}
|
||||
|
||||
func getKeepAliveAckIDs(items map[string]time.Time) (live, expired []string) {
|
||||
// handleKeepAlives modifies the pending request to include deadline extensions
|
||||
// for live messages. It also purges expired messages.
|
||||
//
|
||||
// Called with the lock held.
|
||||
func (it *streamingMessageIterator) handleKeepAlives() {
|
||||
now := time.Now()
|
||||
for id, expiry := range items {
|
||||
dl := trunc32(int64(it.po.ackDeadline.Seconds()))
|
||||
for id, expiry := range it.keepAliveDeadlines {
|
||||
if expiry.Before(now) {
|
||||
expired = append(expired, id)
|
||||
// This delete will not result in skipping any map items, as implied by
|
||||
// the spec at https://golang.org/ref/spec#For_statements, "For
|
||||
// statements with range clause", note 3, and stated explicitly at
|
||||
// https://groups.google.com/forum/#!msg/golang-nuts/UciASUb03Js/pzSq5iVFAQAJ.
|
||||
delete(it.keepAliveDeadlines, id)
|
||||
} else {
|
||||
live = append(live, id)
|
||||
// This will not overwrite a nack, because nacking removes the ID from keepAliveDeadlines.
|
||||
it.pendingModAcks[id] = dl
|
||||
}
|
||||
}
|
||||
return live, expired
|
||||
it.checkDrained()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user