diff options
author | komels <komels@yandex-team.ru> | 2022-04-14 13:10:53 +0300 |
---|---|---|
committer | komels <komels@yandex-team.ru> | 2022-04-14 13:10:53 +0300 |
commit | 21c9b0e6b039e9765eb414c406c2b86e8cea6850 (patch) | |
tree | f40ebc18ff8958dfbd189954ad024043ca983ea5 /library/cpp/tvmauth/client/misc/api | |
parent | 9a4effa852abe489707139c2b260dccc6f4f9aa9 (diff) | |
download | ydb-21c9b0e6b039e9765eb414c406c2b86e8cea6850.tar.gz |
Final part on compatibility layer: LOGBROKER-7215
ref:777c67aadbf705d19034a09a792b2df61ba53697
Diffstat (limited to 'library/cpp/tvmauth/client/misc/api')
10 files changed, 2566 insertions, 0 deletions
diff --git a/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.cpp b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.cpp new file mode 100644 index 0000000000..6ec15c0e88 --- /dev/null +++ b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.cpp @@ -0,0 +1,126 @@ +#include "tvm_client.h" + +#include <util/string/builder.h> + +namespace NTvmAuth::NDynamicClient { + TAsyncUpdaterPtr TTvmClient::Create(const NTvmApi::TClientSettings& settings, TLoggerPtr logger) { + Y_ENSURE_EX(logger, TNonRetriableException() << "Logger is required"); + THolder<TTvmClient> p(new TTvmClient(settings, std::move(logger))); + p->Init(); + p->StartWorker(); + return p.Release(); + } + + NThreading::TFuture<TAddResponse> TTvmClient::Add(TDsts&& dsts) { + if (dsts.empty()) { + LogDebug("Adding dst: got empty task"); + return NThreading::MakeFuture<TAddResponse>(TAddResponse{}); + } + + const size_t size = dsts.size(); + const ui64 id = ++TaskIds_; + NThreading::TPromise<TAddResponse> promise = NThreading::NewPromise<TAddResponse>(); + + TaskQueue_.Enqueue(TTask{id, promise, std::move(dsts)}); + + LogDebug(TStringBuilder() << "Adding dst: got task #" << id << " with " << size << " dsts"); + return promise.GetFuture(); + } + + std::optional<TString> TTvmClient::GetOptionalServiceTicketFor(const TTvmId dst) { + TServiceTicketsPtr tickets = GetCachedServiceTickets(); + Y_ENSURE_EX(tickets, + TBrokenTvmClientSettings() + << "Need to enable fetching of service tickets in settings"); + + auto it = tickets->TicketsById.find(dst); + if (it != tickets->TicketsById.end()) { + return it->second; + } + + it = tickets->ErrorsById.find(dst); + if (it != tickets->ErrorsById.end()) { + ythrow TMissingServiceTicket() + << "Failed to get ticket for '" << dst << "': " + << it->second; + } + + return {}; + } + + TTvmClient::TTvmClient(const NTvmApi::TClientSettings& settings, TLoggerPtr logger) + : TBase(settings, logger) + { + } + + TTvmClient::~TTvmClient() { + TBase::StopWorker(); + } + + void TTvmClient::Worker() { + TBase::Worker(); + ProcessTasks(); + } + + void TTvmClient::ProcessTasks() { + TaskQueue_.DequeueAll(&Tasks_); + if (Tasks_.empty()) { + return; + } + + TDsts required; + for (const TTask& task : Tasks_) { + for (const auto& dst : task.Dsts) { + required.insert(dst); + } + } + + TServiceTicketsPtr cache = UpdateMissingServiceTickets(required); + + for (TTask& task : Tasks_) { + try { + SetResponseForTask(task, *cache); + } catch (const std::exception& e) { + LogError(TStringBuilder() + << "Adding dst: task #" << task.Id << ": exception: " << e.what()); + } catch (...) { + LogError(TStringBuilder() + << "Adding dst: task #" << task.Id << ": exception: " << CurrentExceptionMessage()); + } + } + + Tasks_.clear(); + } + + static const TString UNKNOWN = "Unknown reason"; + void TTvmClient::SetResponseForTask(TTvmClient::TTask& task, const TServiceTickets& cache) { + if (task.Promise.HasValue()) { + LogWarning(TStringBuilder() << "Adding dst: task #" << task.Id << " already has value"); + return; + } + + TAddResponse response; + + for (const auto& dst : task.Dsts) { + if (cache.TicketsById.contains(dst.Id)) { + AddDstToSettings(dst); + response.emplace(dst, TDstResponse{EDstStatus::Success, TString()}); + + LogDebug(TStringBuilder() << "Adding dst: task #" << task.Id + << ": dst=" << dst.Id << " got ticket"); + continue; + } + + auto it = cache.ErrorsById.find(dst.Id); + const TString& error = it == cache.ErrorsById.end() ? UNKNOWN : it->second; + response.emplace(dst, TDstResponse{EDstStatus::Fail, error}); + + LogWarning(TStringBuilder() << "Adding dst: task #" << task.Id + << ": dst=" << dst.Id + << " failed to get ticket: " << error); + } + + LogDebug(TStringBuilder() << "Adding dst: task #" << task.Id << ": set value"); + task.Promise.SetValue(std::move(response)); + } +} diff --git a/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h new file mode 100644 index 0000000000..58ed953b63 --- /dev/null +++ b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h @@ -0,0 +1,60 @@ +#pragma once + +#include <library/cpp/tvmauth/client/misc/api/threaded_updater.h> + +#include <library/cpp/threading/future/future.h> + +#include <util/generic/map.h> +#include <util/thread/lfqueue.h> + +#include <optional> + +namespace NTvmAuth::NDynamicClient { + enum class EDstStatus { + Success, + Fail, + }; + + struct TDstResponse { + EDstStatus Status = EDstStatus::Fail; + TString Error; + + bool operator==(const TDstResponse& o) const { + return Status == o.Status && Error == o.Error; + } + }; + + using TDsts = NTvmApi::TDstSet; + using TAddResponse = TMap<NTvmApi::TClientSettings::TDst, TDstResponse>; + + class TTvmClient: public NTvmApi::TThreadedUpdater { + public: + static TAsyncUpdaterPtr Create(const NTvmApi::TClientSettings& settings, TLoggerPtr logger); + virtual ~TTvmClient(); + + NThreading::TFuture<TAddResponse> Add(TDsts&& dsts); + std::optional<TString> GetOptionalServiceTicketFor(const TTvmId dst); + + protected: // for tests + struct TTask { + ui64 Id = 0; + NThreading::TPromise<TAddResponse> Promise; + TDsts Dsts; + }; + + using TBase = NTvmApi::TThreadedUpdater; + + protected: // for tests + TTvmClient(const NTvmApi::TClientSettings& settings, TLoggerPtr logger); + + void Worker() override; + void ProcessTasks(); + + void SetResponseForTask(TTask& task, const TServiceTickets& cache); + + private: + std::atomic<ui64> TaskIds_ = {0}; + TLockFreeQueue<TTask> TaskQueue_; + TVector<TTask> Tasks_; + }; +} diff --git a/library/cpp/tvmauth/client/misc/api/dynamic_dst/ut/tvm_client_ut.cpp b/library/cpp/tvmauth/client/misc/api/dynamic_dst/ut/tvm_client_ut.cpp new file mode 100644 index 0000000000..89403c15e4 --- /dev/null +++ b/library/cpp/tvmauth/client/misc/api/dynamic_dst/ut/tvm_client_ut.cpp @@ -0,0 +1,635 @@ +#include <library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h> + +#include <library/cpp/tvmauth/client/misc/disk_cache.h> + +#include <library/cpp/tvmauth/unittest.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/stream/file.h> +#include <util/system/fs.h> + +#include <regex> + +using namespace NTvmAuth; +using namespace NTvmAuth::NDynamicClient; + +Y_UNIT_TEST_SUITE(DynamicClient) { + static const std::regex TIME_REGEX(R"(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d.\d{6}Z)"); + static const TString CACHE_DIR = "./tmp/"; + + static void WriteFile(TString name, TStringBuf body, TInstant time) { + NFs::Remove(CACHE_DIR + name); + TFileOutput f(CACHE_DIR + name); + f << TDiskWriter::PrepareData(time, body); + } + + static void CleanCache() { + NFs::RemoveRecursive(CACHE_DIR); + NFs::MakeDirectoryRecursive(CACHE_DIR); + } + + class TLogger: public NTvmAuth::ILogger { + public: + void Log(int lvl, const TString& msg) override { + Cout << TInstant::Now() << " lvl=" << lvl << " msg: " << msg << "\n"; + Stream << lvl << ": " << msg << Endl; + } + + TStringStream Stream; + }; + + class TOfflineUpdater: public NDynamicClient::TTvmClient { + public: + TOfflineUpdater(const NTvmApi::TClientSettings& settings, + TIntrusivePtr<TLogger> l, + bool fail = true, + std::vector<TString> tickets = {}) + : TTvmClient(settings, l) + , Fail(fail) + , Tickets(std::move(tickets)) + { + Init(); + ExpBackoff_.SetEnabled(false); + } + + NUtils::TFetchResult FetchServiceTicketsFromHttp(const TString& req) const override { + if (Fail) { + throw yexception() << "tickets: alarm"; + } + + TString response; + if (!Tickets.empty()) { + response = Tickets.front(); + Tickets.erase(Tickets.begin()); + } + + Cout << "*** FetchServiceTicketsFromHttp. request: " << req << ". response: " << response << Endl; + return {200, {}, "/2/ticket", response, ""}; + } + + NUtils::TFetchResult FetchPublicKeysFromHttp() const override { + if (Fail) { + throw yexception() << "keysalarm"; + } + Cout << "*** FetchPublicKeysFromHttp" << Endl; + return {200, {}, "/2/keys", PublicKeys, ""}; + } + + using TTvmClient::GetDsts; + using TTvmClient::ProcessTasks; + using TTvmClient::SetResponseForTask; + using TTvmClient::Worker; + + bool Fail = true; + TString PublicKeys = NUnittest::TVMKNIFE_PUBLIC_KEYS; + mutable std::vector<TString> Tickets; + }; + + Y_UNIT_TEST(StartWithIncompleteTicketsSet) { + TInstant now = TInstant::Now(); + CleanCache(); + WriteFile("./service_tickets", + R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})" + "\t100500", + now); + + NTvmApi::TClientSettings s; + s.SetSelfTvmId(100500); + s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}, {"kolmo", 213}}, false); + s.SetDiskCacheDir(CACHE_DIR); + + auto l = MakeIntrusive<TLogger>(); + + { + TOfflineUpdater client(s, + l, + false, + { + R"({"213" : { "error" : "some error"}})", + R"({"123" : { "ticket" : "service_ticket_3"}})", + }); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus()); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213)); + + NThreading::TFuture<TAddResponse> fut = client.Add({123}); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus()); + + client.Worker(); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus()); + + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123)); + + UNIT_ASSERT(fut.HasValue()); + TAddResponse resp{ + {123, {EDstStatus::Success, ""}}, + }; + UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue()); + + UNIT_ASSERT(client.Tickets.empty()); + + TDsts dsts{19, 123, 213}; + UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts()); + + UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(213), TMissingServiceTicket, "some error"); + } + } + + Y_UNIT_TEST(StartWithEmptyTicketsSet) { + CleanCache(); + + NTvmApi::TClientSettings s; + s.SetSelfTvmId(100500); + s.EnableServiceTicketsFetchOptions("qwerty", {{"kolmo", 213}}, false); + s.SetDiskCacheDir(CACHE_DIR); + + auto l = MakeIntrusive<TLogger>(); + + { + TOfflineUpdater client(s, + l, + false, + { + R"({"213" : { "error" : "some error"}})", + R"({"123" : { "ticket" : "3:serv:CBAQ__________9_IgYIlJEGEHs:CcafYQH-FF5XaXMuJrgLZj98bIC54cs1ZkcFS9VV_9YM9iOM_0PXCtMkdg85rFjxE_BMpg7bE8ZuoqNfdw0FPt0BAKNeISwlydj4o0IjY82--LZBpP8CRn-EpAnkRaDShdlfrcF2pk1SSmEX8xdyZVQEnkUPY0cHGlFnu231vnE"}})", + }); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus()); + UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213)); + UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213)); + UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(213), TMissingServiceTicket, "some error"); + + NThreading::TFuture<TAddResponse> fut = client.Add({123}); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus()); + + client.Worker(); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus()); + + UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123)); + UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123)); + + UNIT_ASSERT(fut.HasValue()); + TAddResponse resp{ + {123, {EDstStatus::Success, ""}}, + }; + UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue()); + + UNIT_ASSERT(client.Tickets.empty()); + + TDsts dsts{123, 213}; + UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts()); + + UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(213), TMissingServiceTicket, "some error"); + } + }; + Y_UNIT_TEST(StartWithIncompleteCacheAndAdd) { + TInstant now = TInstant::Now(); + CleanCache(); + WriteFile("./service_tickets", + R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})" + "\t100500", + now); + + NTvmApi::TClientSettings s; + s.SetSelfTvmId(100500); + s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}, {"kolmo", 213}}); + s.SetDiskCacheDir(CACHE_DIR); + + auto l = MakeIntrusive<TLogger>(); + + UNIT_ASSERT_EXCEPTION_CONTAINS(TOfflineUpdater(s, l), + TRetriableException, + "Failed to start TvmClient. You can retry: ServiceTickets: tickets: alarm"); + UNIT_ASSERT_VALUES_EQUAL( + TStringBuilder() + << "6: File './tmp/service_tickets' was successfully read\n" + << "6: Got 1 service ticket(s) from disk\n" + << "6: Cache was updated with 1 service ticket(s): XXXXXXXXXXX\n" + << "7: File './tmp/retry_settings' does not exist\n" + << "4: Failed to get ServiceTickets: tickets: alarm\n" + << "4: Failed to get ServiceTickets: tickets: alarm\n" + << "4: Failed to get ServiceTickets: tickets: alarm\n" + << "4: Failed to update service tickets: tickets: alarm\n", + std::regex_replace(std::string(l->Stream.Str()), TIME_REGEX, "XXXXXXXXXXX")); + l->Stream.Str().clear(); + + { + TOfflineUpdater client(s, + l, + false, + { + R"({"213" : { "ticket" : "service_ticket_2"}})", + R"({"123" : { "ticket" : "service_ticket_3"}})", + }); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus()); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213)); + + NThreading::TFuture<TAddResponse> fut = client.Add({123}); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus()); + + client.Worker(); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus()); + + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123)); + + UNIT_ASSERT(fut.HasValue()); + TAddResponse resp{ + {123, {EDstStatus::Success, ""}}, + }; + UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue()); + + UNIT_ASSERT(client.Tickets.empty()); + + TDsts dsts{19, 123, 213}; + UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts()); + } + + UNIT_ASSERT_VALUES_EQUAL( + TStringBuilder() + << "6: File './tmp/service_tickets' was successfully read\n" + << "6: Got 1 service ticket(s) from disk\n" + << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n" + << "7: File './tmp/retry_settings' does not exist\n" + << "7: Response with service tickets for 1 destination(s) was successfully fetched from https://tvm-api.yandex.net\n" + << "7: Got responses with service tickets with 1 pages for 1 destination(s)\n" + << "6: Cache was partly updated with 1 service ticket(s). total: 2\n" + << "6: File './tmp/service_tickets' was successfully written\n" + << "7: Adding dst: got task #1 with 1 dsts\n" + << "7: Response with service tickets for 1 destination(s) was successfully fetched from https://tvm-api.yandex.net\n" + << "7: Got responses with service tickets with 1 pages for 1 destination(s)\n" + << "6: Cache was partly updated with 1 service ticket(s). total: 3\n" + << "6: File './tmp/service_tickets' was successfully written\n" + << "7: Adding dst: task #1: dst=123 got ticket\n" + << "7: Adding dst: task #1: set value\n", + l->Stream.Str()); + } + + Y_UNIT_TEST(StartWithCacheAndAdd) { + TInstant now = TInstant::Now(); + CleanCache(); + WriteFile("./service_tickets", + R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})" + "\t100500", + now); + + NTvmApi::TClientSettings s; + s.SetSelfTvmId(100500); + s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}}); + s.SetDiskCacheDir(CACHE_DIR); + + auto l = MakeIntrusive<TLogger>(); + { + TOfflineUpdater client(s, l); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus()); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + + client.Fail = false; + client.Tickets = { + R"({"123" : { "ticket" : "service_ticket_3"}, "213" : { "ticket" : "service_ticket_2"}})", + }; + NThreading::TFuture<TAddResponse> fut = client.Add({123, 213}); + + client.Worker(); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus()); + + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123)); + + UNIT_ASSERT(fut.HasValue()); + TAddResponse resp{ + {123, {EDstStatus::Success, ""}}, + {213, {EDstStatus::Success, ""}}, + }; + UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue()); + + UNIT_ASSERT(client.Tickets.empty()); + + TDsts dsts{19, 123, 213}; + UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts()); + } + + UNIT_ASSERT_VALUES_EQUAL( + TStringBuilder() + << "6: File './tmp/service_tickets' was successfully read\n" + << "6: Got 1 service ticket(s) from disk\n" + << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n" + << "7: File './tmp/retry_settings' does not exist\n" + << "7: Adding dst: got task #1 with 2 dsts\n" + << "7: Response with service tickets for 2 destination(s) was successfully fetched from https://tvm-api.yandex.net\n" + << "7: Got responses with service tickets with 1 pages for 2 destination(s)\n" + << "6: Cache was partly updated with 2 service ticket(s). total: 3\n" + << "6: File './tmp/service_tickets' was successfully written\n" + << "7: Adding dst: task #1: dst=123 got ticket\n" + << "7: Adding dst: task #1: dst=213 got ticket\n" + << "7: Adding dst: task #1: set value\n", + l->Stream.Str()); + } + + Y_UNIT_TEST(StartWithCacheAndAddSeveral) { + TInstant now = TInstant::Now(); + CleanCache(); + WriteFile("./service_tickets", + R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})" + "\t100500", + now); + + NTvmApi::TClientSettings s; + s.SetSelfTvmId(100500); + s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}}); + s.SetDiskCacheDir(CACHE_DIR); + + auto l = MakeIntrusive<TLogger>(); + { + TOfflineUpdater client(s, l); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus()); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + + client.Fail = false; + client.Tickets = { + R"({"123" : { "ticket" : "service_ticket_3"}, "213" : { "ticket" : "service_ticket_2"}})", + }; + NThreading::TFuture<TAddResponse> fut1 = client.Add({123}); + NThreading::TFuture<TAddResponse> fut2 = client.Add({213}); + + client.Worker(); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus()); + + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123)); + + UNIT_ASSERT(fut1.HasValue()); + TAddResponse resp1{ + {123, {EDstStatus::Success, ""}}, + }; + UNIT_ASSERT_VALUES_EQUAL(resp1, fut1.GetValue()); + + UNIT_ASSERT(fut2.HasValue()); + TAddResponse resp2{ + {213, {EDstStatus::Success, ""}}, + }; + UNIT_ASSERT_VALUES_EQUAL(resp2, fut2.GetValue()); + + UNIT_ASSERT(client.Tickets.empty()); + + TDsts dsts{19, 123, 213}; + UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts()); + } + + UNIT_ASSERT_VALUES_EQUAL( + TStringBuilder() + << "6: File './tmp/service_tickets' was successfully read\n" + << "6: Got 1 service ticket(s) from disk\n" + << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n" + << "7: File './tmp/retry_settings' does not exist\n" + << "7: Adding dst: got task #1 with 1 dsts\n" + << "7: Adding dst: got task #2 with 1 dsts\n" + << "7: Response with service tickets for 2 destination(s) was successfully fetched from https://tvm-api.yandex.net\n" + << "7: Got responses with service tickets with 1 pages for 2 destination(s)\n" + << "6: Cache was partly updated with 2 service ticket(s). total: 3\n" + << "6: File './tmp/service_tickets' was successfully written\n" + << "7: Adding dst: task #1: dst=123 got ticket\n" + << "7: Adding dst: task #1: set value\n" + << "7: Adding dst: task #2: dst=213 got ticket\n" + << "7: Adding dst: task #2: set value\n", + l->Stream.Str()); + } + + Y_UNIT_TEST(StartWithCacheAndAddSeveralWithErrors) { + TInstant now = TInstant::Now(); + CleanCache(); + WriteFile("./service_tickets", + R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})" + "\t100500", + now); + + NTvmApi::TClientSettings s; + s.SetSelfTvmId(100500); + s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}}); + s.SetDiskCacheDir(CACHE_DIR); + + auto l = MakeIntrusive<TLogger>(); + { + TOfflineUpdater client(s, l); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus()); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + + UNIT_ASSERT(client.GetOptionalServiceTicketFor(19)); + UNIT_ASSERT_VALUES_EQUAL("3:serv:CBAQ__________9_IgYIKhCUkQY:CX", + *client.GetOptionalServiceTicketFor(19)); + UNIT_ASSERT(!client.GetOptionalServiceTicketFor(456)); + + client.Fail = false; + client.Tickets = { + R"({ + "123" : { "ticket" : "service_ticket_3"}, + "213" : { "ticket" : "service_ticket_2"}, + "456" : { "error" : "error_3"} + })", + }; + NThreading::TFuture<TAddResponse> fut1 = client.Add({123, 213}); + NThreading::TFuture<TAddResponse> fut2 = client.Add({213, 456}); + + client.Worker(); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus()); + + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(456)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123)); + UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(456)); + + UNIT_ASSERT(client.GetOptionalServiceTicketFor(19)); + UNIT_ASSERT_VALUES_EQUAL("3:serv:CBAQ__________9_IgYIKhCUkQY:CX", + *client.GetOptionalServiceTicketFor(19)); + UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(456), + TMissingServiceTicket, + "Failed to get ticket for '456': error_3"); + + UNIT_ASSERT(fut1.HasValue()); + TAddResponse resp1{ + {123, {EDstStatus::Success, ""}}, + {213, {EDstStatus::Success, ""}}, + }; + UNIT_ASSERT_VALUES_EQUAL(resp1, fut1.GetValue()); + + UNIT_ASSERT(fut2.HasValue()); + TAddResponse resp2{ + {213, {EDstStatus::Success, ""}}, + {456, {EDstStatus::Fail, "error_3"}}, + }; + UNIT_ASSERT_VALUES_EQUAL(resp2, fut2.GetValue()); + + UNIT_ASSERT(client.Tickets.empty()); + + TDsts dsts{19, 123, 213}; + UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts()); + } + + UNIT_ASSERT_VALUES_EQUAL( + TStringBuilder() + << "6: File './tmp/service_tickets' was successfully read\n" + << "6: Got 1 service ticket(s) from disk\n" + << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n" + << "7: File './tmp/retry_settings' does not exist\n" + << "7: Adding dst: got task #1 with 2 dsts\n" + << "7: Adding dst: got task #2 with 2 dsts\n" + << "7: Response with service tickets for 3 destination(s) was successfully fetched from https://tvm-api.yandex.net\n" + << "7: Got responses with service tickets with 1 pages for 3 destination(s)\n" + << "3: Failed to get service ticket for dst=456: error_3\n" + << "6: Cache was partly updated with 2 service ticket(s). total: 3\n" + << "6: File './tmp/service_tickets' was successfully written\n" + << "7: Adding dst: task #1: dst=123 got ticket\n" + << "7: Adding dst: task #1: dst=213 got ticket\n" + << "7: Adding dst: task #1: set value\n" + << "7: Adding dst: task #2: dst=213 got ticket\n" + << "4: Adding dst: task #2: dst=456 failed to get ticket: error_3\n" + << "7: Adding dst: task #2: set value\n", + l->Stream.Str()); + } + + Y_UNIT_TEST(WithException) { + TInstant now = TInstant::Now(); + CleanCache(); + WriteFile("./service_tickets", + R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})" + "\t100500", + now); + + NTvmApi::TClientSettings s; + s.SetSelfTvmId(100500); + s.EnableServiceTicketsFetchOptions("qwerty", {{"blackbox", 19}}); + s.SetDiskCacheDir(CACHE_DIR); + + auto l = MakeIntrusive<TLogger>(); + { + TOfflineUpdater client(s, l); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus()); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + + client.Fail = false; + client.Tickets = { + R"({ + "123" : { "ticket" : "service_ticket_3"}, + "213" : { "ticket" : "service_ticket_2"}, + "456" : { "error" : "error_3"}, + "789" : { "ticket" : "service_ticket_4"} + })", + }; + NThreading::TFuture<TAddResponse> fut1 = client.Add({123, 213}); + NThreading::TFuture<TAddResponse> fut2 = client.Add({213, 456}); + NThreading::TFuture<TAddResponse> fut3 = client.Add({789}); + + fut2.Subscribe([](const auto&) { + throw yexception() << "planed exc"; + }); + fut3.Subscribe([](const auto&) { + throw 5; + }); + + UNIT_ASSERT_NO_EXCEPTION(client.Worker()); + UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus()); + + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213)); + UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(456)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213)); + UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123)); + UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(456)); + + UNIT_ASSERT(fut1.HasValue()); + TAddResponse resp1{ + {123, {EDstStatus::Success, ""}}, + {213, {EDstStatus::Success, ""}}, + }; + UNIT_ASSERT_VALUES_EQUAL(resp1, fut1.GetValue()); + + UNIT_ASSERT(fut2.HasValue()); + TAddResponse resp2{ + {213, {EDstStatus::Success, ""}}, + {456, {EDstStatus::Fail, "error_3"}}, + }; + UNIT_ASSERT_VALUES_EQUAL(resp2, fut2.GetValue()); + + UNIT_ASSERT(fut3.HasValue()); + TAddResponse resp3{ + {789, {EDstStatus::Success, ""}}, + }; + UNIT_ASSERT_VALUES_EQUAL(resp3, fut3.GetValue()); + + UNIT_ASSERT(client.Tickets.empty()); + + TDsts dsts{19, 123, 213, 789}; + UNIT_ASSERT_VALUES_EQUAL(dsts, client.GetDsts()); + } + + UNIT_ASSERT_VALUES_EQUAL( + TStringBuilder() + << "6: File './tmp/service_tickets' was successfully read\n" + << "6: Got 1 service ticket(s) from disk\n" + << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n" + << "7: File './tmp/retry_settings' does not exist\n" + << "7: Adding dst: got task #1 with 2 dsts\n" + << "7: Adding dst: got task #2 with 2 dsts\n" + << "7: Adding dst: got task #3 with 1 dsts\n" + << "7: Response with service tickets for 4 destination(s) was successfully fetched from https://tvm-api.yandex.net\n" + << "7: Got responses with service tickets with 1 pages for 4 destination(s)\n" + << "3: Failed to get service ticket for dst=456: error_3\n" + << "6: Cache was partly updated with 3 service ticket(s). total: 4\n" + << "6: File './tmp/service_tickets' was successfully written\n" + << "7: Adding dst: task #1: dst=123 got ticket\n" + << "7: Adding dst: task #1: dst=213 got ticket\n" + << "7: Adding dst: task #1: set value\n" + << "7: Adding dst: task #2: dst=213 got ticket\n" + << "4: Adding dst: task #2: dst=456 failed to get ticket: error_3\n" + << "7: Adding dst: task #2: set value\n" + << "3: Adding dst: task #2: exception: planed exc\n" + << "7: Adding dst: task #3: dst=789 got ticket\n" + << "7: Adding dst: task #3: set value\n" + << "3: Adding dst: task #3: exception: unknown error\n", + l->Stream.Str()); + } +} + +template <> +void Out<NTvmAuth::NDynamicClient::TDstResponse>(IOutputStream& out, const NTvmAuth::NDynamicClient::TDstResponse& m) { + out << m.Status << " (" << m.Error << ")"; +} + +template <> +void Out<NTvmAuth::NTvmApi::TClientSettings::TDst>(IOutputStream& out, const NTvmAuth::NTvmApi::TClientSettings::TDst& m) { + out << m.Id; +} diff --git a/library/cpp/tvmauth/client/misc/api/retry_settings.h b/library/cpp/tvmauth/client/misc/api/retry_settings.h new file mode 100644 index 0000000000..607b230811 --- /dev/null +++ b/library/cpp/tvmauth/client/misc/api/retry_settings.h @@ -0,0 +1,33 @@ +#pragma once + +#include <library/cpp/tvmauth/client/misc/exponential_backoff.h> + +namespace NTvmAuth::NTvmApi { + struct TRetrySettings { + TExponentialBackoff::TSettings BackoffSettings = { + TDuration::Seconds(0), + TDuration::Minutes(1), + 2, + 0.5, + }; + TDuration MaxRandomSleepDefault = TDuration::Seconds(5); + TDuration MaxRandomSleepWhenOk = TDuration::Minutes(1); + ui32 RetriesOnStart = 3; + ui32 RetriesInBackground = 2; + TDuration WorkerAwakingPeriod = TDuration::Seconds(10); + ui32 DstsLimit = 300; + TDuration RolesUpdatePeriod = TDuration::Minutes(10); + TDuration RolesWarnPeriod = TDuration::Minutes(20); + + bool operator==(const TRetrySettings& o) const { + return BackoffSettings == o.BackoffSettings && + MaxRandomSleepDefault == o.MaxRandomSleepDefault && + MaxRandomSleepWhenOk == o.MaxRandomSleepWhenOk && + RetriesOnStart == o.RetriesOnStart && + WorkerAwakingPeriod == o.WorkerAwakingPeriod && + DstsLimit == o.DstsLimit && + RolesUpdatePeriod == o.RolesUpdatePeriod && + RolesWarnPeriod == o.RolesWarnPeriod; + } + }; +} diff --git a/library/cpp/tvmauth/client/misc/api/roles_fetcher.cpp b/library/cpp/tvmauth/client/misc/api/roles_fetcher.cpp new file mode 100644 index 0000000000..8f4b359e8c --- /dev/null +++ b/library/cpp/tvmauth/client/misc/api/roles_fetcher.cpp @@ -0,0 +1,164 @@ +#include "roles_fetcher.h" + +#include <library/cpp/tvmauth/client/misc/disk_cache.h> +#include <library/cpp/tvmauth/client/misc/roles/decoder.h> +#include <library/cpp/tvmauth/client/misc/roles/parser.h> + +#include <library/cpp/http/misc/httpcodes.h> +#include <library/cpp/string_utils/quote/quote.h> + +#include <util/string/builder.h> +#include <util/string/join.h> + +namespace NTvmAuth::NTvmApi { + static TString CreatePath(const TString& dir, const TString& file) { + return dir.EndsWith("/") + ? dir + file + : dir + "/" + file; + } + + TRolesFetcher::TRolesFetcher(const TRolesFetcherSettings& settings, TLoggerPtr logger) + : Settings_(settings) + , Logger_(logger) + , CacheFilePath_(CreatePath(Settings_.CacheDir, "roles")) + { + Client_ = std::make_unique<TKeepAliveHttpClient>( + Settings_.TiroleHost, + Settings_.TirolePort, + Settings_.Timeout, + Settings_.Timeout); + } + + TInstant TRolesFetcher::ReadFromDisk() { + TDiskReader dr(CacheFilePath_, Logger_.Get()); + if (!dr.Read()) { + return {}; + } + + std::pair<TString, TString> data = ParseDiskFormat(dr.Data()); + if (data.second != Settings_.IdmSystemSlug) { + Logger_->Warning( + TStringBuilder() << "Roles in disk cache are for another slug (" << data.second + << "). Self=" << Settings_.IdmSystemSlug); + return {}; + } + + CurrentRoles_.Set(NRoles::TParser::Parse(std::make_shared<TString>(std::move(data.first)))); + Logger_->Debug( + TStringBuilder() << "Succeed to read roles with revision " + << CurrentRoles_.Get()->GetMeta().Revision + << " from " << CacheFilePath_); + + return dr.Time(); + } + + bool TRolesFetcher::AreRolesOk() const { + return bool(GetCurrentRoles()); + } + + bool TRolesFetcher::IsTimeToUpdate(const TRetrySettings& settings, TDuration sinceUpdate) { + return settings.RolesUpdatePeriod < sinceUpdate; + } + + bool TRolesFetcher::ShouldWarn(const TRetrySettings& settings, TDuration sinceUpdate) { + return settings.RolesWarnPeriod < sinceUpdate; + } + + NUtils::TFetchResult TRolesFetcher::FetchActualRoles(const TString& serviceTicket) { + TStringStream out; + THttpHeaders outHeaders; + + TRequest req = CreateTiroleRequest(serviceTicket); + TKeepAliveHttpClient::THttpCode code = Client_->DoGet( + req.Url, + &out, + req.Headers, + &outHeaders); + + const THttpInputHeader* reqId = outHeaders.FindHeader("X-Request-Id"); + + Logger_->Debug( + TStringBuilder() << "Succeed to perform request for roles to " << Settings_.TiroleHost + << " (request_id=" << (reqId ? reqId->Value() : "") + << "). code=" << code); + + return {code, std::move(outHeaders), "/v1/get_actual_roles", out.Str(), {}}; + } + + void TRolesFetcher::Update(NUtils::TFetchResult&& fetchResult, TInstant now) { + if (fetchResult.Code == HTTP_NOT_MODIFIED) { + Y_ENSURE(CurrentRoles_.Get(), + "tirole did not return any roles because current roles are actual," + " but there are no roles in memory - this should never happen"); + return; + } + + Y_ENSURE(fetchResult.Code == HTTP_OK, + "Unexpected code from tirole: " << fetchResult.Code << ". " << fetchResult.Response); + + const THttpInputHeader* codec = fetchResult.Headers.FindHeader("X-Tirole-Compression"); + const TStringBuf codecBuf = codec ? codec->Value() : ""; + + NRoles::TRawPtr blob; + try { + blob = std::make_shared<TString>(NRoles::TDecoder::Decode( + codecBuf, + std::move(fetchResult.Response))); + } catch (const std::exception& e) { + throw yexception() << "Failed to decode blob with codec '" << codecBuf + << "': " << e.what(); + } + + CurrentRoles_.Set(NRoles::TParser::Parse(blob)); + + Logger_->Debug( + TStringBuilder() << "Succeed to update roles with revision " + << CurrentRoles_.Get()->GetMeta().Revision); + + TDiskWriter dw(CacheFilePath_, Logger_.Get()); + dw.Write(PrepareDiskFormat(*blob, Settings_.IdmSystemSlug), now); + } + + NTvmAuth::NRoles::TRolesPtr TRolesFetcher::GetCurrentRoles() const { + return CurrentRoles_.Get(); + } + + void TRolesFetcher::ResetConnection() { + Client_->ResetConnection(); + } + + static const char DELIMETER = '\t'; + + std::pair<TString, TString> TRolesFetcher::ParseDiskFormat(TStringBuf filebody) { + TStringBuf slug = filebody.RNextTok(DELIMETER); + return {TString(filebody), CGIUnescapeRet(slug)}; + } + + TString TRolesFetcher::PrepareDiskFormat(TStringBuf roles, TStringBuf slug) { + TStringStream res; + res.Reserve(roles.size() + 1 + slug.size()); + res << roles << DELIMETER << CGIEscapeRet(slug); + return res.Str(); + } + + TRolesFetcher::TRequest TRolesFetcher::CreateTiroleRequest(const TString& serviceTicket) const { + TRolesFetcher::TRequest res; + + TStringStream url; + url.Reserve(512); + url << "/v1/get_actual_roles?"; + url << "system_slug=" << CGIEscapeRet(Settings_.IdmSystemSlug) << "&"; + Settings_.ProcInfo.AddToRequest(url); + res.Url = std::move(url.Str()); + + res.Headers.reserve(2); + res.Headers.emplace(XYaServiceTicket_, serviceTicket); + + NRoles::TRolesPtr roles = CurrentRoles_.Get(); + if (roles) { + res.Headers.emplace(IfNoneMatch_, Join("", "\"", roles->GetMeta().Revision, "\"")); + } + + return res; + } +} diff --git a/library/cpp/tvmauth/client/misc/api/roles_fetcher.h b/library/cpp/tvmauth/client/misc/api/roles_fetcher.h new file mode 100644 index 0000000000..63691223b5 --- /dev/null +++ b/library/cpp/tvmauth/client/misc/api/roles_fetcher.h @@ -0,0 +1,63 @@ +#pragma once + +#include "retry_settings.h" + +#include <library/cpp/tvmauth/client/misc/fetch_result.h> +#include <library/cpp/tvmauth/client/misc/proc_info.h> +#include <library/cpp/tvmauth/client/misc/utils.h> +#include <library/cpp/tvmauth/client/misc/roles/roles.h> + +#include <library/cpp/tvmauth/client/logger.h> + +#include <library/cpp/http/simple/http_client.h> + +namespace NTvmAuth::NTvmApi { + struct TRolesFetcherSettings { + TString TiroleHost; + ui16 TirolePort = 0; + TString CacheDir; + NUtils::TProcInfo ProcInfo; + TTvmId SelfTvmId = 0; + TString IdmSystemSlug; + TDuration Timeout = TDuration::Seconds(30); + }; + + class TRolesFetcher { + public: + TRolesFetcher(const TRolesFetcherSettings& settings, TLoggerPtr logger); + + TInstant ReadFromDisk(); + + bool AreRolesOk() const; + static bool IsTimeToUpdate(const TRetrySettings& settings, TDuration sinceUpdate); + static bool ShouldWarn(const TRetrySettings& settings, TDuration sinceUpdate); + + NUtils::TFetchResult FetchActualRoles(const TString& serviceTicket); + void Update(NUtils::TFetchResult&& fetchResult, TInstant now = TInstant::Now()); + + NTvmAuth::NRoles::TRolesPtr GetCurrentRoles() const; + + void ResetConnection(); + + public: + static std::pair<TString, TString> ParseDiskFormat(TStringBuf filebody); + static TString PrepareDiskFormat(TStringBuf roles, TStringBuf slug); + + struct TRequest { + TString Url; + TKeepAliveHttpClient::THeaders Headers; + }; + TRequest CreateTiroleRequest(const TString& serviceTicket) const; + + private: + const TRolesFetcherSettings Settings_; + const TLoggerPtr Logger_; + const TString CacheFilePath_; + const TString XYaServiceTicket_ = "X-Ya-Service-Ticket"; + const TString IfNoneMatch_ = "If-None-Match"; + + NUtils::TProtectedValue<NTvmAuth::NRoles::TRolesPtr> CurrentRoles_; + + std::unique_ptr<TKeepAliveHttpClient> Client_; + }; +} diff --git a/library/cpp/tvmauth/client/misc/api/settings.cpp b/library/cpp/tvmauth/client/misc/api/settings.cpp new file mode 100644 index 0000000000..71aad75998 --- /dev/null +++ b/library/cpp/tvmauth/client/misc/api/settings.cpp @@ -0,0 +1,89 @@ +#include "settings.h" + +#include <util/datetime/base.h> +#include <util/stream/file.h> +#include <util/system/fs.h> + +#include <set> + +namespace NTvmAuth::NTvmApi { + void TClientSettings::CheckPermissions(const TString& dir) { + const TString name = dir + "/check.tmp"; + + try { + NFs::EnsureExists(dir); + + TFile file(name, CreateAlways | RdWr); + + NFs::Remove(name); + } catch (const std::exception& e) { + NFs::Remove(name); + ythrow TPermissionDenied() << "Permission denied to disk cache directory: " << e.what(); + } + } + + void TClientSettings::CheckValid() const { + if (DiskCacheDir) { + CheckPermissions(DiskCacheDir); + } + + if (TStringBuf(Secret)) { + Y_ENSURE_EX(NeedServiceTicketsFetching(), + TBrokenTvmClientSettings() << "Secret is present but destinations list is empty. It makes no sense"); + } + if (NeedServiceTicketsFetching()) { + Y_ENSURE_EX(SelfTvmId != 0, + TBrokenTvmClientSettings() << "SelfTvmId cannot be 0 if fetching of Service Tickets required"); + Y_ENSURE_EX((TStringBuf)Secret, + TBrokenTvmClientSettings() << "Secret is required for fetching of Service Tickets"); + } + + if (CheckServiceTickets) { + Y_ENSURE_EX(SelfTvmId != 0, + TBrokenTvmClientSettings() << "SelfTvmId cannot be 0 if checking of Service Tickets required"); + } + + if (FetchRolesForIdmSystemSlug) { + Y_ENSURE_EX(DiskCacheDir, + TBrokenTvmClientSettings() << "Disk cache must be enabled to use roles: " + "they can be heavy"); + } + + bool needSmth = NeedServiceTicketsFetching() || + IsServiceTicketCheckingRequired() || + IsUserTicketCheckingRequired(); + Y_ENSURE_EX(needSmth, TBrokenTvmClientSettings() << "Invalid settings: nothing to do"); + + // Useless now: keep it here to avoid forgetting check from TDst. TODO: PASSP-35377 + for (const auto& dst : FetchServiceTicketsForDsts) { + Y_ENSURE_EX(dst.Id != 0, TBrokenTvmClientSettings() << "TvmId cannot be 0"); + } + // TODO: check only FetchServiceTicketsForDsts_ + // Python binding checks settings before normalization + for (const auto& [alias, dst] : FetchServiceTicketsForDstsWithAliases) { + Y_ENSURE_EX(dst.Id != 0, TBrokenTvmClientSettings() << "TvmId cannot be 0"); + } + Y_ENSURE_EX(TiroleTvmId != 0, TBrokenTvmClientSettings() << "TiroleTvmId cannot be 0"); + } + + TClientSettings TClientSettings::CloneNormalized() const { + TClientSettings res = *this; + + std::set<TTvmId> allDsts; + for (const auto& tvmid : res.FetchServiceTicketsForDsts) { + allDsts.insert(tvmid.Id); + } + for (const auto& [alias, tvmid] : res.FetchServiceTicketsForDstsWithAliases) { + allDsts.insert(tvmid.Id); + } + if (FetchRolesForIdmSystemSlug) { + allDsts.insert(res.TiroleTvmId); + } + + res.FetchServiceTicketsForDsts = {allDsts.begin(), allDsts.end()}; + + res.CheckValid(); + + return res; + } +} diff --git a/library/cpp/tvmauth/client/misc/api/settings.h b/library/cpp/tvmauth/client/misc/api/settings.h new file mode 100644 index 0000000000..715ab3e02c --- /dev/null +++ b/library/cpp/tvmauth/client/misc/api/settings.h @@ -0,0 +1,302 @@ +#pragma once + +#include <library/cpp/tvmauth/client/misc/settings.h> + +#include <library/cpp/tvmauth/client/exception.h> + +#include <library/cpp/tvmauth/checked_user_ticket.h> +#include <library/cpp/tvmauth/type.h> + +#include <library/cpp/string_utils/secret_string/secret_string.h> + +#include <util/datetime/base.h> +#include <util/generic/hash.h> +#include <util/generic/maybe.h> + +namespace NTvmAuth::NTvmApi { + /** + * Settings for TVM client. Uses https://tvm-api.yandex.net to get state. + * At least one of them is required: + * FetchServiceTicketsForDsts_/FetchServiceTicketsForDstsWithAliases_ + * CheckServiceTickets_ + * CheckUserTicketsWithBbEnv_ + */ + class TClientSettings: public NTvmAuth::TClientSettings { + public: + class TDst; + + /** + * Alias is an internal name for destinations within your code. + * You can associate a name with an tvm_id once in your code and use the name as an alias for + * tvm_id to each calling point. Useful for several environments: prod/test/etc. + * @example: + * // init + * static const TString MY_BACKEND = "my backend"; + * TDstMap map = {{MY_BACKEND, TDst(config.get("my_back_tvm_id"))}}; + * ... + * // per request + * TString t = tvmClient.GetServiceTicket(MY_BACKEND); + */ + using TDstMap = THashMap<TAlias, TDst>; + using TDstVector = TVector<TDst>; + + public: + /*! + * NOTE: Please use this option: it provides the best reliability + * NOTE: Client requires read/write permissions + * WARNING: The same directory can be used only: + * - for TVM clients with the same settings + * OR + * - for new client replacing previous - with another config. + * System user must be the same for processes with these clients inside. + * Implementation doesn't provide other scenarios. + */ + TString DiskCacheDir; + + // Required for Service Ticket fetching or checking + TTvmId SelfTvmId = 0; + + // Options for Service Tickets fetching + NSecretString::TSecretString Secret; + /*! + * Client will process both attrs: + * FetchServiceTicketsForDsts_, FetchServiceTicketsForDstsWithAliases_ + * WARNING: It is not way to provide authorization for incoming ServiceTickets! + * It is way only to send your ServiceTickets to your backend! + */ + TDstVector FetchServiceTicketsForDsts; + TDstMap FetchServiceTicketsForDstsWithAliases; + bool IsIncompleteTicketsSetAnError = true; + + // Options for Service Tickets checking + bool CheckServiceTickets = false; + + // Options for User Tickets checking + TMaybe<EBlackboxEnv> CheckUserTicketsWithBbEnv; + + // Options for roles fetching + TString FetchRolesForIdmSystemSlug; + /*! + * By default client checks src from ServiceTicket or default uid from UserTicket - + * to prevent you from forgetting to check it yourself. + * It does binary checks only: + * ticket gets status NoRoles, if there is no role for src or default uid. + * You need to check roles on your own if you have a non-binary role system or + * you have disabled ShouldCheckSrc/ShouldCheckDefaultUid + * + * You may need to disable this check in the following cases: + * - You use GetRoles() to provide verbose message (with revision). + * Double check may be inconsistent: + * binary check inside client uses revision of roles X - i.e. src 100500 has no role, + * exact check in your code uses revision of roles Y - i.e. src 100500 has some roles. + */ + bool ShouldCheckSrc = true; + bool ShouldCheckDefaultUid = true; + + // Options for tests + TString TvmHost = "https://tvm-api.yandex.net"; + ui16 TvmPort = 443; + TString TiroleHost = "https://tirole-api.yandex.net"; + TDuration TvmSocketTimeout = TDuration::Seconds(5); + TDuration TvmConnectTimeout = TDuration::Seconds(30); + ui16 TirolePort = 443; + TTvmId TiroleTvmId = TIROLE_TVMID; + + // for debug purposes + TString LibVersionPrefix; + + void CheckValid() const; + TClientSettings CloneNormalized() const; + + static inline const TTvmId TIROLE_TVMID = 2028120; + static inline const TTvmId TIROLE_TVMID_TEST = 2026536; + + // DEPRECATED API + // TODO: get rid of it: PASSP-35377 + public: + // Deprecated: set attributes directly + void SetSelfTvmId(TTvmId selfTvmId) { + SelfTvmId = selfTvmId; + } + + // Deprecated: set attributes directly + void EnableServiceTicketChecking() { + CheckServiceTickets = true; + } + + // Deprecated: set attributes directly + void EnableUserTicketChecking(EBlackboxEnv env) { + CheckUserTicketsWithBbEnv = env; + } + + // Deprecated: set attributes directly + void SetTvmHostPort(const TString& host, ui16 port) { + TvmHost = host; + TvmPort = port; + } + + // Deprecated: set attributes directly + void SetTiroleHostPort(const TString& host, ui16 port) { + TiroleHost = host; + TirolePort = port; + } + + // Deprecated: set attributes directly + void EnableRolesFetching(const TString& systemSlug, TTvmId tiroleTvmId = TIROLE_TVMID) { + TiroleTvmId = tiroleTvmId; + FetchRolesForIdmSystemSlug = systemSlug; + } + + // Deprecated: set attributes directly + void DoNotCheckSrcByDefault() { + ShouldCheckSrc = false; + } + + // Deprecated: set attributes directly + void DoNotCheckDefaultUidByDefault() { + ShouldCheckDefaultUid = false; + } + + // Deprecated: set attributes directly + void SetDiskCacheDir(const TString& dir) { + DiskCacheDir = dir; + } + + // Deprecated: set attributes directly + void EnableServiceTicketsFetchOptions(const TStringBuf selfSecret, + TDstMap&& dsts, + const bool considerIncompleteTicketsSetAsError = true) { + IsIncompleteTicketsSetAnError = considerIncompleteTicketsSetAsError; + Secret = selfSecret; + + FetchServiceTicketsForDsts = TDstVector{}; + FetchServiceTicketsForDsts.reserve(dsts.size()); + for (const auto& pair : dsts) { + FetchServiceTicketsForDsts.push_back(pair.second); + } + + FetchServiceTicketsForDstsWithAliases = std::move(dsts); + } + + // Deprecated: set attributes directly + void EnableServiceTicketsFetchOptions(const TStringBuf selfSecret, + TDstVector&& dsts, + const bool considerIncompleteTicketsSetAsError = true) { + IsIncompleteTicketsSetAnError = considerIncompleteTicketsSetAsError; + Secret = selfSecret; + FetchServiceTicketsForDsts = std::move(dsts); + } + + public: + bool IsServiceTicketFetchingRequired() const { + return bool(Secret.Value()); + } + + const TStringBuf GetSelfSecret() const { + return Secret; + } + + bool HasDstAliases() const { + return !FetchServiceTicketsForDstsWithAliases.empty(); + } + + const TDstMap& GetDstAliases() const { + return FetchServiceTicketsForDstsWithAliases; + } + + const TDstVector& GetDestinations() const { + return FetchServiceTicketsForDsts; + } + + bool IsUserTicketCheckingRequired() const { + return bool(CheckUserTicketsWithBbEnv); + } + + EBlackboxEnv GetEnvForUserTickets() const { + return *CheckUserTicketsWithBbEnv; + } + + bool IsServiceTicketCheckingRequired() const { + return CheckServiceTickets; + } + + bool IsDiskCacheUsed() const { + return bool(DiskCacheDir); + } + + TString GetDiskCacheDir() const { + return DiskCacheDir; + } + + TTvmId GetSelfTvmId() const { + return SelfTvmId; + } + + const TString& GetLibVersionPrefix() const { + return LibVersionPrefix; + } + + const TString& GetTvmHost() const { + return TvmHost; + } + + ui16 GetTvmPort() const { + return TvmPort; + } + + bool IsRolesFetchingEnabled() const { + return bool(FetchRolesForIdmSystemSlug); + } + + TTvmId GetTiroleTvmId() const { + return TiroleTvmId; + } + + const TString& GetIdmSystemSlug() const { + return FetchRolesForIdmSystemSlug; + } + + const TString& GetTiroleHost() const { + return TiroleHost; + } + + ui16 GetTirolePort() const { + return TirolePort; + } + + bool NeedServiceTicketsFetching() const { + return !FetchServiceTicketsForDsts.empty() || + !FetchServiceTicketsForDstsWithAliases.empty() || + FetchRolesForIdmSystemSlug; + } + + // TODO: get rid of TDst: PASSP-35377 + class TDst { + public: + TDst(TTvmId id) + : Id(id) + { + Y_ENSURE_EX(id != 0, TBrokenTvmClientSettings() << "TvmId cannot be 0"); + } + + TTvmId Id; + + bool operator==(const TDst& o) const { + return Id == o.Id; + } + + bool operator<(const TDst& o) const { + return Id < o.Id; + } + + public: // for python binding + TDst() + : Id(0) + { + } + }; + + public: + static void CheckPermissions(const TString& dir); + }; +} diff --git a/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp b/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp new file mode 100644 index 0000000000..a7df49c05d --- /dev/null +++ b/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp @@ -0,0 +1,954 @@ +#include "threaded_updater.h" + +#include <library/cpp/tvmauth/client/misc/disk_cache.h> +#include <library/cpp/tvmauth/client/misc/utils.h> +#include <library/cpp/tvmauth/client/misc/retry_settings/v1/settings.pb.h> + +#include <library/cpp/tvmauth/client/logger.h> + +#include <library/cpp/json/json_reader.h> + +#include <util/stream/str.h> +#include <util/string/builder.h> +#include <util/string/cast.h> +#include <util/system/thread.h> + +namespace NTvmAuth::NTvmApi { + static TString CreatePublicKeysUrl(const TClientSettings& settings, + const NUtils::TProcInfo& procInfo) { + TStringStream s; + s << "/2/keys"; + s << "?"; + procInfo.AddToRequest(s); + + s << "&get_retry_settings=yes"; + + if (settings.GetSelfTvmId() != 0) { + s << "&src=" << settings.GetSelfTvmId(); + } + + if (settings.IsUserTicketCheckingRequired()) { + s << "&env=" << static_cast<int>(settings.GetEnvForUserTickets()); + } + + return s.Str(); + } + + TAsyncUpdaterPtr TThreadedUpdater::Create(const TClientSettings& settings, TLoggerPtr logger) { + Y_ENSURE_EX(logger, TNonRetriableException() << "Logger is required"); + THolder<TThreadedUpdater> p(new TThreadedUpdater(settings, std::move(logger))); + p->Init(); + p->StartWorker(); + return p.Release(); + } + + TThreadedUpdater::~TThreadedUpdater() { + ExpBackoff_.SetEnabled(false); + ExpBackoff_.Interrupt(); + StopWorker(); // Required here to avoid using of deleted members + } + + TClientStatus TThreadedUpdater::GetStatus() const { + const TClientStatus::ECode state = GetState(); + return TClientStatus(state, GetLastError(state == TClientStatus::Ok || state == TClientStatus::IncompleteTicketsSet)); + } + + NRoles::TRolesPtr TThreadedUpdater::GetRoles() const { + Y_ENSURE_EX(RolesFetcher_, + TBrokenTvmClientSettings() << "Roles were not configured in settings"); + return RolesFetcher_->GetCurrentRoles(); + } + + TClientStatus::ECode TThreadedUpdater::GetState() const { + const TInstant now = TInstant::Now(); + + if (Settings_.IsServiceTicketFetchingRequired()) { + if (AreServiceTicketsInvalid(now)) { + return TClientStatus::Error; + } + auto tickets = GetCachedServiceTickets(); + if (!tickets) { + return TClientStatus::Error; + } + if (tickets->TicketsById.size() < Destinations_.size()) { + if (Settings_.IsIncompleteTicketsSetAnError) { + return TClientStatus::Error; + } else { + return TClientStatus::IncompleteTicketsSet; + } + } + } + if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) && ArePublicKeysInvalid(now)) { + return TClientStatus::Error; + } + + const TDuration sincePublicKeysUpdate = now - GetUpdateTimeOfPublicKeys(); + const TDuration sinceServiceTicketsUpdate = now - GetUpdateTimeOfServiceTickets(); + const TDuration sinceRolesUpdate = now - GetUpdateTimeOfRoles(); + + if (Settings_.IsServiceTicketFetchingRequired() && sinceServiceTicketsUpdate > ServiceTicketsDurations_.Expiring) { + return TClientStatus::Warning; + } + if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) && + sincePublicKeysUpdate > PublicKeysDurations_.Expiring) + { + return TClientStatus::Warning; + } + if (RolesFetcher_ && TRolesFetcher::ShouldWarn(RetrySettings_, sinceRolesUpdate)) { + return TClientStatus::Warning; + } + + return TClientStatus::Ok; + } + + TThreadedUpdater::TThreadedUpdater(const TClientSettings& settings, TLoggerPtr logger) + : TThreadedUpdaterBase( + TRetrySettings{}.WorkerAwakingPeriod, + std::move(logger), + settings.GetTvmHost(), + settings.GetTvmPort(), + settings.TvmSocketTimeout, + settings.TvmConnectTimeout) + , ExpBackoff_(RetrySettings_.BackoffSettings) + , Settings_(settings.CloneNormalized()) + , ProcInfo_(NUtils::TProcInfo::Create(Settings_.GetLibVersionPrefix())) + , PublicKeysUrl_(CreatePublicKeysUrl(Settings_, ProcInfo_)) + , DstAliases_(MakeAliasMap(Settings_)) + , Headers_({{"Content-Type", "application/x-www-form-urlencoded"}}) + , Random_(TInstant::Now().MicroSeconds()) + { + if (Settings_.IsServiceTicketFetchingRequired()) { + SigningContext_ = TServiceContext::SigningFactory(Settings_.GetSelfSecret()); + } + + if (Settings_.IsServiceTicketFetchingRequired()) { + Destinations_ = {Settings_.GetDestinations().begin(), Settings_.GetDestinations().end()}; + } + + PublicKeysDurations_.RefreshPeriod = TDuration::Days(1); + ServiceTicketsDurations_.RefreshPeriod = TDuration::Hours(1); + + if (Settings_.IsUserTicketCheckingRequired()) { + SetBbEnv(Settings_.GetEnvForUserTickets()); + } + + if (Settings_.IsRolesFetchingEnabled()) { + RolesFetcher_ = std::make_unique<TRolesFetcher>( + TRolesFetcherSettings{ + Settings_.GetTiroleHost(), + Settings_.GetTirolePort(), + Settings_.GetDiskCacheDir(), + ProcInfo_, + Settings_.GetSelfTvmId(), + Settings_.GetIdmSystemSlug(), + }, + Logger_); + } + + if (Settings_.IsDiskCacheUsed()) { + TString path = Settings_.GetDiskCacheDir(); + if (path.back() != '/') { + path.push_back('/'); + } + + if (Settings_.IsServiceTicketFetchingRequired()) { + ServiceTicketsFilepath_ = path; + ServiceTicketsFilepath_.append("service_tickets"); + } + + if (Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) { + PublicKeysFilepath_ = path; + PublicKeysFilepath_.append("public_keys"); + } + + RetrySettingsFilepath_ = path + "retry_settings"; + } else { + LogInfo("Disk cache disabled. Please set disk cache directory in settings for best reliability"); + } + } + + void TThreadedUpdater::Init() { + ReadStateFromDisk(); + ClearErrors(); + ExpBackoff_.SetEnabled(false); + + // First of all try to get tickets: there are a lot of reasons to fail this request. + // As far as disk cache usually disabled, client will not fetch keys before fail on every ctor call. + UpdateServiceTickets(); + if (!AreServicesTicketsOk()) { + ThrowLastError(); + } + + UpdatePublicKeys(); + if (!IsServiceContextOk() || !IsUserContextOk()) { + ThrowLastError(); + } + + UpdateRoles(); + if (RolesFetcher_ && !RolesFetcher_->AreRolesOk()) { + ThrowLastError(); + } + + Inited_ = true; + ExpBackoff_.SetEnabled(true); + } + + void TThreadedUpdater::UpdateServiceTickets() { + if (!Settings_.IsServiceTicketFetchingRequired()) { + return; + } + + TInstant stut = GetUpdateTimeOfServiceTickets(); + try { + if (IsTimeToUpdateServiceTickets(stut)) { + UpdateAllServiceTickets(); + NeedFetchMissingServiceTickets_ = false; + } else if (NeedFetchMissingServiceTickets_ && GetCachedServiceTickets()->TicketsById.size() < Destinations_.size()) { + UpdateMissingServiceTickets(Destinations_); + NeedFetchMissingServiceTickets_ = false; + } + if (AreServicesTicketsOk()) { + ClearError(EScope::ServiceTickets); + } + } catch (const std::exception& e) { + ProcessError(EType::Retriable, EScope::ServiceTickets, e.what()); + LogWarning(TStringBuilder() << "Failed to update service tickets: " << e.what()); + if (TInstant::Now() - stut > ServiceTicketsDurations_.Expiring) { + LogError("Service tickets have not been refreshed for too long period"); + } + } + } + + void TThreadedUpdater::UpdateAllServiceTickets() { + THttpResult st = GetServiceTicketsFromHttp(Destinations_, RetrySettings_.DstsLimit); + + auto oldCache = GetCachedServiceTickets(); + if (oldCache) { + for (const auto& pair : oldCache->ErrorsById) { + st.TicketsWithErrors.Errors.insert(pair); + } + } + + UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), TInstant::Now()); + if (ServiceTicketsFilepath_) { + DiskCacheServiceTickets_ = CreateJsonArray(st.Responses); + TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get()); + w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId())); + } + } + + TServiceTicketsPtr TThreadedUpdater::UpdateMissingServiceTickets(const TDstSet& required) { + TServiceTicketsPtr cache = GetCachedServiceTickets(); + TClientSettings::TDstVector dsts = FindMissingDsts(cache, required); + + if (dsts.empty()) { + return cache; + } + + THttpResult st = GetServiceTicketsFromHttp(dsts, RetrySettings_.DstsLimit); + + size_t gotTickets = st.TicketsWithErrors.Tickets.size(); + + for (const auto& pair : cache->TicketsById) { + st.TicketsWithErrors.Tickets.insert(pair); + } + for (const auto& pair : cache->ErrorsById) { + st.TicketsWithErrors.Errors.insert(pair); + } + for (const auto& pair : st.TicketsWithErrors.Tickets) { + st.TicketsWithErrors.Errors.erase(pair.first); + } + + TServiceTicketsPtr c = UpdateServiceTicketsCachePartly( + std::move(st.TicketsWithErrors), + gotTickets); + if (!c) { + LogWarning("UpdateMissingServiceTickets: new cache is NULL. BUG?"); + c = cache; + } + + if (!ServiceTicketsFilepath_) { + return c; + } + + DiskCacheServiceTickets_ = AppendToJsonArray(DiskCacheServiceTickets_, st.Responses); + + TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get()); + w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId())); + + return c; + } + + void TThreadedUpdater::UpdatePublicKeys() { + if (!Settings_.IsServiceTicketCheckingRequired() && !Settings_.IsUserTicketCheckingRequired()) { + return; + } + + TInstant pkut = GetUpdateTimeOfPublicKeys(); + if (!IsTimeToUpdatePublicKeys(pkut)) { + return; + } + + try { + TString publicKeys = GetPublicKeysFromHttp(); + + UpdatePublicKeysCache(publicKeys, TInstant::Now()); + if (PublicKeysFilepath_) { + TDiskWriter w(PublicKeysFilepath_, Logger_.Get()); + w.Write(publicKeys); + } + if (IsServiceContextOk() && IsUserContextOk()) { + ClearError(EScope::PublicKeys); + } + } catch (const std::exception& e) { + ProcessError(EType::Retriable, EScope::PublicKeys, e.what()); + LogWarning(TStringBuilder() << "Failed to update public keys: " << e.what()); + if (TInstant::Now() - pkut > PublicKeysDurations_.Expiring) { + LogError("Public keys have not been refreshed for too long period"); + } + } + } + + void TThreadedUpdater::UpdateRoles() { + if (!RolesFetcher_) { + return; + } + + TInstant rut = GetUpdateTimeOfRoles(); + if (!TRolesFetcher::IsTimeToUpdate(RetrySettings_, TInstant::Now() - rut)) { + return; + } + + struct TCloser { + TRolesFetcher* Fetcher; + ~TCloser() { + Fetcher->ResetConnection(); + } + } closer{RolesFetcher_.get()}; + + try { + TServiceTicketsPtr st = GetCachedServiceTickets(); + Y_ENSURE(st, "No one service ticket in memory: how it possible?"); + auto it = st->TicketsById.find(Settings_.GetTiroleTvmId()); + Y_ENSURE(it != st->TicketsById.end(), + "Missing tvmid for tirole in cache: " << Settings_.GetTiroleTvmId()); + + RolesFetcher_->Update( + FetchWithRetries( + [&]() { return RolesFetcher_->FetchActualRoles(it->second); }, + EScope::Roles)); + SetUpdateTimeOfRoles(TInstant::Now()); + + if (RolesFetcher_->AreRolesOk()) { + ClearError(EScope::Roles); + } + } catch (const std::exception& e) { + ProcessError(EType::Retriable, EScope::Roles, e.what()); + LogWarning(TStringBuilder() << "Failed to update roles: " << e.what()); + if (TRolesFetcher::ShouldWarn(RetrySettings_, TInstant::Now() - rut)) { + LogError("Roles have not been refreshed for too long period"); + } + } + } + + TServiceTicketsPtr TThreadedUpdater::UpdateServiceTicketsCachePartly( + TAsyncUpdaterBase::TPairTicketsErrors&& tickets, + size_t got) { + size_t count = tickets.Tickets.size(); + TServiceTicketsPtr c = MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets), + std::move(tickets.Errors), + DstAliases_); + SetServiceTickets(c); + + LogInfo(TStringBuilder() + << "Cache was partly updated with " << got + << " service ticket(s). total: " << count); + + return c; + } + + void TThreadedUpdater::UpdateServiceTicketsCache(TPairTicketsErrors&& tickets, TInstant time) { + size_t count = tickets.Tickets.size(); + SetServiceTickets(MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets), + std::move(tickets.Errors), + DstAliases_)); + + SetUpdateTimeOfServiceTickets(time); + + if (count > 0) { + LogInfo(TStringBuilder() << "Cache was updated with " << count << " service ticket(s): " << time); + } + } + + void TThreadedUpdater::UpdatePublicKeysCache(const TString& publicKeys, TInstant time) { + if (publicKeys.empty()) { + return; + } + + if (Settings_.IsServiceTicketCheckingRequired()) { + SetServiceContext(MakeIntrusiveConst<TServiceContext>( + TServiceContext::CheckingFactory(Settings_.GetSelfTvmId(), + publicKeys))); + } + + if (Settings_.IsUserTicketCheckingRequired()) { + SetUserContext(publicKeys); + } + + SetUpdateTimeOfPublicKeys(time); + + LogInfo(TStringBuilder() << "Cache was updated with public keys: " << time); + } + + void TThreadedUpdater::ReadStateFromDisk() { + try { + TServiceTicketsFromDisk st = ReadServiceTicketsFromDisk(); + UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), st.BornDate); + DiskCacheServiceTickets_ = st.FileBody; + } catch (const std::exception& e) { + LogWarning(TStringBuilder() << "Failed to read service tickets from disk: " << e.what()); + } + + try { + std::pair<TString, TInstant> pk = ReadPublicKeysFromDisk(); + UpdatePublicKeysCache(pk.first, pk.second); + } catch (const std::exception& e) { + LogWarning(TStringBuilder() << "Failed to read public keys from disk: " << e.what()); + } + + try { + TString rs = ReadRetrySettingsFromDisk(); + UpdateRetrySettings(rs); + } catch (const std::exception& e) { + LogWarning(TStringBuilder() << "Failed to read retry settings from disk: " << e.what()); + } + + try { + if (RolesFetcher_) { + SetUpdateTimeOfRoles(RolesFetcher_->ReadFromDisk()); + } + } catch (const std::exception& e) { + LogWarning(TStringBuilder() << "Failed to read roles from disk: " << e.what()); + } + } + + TThreadedUpdater::TServiceTicketsFromDisk TThreadedUpdater::ReadServiceTicketsFromDisk() const { + if (!ServiceTicketsFilepath_) { + return {}; + } + + TDiskReader r(ServiceTicketsFilepath_, Logger_.Get()); + if (!r.Read()) { + return {}; + } + + std::pair<TStringBuf, TTvmId> data = ParseTicketsFromDisk(r.Data()); + if (data.second != Settings_.GetSelfTvmId()) { + TStringStream s; + s << "Disk cache is for another tvmId (" << data.second << "). "; + s << "Self=" << Settings_.GetSelfTvmId(); + LogWarning(s.Str()); + return {}; + } + + TPairTicketsErrors res; + ParseTicketsFromResponse(data.first, Destinations_, res); + if (IsInvalid(TServiceTickets::GetInvalidationTime(res.Tickets), TInstant::Now())) { + LogWarning("Disk cache (service tickets) is too old"); + return {}; + } + + LogInfo(TStringBuilder() << "Got " << res.Tickets.size() << " service ticket(s) from disk"); + return {std::move(res), r.Time(), TString(data.first)}; + } + + std::pair<TString, TInstant> TThreadedUpdater::ReadPublicKeysFromDisk() const { + if (!PublicKeysFilepath_) { + return {}; + } + + TDiskReader r(PublicKeysFilepath_, Logger_.Get()); + if (!r.Read()) { + return {}; + } + + if (TInstant::Now() - r.Time() > PublicKeysDurations_.Invalid) { + LogWarning("Disk cache (public keys) is too old"); + return {}; + } + + return {r.Data(), r.Time()}; + } + + TString TThreadedUpdater::ReadRetrySettingsFromDisk() const { + if (!RetrySettingsFilepath_) { + return {}; + } + + TDiskReader r(RetrySettingsFilepath_, Logger_.Get()); + if (!r.Read()) { + return {}; + } + + return r.Data(); + } + + template <class Dsts> + TThreadedUpdater::THttpResult TThreadedUpdater::GetServiceTicketsFromHttp(const Dsts& dsts, const size_t dstLimit) const { + Y_ENSURE(SigningContext_, "Internal error"); + + TClientSettings::TDstVector part; + part.reserve(dstLimit); + THttpResult res; + res.TicketsWithErrors.Tickets.reserve(dsts.size()); + res.Responses.reserve(dsts.size() / dstLimit + 1); + + for (auto it = dsts.begin(); it != dsts.end();) { + part.clear(); + for (size_t count = 0; it != dsts.end() && count < dstLimit; ++count, ++it) { + part.push_back(*it); + } + + TString response = + FetchWithRetries( + [this, &part]() { + // create request here to keep 'ts' actual + return FetchServiceTicketsFromHttp(PrepareRequestForServiceTickets( + Settings_.GetSelfTvmId(), + *SigningContext_, + part, + ProcInfo_)); + }, + EScope::ServiceTickets) + .Response; + ParseTicketsFromResponse(response, part, res.TicketsWithErrors); + LogDebug(TStringBuilder() + << "Response with service tickets for " << part.size() + << " destination(s) was successfully fetched from " << TvmUrl_); + + res.Responses.push_back(response); + } + + LogDebug(TStringBuilder() + << "Got responses with service tickets with " << res.Responses.size() << " pages for " + << dsts.size() << " destination(s)"); + for (const auto& p : res.TicketsWithErrors.Errors) { + LogError(TStringBuilder() + << "Failed to get service ticket for dst=" << p.first << ": " << p.second); + } + + return res; + } + + TString TThreadedUpdater::GetPublicKeysFromHttp() const { + TString publicKeys = + FetchWithRetries( + [this]() { return FetchPublicKeysFromHttp(); }, + EScope::PublicKeys) + .Response; + + LogDebug("Public keys were successfully fetched from " + TvmUrl_); + + return publicKeys; + } + + NUtils::TFetchResult TThreadedUpdater::FetchServiceTicketsFromHttp(const TString& body) const { + TStringStream s; + + THttpHeaders outHeaders; + TKeepAliveHttpClient::THttpCode code = GetClient().DoPost("/2/ticket", body, &s, Headers_, &outHeaders); + + const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings"); + + return {code, {}, "/2/ticket", s.Str(), settings ? settings->Value() : ""}; + } + + NUtils::TFetchResult TThreadedUpdater::FetchPublicKeysFromHttp() const { + TStringStream s; + + THttpHeaders outHeaders; + TKeepAliveHttpClient::THttpCode code = GetClient().DoGet(PublicKeysUrl_, &s, {}, &outHeaders); + + const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings"); + + return {code, {}, "/2/keys", s.Str(), settings ? settings->Value() : ""}; + } + + bool TThreadedUpdater::UpdateRetrySettings(const TString& header) const { + if (header.empty()) { + // Probably it is some kind of test? + return false; + } + + try { + TString raw = NUtils::Base64url2bin(header); + Y_ENSURE(raw, "Invalid base64url in settings"); + + retry_settings::v1::Settings proto; + Y_ENSURE(proto.ParseFromString(raw), "Invalid proto"); + + // This ugly hack helps to process these settings in any case + TThreadedUpdater& this_ = *const_cast<TThreadedUpdater*>(this); + TRetrySettings& res = this_.RetrySettings_; + + TStringStream diff; + auto update = [&diff](auto& l, const auto& r, TStringBuf desc) { + if (l != r) { + diff << desc << ":" << l << "->" << r << ";"; + l = r; + } + }; + + if (proto.has_exponential_backoff_min_sec()) { + update(res.BackoffSettings.Min, + TDuration::Seconds(proto.exponential_backoff_min_sec()), + "exponential_backoff_min"); + } + if (proto.has_exponential_backoff_max_sec()) { + update(res.BackoffSettings.Max, + TDuration::Seconds(proto.exponential_backoff_max_sec()), + "exponential_backoff_max"); + } + if (proto.has_exponential_backoff_factor()) { + update(res.BackoffSettings.Factor, + proto.exponential_backoff_factor(), + "exponential_backoff_factor"); + } + if (proto.has_exponential_backoff_jitter()) { + update(res.BackoffSettings.Jitter, + proto.exponential_backoff_jitter(), + "exponential_backoff_jitter"); + } + this_.ExpBackoff_.UpdateSettings(res.BackoffSettings); + + if (proto.has_max_random_sleep_default()) { + update(res.MaxRandomSleepDefault, + TDuration::MilliSeconds(proto.max_random_sleep_default()), + "max_random_sleep_default"); + } + if (proto.has_max_random_sleep_when_ok()) { + update(res.MaxRandomSleepWhenOk, + TDuration::MilliSeconds(proto.max_random_sleep_when_ok()), + "max_random_sleep_when_ok"); + } + if (proto.has_retries_on_start()) { + Y_ENSURE(proto.retries_on_start(), "retries_on_start==0"); + update(res.RetriesOnStart, + proto.retries_on_start(), + "retries_on_start"); + } + if (proto.has_retries_in_background()) { + Y_ENSURE(proto.retries_in_background(), "retries_in_background==0"); + update(res.RetriesInBackground, + proto.retries_in_background(), + "retries_in_background"); + } + if (proto.has_worker_awaking_period_sec()) { + update(res.WorkerAwakingPeriod, + TDuration::Seconds(proto.worker_awaking_period_sec()), + "worker_awaking_period"); + this_.WorkerAwakingPeriod_ = res.WorkerAwakingPeriod; + } + if (proto.has_dsts_limit()) { + Y_ENSURE(proto.dsts_limit(), "dsts_limit==0"); + update(res.DstsLimit, + proto.dsts_limit(), + "dsts_limit"); + } + + if (proto.has_roles_update_period_sec()) { + Y_ENSURE(proto.roles_update_period_sec(), "roles_update_period==0"); + update(res.RolesUpdatePeriod, + TDuration::Seconds(proto.roles_update_period_sec()), + "roles_update_period_sec"); + } + if (proto.has_roles_warn_period_sec()) { + Y_ENSURE(proto.roles_warn_period_sec(), "roles_warn_period_sec==0"); + update(res.RolesWarnPeriod, + TDuration::Seconds(proto.roles_warn_period_sec()), + "roles_warn_period_sec"); + } + + if (diff.empty()) { + return false; + } + + LogDebug("Retry settings were updated: " + diff.Str()); + return true; + } catch (const std::exception& e) { + LogWarning(TStringBuilder() + << "Failed to update retry settings from server, header '" + << header << "': " + << e.what()); + } + + return false; + } + + template <typename Func> + NUtils::TFetchResult TThreadedUpdater::FetchWithRetries(Func func, EScope scope) const { + const ui32 tries = Inited_ ? RetrySettings_.RetriesInBackground + : RetrySettings_.RetriesOnStart; + + for (size_t idx = 1;; ++idx) { + RandomSleep(); + + try { + NUtils::TFetchResult result = func(); + + if (UpdateRetrySettings(result.RetrySettings) && RetrySettingsFilepath_) { + TDiskWriter w(RetrySettingsFilepath_, Logger_.Get()); + w.Write(result.RetrySettings); + } + + if (400 <= result.Code && result.Code <= 499) { + throw TNonRetriableException() << ProcessHttpError(scope, result.Path, result.Code, result.Response); + } + if (result.Code < 200 || result.Code >= 399) { + throw yexception() << ProcessHttpError(scope, result.Path, result.Code, result.Response); + } + + ExpBackoff_.Decrease(); + return result; + } catch (const TNonRetriableException& e) { + LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what()); + ExpBackoff_.Increase(); + throw; + } catch (const std::exception& e) { + LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what()); + ExpBackoff_.Increase(); + if (idx >= tries) { + throw; + } + } + } + + throw yexception() << "unreachable"; + } + + void TThreadedUpdater::RandomSleep() const { + const TDuration maxSleep = TClientStatus::ECode::Ok == GetState() + ? RetrySettings_.MaxRandomSleepWhenOk + : RetrySettings_.MaxRandomSleepDefault; + + if (maxSleep) { + ui32 toSleep = Random_.GenRand() % maxSleep.MilliSeconds(); + ExpBackoff_.Sleep(TDuration::MilliSeconds(toSleep)); + } + } + + TString TThreadedUpdater::PrepareRequestForServiceTickets(TTvmId src, + const TServiceContext& ctx, + const TClientSettings::TDstVector& dsts, + const NUtils::TProcInfo& procInfo, + time_t now) { + TStringStream s; + + const TString ts = IntToString<10>(now); + TStringStream dst; + dst.Reserve(10 * dsts.size()); + for (const TClientSettings::TDst& d : dsts) { + if (dst.Str()) { + dst << ','; + } + dst << d.Id; + } + + s << "grant_type=client_credentials"; + s << "&src=" << src; + s << "&dst=" << dst.Str(); + s << "&ts=" << ts; + s << "&sign=" << ctx.SignCgiParamsForTvm(ts, dst.Str()); + s << "&get_retry_settings=yes"; + + s << "&"; + procInfo.AddToRequest(s); + + return s.Str(); + } + + template <class Dsts> + void TThreadedUpdater::ParseTicketsFromResponse(TStringBuf resp, + const Dsts& dsts, + TPairTicketsErrors& out) const { + NJson::TJsonValue doc; + Y_ENSURE(NJson::ReadJsonTree(resp, &doc), "Invalid json from tvm-api: " << resp); + + const NJson::TJsonValue* currentResp = doc.IsMap() ? &doc : nullptr; + auto find = [¤tResp, &doc](TTvmId id, NJson::TJsonValue& obj) -> bool { + const TString idStr = IntToString<10>(id); + if (currentResp && currentResp->GetValue(idStr, &obj)) { + return true; + } + + for (const NJson::TJsonValue& val : doc.GetArray()) { + currentResp = &val; + if (currentResp->GetValue(idStr, &obj)) { + return true; + } + } + + return false; + }; + + for (const TClientSettings::TDst& d : dsts) { + NJson::TJsonValue obj; + NJson::TJsonValue val; + + if (!find(d.Id, obj) || !obj.GetValue("ticket", &val)) { + TString err; + if (obj.GetValue("error", &val)) { + err = val.GetString(); + } else { + err = "Missing tvm_id in response, should never happend: " + IntToString<10>(d.Id); + } + + TStringStream s; + s << "Failed to get ServiceTicket for " << d.Id << ": " << err; + ProcessError(EType::NonRetriable, EScope::ServiceTickets, s.Str()); + + out.Errors.insert({d.Id, std::move(err)}); + continue; + } + + out.Tickets.insert({d.Id, val.GetString()}); + } + } + + static const char DELIMETER = '\t'; + TString TThreadedUpdater::PrepareTicketsForDisk(TStringBuf tvmResponse, TTvmId selfId) { + TStringStream s; + s << tvmResponse << DELIMETER << selfId; + return s.Str(); + } + + std::pair<TStringBuf, TTvmId> TThreadedUpdater::ParseTicketsFromDisk(TStringBuf data) { + TStringBuf tvmId = data.RNextTok(DELIMETER); + return {data, IntFromString<TTvmId, 10>(tvmId)}; + } + + const TDstSet& TThreadedUpdater::GetDsts() const { + return Destinations_; + } + + void TThreadedUpdater::AddDstToSettings(const TClientSettings::TDst& dst) { + Destinations_.insert(dst); + } + + bool TThreadedUpdater::IsTimeToUpdateServiceTickets(TInstant lastUpdate) const { + return TInstant::Now() - lastUpdate > ServiceTicketsDurations_.RefreshPeriod; + } + + bool TThreadedUpdater::IsTimeToUpdatePublicKeys(TInstant lastUpdate) const { + return TInstant::Now() - lastUpdate > PublicKeysDurations_.RefreshPeriod; + } + + bool TThreadedUpdater::AreServicesTicketsOk() const { + if (!Settings_.IsServiceTicketFetchingRequired()) { + return true; + } + auto c = GetCachedServiceTickets(); + return c && (!Settings_.IsIncompleteTicketsSetAnError || c->TicketsById.size() == Destinations_.size()); + } + + bool TThreadedUpdater::IsServiceContextOk() const { + if (!Settings_.IsServiceTicketCheckingRequired()) { + return true; + } + + return bool(GetCachedServiceContext()); + } + + bool TThreadedUpdater::IsUserContextOk() const { + if (!Settings_.IsUserTicketCheckingRequired()) { + return true; + } + return bool(GetCachedUserContext()); + } + + void TThreadedUpdater::Worker() { + UpdateServiceTickets(); + UpdatePublicKeys(); + UpdateRoles(); + } + + TServiceTickets::TMapAliasId TThreadedUpdater::MakeAliasMap(const TClientSettings& settings) { + TServiceTickets::TMapAliasId res; + + if (settings.HasDstAliases()) { + for (const auto& pair : settings.GetDstAliases()) { + res.insert({pair.first, pair.second.Id}); + } + } + + return res; + } + + TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(TServiceTicketsPtr available, const TDstSet& required) { + Y_ENSURE(available); + TDstSet set; + // available->TicketsById is not sorted + for (const auto& pair : available->TicketsById) { + set.insert(pair.first); + } + return FindMissingDsts(set, required); + } + + TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(const TDstSet& available, const TDstSet& required) { + TClientSettings::TDstVector res; + std::set_difference(required.begin(), required.end(), + available.begin(), available.end(), + std::inserter(res, res.begin())); + return res; + } + + TString TThreadedUpdater::CreateJsonArray(const TSmallVec<TString>& responses) { + if (responses.empty()) { + return "[]"; + } + + size_t size = 0; + for (const TString& r : responses) { + size += r.size() + 1; + } + + TString res; + res.reserve(size + 2); + + res.push_back('['); + for (const TString& r : responses) { + res.append(r).push_back(','); + } + res.back() = ']'; + + return res; + } + + TString TThreadedUpdater::AppendToJsonArray(const TString& json, const TSmallVec<TString>& responses) { + Y_ENSURE(json, "previous body required"); + + size_t size = 0; + for (const TString& r : responses) { + size += r.size() + 1; + } + + TString res; + res.reserve(size + 2 + json.size()); + + res.push_back('['); + if (json.StartsWith('[')) { + Y_ENSURE(json.EndsWith(']'), "array is broken:" << json); + res.append(TStringBuf(json).Chop(1).Skip(1)); + } else { + res.append(json); + } + + res.push_back(','); + for (const TString& r : responses) { + res.append(r).push_back(','); + } + res.back() = ']'; + + return res; + } +} diff --git a/library/cpp/tvmauth/client/misc/api/threaded_updater.h b/library/cpp/tvmauth/client/misc/api/threaded_updater.h new file mode 100644 index 0000000000..e546bbe030 --- /dev/null +++ b/library/cpp/tvmauth/client/misc/api/threaded_updater.h @@ -0,0 +1,140 @@ +#pragma once + +#include "retry_settings.h" +#include "roles_fetcher.h" +#include "settings.h" + +#include <library/cpp/tvmauth/client/misc/async_updater.h> +#include <library/cpp/tvmauth/client/misc/threaded_updater.h> + +#include <util/generic/set.h> +#include <util/random/fast.h> + +namespace NTvmAuth::NTvmApi { + using TDstSet = TSet<TClientSettings::TDst>; + + class TThreadedUpdater: public TThreadedUpdaterBase { + public: + /*! + * Starts thread for updating of in-memory cache in background + * Reads cache from disk if specified + * @param settings + * @param logger is usefull for monitoring and debuging + */ + static TAsyncUpdaterPtr Create(const TClientSettings& settings, TLoggerPtr logger); + ~TThreadedUpdater(); + + TClientStatus GetStatus() const override; + NRoles::TRolesPtr GetRoles() const override; + + protected: // for tests + TClientStatus::ECode GetState() const; + + TThreadedUpdater(const TClientSettings& settings, TLoggerPtr logger); + void Init(); + + void UpdateServiceTickets(); + void UpdateAllServiceTickets(); + TServiceTicketsPtr UpdateMissingServiceTickets(const TDstSet& required); + void UpdatePublicKeys(); + void UpdateRoles(); + + TServiceTicketsPtr UpdateServiceTicketsCachePartly(TPairTicketsErrors&& tickets, size_t got); + void UpdateServiceTicketsCache(TPairTicketsErrors&& tickets, TInstant time); + void UpdatePublicKeysCache(const TString& publicKeys, TInstant time); + + void ReadStateFromDisk(); + + struct TServiceTicketsFromDisk { + TPairTicketsErrors TicketsWithErrors; + TInstant BornDate; + TString FileBody; + }; + + TServiceTicketsFromDisk ReadServiceTicketsFromDisk() const; + std::pair<TString, TInstant> ReadPublicKeysFromDisk() const; + TString ReadRetrySettingsFromDisk() const; + + struct THttpResult { + TPairTicketsErrors TicketsWithErrors; + TSmallVec<TString> Responses; + }; + + template <class Dsts> + THttpResult GetServiceTicketsFromHttp(const Dsts& dsts, const size_t dstLimit) const; + TString GetPublicKeysFromHttp() const; + + virtual NUtils::TFetchResult FetchServiceTicketsFromHttp(const TString& body) const; + virtual NUtils::TFetchResult FetchPublicKeysFromHttp() const; + + bool UpdateRetrySettings(const TString& header) const; + + template <typename Func> + NUtils::TFetchResult FetchWithRetries(Func func, EScope scope) const; + void RandomSleep() const; + + static TString PrepareRequestForServiceTickets(TTvmId src, + const TServiceContext& ctx, + const TClientSettings::TDstVector& dsts, + const NUtils::TProcInfo& procInfo, + time_t now = time(nullptr)); + template <class Dsts> + void ParseTicketsFromResponse(TStringBuf resp, + const Dsts& dsts, + TPairTicketsErrors& out) const; + + static TString PrepareTicketsForDisk(TStringBuf tvmResponse, TTvmId selfId); + static std::pair<TStringBuf, TTvmId> ParseTicketsFromDisk(TStringBuf data); + + const TDstSet& GetDsts() const; + void AddDstToSettings(const TClientSettings::TDst& dst); + + bool IsTimeToUpdateServiceTickets(TInstant lastUpdate) const; + bool IsTimeToUpdatePublicKeys(TInstant lastUpdate) const; + + bool AreServicesTicketsOk() const; + bool IsServiceContextOk() const; + bool IsUserContextOk() const; + + void Worker() override; + + static TServiceTickets::TMapAliasId MakeAliasMap(const TClientSettings& settings); + static TClientSettings::TDstVector FindMissingDsts(TServiceTicketsPtr available, const TDstSet& required); + static TClientSettings::TDstVector FindMissingDsts(const TDstSet& available, const TDstSet& required); + + static TString CreateJsonArray(const TSmallVec<TString>& responses); + static TString AppendToJsonArray(const TString& json, const TSmallVec<TString>& responses); + + private: + TRetrySettings RetrySettings_; + + protected: + mutable TExponentialBackoff ExpBackoff_; + + private: + const TClientSettings Settings_; + + const NUtils::TProcInfo ProcInfo_; + + const TString PublicKeysUrl_; + + const TServiceTickets::TMapAliasId DstAliases_; + + const TKeepAliveHttpClient::THeaders Headers_; + TMaybe<TServiceContext> SigningContext_; + + TDstSet Destinations_; + TString DiskCacheServiceTickets_; + bool NeedFetchMissingServiceTickets_ = true; + + TString PublicKeysFilepath_; + TString ServiceTicketsFilepath_; + TString RetrySettingsFilepath_; + + std::unique_ptr<TRolesFetcher> RolesFetcher_; + + mutable TReallyFastRng32 Random_; + + bool Inited_ = false; + }; +} |