aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/tvmauth/client/misc/api
diff options
context:
space:
mode:
authorkomels <komels@yandex-team.ru>2022-04-14 13:10:53 +0300
committerkomels <komels@yandex-team.ru>2022-04-14 13:10:53 +0300
commit21c9b0e6b039e9765eb414c406c2b86e8cea6850 (patch)
treef40ebc18ff8958dfbd189954ad024043ca983ea5 /library/cpp/tvmauth/client/misc/api
parent9a4effa852abe489707139c2b260dccc6f4f9aa9 (diff)
downloadydb-21c9b0e6b039e9765eb414c406c2b86e8cea6850.tar.gz
Final part on compatibility layer: LOGBROKER-7215
ref:777c67aadbf705d19034a09a792b2df61ba53697
Diffstat (limited to 'library/cpp/tvmauth/client/misc/api')
-rw-r--r--library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.cpp126
-rw-r--r--library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h60
-rw-r--r--library/cpp/tvmauth/client/misc/api/dynamic_dst/ut/tvm_client_ut.cpp635
-rw-r--r--library/cpp/tvmauth/client/misc/api/retry_settings.h33
-rw-r--r--library/cpp/tvmauth/client/misc/api/roles_fetcher.cpp164
-rw-r--r--library/cpp/tvmauth/client/misc/api/roles_fetcher.h63
-rw-r--r--library/cpp/tvmauth/client/misc/api/settings.cpp89
-rw-r--r--library/cpp/tvmauth/client/misc/api/settings.h302
-rw-r--r--library/cpp/tvmauth/client/misc/api/threaded_updater.cpp954
-rw-r--r--library/cpp/tvmauth/client/misc/api/threaded_updater.h140
10 files changed, 2566 insertions, 0 deletions
diff --git a/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.cpp b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.cpp
new file mode 100644
index 0000000000..6ec15c0e88
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.cpp
@@ -0,0 +1,126 @@
+#include "tvm_client.h"
+
+#include <util/string/builder.h>
+
+namespace NTvmAuth::NDynamicClient {
+ TAsyncUpdaterPtr TTvmClient::Create(const NTvmApi::TClientSettings& settings, TLoggerPtr logger) {
+ Y_ENSURE_EX(logger, TNonRetriableException() << "Logger is required");
+ THolder<TTvmClient> p(new TTvmClient(settings, std::move(logger)));
+ p->Init();
+ p->StartWorker();
+ return p.Release();
+ }
+
+ NThreading::TFuture<TAddResponse> TTvmClient::Add(TDsts&& dsts) {
+ if (dsts.empty()) {
+ LogDebug("Adding dst: got empty task");
+ return NThreading::MakeFuture<TAddResponse>(TAddResponse{});
+ }
+
+ const size_t size = dsts.size();
+ const ui64 id = ++TaskIds_;
+ NThreading::TPromise<TAddResponse> promise = NThreading::NewPromise<TAddResponse>();
+
+ TaskQueue_.Enqueue(TTask{id, promise, std::move(dsts)});
+
+ LogDebug(TStringBuilder() << "Adding dst: got task #" << id << " with " << size << " dsts");
+ return promise.GetFuture();
+ }
+
+ std::optional<TString> TTvmClient::GetOptionalServiceTicketFor(const TTvmId dst) {
+ TServiceTicketsPtr tickets = GetCachedServiceTickets();
+ Y_ENSURE_EX(tickets,
+ TBrokenTvmClientSettings()
+ << "Need to enable fetching of service tickets in settings");
+
+ auto it = tickets->TicketsById.find(dst);
+ if (it != tickets->TicketsById.end()) {
+ return it->second;
+ }
+
+ it = tickets->ErrorsById.find(dst);
+ if (it != tickets->ErrorsById.end()) {
+ ythrow TMissingServiceTicket()
+ << "Failed to get ticket for '" << dst << "': "
+ << it->second;
+ }
+
+ return {};
+ }
+
+ TTvmClient::TTvmClient(const NTvmApi::TClientSettings& settings, TLoggerPtr logger)
+ : TBase(settings, logger)
+ {
+ }
+
+ TTvmClient::~TTvmClient() {
+ TBase::StopWorker();
+ }
+
+ void TTvmClient::Worker() {
+ TBase::Worker();
+ ProcessTasks();
+ }
+
+ void TTvmClient::ProcessTasks() {
+ TaskQueue_.DequeueAll(&Tasks_);
+ if (Tasks_.empty()) {
+ return;
+ }
+
+ TDsts required;
+ for (const TTask& task : Tasks_) {
+ for (const auto& dst : task.Dsts) {
+ required.insert(dst);
+ }
+ }
+
+ TServiceTicketsPtr cache = UpdateMissingServiceTickets(required);
+
+ for (TTask& task : Tasks_) {
+ try {
+ SetResponseForTask(task, *cache);
+ } catch (const std::exception& e) {
+ LogError(TStringBuilder()
+ << "Adding dst: task #" << task.Id << ": exception: " << e.what());
+ } catch (...) {
+ LogError(TStringBuilder()
+ << "Adding dst: task #" << task.Id << ": exception: " << CurrentExceptionMessage());
+ }
+ }
+
+ Tasks_.clear();
+ }
+
+ static const TString UNKNOWN = "Unknown reason";
+ void TTvmClient::SetResponseForTask(TTvmClient::TTask& task, const TServiceTickets& cache) {
+ if (task.Promise.HasValue()) {
+ LogWarning(TStringBuilder() << "Adding dst: task #" << task.Id << " already has value");
+ return;
+ }
+
+ TAddResponse response;
+
+ for (const auto& dst : task.Dsts) {
+ if (cache.TicketsById.contains(dst.Id)) {
+ AddDstToSettings(dst);
+ response.emplace(dst, TDstResponse{EDstStatus::Success, TString()});
+
+ LogDebug(TStringBuilder() << "Adding dst: task #" << task.Id
+ << ": dst=" << dst.Id << " got ticket");
+ continue;
+ }
+
+ auto it = cache.ErrorsById.find(dst.Id);
+ const TString& error = it == cache.ErrorsById.end() ? UNKNOWN : it->second;
+ response.emplace(dst, TDstResponse{EDstStatus::Fail, error});
+
+ LogWarning(TStringBuilder() << "Adding dst: task #" << task.Id
+ << ": dst=" << dst.Id
+ << " failed to get ticket: " << error);
+ }
+
+ LogDebug(TStringBuilder() << "Adding dst: task #" << task.Id << ": set value");
+ task.Promise.SetValue(std::move(response));
+ }
+}
diff --git a/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h
new file mode 100644
index 0000000000..58ed953b63
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h
@@ -0,0 +1,60 @@
+#pragma once
+
+#include <library/cpp/tvmauth/client/misc/api/threaded_updater.h>
+
+#include <library/cpp/threading/future/future.h>
+
+#include <util/generic/map.h>
+#include <util/thread/lfqueue.h>
+
+#include <optional>
+
+namespace NTvmAuth::NDynamicClient {
+ enum class EDstStatus {
+ Success,
+ Fail,
+ };
+
+ struct TDstResponse {
+ EDstStatus Status = EDstStatus::Fail;
+ TString Error;
+
+ bool operator==(const TDstResponse& o) const {
+ return Status == o.Status && Error == o.Error;
+ }
+ };
+
+ using TDsts = NTvmApi::TDstSet;
+ using TAddResponse = TMap<NTvmApi::TClientSettings::TDst, TDstResponse>;
+
+ class TTvmClient: public NTvmApi::TThreadedUpdater {
+ public:
+ static TAsyncUpdaterPtr Create(const NTvmApi::TClientSettings& settings, TLoggerPtr logger);
+ virtual ~TTvmClient();
+
+ NThreading::TFuture<TAddResponse> Add(TDsts&& dsts);
+ std::optional<TString> GetOptionalServiceTicketFor(const TTvmId dst);
+
+ protected: // for tests
+ struct TTask {
+ ui64 Id = 0;
+ NThreading::TPromise<TAddResponse> Promise;
+ TDsts Dsts;
+ };
+
+ using TBase = NTvmApi::TThreadedUpdater;
+
+ protected: // for tests
+ TTvmClient(const NTvmApi::TClientSettings& settings, TLoggerPtr logger);
+
+ void Worker() override;
+ void ProcessTasks();
+
+ void SetResponseForTask(TTask& task, const TServiceTickets& cache);
+
+ private:
+ std::atomic<ui64> TaskIds_ = {0};
+ TLockFreeQueue<TTask> TaskQueue_;
+ TVector<TTask> Tasks_;
+ };
+}
diff --git a/library/cpp/tvmauth/client/misc/api/dynamic_dst/ut/tvm_client_ut.cpp b/library/cpp/tvmauth/client/misc/api/dynamic_dst/ut/tvm_client_ut.cpp
new file mode 100644
index 0000000000..89403c15e4
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/dynamic_dst/ut/tvm_client_ut.cpp
@@ -0,0 +1,635 @@
+#include <library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h>
+
+#include <library/cpp/tvmauth/client/misc/disk_cache.h>
+
+#include <library/cpp/tvmauth/unittest.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/stream/file.h>
+#include <util/system/fs.h>
+
+#include <regex>
+
+using namespace NTvmAuth;
+using namespace NTvmAuth::NDynamicClient;
+
+Y_UNIT_TEST_SUITE(DynamicClient) {
+ static const std::regex TIME_REGEX(R"(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d.\d{6}Z)");
+ static const TString CACHE_DIR = "./tmp/";
+
+ static void WriteFile(TString name, TStringBuf body, TInstant time) {
+ NFs::Remove(CACHE_DIR + name);
+ TFileOutput f(CACHE_DIR + name);
+ f << TDiskWriter::PrepareData(time, body);
+ }
+
+ static void CleanCache() {
+ NFs::RemoveRecursive(CACHE_DIR);
+ NFs::MakeDirectoryRecursive(CACHE_DIR);
+ }
+
+ class TLogger: public NTvmAuth::ILogger {
+ public:
+ void Log(int lvl, const TString& msg) override {
+ Cout << TInstant::Now() << " lvl=" << lvl << " msg: " << msg << "\n";
+ Stream << lvl << ": " << msg << Endl;
+ }
+
+ TStringStream Stream;
+ };
+
+ class TOfflineUpdater: public NDynamicClient::TTvmClient {
+ public:
+ TOfflineUpdater(const NTvmApi::TClientSettings& settings,
+ TIntrusivePtr<TLogger> l,
+ bool fail = true,
+ std::vector<TString> tickets = {})
+ : TTvmClient(settings, l)
+ , Fail(fail)
+ , Tickets(std::move(tickets))
+ {
+ Init();
+ ExpBackoff_.SetEnabled(false);
+ }
+
+ NUtils::TFetchResult FetchServiceTicketsFromHttp(const TString& req) const override {
+ if (Fail) {
+ throw yexception() << "tickets: alarm";
+ }
+
+ TString response;
+ if (!Tickets.empty()) {
+ response = Tickets.front();
+ Tickets.erase(Tickets.begin());
+ }
+
+ Cout << "*** FetchServiceTicketsFromHttp. request: " << req << ". response: " << response << Endl;
+ return {200, {}, "/2/ticket", response, ""};
+ }
+
+ NUtils::TFetchResult FetchPublicKeysFromHttp() const override {
+ if (Fail) {
+ throw yexception() << "keysalarm";
+ }
+ Cout << "*** FetchPublicKeysFromHttp" << Endl;
+ return {200, {}, "/2/keys", PublicKeys, ""};
+ }
+
+ using TTvmClient::GetDsts;
+ using TTvmClient::ProcessTasks;
+ using TTvmClient::SetResponseForTask;
+ using TTvmClient::Worker;
+
+ bool Fail = true;
+ TString PublicKeys = NUnittest::TVMKNIFE_PUBLIC_KEYS;
+ mutable std::vector<TString> Tickets;
+ };
+
+ Y_UNIT_TEST(StartWithIncompleteTicketsSet) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+
+ NTvmApi::TClientSettings s;
+ s.SetSelfTvmId(100500);
+ s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}, {"kolmo", 213}}, false);
+ s.SetDiskCacheDir(CACHE_DIR);
+
+ auto l = MakeIntrusive<TLogger>();
+
+ {
+ TOfflineUpdater client(s,
+ l,
+ false,
+ {
+ R"({"213" : { "error" : "some error"}})",
+ R"({"123" : { "ticket" : "service_ticket_3"}})",
+ });
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213));
+
+ NThreading::TFuture<TAddResponse> fut = client.Add({123});
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+
+ UNIT_ASSERT(fut.HasValue());
+ TAddResponse resp{
+ {123, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue());
+
+ UNIT_ASSERT(client.Tickets.empty());
+
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts());
+
+ UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(213), TMissingServiceTicket, "some error");
+ }
+ }
+
+ Y_UNIT_TEST(StartWithEmptyTicketsSet) {
+ CleanCache();
+
+ NTvmApi::TClientSettings s;
+ s.SetSelfTvmId(100500);
+ s.EnableServiceTicketsFetchOptions("qwerty", {{"kolmo", 213}}, false);
+ s.SetDiskCacheDir(CACHE_DIR);
+
+ auto l = MakeIntrusive<TLogger>();
+
+ {
+ TOfflineUpdater client(s,
+ l,
+ false,
+ {
+ R"({"213" : { "error" : "some error"}})",
+ R"({"123" : { "ticket" : "3:serv:CBAQ__________9_IgYIlJEGEHs:CcafYQH-FF5XaXMuJrgLZj98bIC54cs1ZkcFS9VV_9YM9iOM_0PXCtMkdg85rFjxE_BMpg7bE8ZuoqNfdw0FPt0BAKNeISwlydj4o0IjY82--LZBpP8CRn-EpAnkRaDShdlfrcF2pk1SSmEX8xdyZVQEnkUPY0cHGlFnu231vnE"}})",
+ });
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(213), TMissingServiceTicket, "some error");
+
+ NThreading::TFuture<TAddResponse> fut = client.Add({123});
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+
+ UNIT_ASSERT(fut.HasValue());
+ TAddResponse resp{
+ {123, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue());
+
+ UNIT_ASSERT(client.Tickets.empty());
+
+ TDsts dsts{123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts());
+
+ UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(213), TMissingServiceTicket, "some error");
+ }
+ };
+ Y_UNIT_TEST(StartWithIncompleteCacheAndAdd) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+
+ NTvmApi::TClientSettings s;
+ s.SetSelfTvmId(100500);
+ s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}, {"kolmo", 213}});
+ s.SetDiskCacheDir(CACHE_DIR);
+
+ auto l = MakeIntrusive<TLogger>();
+
+ UNIT_ASSERT_EXCEPTION_CONTAINS(TOfflineUpdater(s, l),
+ TRetriableException,
+ "Failed to start TvmClient. You can retry: ServiceTickets: tickets: alarm");
+ UNIT_ASSERT_VALUES_EQUAL(
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): XXXXXXXXXXX\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "4: Failed to get ServiceTickets: tickets: alarm\n"
+ << "4: Failed to get ServiceTickets: tickets: alarm\n"
+ << "4: Failed to get ServiceTickets: tickets: alarm\n"
+ << "4: Failed to update service tickets: tickets: alarm\n",
+ std::regex_replace(std::string(l->Stream.Str()), TIME_REGEX, "XXXXXXXXXXX"));
+ l->Stream.Str().clear();
+
+ {
+ TOfflineUpdater client(s,
+ l,
+ false,
+ {
+ R"({"213" : { "ticket" : "service_ticket_2"}})",
+ R"({"123" : { "ticket" : "service_ticket_3"}})",
+ });
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+
+ NThreading::TFuture<TAddResponse> fut = client.Add({123});
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+
+ UNIT_ASSERT(fut.HasValue());
+ TAddResponse resp{
+ {123, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue());
+
+ UNIT_ASSERT(client.Tickets.empty());
+
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "7: Response with service tickets for 1 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 1 destination(s)\n"
+ << "6: Cache was partly updated with 1 service ticket(s). total: 2\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: got task #1 with 1 dsts\n"
+ << "7: Response with service tickets for 1 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 1 destination(s)\n"
+ << "6: Cache was partly updated with 1 service ticket(s). total: 3\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: task #1: dst=123 got ticket\n"
+ << "7: Adding dst: task #1: set value\n",
+ l->Stream.Str());
+ }
+
+ Y_UNIT_TEST(StartWithCacheAndAdd) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+
+ NTvmApi::TClientSettings s;
+ s.SetSelfTvmId(100500);
+ s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}});
+ s.SetDiskCacheDir(CACHE_DIR);
+
+ auto l = MakeIntrusive<TLogger>();
+ {
+ TOfflineUpdater client(s, l);
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+
+ client.Fail = false;
+ client.Tickets = {
+ R"({"123" : { "ticket" : "service_ticket_3"}, "213" : { "ticket" : "service_ticket_2"}})",
+ };
+ NThreading::TFuture<TAddResponse> fut = client.Add({123, 213});
+
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+
+ UNIT_ASSERT(fut.HasValue());
+ TAddResponse resp{
+ {123, {EDstStatus::Success, ""}},
+ {213, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue());
+
+ UNIT_ASSERT(client.Tickets.empty());
+
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "7: Adding dst: got task #1 with 2 dsts\n"
+ << "7: Response with service tickets for 2 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 2 destination(s)\n"
+ << "6: Cache was partly updated with 2 service ticket(s). total: 3\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: task #1: dst=123 got ticket\n"
+ << "7: Adding dst: task #1: dst=213 got ticket\n"
+ << "7: Adding dst: task #1: set value\n",
+ l->Stream.Str());
+ }
+
+ Y_UNIT_TEST(StartWithCacheAndAddSeveral) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+
+ NTvmApi::TClientSettings s;
+ s.SetSelfTvmId(100500);
+ s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}});
+ s.SetDiskCacheDir(CACHE_DIR);
+
+ auto l = MakeIntrusive<TLogger>();
+ {
+ TOfflineUpdater client(s, l);
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+
+ client.Fail = false;
+ client.Tickets = {
+ R"({"123" : { "ticket" : "service_ticket_3"}, "213" : { "ticket" : "service_ticket_2"}})",
+ };
+ NThreading::TFuture<TAddResponse> fut1 = client.Add({123});
+ NThreading::TFuture<TAddResponse> fut2 = client.Add({213});
+
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+
+ UNIT_ASSERT(fut1.HasValue());
+ TAddResponse resp1{
+ {123, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp1, fut1.GetValue());
+
+ UNIT_ASSERT(fut2.HasValue());
+ TAddResponse resp2{
+ {213, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp2, fut2.GetValue());
+
+ UNIT_ASSERT(client.Tickets.empty());
+
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "7: Adding dst: got task #1 with 1 dsts\n"
+ << "7: Adding dst: got task #2 with 1 dsts\n"
+ << "7: Response with service tickets for 2 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 2 destination(s)\n"
+ << "6: Cache was partly updated with 2 service ticket(s). total: 3\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: task #1: dst=123 got ticket\n"
+ << "7: Adding dst: task #1: set value\n"
+ << "7: Adding dst: task #2: dst=213 got ticket\n"
+ << "7: Adding dst: task #2: set value\n",
+ l->Stream.Str());
+ }
+
+ Y_UNIT_TEST(StartWithCacheAndAddSeveralWithErrors) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+
+ NTvmApi::TClientSettings s;
+ s.SetSelfTvmId(100500);
+ s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}});
+ s.SetDiskCacheDir(CACHE_DIR);
+
+ auto l = MakeIntrusive<TLogger>();
+ {
+ TOfflineUpdater client(s, l);
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+
+ UNIT_ASSERT(client.GetOptionalServiceTicketFor(19));
+ UNIT_ASSERT_VALUES_EQUAL("3:serv:CBAQ__________9_IgYIKhCUkQY:CX",
+ *client.GetOptionalServiceTicketFor(19));
+ UNIT_ASSERT(!client.GetOptionalServiceTicketFor(456));
+
+ client.Fail = false;
+ client.Tickets = {
+ R"({
+ "123" : { "ticket" : "service_ticket_3"},
+ "213" : { "ticket" : "service_ticket_2"},
+ "456" : { "error" : "error_3"}
+ })",
+ };
+ NThreading::TFuture<TAddResponse> fut1 = client.Add({123, 213});
+ NThreading::TFuture<TAddResponse> fut2 = client.Add({213, 456});
+
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(456));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(456));
+
+ UNIT_ASSERT(client.GetOptionalServiceTicketFor(19));
+ UNIT_ASSERT_VALUES_EQUAL("3:serv:CBAQ__________9_IgYIKhCUkQY:CX",
+ *client.GetOptionalServiceTicketFor(19));
+ UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(456),
+ TMissingServiceTicket,
+ "Failed to get ticket for '456': error_3");
+
+ UNIT_ASSERT(fut1.HasValue());
+ TAddResponse resp1{
+ {123, {EDstStatus::Success, ""}},
+ {213, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp1, fut1.GetValue());
+
+ UNIT_ASSERT(fut2.HasValue());
+ TAddResponse resp2{
+ {213, {EDstStatus::Success, ""}},
+ {456, {EDstStatus::Fail, "error_3"}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp2, fut2.GetValue());
+
+ UNIT_ASSERT(client.Tickets.empty());
+
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "7: Adding dst: got task #1 with 2 dsts\n"
+ << "7: Adding dst: got task #2 with 2 dsts\n"
+ << "7: Response with service tickets for 3 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 3 destination(s)\n"
+ << "3: Failed to get service ticket for dst=456: error_3\n"
+ << "6: Cache was partly updated with 2 service ticket(s). total: 3\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: task #1: dst=123 got ticket\n"
+ << "7: Adding dst: task #1: dst=213 got ticket\n"
+ << "7: Adding dst: task #1: set value\n"
+ << "7: Adding dst: task #2: dst=213 got ticket\n"
+ << "4: Adding dst: task #2: dst=456 failed to get ticket: error_3\n"
+ << "7: Adding dst: task #2: set value\n",
+ l->Stream.Str());
+ }
+
+ Y_UNIT_TEST(WithException) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+
+ NTvmApi::TClientSettings s;
+ s.SetSelfTvmId(100500);
+ s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}});
+ s.SetDiskCacheDir(CACHE_DIR);
+
+ auto l = MakeIntrusive<TLogger>();
+ {
+ TOfflineUpdater client(s, l);
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+
+ client.Fail = false;
+ client.Tickets = {
+ R"({
+ "123" : { "ticket" : "service_ticket_3"},
+ "213" : { "ticket" : "service_ticket_2"},
+ "456" : { "error" : "error_3"},
+ "789" : { "ticket" : "service_ticket_4"}
+ })",
+ };
+ NThreading::TFuture<TAddResponse> fut1 = client.Add({123, 213});
+ NThreading::TFuture<TAddResponse> fut2 = client.Add({213, 456});
+ NThreading::TFuture<TAddResponse> fut3 = client.Add({789});
+
+ fut2.Subscribe([](const auto&) {
+ throw yexception() << "planed exc";
+ });
+ fut3.Subscribe([](const auto&) {
+ throw 5;
+ });
+
+ UNIT_ASSERT_NO_EXCEPTION(client.Worker());
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(456));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(456));
+
+ UNIT_ASSERT(fut1.HasValue());
+ TAddResponse resp1{
+ {123, {EDstStatus::Success, ""}},
+ {213, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp1, fut1.GetValue());
+
+ UNIT_ASSERT(fut2.HasValue());
+ TAddResponse resp2{
+ {213, {EDstStatus::Success, ""}},
+ {456, {EDstStatus::Fail, "error_3"}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp2, fut2.GetValue());
+
+ UNIT_ASSERT(fut3.HasValue());
+ TAddResponse resp3{
+ {789, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp3, fut3.GetValue());
+
+ UNIT_ASSERT(client.Tickets.empty());
+
+ TDsts dsts{19, 123, 213, 789};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "7: Adding dst: got task #1 with 2 dsts\n"
+ << "7: Adding dst: got task #2 with 2 dsts\n"
+ << "7: Adding dst: got task #3 with 1 dsts\n"
+ << "7: Response with service tickets for 4 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 4 destination(s)\n"
+ << "3: Failed to get service ticket for dst=456: error_3\n"
+ << "6: Cache was partly updated with 3 service ticket(s). total: 4\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: task #1: dst=123 got ticket\n"
+ << "7: Adding dst: task #1: dst=213 got ticket\n"
+ << "7: Adding dst: task #1: set value\n"
+ << "7: Adding dst: task #2: dst=213 got ticket\n"
+ << "4: Adding dst: task #2: dst=456 failed to get ticket: error_3\n"
+ << "7: Adding dst: task #2: set value\n"
+ << "3: Adding dst: task #2: exception: planed exc\n"
+ << "7: Adding dst: task #3: dst=789 got ticket\n"
+ << "7: Adding dst: task #3: set value\n"
+ << "3: Adding dst: task #3: exception: unknown error\n",
+ l->Stream.Str());
+ }
+}
+
+template <>
+void Out<NTvmAuth::NDynamicClient::TDstResponse>(IOutputStream& out, const NTvmAuth::NDynamicClient::TDstResponse& m) {
+ out << m.Status << " (" << m.Error << ")";
+}
+
+template <>
+void Out<NTvmAuth::NTvmApi::TClientSettings::TDst>(IOutputStream& out, const NTvmAuth::NTvmApi::TClientSettings::TDst& m) {
+ out << m.Id;
+}
diff --git a/library/cpp/tvmauth/client/misc/api/retry_settings.h b/library/cpp/tvmauth/client/misc/api/retry_settings.h
new file mode 100644
index 0000000000..607b230811
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/retry_settings.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <library/cpp/tvmauth/client/misc/exponential_backoff.h>
+
+namespace NTvmAuth::NTvmApi {
+ struct TRetrySettings {
+ TExponentialBackoff::TSettings BackoffSettings = {
+ TDuration::Seconds(0),
+ TDuration::Minutes(1),
+ 2,
+ 0.5,
+ };
+ TDuration MaxRandomSleepDefault = TDuration::Seconds(5);
+ TDuration MaxRandomSleepWhenOk = TDuration::Minutes(1);
+ ui32 RetriesOnStart = 3;
+ ui32 RetriesInBackground = 2;
+ TDuration WorkerAwakingPeriod = TDuration::Seconds(10);
+ ui32 DstsLimit = 300;
+ TDuration RolesUpdatePeriod = TDuration::Minutes(10);
+ TDuration RolesWarnPeriod = TDuration::Minutes(20);
+
+ bool operator==(const TRetrySettings& o) const {
+ return BackoffSettings == o.BackoffSettings &&
+ MaxRandomSleepDefault == o.MaxRandomSleepDefault &&
+ MaxRandomSleepWhenOk == o.MaxRandomSleepWhenOk &&
+ RetriesOnStart == o.RetriesOnStart &&
+ WorkerAwakingPeriod == o.WorkerAwakingPeriod &&
+ DstsLimit == o.DstsLimit &&
+ RolesUpdatePeriod == o.RolesUpdatePeriod &&
+ RolesWarnPeriod == o.RolesWarnPeriod;
+ }
+ };
+}
diff --git a/library/cpp/tvmauth/client/misc/api/roles_fetcher.cpp b/library/cpp/tvmauth/client/misc/api/roles_fetcher.cpp
new file mode 100644
index 0000000000..8f4b359e8c
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/roles_fetcher.cpp
@@ -0,0 +1,164 @@
+#include "roles_fetcher.h"
+
+#include <library/cpp/tvmauth/client/misc/disk_cache.h>
+#include <library/cpp/tvmauth/client/misc/roles/decoder.h>
+#include <library/cpp/tvmauth/client/misc/roles/parser.h>
+
+#include <library/cpp/http/misc/httpcodes.h>
+#include <library/cpp/string_utils/quote/quote.h>
+
+#include <util/string/builder.h>
+#include <util/string/join.h>
+
+namespace NTvmAuth::NTvmApi {
+ static TString CreatePath(const TString& dir, const TString& file) {
+ return dir.EndsWith("/")
+ ? dir + file
+ : dir + "/" + file;
+ }
+
+ TRolesFetcher::TRolesFetcher(const TRolesFetcherSettings& settings, TLoggerPtr logger)
+ : Settings_(settings)
+ , Logger_(logger)
+ , CacheFilePath_(CreatePath(Settings_.CacheDir, "roles"))
+ {
+ Client_ = std::make_unique<TKeepAliveHttpClient>(
+ Settings_.TiroleHost,
+ Settings_.TirolePort,
+ Settings_.Timeout,
+ Settings_.Timeout);
+ }
+
+ TInstant TRolesFetcher::ReadFromDisk() {
+ TDiskReader dr(CacheFilePath_, Logger_.Get());
+ if (!dr.Read()) {
+ return {};
+ }
+
+ std::pair<TString, TString> data = ParseDiskFormat(dr.Data());
+ if (data.second != Settings_.IdmSystemSlug) {
+ Logger_->Warning(
+ TStringBuilder() << "Roles in disk cache are for another slug (" << data.second
+ << "). Self=" << Settings_.IdmSystemSlug);
+ return {};
+ }
+
+ CurrentRoles_.Set(NRoles::TParser::Parse(std::make_shared<TString>(std::move(data.first))));
+ Logger_->Debug(
+ TStringBuilder() << "Succeed to read roles with revision "
+ << CurrentRoles_.Get()->GetMeta().Revision
+ << " from " << CacheFilePath_);
+
+ return dr.Time();
+ }
+
+ bool TRolesFetcher::AreRolesOk() const {
+ return bool(GetCurrentRoles());
+ }
+
+ bool TRolesFetcher::IsTimeToUpdate(const TRetrySettings& settings, TDuration sinceUpdate) {
+ return settings.RolesUpdatePeriod < sinceUpdate;
+ }
+
+ bool TRolesFetcher::ShouldWarn(const TRetrySettings& settings, TDuration sinceUpdate) {
+ return settings.RolesWarnPeriod < sinceUpdate;
+ }
+
+ NUtils::TFetchResult TRolesFetcher::FetchActualRoles(const TString& serviceTicket) {
+ TStringStream out;
+ THttpHeaders outHeaders;
+
+ TRequest req = CreateTiroleRequest(serviceTicket);
+ TKeepAliveHttpClient::THttpCode code = Client_->DoGet(
+ req.Url,
+ &out,
+ req.Headers,
+ &outHeaders);
+
+ const THttpInputHeader* reqId = outHeaders.FindHeader("X-Request-Id");
+
+ Logger_->Debug(
+ TStringBuilder() << "Succeed to perform request for roles to " << Settings_.TiroleHost
+ << " (request_id=" << (reqId ? reqId->Value() : "")
+ << "). code=" << code);
+
+ return {code, std::move(outHeaders), "/v1/get_actual_roles", out.Str(), {}};
+ }
+
+ void TRolesFetcher::Update(NUtils::TFetchResult&& fetchResult, TInstant now) {
+ if (fetchResult.Code == HTTP_NOT_MODIFIED) {
+ Y_ENSURE(CurrentRoles_.Get(),
+ "tirole did not return any roles because current roles are actual,"
+ " but there are no roles in memory - this should never happen");
+ return;
+ }
+
+ Y_ENSURE(fetchResult.Code == HTTP_OK,
+ "Unexpected code from tirole: " << fetchResult.Code << ". " << fetchResult.Response);
+
+ const THttpInputHeader* codec = fetchResult.Headers.FindHeader("X-Tirole-Compression");
+ const TStringBuf codecBuf = codec ? codec->Value() : "";
+
+ NRoles::TRawPtr blob;
+ try {
+ blob = std::make_shared<TString>(NRoles::TDecoder::Decode(
+ codecBuf,
+ std::move(fetchResult.Response)));
+ } catch (const std::exception& e) {
+ throw yexception() << "Failed to decode blob with codec '" << codecBuf
+ << "': " << e.what();
+ }
+
+ CurrentRoles_.Set(NRoles::TParser::Parse(blob));
+
+ Logger_->Debug(
+ TStringBuilder() << "Succeed to update roles with revision "
+ << CurrentRoles_.Get()->GetMeta().Revision);
+
+ TDiskWriter dw(CacheFilePath_, Logger_.Get());
+ dw.Write(PrepareDiskFormat(*blob, Settings_.IdmSystemSlug), now);
+ }
+
+ NTvmAuth::NRoles::TRolesPtr TRolesFetcher::GetCurrentRoles() const {
+ return CurrentRoles_.Get();
+ }
+
+ void TRolesFetcher::ResetConnection() {
+ Client_->ResetConnection();
+ }
+
+ static const char DELIMETER = '\t';
+
+ std::pair<TString, TString> TRolesFetcher::ParseDiskFormat(TStringBuf filebody) {
+ TStringBuf slug = filebody.RNextTok(DELIMETER);
+ return {TString(filebody), CGIUnescapeRet(slug)};
+ }
+
+ TString TRolesFetcher::PrepareDiskFormat(TStringBuf roles, TStringBuf slug) {
+ TStringStream res;
+ res.Reserve(roles.size() + 1 + slug.size());
+ res << roles << DELIMETER << CGIEscapeRet(slug);
+ return res.Str();
+ }
+
+ TRolesFetcher::TRequest TRolesFetcher::CreateTiroleRequest(const TString& serviceTicket) const {
+ TRolesFetcher::TRequest res;
+
+ TStringStream url;
+ url.Reserve(512);
+ url << "/v1/get_actual_roles?";
+ url << "system_slug=" << CGIEscapeRet(Settings_.IdmSystemSlug) << "&";
+ Settings_.ProcInfo.AddToRequest(url);
+ res.Url = std::move(url.Str());
+
+ res.Headers.reserve(2);
+ res.Headers.emplace(XYaServiceTicket_, serviceTicket);
+
+ NRoles::TRolesPtr roles = CurrentRoles_.Get();
+ if (roles) {
+ res.Headers.emplace(IfNoneMatch_, Join("", "\"", roles->GetMeta().Revision, "\""));
+ }
+
+ return res;
+ }
+}
diff --git a/library/cpp/tvmauth/client/misc/api/roles_fetcher.h b/library/cpp/tvmauth/client/misc/api/roles_fetcher.h
new file mode 100644
index 0000000000..63691223b5
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/roles_fetcher.h
@@ -0,0 +1,63 @@
+#pragma once
+
+#include "retry_settings.h"
+
+#include <library/cpp/tvmauth/client/misc/fetch_result.h>
+#include <library/cpp/tvmauth/client/misc/proc_info.h>
+#include <library/cpp/tvmauth/client/misc/utils.h>
+#include <library/cpp/tvmauth/client/misc/roles/roles.h>
+
+#include <library/cpp/tvmauth/client/logger.h>
+
+#include <library/cpp/http/simple/http_client.h>
+
+namespace NTvmAuth::NTvmApi {
+ struct TRolesFetcherSettings {
+ TString TiroleHost;
+ ui16 TirolePort = 0;
+ TString CacheDir;
+ NUtils::TProcInfo ProcInfo;
+ TTvmId SelfTvmId = 0;
+ TString IdmSystemSlug;
+ TDuration Timeout = TDuration::Seconds(30);
+ };
+
+ class TRolesFetcher {
+ public:
+ TRolesFetcher(const TRolesFetcherSettings& settings, TLoggerPtr logger);
+
+ TInstant ReadFromDisk();
+
+ bool AreRolesOk() const;
+ static bool IsTimeToUpdate(const TRetrySettings& settings, TDuration sinceUpdate);
+ static bool ShouldWarn(const TRetrySettings& settings, TDuration sinceUpdate);
+
+ NUtils::TFetchResult FetchActualRoles(const TString& serviceTicket);
+ void Update(NUtils::TFetchResult&& fetchResult, TInstant now = TInstant::Now());
+
+ NTvmAuth::NRoles::TRolesPtr GetCurrentRoles() const;
+
+ void ResetConnection();
+
+ public:
+ static std::pair<TString, TString> ParseDiskFormat(TStringBuf filebody);
+ static TString PrepareDiskFormat(TStringBuf roles, TStringBuf slug);
+
+ struct TRequest {
+ TString Url;
+ TKeepAliveHttpClient::THeaders Headers;
+ };
+ TRequest CreateTiroleRequest(const TString& serviceTicket) const;
+
+ private:
+ const TRolesFetcherSettings Settings_;
+ const TLoggerPtr Logger_;
+ const TString CacheFilePath_;
+ const TString XYaServiceTicket_ = "X-Ya-Service-Ticket";
+ const TString IfNoneMatch_ = "If-None-Match";
+
+ NUtils::TProtectedValue<NTvmAuth::NRoles::TRolesPtr> CurrentRoles_;
+
+ std::unique_ptr<TKeepAliveHttpClient> Client_;
+ };
+}
diff --git a/library/cpp/tvmauth/client/misc/api/settings.cpp b/library/cpp/tvmauth/client/misc/api/settings.cpp
new file mode 100644
index 0000000000..71aad75998
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/settings.cpp
@@ -0,0 +1,89 @@
+#include "settings.h"
+
+#include <util/datetime/base.h>
+#include <util/stream/file.h>
+#include <util/system/fs.h>
+
+#include <set>
+
+namespace NTvmAuth::NTvmApi {
+ void TClientSettings::CheckPermissions(const TString& dir) {
+ const TString name = dir + "/check.tmp";
+
+ try {
+ NFs::EnsureExists(dir);
+
+ TFile file(name, CreateAlways | RdWr);
+
+ NFs::Remove(name);
+ } catch (const std::exception& e) {
+ NFs::Remove(name);
+ ythrow TPermissionDenied() << "Permission denied to disk cache directory: " << e.what();
+ }
+ }
+
+ void TClientSettings::CheckValid() const {
+ if (DiskCacheDir) {
+ CheckPermissions(DiskCacheDir);
+ }
+
+ if (TStringBuf(Secret)) {
+ Y_ENSURE_EX(NeedServiceTicketsFetching(),
+ TBrokenTvmClientSettings() << "Secret is present but destinations list is empty. It makes no sense");
+ }
+ if (NeedServiceTicketsFetching()) {
+ Y_ENSURE_EX(SelfTvmId != 0,
+ TBrokenTvmClientSettings() << "SelfTvmId cannot be 0 if fetching of Service Tickets required");
+ Y_ENSURE_EX((TStringBuf)Secret,
+ TBrokenTvmClientSettings() << "Secret is required for fetching of Service Tickets");
+ }
+
+ if (CheckServiceTickets) {
+ Y_ENSURE_EX(SelfTvmId != 0,
+ TBrokenTvmClientSettings() << "SelfTvmId cannot be 0 if checking of Service Tickets required");
+ }
+
+ if (FetchRolesForIdmSystemSlug) {
+ Y_ENSURE_EX(DiskCacheDir,
+ TBrokenTvmClientSettings() << "Disk cache must be enabled to use roles: "
+ "they can be heavy");
+ }
+
+ bool needSmth = NeedServiceTicketsFetching() ||
+ IsServiceTicketCheckingRequired() ||
+ IsUserTicketCheckingRequired();
+ Y_ENSURE_EX(needSmth, TBrokenTvmClientSettings() << "Invalid settings: nothing to do");
+
+ // Useless now: keep it here to avoid forgetting check from TDst. TODO: PASSP-35377
+ for (const auto& dst : FetchServiceTicketsForDsts) {
+ Y_ENSURE_EX(dst.Id != 0, TBrokenTvmClientSettings() << "TvmId cannot be 0");
+ }
+ // TODO: check only FetchServiceTicketsForDsts_
+ // Python binding checks settings before normalization
+ for (const auto& [alias, dst] : FetchServiceTicketsForDstsWithAliases) {
+ Y_ENSURE_EX(dst.Id != 0, TBrokenTvmClientSettings() << "TvmId cannot be 0");
+ }
+ Y_ENSURE_EX(TiroleTvmId != 0, TBrokenTvmClientSettings() << "TiroleTvmId cannot be 0");
+ }
+
+ TClientSettings TClientSettings::CloneNormalized() const {
+ TClientSettings res = *this;
+
+ std::set<TTvmId> allDsts;
+ for (const auto& tvmid : res.FetchServiceTicketsForDsts) {
+ allDsts.insert(tvmid.Id);
+ }
+ for (const auto& [alias, tvmid] : res.FetchServiceTicketsForDstsWithAliases) {
+ allDsts.insert(tvmid.Id);
+ }
+ if (FetchRolesForIdmSystemSlug) {
+ allDsts.insert(res.TiroleTvmId);
+ }
+
+ res.FetchServiceTicketsForDsts = {allDsts.begin(), allDsts.end()};
+
+ res.CheckValid();
+
+ return res;
+ }
+}
diff --git a/library/cpp/tvmauth/client/misc/api/settings.h b/library/cpp/tvmauth/client/misc/api/settings.h
new file mode 100644
index 0000000000..715ab3e02c
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/settings.h
@@ -0,0 +1,302 @@
+#pragma once
+
+#include <library/cpp/tvmauth/client/misc/settings.h>
+
+#include <library/cpp/tvmauth/client/exception.h>
+
+#include <library/cpp/tvmauth/checked_user_ticket.h>
+#include <library/cpp/tvmauth/type.h>
+
+#include <library/cpp/string_utils/secret_string/secret_string.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/hash.h>
+#include <util/generic/maybe.h>
+
+namespace NTvmAuth::NTvmApi {
+ /**
+ * Settings for TVM client. Uses https://tvm-api.yandex.net to get state.
+ * At least one of them is required:
+ * FetchServiceTicketsForDsts_/FetchServiceTicketsForDstsWithAliases_
+ * CheckServiceTickets_
+ * CheckUserTicketsWithBbEnv_
+ */
+ class TClientSettings: public NTvmAuth::TClientSettings {
+ public:
+ class TDst;
+
+ /**
+ * Alias is an internal name for destinations within your code.
+ * You can associate a name with an tvm_id once in your code and use the name as an alias for
+ * tvm_id to each calling point. Useful for several environments: prod/test/etc.
+ * @example:
+ * // init
+ * static const TString MY_BACKEND = "my backend";
+ * TDstMap map = {{MY_BACKEND, TDst(config.get("my_back_tvm_id"))}};
+ * ...
+ * // per request
+ * TString t = tvmClient.GetServiceTicket(MY_BACKEND);
+ */
+ using TDstMap = THashMap<TAlias, TDst>;
+ using TDstVector = TVector<TDst>;
+
+ public:
+ /*!
+ * NOTE: Please use this option: it provides the best reliability
+ * NOTE: Client requires read/write permissions
+ * WARNING: The same directory can be used only:
+ * - for TVM clients with the same settings
+ * OR
+ * - for new client replacing previous - with another config.
+ * System user must be the same for processes with these clients inside.
+ * Implementation doesn't provide other scenarios.
+ */
+ TString DiskCacheDir;
+
+ // Required for Service Ticket fetching or checking
+ TTvmId SelfTvmId = 0;
+
+ // Options for Service Tickets fetching
+ NSecretString::TSecretString Secret;
+ /*!
+ * Client will process both attrs:
+ * FetchServiceTicketsForDsts_, FetchServiceTicketsForDstsWithAliases_
+ * WARNING: It is not way to provide authorization for incoming ServiceTickets!
+ * It is way only to send your ServiceTickets to your backend!
+ */
+ TDstVector FetchServiceTicketsForDsts;
+ TDstMap FetchServiceTicketsForDstsWithAliases;
+ bool IsIncompleteTicketsSetAnError = true;
+
+ // Options for Service Tickets checking
+ bool CheckServiceTickets = false;
+
+ // Options for User Tickets checking
+ TMaybe<EBlackboxEnv> CheckUserTicketsWithBbEnv;
+
+ // Options for roles fetching
+ TString FetchRolesForIdmSystemSlug;
+ /*!
+ * By default client checks src from ServiceTicket or default uid from UserTicket -
+ * to prevent you from forgetting to check it yourself.
+ * It does binary checks only:
+ * ticket gets status NoRoles, if there is no role for src or default uid.
+ * You need to check roles on your own if you have a non-binary role system or
+ * you have disabled ShouldCheckSrc/ShouldCheckDefaultUid
+ *
+ * You may need to disable this check in the following cases:
+ * - You use GetRoles() to provide verbose message (with revision).
+ * Double check may be inconsistent:
+ * binary check inside client uses revision of roles X - i.e. src 100500 has no role,
+ * exact check in your code uses revision of roles Y - i.e. src 100500 has some roles.
+ */
+ bool ShouldCheckSrc = true;
+ bool ShouldCheckDefaultUid = true;
+
+ // Options for tests
+ TString TvmHost = "https://tvm-api.yandex.net";
+ ui16 TvmPort = 443;
+ TString TiroleHost = "https://tirole-api.yandex.net";
+ TDuration TvmSocketTimeout = TDuration::Seconds(5);
+ TDuration TvmConnectTimeout = TDuration::Seconds(30);
+ ui16 TirolePort = 443;
+ TTvmId TiroleTvmId = TIROLE_TVMID;
+
+ // for debug purposes
+ TString LibVersionPrefix;
+
+ void CheckValid() const;
+ TClientSettings CloneNormalized() const;
+
+ static inline const TTvmId TIROLE_TVMID = 2028120;
+ static inline const TTvmId TIROLE_TVMID_TEST = 2026536;
+
+ // DEPRECATED API
+ // TODO: get rid of it: PASSP-35377
+ public:
+ // Deprecated: set attributes directly
+ void SetSelfTvmId(TTvmId selfTvmId) {
+ SelfTvmId = selfTvmId;
+ }
+
+ // Deprecated: set attributes directly
+ void EnableServiceTicketChecking() {
+ CheckServiceTickets = true;
+ }
+
+ // Deprecated: set attributes directly
+ void EnableUserTicketChecking(EBlackboxEnv env) {
+ CheckUserTicketsWithBbEnv = env;
+ }
+
+ // Deprecated: set attributes directly
+ void SetTvmHostPort(const TString& host, ui16 port) {
+ TvmHost = host;
+ TvmPort = port;
+ }
+
+ // Deprecated: set attributes directly
+ void SetTiroleHostPort(const TString& host, ui16 port) {
+ TiroleHost = host;
+ TirolePort = port;
+ }
+
+ // Deprecated: set attributes directly
+ void EnableRolesFetching(const TString& systemSlug, TTvmId tiroleTvmId = TIROLE_TVMID) {
+ TiroleTvmId = tiroleTvmId;
+ FetchRolesForIdmSystemSlug = systemSlug;
+ }
+
+ // Deprecated: set attributes directly
+ void DoNotCheckSrcByDefault() {
+ ShouldCheckSrc = false;
+ }
+
+ // Deprecated: set attributes directly
+ void DoNotCheckDefaultUidByDefault() {
+ ShouldCheckDefaultUid = false;
+ }
+
+ // Deprecated: set attributes directly
+ void SetDiskCacheDir(const TString& dir) {
+ DiskCacheDir = dir;
+ }
+
+ // Deprecated: set attributes directly
+ void EnableServiceTicketsFetchOptions(const TStringBuf selfSecret,
+ TDstMap&& dsts,
+ const bool considerIncompleteTicketsSetAsError = true) {
+ IsIncompleteTicketsSetAnError = considerIncompleteTicketsSetAsError;
+ Secret = selfSecret;
+
+ FetchServiceTicketsForDsts = TDstVector{};
+ FetchServiceTicketsForDsts.reserve(dsts.size());
+ for (const auto& pair : dsts) {
+ FetchServiceTicketsForDsts.push_back(pair.second);
+ }
+
+ FetchServiceTicketsForDstsWithAliases = std::move(dsts);
+ }
+
+ // Deprecated: set attributes directly
+ void EnableServiceTicketsFetchOptions(const TStringBuf selfSecret,
+ TDstVector&& dsts,
+ const bool considerIncompleteTicketsSetAsError = true) {
+ IsIncompleteTicketsSetAnError = considerIncompleteTicketsSetAsError;
+ Secret = selfSecret;
+ FetchServiceTicketsForDsts = std::move(dsts);
+ }
+
+ public:
+ bool IsServiceTicketFetchingRequired() const {
+ return bool(Secret.Value());
+ }
+
+ const TStringBuf GetSelfSecret() const {
+ return Secret;
+ }
+
+ bool HasDstAliases() const {
+ return !FetchServiceTicketsForDstsWithAliases.empty();
+ }
+
+ const TDstMap& GetDstAliases() const {
+ return FetchServiceTicketsForDstsWithAliases;
+ }
+
+ const TDstVector& GetDestinations() const {
+ return FetchServiceTicketsForDsts;
+ }
+
+ bool IsUserTicketCheckingRequired() const {
+ return bool(CheckUserTicketsWithBbEnv);
+ }
+
+ EBlackboxEnv GetEnvForUserTickets() const {
+ return *CheckUserTicketsWithBbEnv;
+ }
+
+ bool IsServiceTicketCheckingRequired() const {
+ return CheckServiceTickets;
+ }
+
+ bool IsDiskCacheUsed() const {
+ return bool(DiskCacheDir);
+ }
+
+ TString GetDiskCacheDir() const {
+ return DiskCacheDir;
+ }
+
+ TTvmId GetSelfTvmId() const {
+ return SelfTvmId;
+ }
+
+ const TString& GetLibVersionPrefix() const {
+ return LibVersionPrefix;
+ }
+
+ const TString& GetTvmHost() const {
+ return TvmHost;
+ }
+
+ ui16 GetTvmPort() const {
+ return TvmPort;
+ }
+
+ bool IsRolesFetchingEnabled() const {
+ return bool(FetchRolesForIdmSystemSlug);
+ }
+
+ TTvmId GetTiroleTvmId() const {
+ return TiroleTvmId;
+ }
+
+ const TString& GetIdmSystemSlug() const {
+ return FetchRolesForIdmSystemSlug;
+ }
+
+ const TString& GetTiroleHost() const {
+ return TiroleHost;
+ }
+
+ ui16 GetTirolePort() const {
+ return TirolePort;
+ }
+
+ bool NeedServiceTicketsFetching() const {
+ return !FetchServiceTicketsForDsts.empty() ||
+ !FetchServiceTicketsForDstsWithAliases.empty() ||
+ FetchRolesForIdmSystemSlug;
+ }
+
+ // TODO: get rid of TDst: PASSP-35377
+ class TDst {
+ public:
+ TDst(TTvmId id)
+ : Id(id)
+ {
+ Y_ENSURE_EX(id != 0, TBrokenTvmClientSettings() << "TvmId cannot be 0");
+ }
+
+ TTvmId Id;
+
+ bool operator==(const TDst& o) const {
+ return Id == o.Id;
+ }
+
+ bool operator<(const TDst& o) const {
+ return Id < o.Id;
+ }
+
+ public: // for python binding
+ TDst()
+ : Id(0)
+ {
+ }
+ };
+
+ public:
+ static void CheckPermissions(const TString& dir);
+ };
+}
diff --git a/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp b/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp
new file mode 100644
index 0000000000..a7df49c05d
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp
@@ -0,0 +1,954 @@
+#include "threaded_updater.h"
+
+#include <library/cpp/tvmauth/client/misc/disk_cache.h>
+#include <library/cpp/tvmauth/client/misc/utils.h>
+#include <library/cpp/tvmauth/client/misc/retry_settings/v1/settings.pb.h>
+
+#include <library/cpp/tvmauth/client/logger.h>
+
+#include <library/cpp/json/json_reader.h>
+
+#include <util/stream/str.h>
+#include <util/string/builder.h>
+#include <util/string/cast.h>
+#include <util/system/thread.h>
+
+namespace NTvmAuth::NTvmApi {
+ static TString CreatePublicKeysUrl(const TClientSettings& settings,
+ const NUtils::TProcInfo& procInfo) {
+ TStringStream s;
+ s << "/2/keys";
+ s << "?";
+ procInfo.AddToRequest(s);
+
+ s << "&get_retry_settings=yes";
+
+ if (settings.GetSelfTvmId() != 0) {
+ s << "&src=" << settings.GetSelfTvmId();
+ }
+
+ if (settings.IsUserTicketCheckingRequired()) {
+ s << "&env=" << static_cast<int>(settings.GetEnvForUserTickets());
+ }
+
+ return s.Str();
+ }
+
+ TAsyncUpdaterPtr TThreadedUpdater::Create(const TClientSettings& settings, TLoggerPtr logger) {
+ Y_ENSURE_EX(logger, TNonRetriableException() << "Logger is required");
+ THolder<TThreadedUpdater> p(new TThreadedUpdater(settings, std::move(logger)));
+ p->Init();
+ p->StartWorker();
+ return p.Release();
+ }
+
+ TThreadedUpdater::~TThreadedUpdater() {
+ ExpBackoff_.SetEnabled(false);
+ ExpBackoff_.Interrupt();
+ StopWorker(); // Required here to avoid using of deleted members
+ }
+
+ TClientStatus TThreadedUpdater::GetStatus() const {
+ const TClientStatus::ECode state = GetState();
+ return TClientStatus(state, GetLastError(state == TClientStatus::Ok || state == TClientStatus::IncompleteTicketsSet));
+ }
+
+ NRoles::TRolesPtr TThreadedUpdater::GetRoles() const {
+ Y_ENSURE_EX(RolesFetcher_,
+ TBrokenTvmClientSettings() << "Roles were not configured in settings");
+ return RolesFetcher_->GetCurrentRoles();
+ }
+
+ TClientStatus::ECode TThreadedUpdater::GetState() const {
+ const TInstant now = TInstant::Now();
+
+ if (Settings_.IsServiceTicketFetchingRequired()) {
+ if (AreServiceTicketsInvalid(now)) {
+ return TClientStatus::Error;
+ }
+ auto tickets = GetCachedServiceTickets();
+ if (!tickets) {
+ return TClientStatus::Error;
+ }
+ if (tickets->TicketsById.size() < Destinations_.size()) {
+ if (Settings_.IsIncompleteTicketsSetAnError) {
+ return TClientStatus::Error;
+ } else {
+ return TClientStatus::IncompleteTicketsSet;
+ }
+ }
+ }
+ if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) && ArePublicKeysInvalid(now)) {
+ return TClientStatus::Error;
+ }
+
+ const TDuration sincePublicKeysUpdate = now - GetUpdateTimeOfPublicKeys();
+ const TDuration sinceServiceTicketsUpdate = now - GetUpdateTimeOfServiceTickets();
+ const TDuration sinceRolesUpdate = now - GetUpdateTimeOfRoles();
+
+ if (Settings_.IsServiceTicketFetchingRequired() && sinceServiceTicketsUpdate > ServiceTicketsDurations_.Expiring) {
+ return TClientStatus::Warning;
+ }
+ if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) &&
+ sincePublicKeysUpdate > PublicKeysDurations_.Expiring)
+ {
+ return TClientStatus::Warning;
+ }
+ if (RolesFetcher_ && TRolesFetcher::ShouldWarn(RetrySettings_, sinceRolesUpdate)) {
+ return TClientStatus::Warning;
+ }
+
+ return TClientStatus::Ok;
+ }
+
+ TThreadedUpdater::TThreadedUpdater(const TClientSettings& settings, TLoggerPtr logger)
+ : TThreadedUpdaterBase(
+ TRetrySettings{}.WorkerAwakingPeriod,
+ std::move(logger),
+ settings.GetTvmHost(),
+ settings.GetTvmPort(),
+ settings.TvmSocketTimeout,
+ settings.TvmConnectTimeout)
+ , ExpBackoff_(RetrySettings_.BackoffSettings)
+ , Settings_(settings.CloneNormalized())
+ , ProcInfo_(NUtils::TProcInfo::Create(Settings_.GetLibVersionPrefix()))
+ , PublicKeysUrl_(CreatePublicKeysUrl(Settings_, ProcInfo_))
+ , DstAliases_(MakeAliasMap(Settings_))
+ , Headers_({{"Content-Type", "application/x-www-form-urlencoded"}})
+ , Random_(TInstant::Now().MicroSeconds())
+ {
+ if (Settings_.IsServiceTicketFetchingRequired()) {
+ SigningContext_ = TServiceContext::SigningFactory(Settings_.GetSelfSecret());
+ }
+
+ if (Settings_.IsServiceTicketFetchingRequired()) {
+ Destinations_ = {Settings_.GetDestinations().begin(), Settings_.GetDestinations().end()};
+ }
+
+ PublicKeysDurations_.RefreshPeriod = TDuration::Days(1);
+ ServiceTicketsDurations_.RefreshPeriod = TDuration::Hours(1);
+
+ if (Settings_.IsUserTicketCheckingRequired()) {
+ SetBbEnv(Settings_.GetEnvForUserTickets());
+ }
+
+ if (Settings_.IsRolesFetchingEnabled()) {
+ RolesFetcher_ = std::make_unique<TRolesFetcher>(
+ TRolesFetcherSettings{
+ Settings_.GetTiroleHost(),
+ Settings_.GetTirolePort(),
+ Settings_.GetDiskCacheDir(),
+ ProcInfo_,
+ Settings_.GetSelfTvmId(),
+ Settings_.GetIdmSystemSlug(),
+ },
+ Logger_);
+ }
+
+ if (Settings_.IsDiskCacheUsed()) {
+ TString path = Settings_.GetDiskCacheDir();
+ if (path.back() != '/') {
+ path.push_back('/');
+ }
+
+ if (Settings_.IsServiceTicketFetchingRequired()) {
+ ServiceTicketsFilepath_ = path;
+ ServiceTicketsFilepath_.append("service_tickets");
+ }
+
+ if (Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) {
+ PublicKeysFilepath_ = path;
+ PublicKeysFilepath_.append("public_keys");
+ }
+
+ RetrySettingsFilepath_ = path + "retry_settings";
+ } else {
+ LogInfo("Disk cache disabled. Please set disk cache directory in settings for best reliability");
+ }
+ }
+
+ void TThreadedUpdater::Init() {
+ ReadStateFromDisk();
+ ClearErrors();
+ ExpBackoff_.SetEnabled(false);
+
+ // First of all try to get tickets: there are a lot of reasons to fail this request.
+ // As far as disk cache usually disabled, client will not fetch keys before fail on every ctor call.
+ UpdateServiceTickets();
+ if (!AreServicesTicketsOk()) {
+ ThrowLastError();
+ }
+
+ UpdatePublicKeys();
+ if (!IsServiceContextOk() || !IsUserContextOk()) {
+ ThrowLastError();
+ }
+
+ UpdateRoles();
+ if (RolesFetcher_ && !RolesFetcher_->AreRolesOk()) {
+ ThrowLastError();
+ }
+
+ Inited_ = true;
+ ExpBackoff_.SetEnabled(true);
+ }
+
+ void TThreadedUpdater::UpdateServiceTickets() {
+ if (!Settings_.IsServiceTicketFetchingRequired()) {
+ return;
+ }
+
+ TInstant stut = GetUpdateTimeOfServiceTickets();
+ try {
+ if (IsTimeToUpdateServiceTickets(stut)) {
+ UpdateAllServiceTickets();
+ NeedFetchMissingServiceTickets_ = false;
+ } else if (NeedFetchMissingServiceTickets_ && GetCachedServiceTickets()->TicketsById.size() < Destinations_.size()) {
+ UpdateMissingServiceTickets(Destinations_);
+ NeedFetchMissingServiceTickets_ = false;
+ }
+ if (AreServicesTicketsOk()) {
+ ClearError(EScope::ServiceTickets);
+ }
+ } catch (const std::exception& e) {
+ ProcessError(EType::Retriable, EScope::ServiceTickets, e.what());
+ LogWarning(TStringBuilder() << "Failed to update service tickets: " << e.what());
+ if (TInstant::Now() - stut > ServiceTicketsDurations_.Expiring) {
+ LogError("Service tickets have not been refreshed for too long period");
+ }
+ }
+ }
+
+ void TThreadedUpdater::UpdateAllServiceTickets() {
+ THttpResult st = GetServiceTicketsFromHttp(Destinations_, RetrySettings_.DstsLimit);
+
+ auto oldCache = GetCachedServiceTickets();
+ if (oldCache) {
+ for (const auto& pair : oldCache->ErrorsById) {
+ st.TicketsWithErrors.Errors.insert(pair);
+ }
+ }
+
+ UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), TInstant::Now());
+ if (ServiceTicketsFilepath_) {
+ DiskCacheServiceTickets_ = CreateJsonArray(st.Responses);
+ TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get());
+ w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId()));
+ }
+ }
+
+ TServiceTicketsPtr TThreadedUpdater::UpdateMissingServiceTickets(const TDstSet& required) {
+ TServiceTicketsPtr cache = GetCachedServiceTickets();
+ TClientSettings::TDstVector dsts = FindMissingDsts(cache, required);
+
+ if (dsts.empty()) {
+ return cache;
+ }
+
+ THttpResult st = GetServiceTicketsFromHttp(dsts, RetrySettings_.DstsLimit);
+
+ size_t gotTickets = st.TicketsWithErrors.Tickets.size();
+
+ for (const auto& pair : cache->TicketsById) {
+ st.TicketsWithErrors.Tickets.insert(pair);
+ }
+ for (const auto& pair : cache->ErrorsById) {
+ st.TicketsWithErrors.Errors.insert(pair);
+ }
+ for (const auto& pair : st.TicketsWithErrors.Tickets) {
+ st.TicketsWithErrors.Errors.erase(pair.first);
+ }
+
+ TServiceTicketsPtr c = UpdateServiceTicketsCachePartly(
+ std::move(st.TicketsWithErrors),
+ gotTickets);
+ if (!c) {
+ LogWarning("UpdateMissingServiceTickets: new cache is NULL. BUG?");
+ c = cache;
+ }
+
+ if (!ServiceTicketsFilepath_) {
+ return c;
+ }
+
+ DiskCacheServiceTickets_ = AppendToJsonArray(DiskCacheServiceTickets_, st.Responses);
+
+ TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get());
+ w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId()));
+
+ return c;
+ }
+
+ void TThreadedUpdater::UpdatePublicKeys() {
+ if (!Settings_.IsServiceTicketCheckingRequired() && !Settings_.IsUserTicketCheckingRequired()) {
+ return;
+ }
+
+ TInstant pkut = GetUpdateTimeOfPublicKeys();
+ if (!IsTimeToUpdatePublicKeys(pkut)) {
+ return;
+ }
+
+ try {
+ TString publicKeys = GetPublicKeysFromHttp();
+
+ UpdatePublicKeysCache(publicKeys, TInstant::Now());
+ if (PublicKeysFilepath_) {
+ TDiskWriter w(PublicKeysFilepath_, Logger_.Get());
+ w.Write(publicKeys);
+ }
+ if (IsServiceContextOk() && IsUserContextOk()) {
+ ClearError(EScope::PublicKeys);
+ }
+ } catch (const std::exception& e) {
+ ProcessError(EType::Retriable, EScope::PublicKeys, e.what());
+ LogWarning(TStringBuilder() << "Failed to update public keys: " << e.what());
+ if (TInstant::Now() - pkut > PublicKeysDurations_.Expiring) {
+ LogError("Public keys have not been refreshed for too long period");
+ }
+ }
+ }
+
+ void TThreadedUpdater::UpdateRoles() {
+ if (!RolesFetcher_) {
+ return;
+ }
+
+ TInstant rut = GetUpdateTimeOfRoles();
+ if (!TRolesFetcher::IsTimeToUpdate(RetrySettings_, TInstant::Now() - rut)) {
+ return;
+ }
+
+ struct TCloser {
+ TRolesFetcher* Fetcher;
+ ~TCloser() {
+ Fetcher->ResetConnection();
+ }
+ } closer{RolesFetcher_.get()};
+
+ try {
+ TServiceTicketsPtr st = GetCachedServiceTickets();
+ Y_ENSURE(st, "No one service ticket in memory: how it possible?");
+ auto it = st->TicketsById.find(Settings_.GetTiroleTvmId());
+ Y_ENSURE(it != st->TicketsById.end(),
+ "Missing tvmid for tirole in cache: " << Settings_.GetTiroleTvmId());
+
+ RolesFetcher_->Update(
+ FetchWithRetries(
+ [&]() { return RolesFetcher_->FetchActualRoles(it->second); },
+ EScope::Roles));
+ SetUpdateTimeOfRoles(TInstant::Now());
+
+ if (RolesFetcher_->AreRolesOk()) {
+ ClearError(EScope::Roles);
+ }
+ } catch (const std::exception& e) {
+ ProcessError(EType::Retriable, EScope::Roles, e.what());
+ LogWarning(TStringBuilder() << "Failed to update roles: " << e.what());
+ if (TRolesFetcher::ShouldWarn(RetrySettings_, TInstant::Now() - rut)) {
+ LogError("Roles have not been refreshed for too long period");
+ }
+ }
+ }
+
+ TServiceTicketsPtr TThreadedUpdater::UpdateServiceTicketsCachePartly(
+ TAsyncUpdaterBase::TPairTicketsErrors&& tickets,
+ size_t got) {
+ size_t count = tickets.Tickets.size();
+ TServiceTicketsPtr c = MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets),
+ std::move(tickets.Errors),
+ DstAliases_);
+ SetServiceTickets(c);
+
+ LogInfo(TStringBuilder()
+ << "Cache was partly updated with " << got
+ << " service ticket(s). total: " << count);
+
+ return c;
+ }
+
+ void TThreadedUpdater::UpdateServiceTicketsCache(TPairTicketsErrors&& tickets, TInstant time) {
+ size_t count = tickets.Tickets.size();
+ SetServiceTickets(MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets),
+ std::move(tickets.Errors),
+ DstAliases_));
+
+ SetUpdateTimeOfServiceTickets(time);
+
+ if (count > 0) {
+ LogInfo(TStringBuilder() << "Cache was updated with " << count << " service ticket(s): " << time);
+ }
+ }
+
+ void TThreadedUpdater::UpdatePublicKeysCache(const TString& publicKeys, TInstant time) {
+ if (publicKeys.empty()) {
+ return;
+ }
+
+ if (Settings_.IsServiceTicketCheckingRequired()) {
+ SetServiceContext(MakeIntrusiveConst<TServiceContext>(
+ TServiceContext::CheckingFactory(Settings_.GetSelfTvmId(),
+ publicKeys)));
+ }
+
+ if (Settings_.IsUserTicketCheckingRequired()) {
+ SetUserContext(publicKeys);
+ }
+
+ SetUpdateTimeOfPublicKeys(time);
+
+ LogInfo(TStringBuilder() << "Cache was updated with public keys: " << time);
+ }
+
+ void TThreadedUpdater::ReadStateFromDisk() {
+ try {
+ TServiceTicketsFromDisk st = ReadServiceTicketsFromDisk();
+ UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), st.BornDate);
+ DiskCacheServiceTickets_ = st.FileBody;
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder() << "Failed to read service tickets from disk: " << e.what());
+ }
+
+ try {
+ std::pair<TString, TInstant> pk = ReadPublicKeysFromDisk();
+ UpdatePublicKeysCache(pk.first, pk.second);
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder() << "Failed to read public keys from disk: " << e.what());
+ }
+
+ try {
+ TString rs = ReadRetrySettingsFromDisk();
+ UpdateRetrySettings(rs);
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder() << "Failed to read retry settings from disk: " << e.what());
+ }
+
+ try {
+ if (RolesFetcher_) {
+ SetUpdateTimeOfRoles(RolesFetcher_->ReadFromDisk());
+ }
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder() << "Failed to read roles from disk: " << e.what());
+ }
+ }
+
+ TThreadedUpdater::TServiceTicketsFromDisk TThreadedUpdater::ReadServiceTicketsFromDisk() const {
+ if (!ServiceTicketsFilepath_) {
+ return {};
+ }
+
+ TDiskReader r(ServiceTicketsFilepath_, Logger_.Get());
+ if (!r.Read()) {
+ return {};
+ }
+
+ std::pair<TStringBuf, TTvmId> data = ParseTicketsFromDisk(r.Data());
+ if (data.second != Settings_.GetSelfTvmId()) {
+ TStringStream s;
+ s << "Disk cache is for another tvmId (" << data.second << "). ";
+ s << "Self=" << Settings_.GetSelfTvmId();
+ LogWarning(s.Str());
+ return {};
+ }
+
+ TPairTicketsErrors res;
+ ParseTicketsFromResponse(data.first, Destinations_, res);
+ if (IsInvalid(TServiceTickets::GetInvalidationTime(res.Tickets), TInstant::Now())) {
+ LogWarning("Disk cache (service tickets) is too old");
+ return {};
+ }
+
+ LogInfo(TStringBuilder() << "Got " << res.Tickets.size() << " service ticket(s) from disk");
+ return {std::move(res), r.Time(), TString(data.first)};
+ }
+
+ std::pair<TString, TInstant> TThreadedUpdater::ReadPublicKeysFromDisk() const {
+ if (!PublicKeysFilepath_) {
+ return {};
+ }
+
+ TDiskReader r(PublicKeysFilepath_, Logger_.Get());
+ if (!r.Read()) {
+ return {};
+ }
+
+ if (TInstant::Now() - r.Time() > PublicKeysDurations_.Invalid) {
+ LogWarning("Disk cache (public keys) is too old");
+ return {};
+ }
+
+ return {r.Data(), r.Time()};
+ }
+
+ TString TThreadedUpdater::ReadRetrySettingsFromDisk() const {
+ if (!RetrySettingsFilepath_) {
+ return {};
+ }
+
+ TDiskReader r(RetrySettingsFilepath_, Logger_.Get());
+ if (!r.Read()) {
+ return {};
+ }
+
+ return r.Data();
+ }
+
+ template <class Dsts>
+ TThreadedUpdater::THttpResult TThreadedUpdater::GetServiceTicketsFromHttp(const Dsts& dsts, const size_t dstLimit) const {
+ Y_ENSURE(SigningContext_, "Internal error");
+
+ TClientSettings::TDstVector part;
+ part.reserve(dstLimit);
+ THttpResult res;
+ res.TicketsWithErrors.Tickets.reserve(dsts.size());
+ res.Responses.reserve(dsts.size() / dstLimit + 1);
+
+ for (auto it = dsts.begin(); it != dsts.end();) {
+ part.clear();
+ for (size_t count = 0; it != dsts.end() && count < dstLimit; ++count, ++it) {
+ part.push_back(*it);
+ }
+
+ TString response =
+ FetchWithRetries(
+ [this, &part]() {
+ // create request here to keep 'ts' actual
+ return FetchServiceTicketsFromHttp(PrepareRequestForServiceTickets(
+ Settings_.GetSelfTvmId(),
+ *SigningContext_,
+ part,
+ ProcInfo_));
+ },
+ EScope::ServiceTickets)
+ .Response;
+ ParseTicketsFromResponse(response, part, res.TicketsWithErrors);
+ LogDebug(TStringBuilder()
+ << "Response with service tickets for " << part.size()
+ << " destination(s) was successfully fetched from " << TvmUrl_);
+
+ res.Responses.push_back(response);
+ }
+
+ LogDebug(TStringBuilder()
+ << "Got responses with service tickets with " << res.Responses.size() << " pages for "
+ << dsts.size() << " destination(s)");
+ for (const auto& p : res.TicketsWithErrors.Errors) {
+ LogError(TStringBuilder()
+ << "Failed to get service ticket for dst=" << p.first << ": " << p.second);
+ }
+
+ return res;
+ }
+
+ TString TThreadedUpdater::GetPublicKeysFromHttp() const {
+ TString publicKeys =
+ FetchWithRetries(
+ [this]() { return FetchPublicKeysFromHttp(); },
+ EScope::PublicKeys)
+ .Response;
+
+ LogDebug("Public keys were successfully fetched from " + TvmUrl_);
+
+ return publicKeys;
+ }
+
+ NUtils::TFetchResult TThreadedUpdater::FetchServiceTicketsFromHttp(const TString& body) const {
+ TStringStream s;
+
+ THttpHeaders outHeaders;
+ TKeepAliveHttpClient::THttpCode code = GetClient().DoPost("/2/ticket", body, &s, Headers_, &outHeaders);
+
+ const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings");
+
+ return {code, {}, "/2/ticket", s.Str(), settings ? settings->Value() : ""};
+ }
+
+ NUtils::TFetchResult TThreadedUpdater::FetchPublicKeysFromHttp() const {
+ TStringStream s;
+
+ THttpHeaders outHeaders;
+ TKeepAliveHttpClient::THttpCode code = GetClient().DoGet(PublicKeysUrl_, &s, {}, &outHeaders);
+
+ const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings");
+
+ return {code, {}, "/2/keys", s.Str(), settings ? settings->Value() : ""};
+ }
+
+ bool TThreadedUpdater::UpdateRetrySettings(const TString& header) const {
+ if (header.empty()) {
+ // Probably it is some kind of test?
+ return false;
+ }
+
+ try {
+ TString raw = NUtils::Base64url2bin(header);
+ Y_ENSURE(raw, "Invalid base64url in settings");
+
+ retry_settings::v1::Settings proto;
+ Y_ENSURE(proto.ParseFromString(raw), "Invalid proto");
+
+ // This ugly hack helps to process these settings in any case
+ TThreadedUpdater& this_ = *const_cast<TThreadedUpdater*>(this);
+ TRetrySettings& res = this_.RetrySettings_;
+
+ TStringStream diff;
+ auto update = [&diff](auto& l, const auto& r, TStringBuf desc) {
+ if (l != r) {
+ diff << desc << ":" << l << "->" << r << ";";
+ l = r;
+ }
+ };
+
+ if (proto.has_exponential_backoff_min_sec()) {
+ update(res.BackoffSettings.Min,
+ TDuration::Seconds(proto.exponential_backoff_min_sec()),
+ "exponential_backoff_min");
+ }
+ if (proto.has_exponential_backoff_max_sec()) {
+ update(res.BackoffSettings.Max,
+ TDuration::Seconds(proto.exponential_backoff_max_sec()),
+ "exponential_backoff_max");
+ }
+ if (proto.has_exponential_backoff_factor()) {
+ update(res.BackoffSettings.Factor,
+ proto.exponential_backoff_factor(),
+ "exponential_backoff_factor");
+ }
+ if (proto.has_exponential_backoff_jitter()) {
+ update(res.BackoffSettings.Jitter,
+ proto.exponential_backoff_jitter(),
+ "exponential_backoff_jitter");
+ }
+ this_.ExpBackoff_.UpdateSettings(res.BackoffSettings);
+
+ if (proto.has_max_random_sleep_default()) {
+ update(res.MaxRandomSleepDefault,
+ TDuration::MilliSeconds(proto.max_random_sleep_default()),
+ "max_random_sleep_default");
+ }
+ if (proto.has_max_random_sleep_when_ok()) {
+ update(res.MaxRandomSleepWhenOk,
+ TDuration::MilliSeconds(proto.max_random_sleep_when_ok()),
+ "max_random_sleep_when_ok");
+ }
+ if (proto.has_retries_on_start()) {
+ Y_ENSURE(proto.retries_on_start(), "retries_on_start==0");
+ update(res.RetriesOnStart,
+ proto.retries_on_start(),
+ "retries_on_start");
+ }
+ if (proto.has_retries_in_background()) {
+ Y_ENSURE(proto.retries_in_background(), "retries_in_background==0");
+ update(res.RetriesInBackground,
+ proto.retries_in_background(),
+ "retries_in_background");
+ }
+ if (proto.has_worker_awaking_period_sec()) {
+ update(res.WorkerAwakingPeriod,
+ TDuration::Seconds(proto.worker_awaking_period_sec()),
+ "worker_awaking_period");
+ this_.WorkerAwakingPeriod_ = res.WorkerAwakingPeriod;
+ }
+ if (proto.has_dsts_limit()) {
+ Y_ENSURE(proto.dsts_limit(), "dsts_limit==0");
+ update(res.DstsLimit,
+ proto.dsts_limit(),
+ "dsts_limit");
+ }
+
+ if (proto.has_roles_update_period_sec()) {
+ Y_ENSURE(proto.roles_update_period_sec(), "roles_update_period==0");
+ update(res.RolesUpdatePeriod,
+ TDuration::Seconds(proto.roles_update_period_sec()),
+ "roles_update_period_sec");
+ }
+ if (proto.has_roles_warn_period_sec()) {
+ Y_ENSURE(proto.roles_warn_period_sec(), "roles_warn_period_sec==0");
+ update(res.RolesWarnPeriod,
+ TDuration::Seconds(proto.roles_warn_period_sec()),
+ "roles_warn_period_sec");
+ }
+
+ if (diff.empty()) {
+ return false;
+ }
+
+ LogDebug("Retry settings were updated: " + diff.Str());
+ return true;
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder()
+ << "Failed to update retry settings from server, header '"
+ << header << "': "
+ << e.what());
+ }
+
+ return false;
+ }
+
+ template <typename Func>
+ NUtils::TFetchResult TThreadedUpdater::FetchWithRetries(Func func, EScope scope) const {
+ const ui32 tries = Inited_ ? RetrySettings_.RetriesInBackground
+ : RetrySettings_.RetriesOnStart;
+
+ for (size_t idx = 1;; ++idx) {
+ RandomSleep();
+
+ try {
+ NUtils::TFetchResult result = func();
+
+ if (UpdateRetrySettings(result.RetrySettings) && RetrySettingsFilepath_) {
+ TDiskWriter w(RetrySettingsFilepath_, Logger_.Get());
+ w.Write(result.RetrySettings);
+ }
+
+ if (400 <= result.Code && result.Code <= 499) {
+ throw TNonRetriableException() << ProcessHttpError(scope, result.Path, result.Code, result.Response);
+ }
+ if (result.Code < 200 || result.Code >= 399) {
+ throw yexception() << ProcessHttpError(scope, result.Path, result.Code, result.Response);
+ }
+
+ ExpBackoff_.Decrease();
+ return result;
+ } catch (const TNonRetriableException& e) {
+ LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what());
+ ExpBackoff_.Increase();
+ throw;
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what());
+ ExpBackoff_.Increase();
+ if (idx >= tries) {
+ throw;
+ }
+ }
+ }
+
+ throw yexception() << "unreachable";
+ }
+
+ void TThreadedUpdater::RandomSleep() const {
+ const TDuration maxSleep = TClientStatus::ECode::Ok == GetState()
+ ? RetrySettings_.MaxRandomSleepWhenOk
+ : RetrySettings_.MaxRandomSleepDefault;
+
+ if (maxSleep) {
+ ui32 toSleep = Random_.GenRand() % maxSleep.MilliSeconds();
+ ExpBackoff_.Sleep(TDuration::MilliSeconds(toSleep));
+ }
+ }
+
+ TString TThreadedUpdater::PrepareRequestForServiceTickets(TTvmId src,
+ const TServiceContext& ctx,
+ const TClientSettings::TDstVector& dsts,
+ const NUtils::TProcInfo& procInfo,
+ time_t now) {
+ TStringStream s;
+
+ const TString ts = IntToString<10>(now);
+ TStringStream dst;
+ dst.Reserve(10 * dsts.size());
+ for (const TClientSettings::TDst& d : dsts) {
+ if (dst.Str()) {
+ dst << ',';
+ }
+ dst << d.Id;
+ }
+
+ s << "grant_type=client_credentials";
+ s << "&src=" << src;
+ s << "&dst=" << dst.Str();
+ s << "&ts=" << ts;
+ s << "&sign=" << ctx.SignCgiParamsForTvm(ts, dst.Str());
+ s << "&get_retry_settings=yes";
+
+ s << "&";
+ procInfo.AddToRequest(s);
+
+ return s.Str();
+ }
+
+ template <class Dsts>
+ void TThreadedUpdater::ParseTicketsFromResponse(TStringBuf resp,
+ const Dsts& dsts,
+ TPairTicketsErrors& out) const {
+ NJson::TJsonValue doc;
+ Y_ENSURE(NJson::ReadJsonTree(resp, &doc), "Invalid json from tvm-api: " << resp);
+
+ const NJson::TJsonValue* currentResp = doc.IsMap() ? &doc : nullptr;
+ auto find = [&currentResp, &doc](TTvmId id, NJson::TJsonValue& obj) -> bool {
+ const TString idStr = IntToString<10>(id);
+ if (currentResp && currentResp->GetValue(idStr, &obj)) {
+ return true;
+ }
+
+ for (const NJson::TJsonValue& val : doc.GetArray()) {
+ currentResp = &val;
+ if (currentResp->GetValue(idStr, &obj)) {
+ return true;
+ }
+ }
+
+ return false;
+ };
+
+ for (const TClientSettings::TDst& d : dsts) {
+ NJson::TJsonValue obj;
+ NJson::TJsonValue val;
+
+ if (!find(d.Id, obj) || !obj.GetValue("ticket", &val)) {
+ TString err;
+ if (obj.GetValue("error", &val)) {
+ err = val.GetString();
+ } else {
+ err = "Missing tvm_id in response, should never happend: " + IntToString<10>(d.Id);
+ }
+
+ TStringStream s;
+ s << "Failed to get ServiceTicket for " << d.Id << ": " << err;
+ ProcessError(EType::NonRetriable, EScope::ServiceTickets, s.Str());
+
+ out.Errors.insert({d.Id, std::move(err)});
+ continue;
+ }
+
+ out.Tickets.insert({d.Id, val.GetString()});
+ }
+ }
+
+ static const char DELIMETER = '\t';
+ TString TThreadedUpdater::PrepareTicketsForDisk(TStringBuf tvmResponse, TTvmId selfId) {
+ TStringStream s;
+ s << tvmResponse << DELIMETER << selfId;
+ return s.Str();
+ }
+
+ std::pair<TStringBuf, TTvmId> TThreadedUpdater::ParseTicketsFromDisk(TStringBuf data) {
+ TStringBuf tvmId = data.RNextTok(DELIMETER);
+ return {data, IntFromString<TTvmId, 10>(tvmId)};
+ }
+
+ const TDstSet& TThreadedUpdater::GetDsts() const {
+ return Destinations_;
+ }
+
+ void TThreadedUpdater::AddDstToSettings(const TClientSettings::TDst& dst) {
+ Destinations_.insert(dst);
+ }
+
+ bool TThreadedUpdater::IsTimeToUpdateServiceTickets(TInstant lastUpdate) const {
+ return TInstant::Now() - lastUpdate > ServiceTicketsDurations_.RefreshPeriod;
+ }
+
+ bool TThreadedUpdater::IsTimeToUpdatePublicKeys(TInstant lastUpdate) const {
+ return TInstant::Now() - lastUpdate > PublicKeysDurations_.RefreshPeriod;
+ }
+
+ bool TThreadedUpdater::AreServicesTicketsOk() const {
+ if (!Settings_.IsServiceTicketFetchingRequired()) {
+ return true;
+ }
+ auto c = GetCachedServiceTickets();
+ return c && (!Settings_.IsIncompleteTicketsSetAnError || c->TicketsById.size() == Destinations_.size());
+ }
+
+ bool TThreadedUpdater::IsServiceContextOk() const {
+ if (!Settings_.IsServiceTicketCheckingRequired()) {
+ return true;
+ }
+
+ return bool(GetCachedServiceContext());
+ }
+
+ bool TThreadedUpdater::IsUserContextOk() const {
+ if (!Settings_.IsUserTicketCheckingRequired()) {
+ return true;
+ }
+ return bool(GetCachedUserContext());
+ }
+
+ void TThreadedUpdater::Worker() {
+ UpdateServiceTickets();
+ UpdatePublicKeys();
+ UpdateRoles();
+ }
+
+ TServiceTickets::TMapAliasId TThreadedUpdater::MakeAliasMap(const TClientSettings& settings) {
+ TServiceTickets::TMapAliasId res;
+
+ if (settings.HasDstAliases()) {
+ for (const auto& pair : settings.GetDstAliases()) {
+ res.insert({pair.first, pair.second.Id});
+ }
+ }
+
+ return res;
+ }
+
+ TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(TServiceTicketsPtr available, const TDstSet& required) {
+ Y_ENSURE(available);
+ TDstSet set;
+ // available->TicketsById is not sorted
+ for (const auto& pair : available->TicketsById) {
+ set.insert(pair.first);
+ }
+ return FindMissingDsts(set, required);
+ }
+
+ TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(const TDstSet& available, const TDstSet& required) {
+ TClientSettings::TDstVector res;
+ std::set_difference(required.begin(), required.end(),
+ available.begin(), available.end(),
+ std::inserter(res, res.begin()));
+ return res;
+ }
+
+ TString TThreadedUpdater::CreateJsonArray(const TSmallVec<TString>& responses) {
+ if (responses.empty()) {
+ return "[]";
+ }
+
+ size_t size = 0;
+ for (const TString& r : responses) {
+ size += r.size() + 1;
+ }
+
+ TString res;
+ res.reserve(size + 2);
+
+ res.push_back('[');
+ for (const TString& r : responses) {
+ res.append(r).push_back(',');
+ }
+ res.back() = ']';
+
+ return res;
+ }
+
+ TString TThreadedUpdater::AppendToJsonArray(const TString& json, const TSmallVec<TString>& responses) {
+ Y_ENSURE(json, "previous body required");
+
+ size_t size = 0;
+ for (const TString& r : responses) {
+ size += r.size() + 1;
+ }
+
+ TString res;
+ res.reserve(size + 2 + json.size());
+
+ res.push_back('[');
+ if (json.StartsWith('[')) {
+ Y_ENSURE(json.EndsWith(']'), "array is broken:" << json);
+ res.append(TStringBuf(json).Chop(1).Skip(1));
+ } else {
+ res.append(json);
+ }
+
+ res.push_back(',');
+ for (const TString& r : responses) {
+ res.append(r).push_back(',');
+ }
+ res.back() = ']';
+
+ return res;
+ }
+}
diff --git a/library/cpp/tvmauth/client/misc/api/threaded_updater.h b/library/cpp/tvmauth/client/misc/api/threaded_updater.h
new file mode 100644
index 0000000000..e546bbe030
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/threaded_updater.h
@@ -0,0 +1,140 @@
+#pragma once
+
+#include "retry_settings.h"
+#include "roles_fetcher.h"
+#include "settings.h"
+
+#include <library/cpp/tvmauth/client/misc/async_updater.h>
+#include <library/cpp/tvmauth/client/misc/threaded_updater.h>
+
+#include <util/generic/set.h>
+#include <util/random/fast.h>
+
+namespace NTvmAuth::NTvmApi {
+ using TDstSet = TSet<TClientSettings::TDst>;
+
+ class TThreadedUpdater: public TThreadedUpdaterBase {
+ public:
+ /*!
+ * Starts thread for updating of in-memory cache in background
+ * Reads cache from disk if specified
+ * @param settings
+ * @param logger is usefull for monitoring and debuging
+ */
+ static TAsyncUpdaterPtr Create(const TClientSettings& settings, TLoggerPtr logger);
+ ~TThreadedUpdater();
+
+ TClientStatus GetStatus() const override;
+ NRoles::TRolesPtr GetRoles() const override;
+
+ protected: // for tests
+ TClientStatus::ECode GetState() const;
+
+ TThreadedUpdater(const TClientSettings& settings, TLoggerPtr logger);
+ void Init();
+
+ void UpdateServiceTickets();
+ void UpdateAllServiceTickets();
+ TServiceTicketsPtr UpdateMissingServiceTickets(const TDstSet& required);
+ void UpdatePublicKeys();
+ void UpdateRoles();
+
+ TServiceTicketsPtr UpdateServiceTicketsCachePartly(TPairTicketsErrors&& tickets, size_t got);
+ void UpdateServiceTicketsCache(TPairTicketsErrors&& tickets, TInstant time);
+ void UpdatePublicKeysCache(const TString& publicKeys, TInstant time);
+
+ void ReadStateFromDisk();
+
+ struct TServiceTicketsFromDisk {
+ TPairTicketsErrors TicketsWithErrors;
+ TInstant BornDate;
+ TString FileBody;
+ };
+
+ TServiceTicketsFromDisk ReadServiceTicketsFromDisk() const;
+ std::pair<TString, TInstant> ReadPublicKeysFromDisk() const;
+ TString ReadRetrySettingsFromDisk() const;
+
+ struct THttpResult {
+ TPairTicketsErrors TicketsWithErrors;
+ TSmallVec<TString> Responses;
+ };
+
+ template <class Dsts>
+ THttpResult GetServiceTicketsFromHttp(const Dsts& dsts, const size_t dstLimit) const;
+ TString GetPublicKeysFromHttp() const;
+
+ virtual NUtils::TFetchResult FetchServiceTicketsFromHttp(const TString& body) const;
+ virtual NUtils::TFetchResult FetchPublicKeysFromHttp() const;
+
+ bool UpdateRetrySettings(const TString& header) const;
+
+ template <typename Func>
+ NUtils::TFetchResult FetchWithRetries(Func func, EScope scope) const;
+ void RandomSleep() const;
+
+ static TString PrepareRequestForServiceTickets(TTvmId src,
+ const TServiceContext& ctx,
+ const TClientSettings::TDstVector& dsts,
+ const NUtils::TProcInfo& procInfo,
+ time_t now = time(nullptr));
+ template <class Dsts>
+ void ParseTicketsFromResponse(TStringBuf resp,
+ const Dsts& dsts,
+ TPairTicketsErrors& out) const;
+
+ static TString PrepareTicketsForDisk(TStringBuf tvmResponse, TTvmId selfId);
+ static std::pair<TStringBuf, TTvmId> ParseTicketsFromDisk(TStringBuf data);
+
+ const TDstSet& GetDsts() const;
+ void AddDstToSettings(const TClientSettings::TDst& dst);
+
+ bool IsTimeToUpdateServiceTickets(TInstant lastUpdate) const;
+ bool IsTimeToUpdatePublicKeys(TInstant lastUpdate) const;
+
+ bool AreServicesTicketsOk() const;
+ bool IsServiceContextOk() const;
+ bool IsUserContextOk() const;
+
+ void Worker() override;
+
+ static TServiceTickets::TMapAliasId MakeAliasMap(const TClientSettings& settings);
+ static TClientSettings::TDstVector FindMissingDsts(TServiceTicketsPtr available, const TDstSet& required);
+ static TClientSettings::TDstVector FindMissingDsts(const TDstSet& available, const TDstSet& required);
+
+ static TString CreateJsonArray(const TSmallVec<TString>& responses);
+ static TString AppendToJsonArray(const TString& json, const TSmallVec<TString>& responses);
+
+ private:
+ TRetrySettings RetrySettings_;
+
+ protected:
+ mutable TExponentialBackoff ExpBackoff_;
+
+ private:
+ const TClientSettings Settings_;
+
+ const NUtils::TProcInfo ProcInfo_;
+
+ const TString PublicKeysUrl_;
+
+ const TServiceTickets::TMapAliasId DstAliases_;
+
+ const TKeepAliveHttpClient::THeaders Headers_;
+ TMaybe<TServiceContext> SigningContext_;
+
+ TDstSet Destinations_;
+ TString DiskCacheServiceTickets_;
+ bool NeedFetchMissingServiceTickets_ = true;
+
+ TString PublicKeysFilepath_;
+ TString ServiceTicketsFilepath_;
+ TString RetrySettingsFilepath_;
+
+ std::unique_ptr<TRolesFetcher> RolesFetcher_;
+
+ mutable TReallyFastRng32 Random_;
+
+ bool Inited_ = false;
+ };
+}