1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
#pragma once
#include "retry_settings.h"
#include "roles_fetcher.h"
#include "settings.h"
#include <library/cpp/tvmauth/client/misc/async_updater.h>
#include <library/cpp/tvmauth/client/misc/threaded_updater.h>
#include <util/generic/set.h>
#include <util/random/fast.h>
#include <mutex>
namespace NTvmAuth::NTvmApi {
using TDstSet = TSet<TClientSettings::TDst>;
using TDstSetPtr = std::shared_ptr<const TDstSet>;
class TThreadedUpdater: public TThreadedUpdaterBase {
public:
/*!
* Starts thread for updating of in-memory cache in background
* Reads cache from disk if specified
* @param settings
* @param logger is usefull for monitoring and debuging
*/
static TAsyncUpdaterPtr Create(const TClientSettings& settings, TLoggerPtr logger);
~TThreadedUpdater();
TClientStatus GetStatus() const override;
NRoles::TRolesPtr GetRoles() const override;
protected: // for tests
TClientStatus::ECode GetState() const;
TThreadedUpdater(const TClientSettings& settings, TLoggerPtr logger);
void Init();
void UpdateServiceTickets();
void UpdateAllServiceTickets();
TServiceTicketsPtr UpdateMissingServiceTickets(const TDstSet& required);
void UpdatePublicKeys();
void UpdateRoles();
TServiceTicketsPtr UpdateServiceTicketsCachePartly(TPairTicketsErrors&& tickets, size_t got);
void UpdateServiceTicketsCache(TPairTicketsErrors&& tickets, TInstant time);
void UpdatePublicKeysCache(const TString& publicKeys, TInstant time);
void ReadStateFromDisk();
struct TServiceTicketsFromDisk {
TPairTicketsErrors TicketsWithErrors;
TInstant BornDate;
TString FileBody;
};
std::pair<TServiceTicketsFromDisk, TServiceTicketsFromDisk> ReadServiceTicketsFromDisk() const;
std::pair<TString, TInstant> ReadPublicKeysFromDisk() const;
TString ReadRetrySettingsFromDisk() const;
struct THttpResult {
TPairTicketsErrors TicketsWithErrors;
TSmallVec<TString> Responses;
};
template <class Dsts>
THttpResult GetServiceTicketsFromHttp(const Dsts& dsts, const size_t dstLimit) const;
TString GetPublicKeysFromHttp() const;
TServiceTickets::TMapIdStr GetRequestedTicketsFromStartUpCache(const TDstSet& dsts) const;
virtual NUtils::TFetchResult FetchServiceTicketsFromHttp(const TString& body) const;
virtual NUtils::TFetchResult FetchPublicKeysFromHttp() const;
bool UpdateRetrySettings(const TString& header) const;
template <typename Func>
NUtils::TFetchResult FetchWithRetries(Func func, EScope scope) const;
void RandomSleep() const;
static TString PrepareRequestForServiceTickets(TTvmId src,
const TServiceContext& ctx,
const TClientSettings::TDstVector& dsts,
const NUtils::TProcInfo& procInfo,
time_t now = time(nullptr));
template <class Dsts>
void ParseTicketsFromResponse(TStringBuf resp,
const Dsts& dsts,
TPairTicketsErrors& out) const;
void ParseTicketsFromDiskCache(TStringBuf cache,
TPairTicketsErrors& out) const;
static TString PrepareTicketsForDisk(TStringBuf tvmResponse, TTvmId selfId);
static std::pair<TStringBuf, TTvmId> ParseTicketsFromDisk(TStringBuf data);
TDstSetPtr GetDsts() const;
void SetDsts(TDstSetPtr dsts);
TInstant GetStartUpCacheBornDate() const;
bool IsTimeToUpdateServiceTickets(TInstant lastUpdate) const;
bool IsTimeToUpdatePublicKeys(TInstant lastUpdate) const;
bool AreServicesTicketsOk() const;
bool IsServiceContextOk() const;
bool IsUserContextOk() const;
void Worker() override;
static TServiceTickets::TMapAliasId MakeAliasMap(const TClientSettings& settings);
static TClientSettings::TDstVector FindMissingDsts(TServiceTicketsPtr available, const TDstSet& required);
static TClientSettings::TDstVector FindMissingDsts(const TDstSet& available, const TDstSet& required);
static TString CreateJsonArray(const TSmallVec<TString>& responses);
static TString AppendToJsonArray(const TString& json, const TSmallVec<TString>& responses);
private:
TRetrySettings RetrySettings_;
protected:
mutable TExponentialBackoff ExpBackoff_;
std::unique_ptr<std::mutex> ServiceTicketBatchUpdateMutex_;
private:
const TClientSettings Settings_;
const NUtils::TProcInfo ProcInfo_;
const TString PublicKeysUrl_;
const TServiceTickets::TMapAliasId DstAliases_;
const TKeepAliveHttpClient::THeaders Headers_;
TMaybe<TServiceContext> SigningContext_;
NUtils::TProtectedValue<TDstSetPtr> Destinations_;
TString DiskCacheServiceTickets_;
TServiceTicketsFromDisk StartUpCache_;
bool NeedFetchMissingServiceTickets_ = true;
TString PublicKeysFilepath_;
TString ServiceTicketsFilepath_;
TString RetrySettingsFilepath_;
std::unique_ptr<TRolesFetcher> RolesFetcher_;
mutable TReallyFastRng32 Random_;
bool Inited_ = false;
};
}
|