1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
package bearer
import (
"context"
"fmt"
"sync/atomic"
"time"
smithycontext "github.com/aws/smithy-go/context"
"github.com/aws/smithy-go/internal/sync/singleflight"
)
// package variable that can be override in unit tests.
var timeNow = time.Now
// TokenCacheOptions provides a set of optional configuration options for the
// TokenCache TokenProvider.
type TokenCacheOptions struct {
// The duration before the token will expire when the credentials will be
// refreshed. If DisableAsyncRefresh is true, the RetrieveBearerToken calls
// will be blocking.
//
// Asynchronous refreshes are deduplicated, and only one will be in-flight
// at a time. If the token expires while an asynchronous refresh is in
// flight, the next call to RetrieveBearerToken will block on that refresh
// to return.
RefreshBeforeExpires time.Duration
// The timeout the underlying TokenProvider's RetrieveBearerToken call must
// return within, or will be canceled. Defaults to 0, no timeout.
//
// If 0 timeout, its possible for the underlying tokenProvider's
// RetrieveBearerToken call to block forever. Preventing subsequent
// TokenCache attempts to refresh the token.
//
// If this timeout is reached all pending deduplicated calls to
// TokenCache RetrieveBearerToken will fail with an error.
RetrieveBearerTokenTimeout time.Duration
// The minimum duration between asynchronous refresh attempts. If the next
// asynchronous recent refresh attempt was within the minimum delay
// duration, the call to retrieve will return the current cached token, if
// not expired.
//
// The asynchronous retrieve is deduplicated across multiple calls when
// RetrieveBearerToken is called. The asynchronous retrieve is not a
// periodic task. It is only performed when the token has not yet expired,
// and the current item is within the RefreshBeforeExpires window, and the
// TokenCache's RetrieveBearerToken method is called.
//
// If 0, (default) there will be no minimum delay between asynchronous
// refresh attempts.
//
// If DisableAsyncRefresh is true, this option is ignored.
AsyncRefreshMinimumDelay time.Duration
// Sets if the TokenCache will attempt to refresh the token in the
// background asynchronously instead of blocking for credentials to be
// refreshed. If disabled token refresh will be blocking.
//
// The first call to RetrieveBearerToken will always be blocking, because
// there is no cached token.
DisableAsyncRefresh bool
}
// TokenCache provides an utility to cache Bearer Authentication tokens from a
// wrapped TokenProvider. The TokenCache can be has options to configure the
// cache's early and asynchronous refresh of the token.
type TokenCache struct {
options TokenCacheOptions
provider TokenProvider
cachedToken atomic.Value
lastRefreshAttemptTime atomic.Value
sfGroup singleflight.Group
}
// NewTokenCache returns a initialized TokenCache that implements the
// TokenProvider interface. Wrapping the provider passed in. Also taking a set
// of optional functional option parameters to configure the token cache.
func NewTokenCache(provider TokenProvider, optFns ...func(*TokenCacheOptions)) *TokenCache {
var options TokenCacheOptions
for _, fn := range optFns {
fn(&options)
}
return &TokenCache{
options: options,
provider: provider,
}
}
// RetrieveBearerToken returns the token if it could be obtained, or error if a
// valid token could not be retrieved.
//
// The passed in Context's cancel/deadline/timeout will impacting only this
// individual retrieve call and not any other already queued up calls. This
// means underlying provider's RetrieveBearerToken calls could block for ever,
// and not be canceled with the Context. Set RetrieveBearerTokenTimeout to
// provide a timeout, preventing the underlying TokenProvider blocking forever.
//
// By default, if the passed in Context is canceled, all of its values will be
// considered expired. The wrapped TokenProvider will not be able to lookup the
// values from the Context once it is expired. This is done to protect against
// expired values no longer being valid. To disable this behavior, use
// smithy-go's context.WithPreserveExpiredValues to add a value to the Context
// before calling RetrieveBearerToken to enable support for expired values.
//
// Without RetrieveBearerTokenTimeout there is the potential for a underlying
// Provider's RetrieveBearerToken call to sit forever. Blocking in subsequent
// attempts at refreshing the token.
func (p *TokenCache) RetrieveBearerToken(ctx context.Context) (Token, error) {
cachedToken, ok := p.getCachedToken()
if !ok || cachedToken.Expired(timeNow()) {
return p.refreshBearerToken(ctx)
}
// Check if the token should be refreshed before it expires.
refreshToken := cachedToken.Expired(timeNow().Add(p.options.RefreshBeforeExpires))
if !refreshToken {
return cachedToken, nil
}
if p.options.DisableAsyncRefresh {
return p.refreshBearerToken(ctx)
}
p.tryAsyncRefresh(ctx)
return cachedToken, nil
}
// tryAsyncRefresh attempts to asynchronously refresh the token returning the
// already cached token. If it AsyncRefreshMinimumDelay option is not zero, and
// the duration since the last refresh is less than that value, nothing will be
// done.
func (p *TokenCache) tryAsyncRefresh(ctx context.Context) {
if p.options.AsyncRefreshMinimumDelay != 0 {
var lastRefreshAttempt time.Time
if v := p.lastRefreshAttemptTime.Load(); v != nil {
lastRefreshAttempt = v.(time.Time)
}
if timeNow().Before(lastRefreshAttempt.Add(p.options.AsyncRefreshMinimumDelay)) {
return
}
}
// Ignore the returned channel so this won't be blocking, and limit the
// number of additional goroutines created.
p.sfGroup.DoChan("async-refresh", func() (interface{}, error) {
res, err := p.refreshBearerToken(ctx)
if p.options.AsyncRefreshMinimumDelay != 0 {
var refreshAttempt time.Time
if err != nil {
refreshAttempt = timeNow()
}
p.lastRefreshAttemptTime.Store(refreshAttempt)
}
return res, err
})
}
func (p *TokenCache) refreshBearerToken(ctx context.Context) (Token, error) {
resCh := p.sfGroup.DoChan("refresh-token", func() (interface{}, error) {
ctx := smithycontext.WithSuppressCancel(ctx)
if v := p.options.RetrieveBearerTokenTimeout; v != 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, v)
defer cancel()
}
return p.singleRetrieve(ctx)
})
select {
case res := <-resCh:
return res.Val.(Token), res.Err
case <-ctx.Done():
return Token{}, fmt.Errorf("retrieve bearer token canceled, %w", ctx.Err())
}
}
func (p *TokenCache) singleRetrieve(ctx context.Context) (interface{}, error) {
token, err := p.provider.RetrieveBearerToken(ctx)
if err != nil {
return Token{}, fmt.Errorf("failed to retrieve bearer token, %w", err)
}
p.cachedToken.Store(&token)
return token, nil
}
// getCachedToken returns the currently cached token and true if found. Returns
// false if no token is cached.
func (p *TokenCache) getCachedToken() (Token, bool) {
v := p.cachedToken.Load()
if v == nil {
return Token{}, false
}
t := v.(*Token)
if t == nil || t.Value == "" {
return Token{}, false
}
return *t, true
}
|