path: root/library/cpp/tvmauth/client/misc
diff options
authorqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
committerqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
commit22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch)
treebffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/tvmauth/client/misc
parent332b99e2173f0425444abb759eebcb2fafaa9209 (diff)
validate canons without yatest_common
Diffstat (limited to 'library/cpp/tvmauth/client/misc')
48 files changed, 6121 insertions, 0 deletions
diff --git a/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.cpp b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.cpp
new file mode 100644
index 0000000000..cd6ec45406
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.cpp
@@ -0,0 +1,166 @@
+#include "tvm_client.h"
+#include <util/string/builder.h>
+namespace NTvmAuth::NDynamicClient {
+ TIntrusivePtr<TTvmClient> TTvmClient::Create(const NTvmApi::TClientSettings& settings, TLoggerPtr logger) {
+ Y_ENSURE_EX(logger, TNonRetriableException() << "Logger is required");
+ THolder<TTvmClient> p(new TTvmClient(settings, std::move(logger)));
+ p->Init();
+ p->StartWorker();
+ return p.Release();
+ }
+ NThreading::TFuture<TAddResponse> TTvmClient::Add(TDsts&& dsts) {
+ if (dsts.empty()) {
+ LogDebug("Adding dst: got empty task");
+ return NThreading::MakeFuture<TAddResponse>(TAddResponse{});
+ }
+ NThreading::TPromise<TAddResponse> promise = NThreading::NewPromise<TAddResponse>();
+ TServiceTickets::TMapIdStr requestedTicketsFromStartUpCache = GetRequestedTicketsFromStartUpCache(dsts);
+ if (requestedTicketsFromStartUpCache.size() == dsts.size() &&
+ !IsInvalid(TServiceTickets::GetInvalidationTime(requestedTicketsFromStartUpCache), TInstant::Now())) {
+ std::unique_lock lock(*ServiceTicketBatchUpdateMutex_);
+ TPairTicketsErrors newCache;
+ TServiceTicketsPtr cache = GetCachedServiceTickets();
+ NTvmApi::TDstSetPtr oldDsts = GetDsts();
+ std::shared_ptr<TDsts> newDsts = std::make_shared<TDsts>(oldDsts->begin(), oldDsts->end());
+ for (const auto& ticket : cache->TicketsById) {
+ newCache.Tickets.insert(ticket);
+ }
+ for (const auto& error : cache->ErrorsById) {
+ newCache.Errors.insert(error);
+ }
+ for (const auto& ticket : requestedTicketsFromStartUpCache) {
+ newCache.Tickets.insert(ticket);
+ newDsts->insert(ticket.first);
+ }
+ UpdateServiceTicketsCache(std::move(newCache), GetStartUpCacheBornDate());
+ SetDsts(std::move(newDsts));
+ lock.unlock();
+ TAddResponse response;
+ for (const auto& dst : dsts) {
+ response.emplace(dst, TDstResponse{EDstStatus::Success, TString()});
+ LogDebug(TStringBuilder() << "Got ticket from disk cache"
+ << ": dst=" << dst.Id << " got ticket");
+ }
+ promise.SetValue(std::move(response));
+ return promise.GetFuture();
+ }
+ const size_t size = dsts.size();
+ const ui64 id = ++TaskIds_;
+ TaskQueue_.Enqueue(TTask{id, promise, std::move(dsts)});
+ LogDebug(TStringBuilder() << "Adding dst: got task #" << id << " with " << size << " dsts");
+ return promise.GetFuture();
+ }
+ std::optional<TString> TTvmClient::GetOptionalServiceTicketFor(const TTvmId dst) {
+ TServiceTicketsPtr tickets = GetCachedServiceTickets();
+ Y_ENSURE_EX(tickets,
+ TBrokenTvmClientSettings()
+ << "Need to enable fetching of service tickets in settings");
+ auto it = tickets->TicketsById.find(dst);
+ if (it != tickets->TicketsById.end()) {
+ return it->second;
+ }
+ it = tickets->ErrorsById.find(dst);
+ if (it != tickets->ErrorsById.end()) {
+ ythrow TMissingServiceTicket()
+ << "Failed to get ticket for '" << dst << "': "
+ << it->second;
+ }
+ return {};
+ }
+ TTvmClient::TTvmClient(const NTvmApi::TClientSettings& settings, TLoggerPtr logger)
+ : TBase(settings, logger)
+ {
+ }
+ TTvmClient::~TTvmClient() {
+ TBase::StopWorker();
+ }
+ void TTvmClient::Worker() {
+ TBase::Worker();
+ ProcessTasks();
+ }
+ void TTvmClient::ProcessTasks() {
+ TaskQueue_.DequeueAll(&Tasks_);
+ if (Tasks_.empty()) {
+ return;
+ }
+ TDsts required;
+ for (const TTask& task : Tasks_) {
+ for (const auto& dst : task.Dsts) {
+ required.insert(dst);
+ }
+ }
+ TServiceTicketsPtr cache = UpdateMissingServiceTickets(required);
+ for (TTask& task : Tasks_) {
+ try {
+ SetResponseForTask(task, *cache);
+ } catch (const std::exception& e) {
+ LogError(TStringBuilder()
+ << "Adding dst: task #" << task.Id << ": exception: " << e.what());
+ } catch (...) {
+ LogError(TStringBuilder()
+ << "Adding dst: task #" << task.Id << ": exception: " << CurrentExceptionMessage());
+ }
+ }
+ Tasks_.clear();
+ }
+ static const TString UNKNOWN = "Unknown reason";
+ void TTvmClient::SetResponseForTask(TTvmClient::TTask& task, const TServiceTickets& cache) {
+ if (task.Promise.HasValue()) {
+ LogWarning(TStringBuilder() << "Adding dst: task #" << task.Id << " already has value");
+ return;
+ }
+ TAddResponse response;
+ for (const auto& dst : task.Dsts) {
+ if (cache.TicketsById.contains(dst.Id)) {
+ response.emplace(dst, TDstResponse{EDstStatus::Success, TString()});
+ LogDebug(TStringBuilder() << "Adding dst: task #" << task.Id
+ << ": dst=" << dst.Id << " got ticket");
+ continue;
+ }
+ auto it = cache.ErrorsById.find(dst.Id);
+ const TString& error = it == cache.ErrorsById.end() ? UNKNOWN : it->second;
+ response.emplace(dst, TDstResponse{EDstStatus::Fail, error});
+ LogWarning(TStringBuilder() << "Adding dst: task #" << task.Id
+ << ": dst=" << dst.Id
+ << " failed to get ticket: " << error);
+ }
+ LogDebug(TStringBuilder() << "Adding dst: task #" << task.Id << ": set value");
+ task.Promise.SetValue(std::move(response));
+ }
diff --git a/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h
new file mode 100644
index 0000000000..67eeb2618a
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h
@@ -0,0 +1,60 @@
+#pragma once
+#include <library/cpp/tvmauth/client/misc/api/threaded_updater.h>
+#include <library/cpp/threading/future/future.h>
+#include <util/generic/map.h>
+#include <util/thread/lfqueue.h>
+#include <optional>
+namespace NTvmAuth::NDynamicClient {
+ enum class EDstStatus {
+ Success,
+ Fail,
+ };
+ struct TDstResponse {
+ EDstStatus Status = EDstStatus::Fail;
+ TString Error;
+ bool operator==(const TDstResponse& o) const {
+ return Status == o.Status && Error == o.Error;
+ }
+ };
+ using TDsts = NTvmApi::TDstSet;
+ using TAddResponse = TMap<NTvmApi::TClientSettings::TDst, TDstResponse>;
+ class TTvmClient: public NTvmApi::TThreadedUpdater {
+ public:
+ static TIntrusivePtr<TTvmClient> Create(const NTvmApi::TClientSettings& settings, TLoggerPtr logger);
+ virtual ~TTvmClient();
+ NThreading::TFuture<TAddResponse> Add(TDsts&& dsts);
+ std::optional<TString> GetOptionalServiceTicketFor(const TTvmId dst);
+ protected: // for tests
+ struct TTask {
+ ui64 Id = 0;
+ NThreading::TPromise<TAddResponse> Promise;
+ TDsts Dsts;
+ };
+ using TBase = NTvmApi::TThreadedUpdater;
+ protected: // for tests
+ TTvmClient(const NTvmApi::TClientSettings& settings, TLoggerPtr logger);
+ void Worker() override;
+ void ProcessTasks();
+ void SetResponseForTask(TTask& task, const TServiceTickets& cache);
+ private:
+ std::atomic<ui64> TaskIds_ = {0};
+ TLockFreeQueue<TTask> TaskQueue_;
+ TVector<TTask> Tasks_;
+ };
diff --git a/library/cpp/tvmauth/client/misc/api/dynamic_dst/ut/tvm_client_ut.cpp b/library/cpp/tvmauth/client/misc/api/dynamic_dst/ut/tvm_client_ut.cpp
new file mode 100644
index 0000000000..c633b2457a
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/dynamic_dst/ut/tvm_client_ut.cpp
@@ -0,0 +1,760 @@
+#include <library/cpp/tvmauth/client/misc/api/dynamic_dst/tvm_client.h>
+#include <library/cpp/tvmauth/client/misc/disk_cache.h>
+#include <library/cpp/tvmauth/unittest.h>
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/stream/file.h>
+#include <util/system/fs.h>
+#include <regex>
+using namespace NTvmAuth;
+using namespace NTvmAuth::NDynamicClient;
+Y_UNIT_TEST_SUITE(DynamicClient) {
+ static const std::regex TIME_REGEX(R"(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d.\d{6}Z)");
+ static const TString CACHE_DIR = "./tmp/";
+ static void WriteFile(TString name, TStringBuf body, TInstant time) {
+ NFs::Remove(CACHE_DIR + name);
+ TFileOutput f(CACHE_DIR + name);
+ f << TDiskWriter::PrepareData(time, body);
+ }
+ static void CleanCache() {
+ NFs::RemoveRecursive(CACHE_DIR);
+ NFs::MakeDirectoryRecursive(CACHE_DIR);
+ }
+ class TLogger: public NTvmAuth::ILogger {
+ public:
+ void Log(int lvl, const TString& msg) override {
+ Cout << TInstant::Now() << " lvl=" << lvl << " msg: " << msg << "\n";
+ Stream << lvl << ": " << msg << Endl;
+ }
+ TStringStream Stream;
+ };
+ class TOfflineUpdater: public NDynamicClient::TTvmClient {
+ public:
+ TOfflineUpdater(const NTvmApi::TClientSettings& settings,
+ TIntrusivePtr<TLogger> l,
+ bool fail = true,
+ std::vector<TString> tickets = {})
+ : TTvmClient(settings, l)
+ , Fail(fail)
+ , Tickets(std::move(tickets))
+ {
+ Init();
+ ExpBackoff_.SetEnabled(false);
+ }
+ NUtils::TFetchResult FetchServiceTicketsFromHttp(const TString& req) const override {
+ if (Fail) {
+ throw yexception() << "tickets: alarm";
+ }
+ TString response;
+ if (!Tickets.empty()) {
+ response = Tickets.front();
+ Tickets.erase(Tickets.begin());
+ }
+ Cout << "*** FetchServiceTicketsFromHttp. request: " << req << ". response: " << response << Endl;
+ return {200, {}, "/2/ticket", response, ""};
+ }
+ NUtils::TFetchResult FetchPublicKeysFromHttp() const override {
+ if (Fail) {
+ throw yexception() << "keysalarm";
+ }
+ Cout << "*** FetchPublicKeysFromHttp" << Endl;
+ return {200, {}, "/2/keys", PublicKeys, ""};
+ }
+ using TTvmClient::GetDsts;
+ using TTvmClient::ProcessTasks;
+ using TTvmClient::SetResponseForTask;
+ using TTvmClient::Worker;
+ bool Fail = true;
+ TString PublicKeys = NUnittest::TVMKNIFE_PUBLIC_KEYS;
+ mutable std::vector<TString> Tickets;
+ };
+ Y_UNIT_TEST(StartWithIncompleteTicketsSet) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+ NTvmApi::TClientSettings s{
+ .DiskCacheDir = CACHE_DIR,
+ .SelfTvmId = 100500,
+ .Secret = (TStringBuf) "qwerty",
+ .FetchServiceTicketsForDstsWithAliases = {{"blackbox", 19}, {"kolmo", 213}},
+ .IsIncompleteTicketsSetAnError = false,
+ };
+ auto l = MakeIntrusive<TLogger>();
+ {
+ TOfflineUpdater client(s,
+ l,
+ false,
+ {
+ R"({"213" : { "error" : "some error"}})",
+ R"({"123" : { "ticket" : "service_ticket_3"}})",
+ });
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ NThreading::TFuture<TAddResponse> fut = client.Add({123});
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+ UNIT_ASSERT(fut.HasValue());
+ TAddResponse resp{
+ {123, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue());
+ UNIT_ASSERT(client.Tickets.empty());
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, *client.GetDsts());
+ UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(213), TMissingServiceTicket, "some error");
+ }
+ }
+ Y_UNIT_TEST(StartWithEmptyTicketsSet) {
+ CleanCache();
+ NTvmApi::TClientSettings s{
+ .DiskCacheDir = CACHE_DIR,
+ .SelfTvmId = 100500,
+ .Secret = (TStringBuf) "qwerty",
+ .FetchServiceTicketsForDstsWithAliases = {{"kolmo", 213}},
+ .IsIncompleteTicketsSetAnError = false,
+ };
+ auto l = MakeIntrusive<TLogger>();
+ {
+ TOfflineUpdater client(s,
+ l,
+ false,
+ {
+ R"({"213" : { "error" : "some error"}})",
+ R"({"123" : { "ticket" : "3:serv:CBAQ__________9_IgYIlJEGEHs:CcafYQH-FF5XaXMuJrgLZj98bIC54cs1ZkcFS9VV_9YM9iOM_0PXCtMkdg85rFjxE_BMpg7bE8ZuoqNfdw0FPt0BAKNeISwlydj4o0IjY82--LZBpP8CRn-EpAnkRaDShdlfrcF2pk1SSmEX8xdyZVQEnkUPY0cHGlFnu231vnE"}})",
+ });
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(213), TMissingServiceTicket, "some error");
+ NThreading::TFuture<TAddResponse> fut = client.Add({123});
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::IncompleteTicketsSet, client.GetStatus());
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+ UNIT_ASSERT(fut.HasValue());
+ TAddResponse resp{
+ {123, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue());
+ UNIT_ASSERT(client.Tickets.empty());
+ TDsts dsts{123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, *client.GetDsts());
+ UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(213), TMissingServiceTicket, "some error");
+ }
+ };
+ Y_UNIT_TEST(StartWithIncompleteCacheAndAdd) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+ NTvmApi::TClientSettings s{
+ .DiskCacheDir = CACHE_DIR,
+ .SelfTvmId = 100500,
+ .Secret = (TStringBuf) "qwerty",
+ .FetchServiceTicketsForDstsWithAliases = {{"blackbox", 19}, {"kolmo", 213}},
+ };
+ auto l = MakeIntrusive<TLogger>();
+ TRetriableException,
+ "Failed to start TvmClient. You can retry: ServiceTickets: tickets: alarm");
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): XXXXXXXXXXX\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "4: Failed to get ServiceTickets: tickets: alarm\n"
+ << "4: Failed to get ServiceTickets: tickets: alarm\n"
+ << "4: Failed to get ServiceTickets: tickets: alarm\n"
+ << "4: Failed to update service tickets: tickets: alarm\n",
+ std::regex_replace(std::string(l->Stream.Str()), TIME_REGEX, "XXXXXXXXXXX"));
+ l->Stream.Str().clear();
+ {
+ TOfflineUpdater client(s,
+ l,
+ false,
+ {
+ R"({"213" : { "ticket" : "service_ticket_2"}})",
+ R"({"123" : { "ticket" : "service_ticket_3"}})",
+ });
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ NThreading::TFuture<TAddResponse> fut = client.Add({123});
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+ UNIT_ASSERT(fut.HasValue());
+ TAddResponse resp{
+ {123, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue());
+ UNIT_ASSERT(client.Tickets.empty());
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, *client.GetDsts());
+ }
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "7: Response with service tickets for 1 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 1 destination(s)\n"
+ << "6: Cache was partly updated with 1 service ticket(s). total: 2\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: got task #1 with 1 dsts\n"
+ << "7: Response with service tickets for 1 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 1 destination(s)\n"
+ << "6: Cache was partly updated with 1 service ticket(s). total: 3\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: task #1: dst=123 got ticket\n"
+ << "7: Adding dst: task #1: set value\n",
+ l->Stream.Str());
+ }
+ Y_UNIT_TEST(StartWithCacheAndAdd) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+ NTvmApi::TClientSettings s{
+ .DiskCacheDir = CACHE_DIR,
+ .SelfTvmId = 100500,
+ .Secret = (TStringBuf) "qwerty",
+ .FetchServiceTicketsForDstsWithAliases = {{"blackbox", 19}},
+ .IsIncompleteTicketsSetAnError = false,
+ };
+ auto l = MakeIntrusive<TLogger>();
+ {
+ TOfflineUpdater client(s, l);
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ client.Fail = false;
+ client.Tickets = {
+ R"({"123" : { "ticket" : "service_ticket_3"}, "213" : { "ticket" : "service_ticket_2"}})",
+ };
+ NThreading::TFuture<TAddResponse> fut = client.Add({123, 213});
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+ UNIT_ASSERT(fut.HasValue());
+ TAddResponse resp{
+ {123, {EDstStatus::Success, ""}},
+ {213, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue());
+ UNIT_ASSERT(client.Tickets.empty());
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, *client.GetDsts());
+ }
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "7: Adding dst: got task #1 with 2 dsts\n"
+ << "7: Response with service tickets for 2 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 2 destination(s)\n"
+ << "6: Cache was partly updated with 2 service ticket(s). total: 3\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: task #1: dst=123 got ticket\n"
+ << "7: Adding dst: task #1: dst=213 got ticket\n"
+ << "7: Adding dst: task #1: set value\n",
+ l->Stream.Str());
+ }
+ Y_UNIT_TEST(StartWithCacheAndAddSeveral) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+ NTvmApi::TClientSettings s{
+ .DiskCacheDir = CACHE_DIR,
+ .SelfTvmId = 100500,
+ .Secret = (TStringBuf) "qwerty",
+ .FetchServiceTicketsForDstsWithAliases = {{"blackbox", 19}},
+ };
+ auto l = MakeIntrusive<TLogger>();
+ {
+ TOfflineUpdater client(s, l);
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ client.Fail = false;
+ client.Tickets = {
+ R"({"123" : { "ticket" : "service_ticket_3"}, "213" : { "ticket" : "service_ticket_2"}})",
+ };
+ NThreading::TFuture<TAddResponse> fut1 = client.Add({123});
+ NThreading::TFuture<TAddResponse> fut2 = client.Add({213});
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+ UNIT_ASSERT(fut1.HasValue());
+ TAddResponse resp1{
+ {123, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp1, fut1.GetValue());
+ UNIT_ASSERT(fut2.HasValue());
+ TAddResponse resp2{
+ {213, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp2, fut2.GetValue());
+ UNIT_ASSERT(client.Tickets.empty());
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, *client.GetDsts());
+ }
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "7: Adding dst: got task #1 with 1 dsts\n"
+ << "7: Adding dst: got task #2 with 1 dsts\n"
+ << "7: Response with service tickets for 2 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 2 destination(s)\n"
+ << "6: Cache was partly updated with 2 service ticket(s). total: 3\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: task #1: dst=123 got ticket\n"
+ << "7: Adding dst: task #1: set value\n"
+ << "7: Adding dst: task #2: dst=213 got ticket\n"
+ << "7: Adding dst: task #2: set value\n",
+ l->Stream.Str());
+ }
+ Y_UNIT_TEST(StartWithCacheAndAddSeveralWithErrors) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+ NTvmApi::TClientSettings s{
+ .DiskCacheDir = CACHE_DIR,
+ .SelfTvmId = 100500,
+ .Secret = (TStringBuf) "qwerty",
+ .FetchServiceTicketsForDstsWithAliases = {{"blackbox", 19}},
+ };
+ auto l = MakeIntrusive<TLogger>();
+ {
+ TOfflineUpdater client(s, l);
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(client.GetOptionalServiceTicketFor(19));
+ *client.GetOptionalServiceTicketFor(19));
+ UNIT_ASSERT(!client.GetOptionalServiceTicketFor(456));
+ client.Fail = false;
+ client.Tickets = {
+ R"({
+ "123" : { "ticket" : "service_ticket_3"},
+ "213" : { "ticket" : "service_ticket_2"},
+ "456" : { "error" : "error_3"}
+ })",
+ };
+ NThreading::TFuture<TAddResponse> fut1 = client.Add({123, 213});
+ NThreading::TFuture<TAddResponse> fut2 = client.Add({213, 456});
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(456));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(456));
+ UNIT_ASSERT(client.GetOptionalServiceTicketFor(19));
+ *client.GetOptionalServiceTicketFor(19));
+ UNIT_ASSERT_EXCEPTION_CONTAINS(client.GetOptionalServiceTicketFor(456),
+ TMissingServiceTicket,
+ "Failed to get ticket for '456': error_3");
+ UNIT_ASSERT(fut1.HasValue());
+ TAddResponse resp1{
+ {123, {EDstStatus::Success, ""}},
+ {213, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp1, fut1.GetValue());
+ UNIT_ASSERT(fut2.HasValue());
+ TAddResponse resp2{
+ {213, {EDstStatus::Success, ""}},
+ {456, {EDstStatus::Fail, "error_3"}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp2, fut2.GetValue());
+ UNIT_ASSERT(client.Tickets.empty());
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, *client.GetDsts());
+ }
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "7: Adding dst: got task #1 with 2 dsts\n"
+ << "7: Adding dst: got task #2 with 2 dsts\n"
+ << "7: Response with service tickets for 3 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 3 destination(s)\n"
+ << "3: Failed to get service ticket for dst=456: error_3\n"
+ << "6: Cache was partly updated with 2 service ticket(s). total: 3\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: task #1: dst=123 got ticket\n"
+ << "7: Adding dst: task #1: dst=213 got ticket\n"
+ << "7: Adding dst: task #1: set value\n"
+ << "7: Adding dst: task #2: dst=213 got ticket\n"
+ << "4: Adding dst: task #2: dst=456 failed to get ticket: error_3\n"
+ << "7: Adding dst: task #2: set value\n",
+ l->Stream.Str());
+ }
+ Y_UNIT_TEST(WithException) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+ NTvmApi::TClientSettings s{
+ .DiskCacheDir = CACHE_DIR,
+ .SelfTvmId = 100500,
+ .Secret = (TStringBuf) "qwerty",
+ .FetchServiceTicketsForDstsWithAliases = {{"blackbox", 19}},
+ };
+ auto l = MakeIntrusive<TLogger>();
+ {
+ TOfflineUpdater client(s, l);
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ client.Fail = false;
+ client.Tickets = {
+ R"({
+ "123" : { "ticket" : "service_ticket_3"},
+ "213" : { "ticket" : "service_ticket_2"},
+ "456" : { "error" : "error_3"},
+ "789" : { "ticket" : "service_ticket_4"}
+ })",
+ };
+ NThreading::TFuture<TAddResponse> fut1 = client.Add({123, 213});
+ NThreading::TFuture<TAddResponse> fut2 = client.Add({213, 456});
+ NThreading::TFuture<TAddResponse> fut3 = client.Add({789});
+ fut2.Subscribe([](const auto&) {
+ throw yexception() << "planed exc";
+ });
+ fut3.Subscribe([](const auto&) {
+ throw 5;
+ });
+ UNIT_ASSERT_NO_EXCEPTION(client.Worker());
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(456));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->ErrorsById.contains(456));
+ UNIT_ASSERT(fut1.HasValue());
+ TAddResponse resp1{
+ {123, {EDstStatus::Success, ""}},
+ {213, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp1, fut1.GetValue());
+ UNIT_ASSERT(fut2.HasValue());
+ TAddResponse resp2{
+ {213, {EDstStatus::Success, ""}},
+ {456, {EDstStatus::Fail, "error_3"}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp2, fut2.GetValue());
+ UNIT_ASSERT(fut3.HasValue());
+ TAddResponse resp3{
+ {789, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp3, fut3.GetValue());
+ UNIT_ASSERT(client.Tickets.empty());
+ TDsts dsts{19, 123, 213, 789};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, *client.GetDsts());
+ }
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "7: Adding dst: got task #1 with 2 dsts\n"
+ << "7: Adding dst: got task #2 with 2 dsts\n"
+ << "7: Adding dst: got task #3 with 1 dsts\n"
+ << "7: Response with service tickets for 4 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 4 destination(s)\n"
+ << "3: Failed to get service ticket for dst=456: error_3\n"
+ << "6: Cache was partly updated with 3 service ticket(s). total: 4\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: task #1: dst=123 got ticket\n"
+ << "7: Adding dst: task #1: dst=213 got ticket\n"
+ << "7: Adding dst: task #1: set value\n"
+ << "7: Adding dst: task #2: dst=213 got ticket\n"
+ << "4: Adding dst: task #2: dst=456 failed to get ticket: error_3\n"
+ << "7: Adding dst: task #2: set value\n"
+ << "3: Adding dst: task #2: exception: planed exc\n"
+ << "7: Adding dst: task #3: dst=789 got ticket\n"
+ << "7: Adding dst: task #3: set value\n"
+ << "3: Adding dst: task #3: exception: unknown error\n",
+ l->Stream.Str());
+ }
+ Y_UNIT_TEST(UseCacheOnAddAfterRestart) {
+ TInstant now = TInstant::Now();
+ CleanCache();
+ WriteFile("./service_tickets",
+ R"({"19" : { "ticket" : "3:serv:CBAQ__________9_IgYIKhCUkQY:CX"}})"
+ "\t100500",
+ now);
+ NTvmApi::TClientSettings s{
+ .DiskCacheDir = CACHE_DIR,
+ .SelfTvmId = 100500,
+ .Secret = (TStringBuf) "qwerty",
+ .FetchServiceTicketsForDstsWithAliases = {{"blackbox", 19}},
+ .IsIncompleteTicketsSetAnError = false,
+ };
+ auto l = MakeIntrusive<TLogger>();
+ {
+ TOfflineUpdater client(s, l);
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ client.Fail = false;
+ client.Tickets = {
+ R"({"123" : { "ticket" : "3:serv:CBAQ__________9_IgQIExB7:V9zPxvEi-VYK2FffYkkT0lF_Ol2XXGzZDH-dVpx0Vo962KDh6n3AtPiCGpYZsLFZkJ5BcujVOUVfONprbm9WqZaPk6qDRuBd-OfYjGRDNUGAP_v5a1bZEBZhaRV8pvceh28E5QsgpOz1SEN5viKgPt1Tz9EmN8abQZiCPj2mqY8"},
+ "213" : { "ticket" : "3:serv:CBAQ__________9_IgUIExDVAQ:Ten8XGEC1rupV9GsKvEG9IbN5DazwS3rmpEuFbgk34RaNeLO9g1mAvxKhL76hQnrgi_KjXv_xEtyQ4awBTXeQuhSlC0wkgargsc96AShWBFfHuTM-ftDtn5VdF9d8k1TEsTdJfZsW3_E_wy_3FQhs_rnoIowys6_SAh8YLpqaQk"}})",
+ };
+ NThreading::TFuture<TAddResponse> fut = client.Add({123, 213});
+ client.Worker();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+ UNIT_ASSERT(fut.HasValue());
+ TAddResponse resp{
+ {123, {EDstStatus::Success, ""}},
+ {213, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue());
+ UNIT_ASSERT(client.Tickets.empty());
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, *client.GetDsts());
+ }
+ {
+ TOfflineUpdater client(s, l);
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->TicketsById.contains(213));
+ NThreading::TFuture<TAddResponse> fut = client.Add({123, 213});
+ fut.Wait();
+ UNIT_ASSERT_VALUES_EQUAL(TClientStatus::Ok, client.GetStatus());
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(19));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(213));
+ UNIT_ASSERT(client.GetCachedServiceTickets()->TicketsById.contains(123));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(19));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(213));
+ UNIT_ASSERT(!client.GetCachedServiceTickets()->ErrorsById.contains(123));
+ UNIT_ASSERT(fut.HasValue());
+ TAddResponse resp{
+ {123, {EDstStatus::Success, ""}},
+ {213, {EDstStatus::Success, ""}},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(resp, fut.GetValue());
+ UNIT_ASSERT(client.Tickets.empty());
+ TDsts dsts{19, 123, 213};
+ UNIT_ASSERT_VALUES_EQUAL(dsts, *client.GetDsts());
+ }
+ TStringBuilder()
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 1 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "7: Adding dst: got task #1 with 2 dsts\n"
+ << "7: Response with service tickets for 2 destination(s) was successfully fetched from https://tvm-api.yandex.net\n"
+ << "7: Got responses with service tickets with 1 pages for 2 destination(s)\n"
+ << "6: Cache was partly updated with 2 service ticket(s). total: 3\n"
+ << "6: File './tmp/service_tickets' was successfully written\n"
+ << "7: Adding dst: task #1: dst=123 got ticket\n"
+ << "7: Adding dst: task #1: dst=213 got ticket\n"
+ << "7: Adding dst: task #1: set value\n"
+ << "6: File './tmp/service_tickets' was successfully read\n"
+ << "6: Got 3 service ticket(s) from disk\n"
+ << "6: Cache was updated with 1 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: File './tmp/retry_settings' does not exist\n"
+ << "6: Cache was updated with 3 service ticket(s): " << TInstant::Seconds(now.Seconds()) << "\n"
+ << "7: Got ticket from disk cache: dst=123 got ticket\n"
+ << "7: Got ticket from disk cache: dst=213 got ticket\n",
+ l->Stream.Str());
+ }
+template <>
+void Out<NTvmAuth::NDynamicClient::TDstResponse>(IOutputStream& out, const NTvmAuth::NDynamicClient::TDstResponse& m) {
+ out << m.Status << " (" << m.Error << ")";
+template <>
+void Out<NTvmAuth::NTvmApi::TClientSettings::TDst>(IOutputStream& out, const NTvmAuth::NTvmApi::TClientSettings::TDst& m) {
+ out << m.Id;
diff --git a/library/cpp/tvmauth/client/misc/api/retry_settings.h b/library/cpp/tvmauth/client/misc/api/retry_settings.h
new file mode 100644
index 0000000000..607b230811
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/retry_settings.h
@@ -0,0 +1,33 @@
+#pragma once
+#include <library/cpp/tvmauth/client/misc/exponential_backoff.h>
+namespace NTvmAuth::NTvmApi {
+ struct TRetrySettings {
+ TExponentialBackoff::TSettings BackoffSettings = {
+ TDuration::Seconds(0),
+ TDuration::Minutes(1),
+ 2,
+ 0.5,
+ };
+ TDuration MaxRandomSleepDefault = TDuration::Seconds(5);
+ TDuration MaxRandomSleepWhenOk = TDuration::Minutes(1);
+ ui32 RetriesOnStart = 3;
+ ui32 RetriesInBackground = 2;
+ TDuration WorkerAwakingPeriod = TDuration::Seconds(10);
+ ui32 DstsLimit = 300;
+ TDuration RolesUpdatePeriod = TDuration::Minutes(10);
+ TDuration RolesWarnPeriod = TDuration::Minutes(20);
+ bool operator==(const TRetrySettings& o) const {
+ return BackoffSettings == o.BackoffSettings &&
+ MaxRandomSleepDefault == o.MaxRandomSleepDefault &&
+ MaxRandomSleepWhenOk == o.MaxRandomSleepWhenOk &&
+ RetriesOnStart == o.RetriesOnStart &&
+ WorkerAwakingPeriod == o.WorkerAwakingPeriod &&
+ DstsLimit == o.DstsLimit &&
+ RolesUpdatePeriod == o.RolesUpdatePeriod &&
+ RolesWarnPeriod == o.RolesWarnPeriod;
+ }
+ };
diff --git a/library/cpp/tvmauth/client/misc/api/roles_fetcher.cpp b/library/cpp/tvmauth/client/misc/api/roles_fetcher.cpp
new file mode 100644
index 0000000000..8f4b359e8c
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/roles_fetcher.cpp
@@ -0,0 +1,164 @@
+#include "roles_fetcher.h"
+#include <library/cpp/tvmauth/client/misc/disk_cache.h>
+#include <library/cpp/tvmauth/client/misc/roles/decoder.h>
+#include <library/cpp/tvmauth/client/misc/roles/parser.h>
+#include <library/cpp/http/misc/httpcodes.h>
+#include <library/cpp/string_utils/quote/quote.h>
+#include <util/string/builder.h>
+#include <util/string/join.h>
+namespace NTvmAuth::NTvmApi {
+ static TString CreatePath(const TString& dir, const TString& file) {
+ return dir.EndsWith("/")
+ ? dir + file
+ : dir + "/" + file;
+ }
+ TRolesFetcher::TRolesFetcher(const TRolesFetcherSettings& settings, TLoggerPtr logger)
+ : Settings_(settings)
+ , Logger_(logger)
+ , CacheFilePath_(CreatePath(Settings_.CacheDir, "roles"))
+ {
+ Client_ = std::make_unique<TKeepAliveHttpClient>(
+ Settings_.TiroleHost,
+ Settings_.TirolePort,
+ Settings_.Timeout,
+ Settings_.Timeout);
+ }
+ TInstant TRolesFetcher::ReadFromDisk() {
+ TDiskReader dr(CacheFilePath_, Logger_.Get());
+ if (!dr.Read()) {
+ return {};
+ }
+ std::pair<TString, TString> data = ParseDiskFormat(dr.Data());
+ if (data.second != Settings_.IdmSystemSlug) {
+ Logger_->Warning(
+ TStringBuilder() << "Roles in disk cache are for another slug (" << data.second
+ << "). Self=" << Settings_.IdmSystemSlug);
+ return {};
+ }
+ CurrentRoles_.Set(NRoles::TParser::Parse(std::make_shared<TString>(std::move(data.first))));
+ Logger_->Debug(
+ TStringBuilder() << "Succeed to read roles with revision "
+ << CurrentRoles_.Get()->GetMeta().Revision
+ << " from " << CacheFilePath_);
+ return dr.Time();
+ }
+ bool TRolesFetcher::AreRolesOk() const {
+ return bool(GetCurrentRoles());
+ }
+ bool TRolesFetcher::IsTimeToUpdate(const TRetrySettings& settings, TDuration sinceUpdate) {
+ return settings.RolesUpdatePeriod < sinceUpdate;
+ }
+ bool TRolesFetcher::ShouldWarn(const TRetrySettings& settings, TDuration sinceUpdate) {
+ return settings.RolesWarnPeriod < sinceUpdate;
+ }
+ NUtils::TFetchResult TRolesFetcher::FetchActualRoles(const TString& serviceTicket) {
+ TStringStream out;
+ THttpHeaders outHeaders;
+ TRequest req = CreateTiroleRequest(serviceTicket);
+ TKeepAliveHttpClient::THttpCode code = Client_->DoGet(
+ req.Url,
+ &out,
+ req.Headers,
+ &outHeaders);
+ const THttpInputHeader* reqId = outHeaders.FindHeader("X-Request-Id");
+ Logger_->Debug(
+ TStringBuilder() << "Succeed to perform request for roles to " << Settings_.TiroleHost
+ << " (request_id=" << (reqId ? reqId->Value() : "")
+ << "). code=" << code);
+ return {code, std::move(outHeaders), "/v1/get_actual_roles", out.Str(), {}};
+ }
+ void TRolesFetcher::Update(NUtils::TFetchResult&& fetchResult, TInstant now) {
+ if (fetchResult.Code == HTTP_NOT_MODIFIED) {
+ Y_ENSURE(CurrentRoles_.Get(),
+ "tirole did not return any roles because current roles are actual,"
+ " but there are no roles in memory - this should never happen");
+ return;
+ }
+ Y_ENSURE(fetchResult.Code == HTTP_OK,
+ "Unexpected code from tirole: " << fetchResult.Code << ". " << fetchResult.Response);
+ const THttpInputHeader* codec = fetchResult.Headers.FindHeader("X-Tirole-Compression");
+ const TStringBuf codecBuf = codec ? codec->Value() : "";
+ NRoles::TRawPtr blob;
+ try {
+ blob = std::make_shared<TString>(NRoles::TDecoder::Decode(
+ codecBuf,
+ std::move(fetchResult.Response)));
+ } catch (const std::exception& e) {
+ throw yexception() << "Failed to decode blob with codec '" << codecBuf
+ << "': " << e.what();
+ }
+ CurrentRoles_.Set(NRoles::TParser::Parse(blob));
+ Logger_->Debug(
+ TStringBuilder() << "Succeed to update roles with revision "
+ << CurrentRoles_.Get()->GetMeta().Revision);
+ TDiskWriter dw(CacheFilePath_, Logger_.Get());
+ dw.Write(PrepareDiskFormat(*blob, Settings_.IdmSystemSlug), now);
+ }
+ NTvmAuth::NRoles::TRolesPtr TRolesFetcher::GetCurrentRoles() const {
+ return CurrentRoles_.Get();
+ }
+ void TRolesFetcher::ResetConnection() {
+ Client_->ResetConnection();
+ }
+ static const char DELIMETER = '\t';
+ std::pair<TString, TString> TRolesFetcher::ParseDiskFormat(TStringBuf filebody) {
+ TStringBuf slug = filebody.RNextTok(DELIMETER);
+ return {TString(filebody), CGIUnescapeRet(slug)};
+ }
+ TString TRolesFetcher::PrepareDiskFormat(TStringBuf roles, TStringBuf slug) {
+ TStringStream res;
+ res.Reserve(roles.size() + 1 + slug.size());
+ res << roles << DELIMETER << CGIEscapeRet(slug);
+ return res.Str();
+ }
+ TRolesFetcher::TRequest TRolesFetcher::CreateTiroleRequest(const TString& serviceTicket) const {
+ TRolesFetcher::TRequest res;
+ TStringStream url;
+ url.Reserve(512);
+ url << "/v1/get_actual_roles?";
+ url << "system_slug=" << CGIEscapeRet(Settings_.IdmSystemSlug) << "&";
+ Settings_.ProcInfo.AddToRequest(url);
+ res.Url = std::move(url.Str());
+ res.Headers.reserve(2);
+ res.Headers.emplace(XYaServiceTicket_, serviceTicket);
+ NRoles::TRolesPtr roles = CurrentRoles_.Get();
+ if (roles) {
+ res.Headers.emplace(IfNoneMatch_, Join("", "\"", roles->GetMeta().Revision, "\""));
+ }
+ return res;
+ }
diff --git a/library/cpp/tvmauth/client/misc/api/roles_fetcher.h b/library/cpp/tvmauth/client/misc/api/roles_fetcher.h
new file mode 100644
index 0000000000..63691223b5
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/roles_fetcher.h
@@ -0,0 +1,63 @@
+#pragma once
+#include "retry_settings.h"
+#include <library/cpp/tvmauth/client/misc/fetch_result.h>
+#include <library/cpp/tvmauth/client/misc/proc_info.h>
+#include <library/cpp/tvmauth/client/misc/utils.h>
+#include <library/cpp/tvmauth/client/misc/roles/roles.h>
+#include <library/cpp/tvmauth/client/logger.h>
+#include <library/cpp/http/simple/http_client.h>
+namespace NTvmAuth::NTvmApi {
+ struct TRolesFetcherSettings {
+ TString TiroleHost;
+ ui16 TirolePort = 0;
+ TString CacheDir;
+ NUtils::TProcInfo ProcInfo;
+ TTvmId SelfTvmId = 0;
+ TString IdmSystemSlug;
+ TDuration Timeout = TDuration::Seconds(30);
+ };
+ class TRolesFetcher {
+ public:
+ TRolesFetcher(const TRolesFetcherSettings& settings, TLoggerPtr logger);
+ TInstant ReadFromDisk();
+ bool AreRolesOk() const;
+ static bool IsTimeToUpdate(const TRetrySettings& settings, TDuration sinceUpdate);
+ static bool ShouldWarn(const TRetrySettings& settings, TDuration sinceUpdate);
+ NUtils::TFetchResult FetchActualRoles(const TString& serviceTicket);
+ void Update(NUtils::TFetchResult&& fetchResult, TInstant now = TInstant::Now());
+ NTvmAuth::NRoles::TRolesPtr GetCurrentRoles() const;
+ void ResetConnection();
+ public:
+ static std::pair<TString, TString> ParseDiskFormat(TStringBuf filebody);
+ static TString PrepareDiskFormat(TStringBuf roles, TStringBuf slug);
+ struct TRequest {
+ TString Url;
+ TKeepAliveHttpClient::THeaders Headers;
+ };
+ TRequest CreateTiroleRequest(const TString& serviceTicket) const;
+ private:
+ const TRolesFetcherSettings Settings_;
+ const TLoggerPtr Logger_;
+ const TString CacheFilePath_;
+ const TString XYaServiceTicket_ = "X-Ya-Service-Ticket";
+ const TString IfNoneMatch_ = "If-None-Match";
+ NUtils::TProtectedValue<NTvmAuth::NRoles::TRolesPtr> CurrentRoles_;
+ std::unique_ptr<TKeepAliveHttpClient> Client_;
+ };
diff --git a/library/cpp/tvmauth/client/misc/api/settings.cpp b/library/cpp/tvmauth/client/misc/api/settings.cpp
new file mode 100644
index 0000000000..eddad21869
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/settings.cpp
@@ -0,0 +1,89 @@
+#include "settings.h"
+#include <util/datetime/base.h>
+#include <util/stream/file.h>
+#include <util/system/fs.h>
+#include <set>
+namespace NTvmAuth::NTvmApi {
+ void TClientSettings::CheckPermissions(const TString& dir) {
+ const TString name = dir + "/check.tmp";
+ try {
+ NFs::EnsureExists(dir);
+ TFile file(name, CreateAlways | RdWr);
+ NFs::Remove(name);
+ } catch (const std::exception& e) {
+ NFs::Remove(name);
+ ythrow TPermissionDenied() << "Permission denied to disk cache directory: " << e.what();
+ }
+ }
+ void TClientSettings::CheckValid() const {
+ if (DiskCacheDir) {
+ CheckPermissions(DiskCacheDir);
+ }
+ if (TStringBuf(Secret)) {
+ Y_ENSURE_EX(NeedServiceTicketsFetching(),
+ TBrokenTvmClientSettings() << "Secret is present but destinations list is empty. It makes no sense");
+ }
+ if (NeedServiceTicketsFetching()) {
+ Y_ENSURE_EX(SelfTvmId != 0,
+ TBrokenTvmClientSettings() << "SelfTvmId cannot be 0 if fetching of Service Tickets required");
+ Y_ENSURE_EX((TStringBuf)Secret,
+ TBrokenTvmClientSettings() << "Secret is required for fetching of Service Tickets");
+ }
+ if (CheckServiceTickets) {
+ Y_ENSURE_EX(SelfTvmId != 0,
+ TBrokenTvmClientSettings() << "SelfTvmId cannot be 0 if checking of Service Tickets required");
+ }
+ if (FetchRolesForIdmSystemSlug) {
+ Y_ENSURE_EX(DiskCacheDir,
+ TBrokenTvmClientSettings() << "Disk cache must be enabled to use roles: "
+ "they can be heavy");
+ }
+ bool needSmth = NeedServiceTicketsFetching() ||
+ CheckServiceTickets ||
+ CheckUserTicketsWithBbEnv;
+ Y_ENSURE_EX(needSmth, TBrokenTvmClientSettings() << "Invalid settings: nothing to do");
+ // Useless now: keep it here to avoid forgetting check from TDst. TODO: PASSP-35377
+ for (const auto& dst : FetchServiceTicketsForDsts) {
+ Y_ENSURE_EX(dst.Id != 0, TBrokenTvmClientSettings() << "TvmId cannot be 0");
+ }
+ // TODO: check only FetchServiceTicketsForDsts_
+ // Python binding checks settings before normalization
+ for (const auto& [alias, dst] : FetchServiceTicketsForDstsWithAliases) {
+ Y_ENSURE_EX(dst.Id != 0, TBrokenTvmClientSettings() << "TvmId cannot be 0");
+ }
+ Y_ENSURE_EX(TiroleTvmId != 0, TBrokenTvmClientSettings() << "TiroleTvmId cannot be 0");
+ }
+ TClientSettings TClientSettings::CloneNormalized() const {
+ TClientSettings res = *this;
+ std::set<TTvmId> allDsts;
+ for (const auto& tvmid : res.FetchServiceTicketsForDsts) {
+ allDsts.insert(tvmid.Id);
+ }
+ for (const auto& [alias, tvmid] : res.FetchServiceTicketsForDstsWithAliases) {
+ allDsts.insert(tvmid.Id);
+ }
+ if (FetchRolesForIdmSystemSlug) {
+ allDsts.insert(res.TiroleTvmId);
+ }
+ res.FetchServiceTicketsForDsts = {allDsts.begin(), allDsts.end()};
+ res.CheckValid();
+ return res;
+ }
diff --git a/library/cpp/tvmauth/client/misc/api/settings.h b/library/cpp/tvmauth/client/misc/api/settings.h
new file mode 100644
index 0000000000..4f901dda57
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/settings.h
@@ -0,0 +1,236 @@
+#pragma once
+#include <library/cpp/tvmauth/client/misc/settings.h>
+#include <library/cpp/tvmauth/client/exception.h>
+#include <library/cpp/tvmauth/checked_user_ticket.h>
+#include <library/cpp/tvmauth/type.h>
+#include <library/cpp/string_utils/secret_string/secret_string.h>
+#include <util/datetime/base.h>
+#include <util/generic/hash.h>
+#include <util/generic/maybe.h>
+namespace NTvmAuth::NTvmApi {
+ /**
+ * Settings for TVM client. Uses https://tvm-api.yandex.net to get state.
+ * At least one of them is required:
+ * FetchServiceTicketsForDsts_/FetchServiceTicketsForDstsWithAliases_
+ * CheckServiceTickets_
+ * CheckUserTicketsWithBbEnv_
+ */
+ class TClientSettings: public NTvmAuth::TClientSettings {
+ public:
+ class TDst;
+ /**
+ * Alias is an internal name for destinations within your code.
+ * You can associate a name with an tvm_id once in your code and use the name as an alias for
+ * tvm_id to each calling point. Useful for several environments: prod/test/etc.
+ * @example:
+ * // init
+ * static const TString MY_BACKEND = "my backend";
+ * TDstMap map = {{MY_BACKEND, TDst(config.get("my_back_tvm_id"))}};
+ * ...
+ * // per request
+ * TString t = tvmClient.GetServiceTicket(MY_BACKEND);
+ */
+ using TDstMap = THashMap<TAlias, TDst>;
+ using TDstVector = TVector<TDst>;
+ public:
+ /*!
+ * NOTE: Please use this option: it provides the best reliability
+ * NOTE: Client requires read/write permissions
+ * WARNING: The same directory can be used only:
+ * - for TVM clients with the same settings
+ * OR
+ * - for new client replacing previous - with another config.
+ * System user must be the same for processes with these clients inside.
+ * Implementation doesn't provide other scenarios.
+ */
+ TString DiskCacheDir;
+ // Required for Service Ticket fetching or checking
+ TTvmId SelfTvmId = 0;
+ // Options for Service Tickets fetching
+ NSecretString::TSecretString Secret;
+ /*!
+ * Client will process both attrs:
+ * FetchServiceTicketsForDsts_, FetchServiceTicketsForDstsWithAliases_
+ * WARNING: It is not way to provide authorization for incoming ServiceTickets!
+ * It is way only to send your ServiceTickets to your backend!
+ */
+ TDstVector FetchServiceTicketsForDsts;
+ TDstMap FetchServiceTicketsForDstsWithAliases;
+ bool IsIncompleteTicketsSetAnError = true;
+ // Options for Service Tickets checking
+ bool CheckServiceTickets = false;
+ // Options for User Tickets checking
+ TMaybe<EBlackboxEnv> CheckUserTicketsWithBbEnv;
+ // Options for roles fetching
+ TString FetchRolesForIdmSystemSlug;
+ /*!
+ * By default client checks src from ServiceTicket or default uid from UserTicket -
+ * to prevent you from forgetting to check it yourself.
+ * It does binary checks only:
+ * ticket gets status NoRoles, if there is no role for src or default uid.
+ * You need to check roles on your own if you have a non-binary role system or
+ * you have disabled ShouldCheckSrc/ShouldCheckDefaultUid
+ *
+ * You may need to disable this check in the following cases:
+ * - You use GetRoles() to provide verbose message (with revision).
+ * Double check may be inconsistent:
+ * binary check inside client uses revision of roles X - i.e. src 100500 has no role,
+ * exact check in your code uses revision of roles Y - i.e. src 100500 has some roles.
+ */
+ bool ShouldCheckSrc = true;
+ bool ShouldCheckDefaultUid = true;
+ /*!
+ * By default client checks dst from ServiceTicket. If this check is switched off
+ * incorrect dst does not result in error of checked ticket status
+ * DANGEROUS: This case you must check dst manualy using @link TCheckedServiceTicket::GetDst()
+ */
+ bool ShouldCheckDst = true;
+ // Options for tests
+ TString TvmHost = "https://tvm-api.yandex.net";
+ ui16 TvmPort = 443;
+ TString TiroleHost = "https://tirole-api.yandex.net";
+ TDuration TvmSocketTimeout = TDuration::Seconds(5);
+ TDuration TvmConnectTimeout = TDuration::Seconds(30);
+ ui16 TirolePort = 443;
+ TTvmId TiroleTvmId = TIROLE_TVMID;
+ // for debug purposes
+ TString LibVersionPrefix;
+ void CheckValid() const;
+ TClientSettings CloneNormalized() const;
+ static inline const TTvmId TIROLE_TVMID = 2028120;
+ static inline const TTvmId TIROLE_TVMID_TEST = 2026536;
+ // TODO: get rid of it: PASSP-35377
+ public:
+ // Deprecated: set attributes directly
+ void SetSelfTvmId(TTvmId selfTvmId) {
+ SelfTvmId = selfTvmId;
+ }
+ // Deprecated: set attributes directly
+ void EnableServiceTicketChecking() {
+ CheckServiceTickets = true;
+ }
+ // Deprecated: set attributes directly
+ void EnableUserTicketChecking(EBlackboxEnv env) {
+ CheckUserTicketsWithBbEnv = env;
+ }
+ // Deprecated: set attributes directly
+ void SetTvmHostPort(const TString& host, ui16 port) {
+ TvmHost = host;
+ TvmPort = port;
+ }
+ // Deprecated: set attributes directly
+ void SetTiroleHostPort(const TString& host, ui16 port) {
+ TiroleHost = host;
+ TirolePort = port;
+ }
+ // Deprecated: set attributes directly
+ void EnableRolesFetching(const TString& systemSlug, TTvmId tiroleTvmId = TIROLE_TVMID) {
+ TiroleTvmId = tiroleTvmId;
+ FetchRolesForIdmSystemSlug = systemSlug;
+ }
+ // Deprecated: set attributes directly
+ void DoNotCheckSrcByDefault() {
+ ShouldCheckSrc = false;
+ }
+ // Deprecated: set attributes directly
+ void DoNotCheckDefaultUidByDefault() {
+ ShouldCheckDefaultUid = false;
+ }
+ // Deprecated: set attributes directly
+ void SetDiskCacheDir(const TString& dir) {
+ DiskCacheDir = dir;
+ }
+ // Deprecated: set attributes directly
+ void EnableServiceTicketsFetchOptions(const TStringBuf selfSecret,
+ TDstMap&& dsts,
+ const bool considerIncompleteTicketsSetAsError = true) {
+ IsIncompleteTicketsSetAnError = considerIncompleteTicketsSetAsError;
+ Secret = selfSecret;
+ FetchServiceTicketsForDsts = TDstVector{};
+ FetchServiceTicketsForDsts.reserve(dsts.size());
+ for (const auto& pair : dsts) {
+ FetchServiceTicketsForDsts.push_back(pair.second);
+ }
+ FetchServiceTicketsForDstsWithAliases = std::move(dsts);
+ }
+ // Deprecated: set attributes directly
+ void EnableServiceTicketsFetchOptions(const TStringBuf selfSecret,
+ TDstVector&& dsts,
+ const bool considerIncompleteTicketsSetAsError = true) {
+ IsIncompleteTicketsSetAnError = considerIncompleteTicketsSetAsError;
+ Secret = selfSecret;
+ FetchServiceTicketsForDsts = std::move(dsts);
+ }
+ public:
+ bool IsServiceTicketFetchingRequired() const {
+ return bool(Secret.Value());
+ }
+ bool NeedServiceTicketsFetching() const {
+ return !FetchServiceTicketsForDsts.empty() ||
+ !FetchServiceTicketsForDstsWithAliases.empty() ||
+ FetchRolesForIdmSystemSlug;
+ }
+ // TODO: get rid of TDst: PASSP-35377
+ class TDst {
+ public:
+ TDst(TTvmId id)
+ : Id(id)
+ {
+ Y_ENSURE_EX(id != 0, TBrokenTvmClientSettings() << "TvmId cannot be 0");
+ }
+ TTvmId Id;
+ bool operator==(const TDst& o) const {
+ return Id == o.Id;
+ }
+ bool operator<(const TDst& o) const {
+ return Id < o.Id;
+ }
+ public: // for python binding
+ TDst()
+ : Id(0)
+ {
+ }
+ };
+ public:
+ static void CheckPermissions(const TString& dir);
+ };
diff --git a/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp b/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp
new file mode 100644
index 0000000000..c4a6415be8
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp
@@ -0,0 +1,1049 @@
+#include "threaded_updater.h"
+#include <library/cpp/tvmauth/client/misc/disk_cache.h>
+#include <library/cpp/tvmauth/client/misc/utils.h>
+#include <library/cpp/tvmauth/client/misc/retry_settings/v1/settings.pb.h>
+#include <library/cpp/tvmauth/client/logger.h>
+#include <library/cpp/json/json_reader.h>
+#include <util/stream/str.h>
+#include <util/string/builder.h>
+#include <util/string/cast.h>
+#include <util/system/thread.h>
+namespace NTvmAuth::NTvmApi {
+ static TString CreatePublicKeysUrl(const TClientSettings& settings,
+ const NUtils::TProcInfo& procInfo) {
+ TStringStream s;
+ s << "/2/keys";
+ s << "?";
+ procInfo.AddToRequest(s);
+ s << "&get_retry_settings=yes";
+ if (settings.SelfTvmId != 0) {
+ s << "&src=" << settings.SelfTvmId;
+ }
+ if (settings.CheckUserTicketsWithBbEnv) {
+ s << "&env=" << static_cast<int>(*settings.CheckUserTicketsWithBbEnv);
+ }
+ return s.Str();
+ }
+ TAsyncUpdaterPtr TThreadedUpdater::Create(const TClientSettings& settings, TLoggerPtr logger) {
+ Y_ENSURE_EX(logger, TNonRetriableException() << "Logger is required");
+ THolder<TThreadedUpdater> p(new TThreadedUpdater(settings, std::move(logger)));
+ p->Init();
+ p->StartWorker();
+ return p.Release();
+ }
+ TThreadedUpdater::~TThreadedUpdater() {
+ ExpBackoff_.SetEnabled(false);
+ ExpBackoff_.Interrupt();
+ StopWorker(); // Required here to avoid using of deleted members
+ }
+ TClientStatus TThreadedUpdater::GetStatus() const {
+ const TClientStatus::ECode state = GetState();
+ return TClientStatus(state, GetLastError(state == TClientStatus::Ok || state == TClientStatus::IncompleteTicketsSet));
+ }
+ NRoles::TRolesPtr TThreadedUpdater::GetRoles() const {
+ Y_ENSURE_EX(RolesFetcher_,
+ TBrokenTvmClientSettings() << "Roles were not configured in settings");
+ return RolesFetcher_->GetCurrentRoles();
+ }
+ TClientStatus::ECode TThreadedUpdater::GetState() const {
+ const TInstant now = TInstant::Now();
+ if (Settings_.IsServiceTicketFetchingRequired()) {
+ if (AreServiceTicketsInvalid(now)) {
+ return TClientStatus::Error;
+ }
+ auto tickets = GetCachedServiceTickets();
+ if (!tickets) {
+ return TClientStatus::Error;
+ }
+ if (tickets->TicketsById.size() < GetDsts()->size()) {
+ if (Settings_.IsIncompleteTicketsSetAnError) {
+ return TClientStatus::Error;
+ } else {
+ return TClientStatus::IncompleteTicketsSet;
+ }
+ }
+ }
+ if ((Settings_.CheckServiceTickets || Settings_.CheckUserTicketsWithBbEnv) && ArePublicKeysInvalid(now)) {
+ return TClientStatus::Error;
+ }
+ const TDuration sincePublicKeysUpdate = now - GetUpdateTimeOfPublicKeys();
+ const TDuration sinceServiceTicketsUpdate = now - GetUpdateTimeOfServiceTickets();
+ const TDuration sinceRolesUpdate = now - GetUpdateTimeOfRoles();
+ if (Settings_.IsServiceTicketFetchingRequired() && sinceServiceTicketsUpdate > ServiceTicketsDurations_.Expiring) {
+ return TClientStatus::Warning;
+ }
+ if ((Settings_.CheckServiceTickets || Settings_.CheckUserTicketsWithBbEnv) &&
+ sincePublicKeysUpdate > PublicKeysDurations_.Expiring)
+ {
+ return TClientStatus::Warning;
+ }
+ if (RolesFetcher_ && TRolesFetcher::ShouldWarn(RetrySettings_, sinceRolesUpdate)) {
+ return TClientStatus::Warning;
+ }
+ return TClientStatus::Ok;
+ }
+ TThreadedUpdater::TThreadedUpdater(const TClientSettings& settings, TLoggerPtr logger)
+ : TThreadedUpdaterBase(
+ TRetrySettings{}.WorkerAwakingPeriod,
+ std::move(logger),
+ settings.TvmHost,
+ settings.TvmPort,
+ settings.TvmSocketTimeout,
+ settings.TvmConnectTimeout)
+ , ExpBackoff_(RetrySettings_.BackoffSettings)
+ , ServiceTicketBatchUpdateMutex_(std::make_unique<std::mutex>())
+ , Settings_(settings.CloneNormalized())
+ , ProcInfo_(NUtils::TProcInfo::Create(Settings_.LibVersionPrefix))
+ , PublicKeysUrl_(CreatePublicKeysUrl(Settings_, ProcInfo_))
+ , DstAliases_(MakeAliasMap(Settings_))
+ , Headers_({{"Content-Type", "application/x-www-form-urlencoded"}})
+ , Random_(TInstant::Now().MicroSeconds())
+ {
+ if (Settings_.IsServiceTicketFetchingRequired()) {
+ SigningContext_ = TServiceContext::SigningFactory(Settings_.Secret);
+ }
+ if (Settings_.IsServiceTicketFetchingRequired()) {
+ Destinations_ = std::make_shared<const TDstSet>(Settings_.FetchServiceTicketsForDsts.begin(), Settings_.FetchServiceTicketsForDsts.end());
+ }
+ PublicKeysDurations_.RefreshPeriod = TDuration::Days(1);
+ ServiceTicketsDurations_.RefreshPeriod = TDuration::Hours(1);
+ if (Settings_.CheckUserTicketsWithBbEnv) {
+ SetBbEnv(*Settings_.CheckUserTicketsWithBbEnv);
+ }
+ if (Settings_.FetchRolesForIdmSystemSlug) {
+ RolesFetcher_ = std::make_unique<TRolesFetcher>(
+ TRolesFetcherSettings{
+ Settings_.TiroleHost,
+ Settings_.TirolePort,
+ Settings_.DiskCacheDir,
+ ProcInfo_,
+ Settings_.SelfTvmId,
+ Settings_.FetchRolesForIdmSystemSlug,
+ },
+ Logger_);
+ }
+ if (Settings_.DiskCacheDir) {
+ TString path = Settings_.DiskCacheDir;
+ if (path.back() != '/') {
+ path.push_back('/');
+ }
+ if (Settings_.IsServiceTicketFetchingRequired()) {
+ ServiceTicketsFilepath_ = path;
+ ServiceTicketsFilepath_.append("service_tickets");
+ }
+ if (Settings_.CheckServiceTickets || Settings_.CheckUserTicketsWithBbEnv) {
+ PublicKeysFilepath_ = path;
+ PublicKeysFilepath_.append("public_keys");
+ }
+ RetrySettingsFilepath_ = path + "retry_settings";
+ } else {
+ LogInfo("Disk cache disabled. Please set disk cache directory in settings for best reliability");
+ }
+ }
+ void TThreadedUpdater::Init() {
+ ReadStateFromDisk();
+ ClearErrors();
+ ExpBackoff_.SetEnabled(false);
+ // First of all try to get tickets: there are a lot of reasons to fail this request.
+ // As far as disk cache usually disabled, client will not fetch keys before fail on every ctor call.
+ UpdateServiceTickets();
+ if (!AreServicesTicketsOk()) {
+ ThrowLastError();
+ }
+ UpdatePublicKeys();
+ if (!IsServiceContextOk() || !IsUserContextOk()) {
+ ThrowLastError();
+ }
+ UpdateRoles();
+ if (RolesFetcher_ && !RolesFetcher_->AreRolesOk()) {
+ ThrowLastError();
+ }
+ Inited_ = true;
+ ExpBackoff_.SetEnabled(true);
+ }
+ void TThreadedUpdater::UpdateServiceTickets() {
+ if (!Settings_.IsServiceTicketFetchingRequired()) {
+ return;
+ }
+ TInstant stut = GetUpdateTimeOfServiceTickets();
+ try {
+ if (IsTimeToUpdateServiceTickets(stut)) {
+ UpdateAllServiceTickets();
+ NeedFetchMissingServiceTickets_ = false;
+ } else if (NeedFetchMissingServiceTickets_) {
+ TDstSetPtr dsts = GetDsts();
+ if (GetCachedServiceTickets()->TicketsById.size() < dsts->size()) {
+ UpdateMissingServiceTickets(*dsts);
+ NeedFetchMissingServiceTickets_ = false;
+ }
+ }
+ if (AreServicesTicketsOk()) {
+ ClearError(EScope::ServiceTickets);
+ }
+ } catch (const std::exception& e) {
+ ProcessError(EType::Retriable, EScope::ServiceTickets, e.what());
+ LogWarning(TStringBuilder() << "Failed to update service tickets: " << e.what());
+ if (TInstant::Now() - stut > ServiceTicketsDurations_.Expiring) {
+ LogError("Service tickets have not been refreshed for too long period");
+ }
+ }
+ }
+ void TThreadedUpdater::UpdateAllServiceTickets() {
+ TDstSetPtr dsts = GetDsts();
+ THttpResult st = GetServiceTicketsFromHttp(*dsts, RetrySettings_.DstsLimit);
+ std::unique_lock lock(*ServiceTicketBatchUpdateMutex_);
+ auto oldCache = GetCachedServiceTickets();
+ if (oldCache) {
+ if (dsts->size() < GetDsts()->size()) {
+ for (const auto& pair : oldCache->TicketsById) {
+ st.TicketsWithErrors.Tickets.insert(pair);
+ }
+ }
+ for (const auto& pair : oldCache->ErrorsById) {
+ st.TicketsWithErrors.Errors.insert(pair);
+ }
+ }
+ UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), TInstant::Now());
+ lock.unlock();
+ if (ServiceTicketsFilepath_) {
+ DiskCacheServiceTickets_ = CreateJsonArray(st.Responses);
+ TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get());
+ w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.SelfTvmId));
+ }
+ }
+ TServiceTicketsPtr TThreadedUpdater::UpdateMissingServiceTickets(const TDstSet& required) {
+ TServiceTicketsPtr cache = GetCachedServiceTickets();
+ TClientSettings::TDstVector missingDsts = FindMissingDsts(cache, required);
+ if (missingDsts.empty()) {
+ return cache;
+ }
+ THttpResult st = GetServiceTicketsFromHttp(missingDsts, RetrySettings_.DstsLimit);
+ std::unique_lock lock(*ServiceTicketBatchUpdateMutex_);
+ cache = GetCachedServiceTickets();
+ size_t gotTickets = st.TicketsWithErrors.Tickets.size();
+ TDstSetPtr oldDsts = GetDsts();
+ std::shared_ptr<TDstSet> newDsts = std::make_shared<TDstSet>(oldDsts->begin(), oldDsts->end());
+ for (const auto& pair : cache->TicketsById) {
+ st.TicketsWithErrors.Tickets.insert(pair);
+ }
+ for (const auto& pair : cache->ErrorsById) {
+ st.TicketsWithErrors.Errors.insert(pair);
+ }
+ for (const auto& pair : st.TicketsWithErrors.Tickets) {
+ st.TicketsWithErrors.Errors.erase(pair.first);
+ newDsts->insert(pair.first);
+ }
+ TServiceTicketsPtr c = UpdateServiceTicketsCachePartly(
+ std::move(st.TicketsWithErrors),
+ gotTickets);
+ SetDsts(std::move(newDsts));
+ lock.unlock();
+ if (!c) {
+ LogWarning("UpdateMissingServiceTickets: new cache is NULL. BUG?");
+ c = cache;
+ }
+ if (!ServiceTicketsFilepath_) {
+ return c;
+ }
+ DiskCacheServiceTickets_ = AppendToJsonArray(DiskCacheServiceTickets_, st.Responses);
+ TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get());
+ w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.SelfTvmId));
+ return c;
+ }
+ void TThreadedUpdater::UpdatePublicKeys() {
+ if (!Settings_.CheckServiceTickets && !Settings_.CheckUserTicketsWithBbEnv) {
+ return;
+ }
+ TInstant pkut = GetUpdateTimeOfPublicKeys();
+ if (!IsTimeToUpdatePublicKeys(pkut)) {
+ return;
+ }
+ try {
+ TString publicKeys = GetPublicKeysFromHttp();
+ UpdatePublicKeysCache(publicKeys, TInstant::Now());
+ if (PublicKeysFilepath_) {
+ TDiskWriter w(PublicKeysFilepath_, Logger_.Get());
+ w.Write(publicKeys);
+ }
+ if (IsServiceContextOk() && IsUserContextOk()) {
+ ClearError(EScope::PublicKeys);
+ }
+ } catch (const std::exception& e) {
+ ProcessError(EType::Retriable, EScope::PublicKeys, e.what());
+ LogWarning(TStringBuilder() << "Failed to update public keys: " << e.what());
+ if (TInstant::Now() - pkut > PublicKeysDurations_.Expiring) {
+ LogError("Public keys have not been refreshed for too long period");
+ }
+ }
+ }
+ void TThreadedUpdater::UpdateRoles() {
+ if (!RolesFetcher_) {
+ return;
+ }
+ TInstant rut = GetUpdateTimeOfRoles();
+ if (!TRolesFetcher::IsTimeToUpdate(RetrySettings_, TInstant::Now() - rut)) {
+ return;
+ }
+ struct TCloser {
+ TRolesFetcher* Fetcher;
+ ~TCloser() {
+ Fetcher->ResetConnection();
+ }
+ } closer{RolesFetcher_.get()};
+ try {
+ TServiceTicketsPtr st = GetCachedServiceTickets();
+ Y_ENSURE(st, "No one service ticket in memory: how it possible?");
+ auto it = st->TicketsById.find(Settings_.TiroleTvmId);
+ Y_ENSURE(it != st->TicketsById.end(),
+ "Missing tvmid for tirole in cache: " << Settings_.TiroleTvmId);
+ RolesFetcher_->Update(
+ FetchWithRetries(
+ [&]() { return RolesFetcher_->FetchActualRoles(it->second); },
+ EScope::Roles));
+ SetUpdateTimeOfRoles(TInstant::Now());
+ if (RolesFetcher_->AreRolesOk()) {
+ ClearError(EScope::Roles);
+ }
+ } catch (const std::exception& e) {
+ ProcessError(EType::Retriable, EScope::Roles, e.what());
+ LogWarning(TStringBuilder() << "Failed to update roles: " << e.what());
+ if (TRolesFetcher::ShouldWarn(RetrySettings_, TInstant::Now() - rut)) {
+ LogError("Roles have not been refreshed for too long period");
+ }
+ }
+ }
+ TServiceTicketsPtr TThreadedUpdater::UpdateServiceTicketsCachePartly(
+ TAsyncUpdaterBase::TPairTicketsErrors&& tickets,
+ size_t got) {
+ size_t count = tickets.Tickets.size();
+ TServiceTicketsPtr c = MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets),
+ std::move(tickets.Errors),
+ DstAliases_);
+ SetServiceTickets(c);
+ LogInfo(TStringBuilder()
+ << "Cache was partly updated with " << got
+ << " service ticket(s). total: " << count);
+ return c;
+ }
+ void TThreadedUpdater::UpdateServiceTicketsCache(TPairTicketsErrors&& tickets, TInstant time) {
+ size_t count = tickets.Tickets.size();
+ SetServiceTickets(MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets),
+ std::move(tickets.Errors),
+ DstAliases_));
+ SetUpdateTimeOfServiceTickets(time);
+ if (count > 0) {
+ LogInfo(TStringBuilder() << "Cache was updated with " << count << " service ticket(s): " << time);
+ }
+ }
+ void TThreadedUpdater::UpdatePublicKeysCache(const TString& publicKeys, TInstant time) {
+ if (publicKeys.empty()) {
+ return;
+ }
+ if (Settings_.CheckServiceTickets) {
+ SetServiceContext(MakeIntrusiveConst<TServiceContext>(
+ TServiceContext::CheckingFactory(Settings_.SelfTvmId,
+ publicKeys)));
+ }
+ if (Settings_.CheckUserTicketsWithBbEnv) {
+ SetUserContext(publicKeys);
+ }
+ SetUpdateTimeOfPublicKeys(time);
+ LogInfo(TStringBuilder() << "Cache was updated with public keys: " << time);
+ }
+ void TThreadedUpdater::ReadStateFromDisk() {
+ try {
+ TServiceTicketsFromDisk st;
+ std::tie(st, StartUpCache_) = ReadServiceTicketsFromDisk();
+ UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), st.BornDate);
+ DiskCacheServiceTickets_ = st.FileBody;
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder() << "Failed to read service tickets from disk: " << e.what());
+ }
+ try {
+ std::pair<TString, TInstant> pk = ReadPublicKeysFromDisk();
+ UpdatePublicKeysCache(pk.first, pk.second);
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder() << "Failed to read public keys from disk: " << e.what());
+ }
+ try {
+ TString rs = ReadRetrySettingsFromDisk();
+ UpdateRetrySettings(rs);
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder() << "Failed to read retry settings from disk: " << e.what());
+ }
+ try {
+ if (RolesFetcher_) {
+ SetUpdateTimeOfRoles(RolesFetcher_->ReadFromDisk());
+ }
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder() << "Failed to read roles from disk: " << e.what());
+ }
+ }
+ std::pair<TThreadedUpdater::TServiceTicketsFromDisk, TThreadedUpdater::TServiceTicketsFromDisk> TThreadedUpdater::ReadServiceTicketsFromDisk() const {
+ if (!ServiceTicketsFilepath_) {
+ return {};
+ }
+ TDiskReader r(ServiceTicketsFilepath_, Logger_.Get());
+ if (!r.Read()) {
+ return {};
+ }
+ std::pair<TStringBuf, TTvmId> data = ParseTicketsFromDisk(r.Data());
+ if (data.second != Settings_.SelfTvmId) {
+ TStringStream s;
+ s << "Disk cache is for another tvmId (" << data.second << "). ";
+ s << "Self=" << Settings_.SelfTvmId;
+ LogWarning(s.Str());
+ return {};
+ }
+ TThreadedUpdater::TServiceTicketsFromDisk resDst{
+ .BornDate = r.Time(),
+ .FileBody = TString(data.first),
+ };
+ TThreadedUpdater::TServiceTicketsFromDisk resAll{
+ .BornDate = r.Time(),
+ .FileBody = TString(data.first),
+ };
+ TDstSetPtr dsts = GetDsts();
+ ParseTicketsFromResponse(data.first, *dsts, resDst.TicketsWithErrors);
+ if (IsInvalid(TServiceTickets::GetInvalidationTime(resDst.TicketsWithErrors.Tickets), TInstant::Now())) {
+ LogWarning("Disk cache (service tickets) is too old");
+ return {};
+ }
+ try {
+ ParseTicketsFromDiskCache(data.first, resAll.TicketsWithErrors);
+ } catch (std::exception& e) {
+ LogWarning(TStringBuilder() << "Failed to parse all service tickets from disk cache: " << e.what());
+ LogInfo(TStringBuilder() << "Got " << resDst.TicketsWithErrors.Tickets.size() << " service ticket(s) from disk");
+ return {std::move(resDst), {}};
+ }
+ if (resAll.TicketsWithErrors.Tickets.empty()) {
+ LogInfo(TStringBuilder() << "Got " << resDst.TicketsWithErrors.Tickets.size() << " service ticket(s) from disk");
+ return {std::move(resDst), std::move(resAll)};
+ }
+ if (IsInvalid(TServiceTickets::GetInvalidationTime(resAll.TicketsWithErrors.Tickets), TInstant::Now())) {
+ LogWarning("Disk cache (service tickets) is too old");
+ LogInfo(TStringBuilder() << "Got " << resDst.TicketsWithErrors.Tickets.size() << " service ticket(s) from disk");
+ return {std::move(resDst), {}};
+ }
+ LogInfo(TStringBuilder() << "Got " << resAll.TicketsWithErrors.Tickets.size() << " service ticket(s) from disk");
+ return {std::move(resDst), std::move(resAll)};
+ }
+ std::pair<TString, TInstant> TThreadedUpdater::ReadPublicKeysFromDisk() const {
+ if (!PublicKeysFilepath_) {
+ return {};
+ }
+ TDiskReader r(PublicKeysFilepath_, Logger_.Get());
+ if (!r.Read()) {
+ return {};
+ }
+ if (TInstant::Now() - r.Time() > PublicKeysDurations_.Invalid) {
+ LogWarning("Disk cache (public keys) is too old");
+ return {};
+ }
+ return {r.Data(), r.Time()};
+ }
+ TString TThreadedUpdater::ReadRetrySettingsFromDisk() const {
+ if (!RetrySettingsFilepath_) {
+ return {};
+ }
+ TDiskReader r(RetrySettingsFilepath_, Logger_.Get());
+ if (!r.Read()) {
+ return {};
+ }
+ return r.Data();
+ }
+ template <class Dsts>
+ TThreadedUpdater::THttpResult TThreadedUpdater::GetServiceTicketsFromHttp(const Dsts& dsts, const size_t dstLimit) const {
+ Y_ENSURE(SigningContext_, "Internal error");
+ TClientSettings::TDstVector part;
+ part.reserve(dstLimit);
+ THttpResult res;
+ res.TicketsWithErrors.Tickets.reserve(dsts.size());
+ res.Responses.reserve(dsts.size() / dstLimit + 1);
+ for (auto it = dsts.begin(); it != dsts.end();) {
+ part.clear();
+ for (size_t count = 0; it != dsts.end() && count < dstLimit; ++count, ++it) {
+ part.push_back(*it);
+ }
+ TString response =
+ FetchWithRetries(
+ [this, &part]() {
+ // create request here to keep 'ts' actual
+ return FetchServiceTicketsFromHttp(PrepareRequestForServiceTickets(
+ Settings_.SelfTvmId,
+ *SigningContext_,
+ part,
+ ProcInfo_));
+ },
+ EScope::ServiceTickets)
+ .Response;
+ ParseTicketsFromResponse(response, part, res.TicketsWithErrors);
+ LogDebug(TStringBuilder()
+ << "Response with service tickets for " << part.size()
+ << " destination(s) was successfully fetched from " << TvmUrl_);
+ res.Responses.push_back(response);
+ }
+ LogDebug(TStringBuilder()
+ << "Got responses with service tickets with " << res.Responses.size() << " pages for "
+ << dsts.size() << " destination(s)");
+ for (const auto& p : res.TicketsWithErrors.Errors) {
+ LogError(TStringBuilder()
+ << "Failed to get service ticket for dst=" << p.first << ": " << p.second);
+ }
+ return res;
+ }
+ TString TThreadedUpdater::GetPublicKeysFromHttp() const {
+ TString publicKeys =
+ FetchWithRetries(
+ [this]() { return FetchPublicKeysFromHttp(); },
+ EScope::PublicKeys)
+ .Response;
+ LogDebug("Public keys were successfully fetched from " + TvmUrl_);
+ return publicKeys;
+ }
+ TServiceTickets::TMapIdStr TThreadedUpdater::GetRequestedTicketsFromStartUpCache(const TDstSet& dsts) const {
+ TServiceTickets::TMapIdStr res;
+ for (const TClientSettings::TDst& dst : dsts) {
+ auto it = StartUpCache_.TicketsWithErrors.Tickets.find(dst.Id);
+ if (it != StartUpCache_.TicketsWithErrors.Tickets.end()) {
+ res[dst.Id] = it->second;
+ }
+ }
+ return res;
+ }
+ TInstant TThreadedUpdater::GetStartUpCacheBornDate() const {
+ return StartUpCache_.BornDate;
+ }
+ NUtils::TFetchResult TThreadedUpdater::FetchServiceTicketsFromHttp(const TString& body) const {
+ TStringStream s;
+ THttpHeaders outHeaders;
+ TKeepAliveHttpClient::THttpCode code = GetClient().DoPost("/2/ticket", body, &s, Headers_, &outHeaders);
+ const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings");
+ return {code, {}, "/2/ticket", s.Str(), settings ? settings->Value() : ""};
+ }
+ NUtils::TFetchResult TThreadedUpdater::FetchPublicKeysFromHttp() const {
+ TStringStream s;
+ THttpHeaders outHeaders;
+ TKeepAliveHttpClient::THttpCode code = GetClient().DoGet(PublicKeysUrl_, &s, {}, &outHeaders);
+ const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings");
+ return {code, {}, "/2/keys", s.Str(), settings ? settings->Value() : ""};
+ }
+ bool TThreadedUpdater::UpdateRetrySettings(const TString& header) const {
+ if (header.empty()) {
+ // Probably it is some kind of test?
+ return false;
+ }
+ try {
+ TString raw = NUtils::Base64url2bin(header);
+ Y_ENSURE(raw, "Invalid base64url in settings");
+ retry_settings::v1::Settings proto;
+ Y_ENSURE(proto.ParseFromString(raw), "Invalid proto");
+ // This ugly hack helps to process these settings in any case
+ TThreadedUpdater& this_ = *const_cast<TThreadedUpdater*>(this);
+ TRetrySettings& res = this_.RetrySettings_;
+ TStringStream diff;
+ auto update = [&diff](auto& l, const auto& r, TStringBuf desc) {
+ if (l != r) {
+ diff << desc << ":" << l << "->" << r << ";";
+ l = r;
+ }
+ };
+ if (proto.has_exponential_backoff_min_sec()) {
+ update(res.BackoffSettings.Min,
+ TDuration::Seconds(proto.exponential_backoff_min_sec()),
+ "exponential_backoff_min");
+ }
+ if (proto.has_exponential_backoff_max_sec()) {
+ update(res.BackoffSettings.Max,
+ TDuration::Seconds(proto.exponential_backoff_max_sec()),
+ "exponential_backoff_max");
+ }
+ if (proto.has_exponential_backoff_factor()) {
+ update(res.BackoffSettings.Factor,
+ proto.exponential_backoff_factor(),
+ "exponential_backoff_factor");
+ }
+ if (proto.has_exponential_backoff_jitter()) {
+ update(res.BackoffSettings.Jitter,
+ proto.exponential_backoff_jitter(),
+ "exponential_backoff_jitter");
+ }
+ this_.ExpBackoff_.UpdateSettings(res.BackoffSettings);
+ if (proto.has_max_random_sleep_default()) {
+ update(res.MaxRandomSleepDefault,
+ TDuration::MilliSeconds(proto.max_random_sleep_default()),
+ "max_random_sleep_default");
+ }
+ if (proto.has_max_random_sleep_when_ok()) {
+ update(res.MaxRandomSleepWhenOk,
+ TDuration::MilliSeconds(proto.max_random_sleep_when_ok()),
+ "max_random_sleep_when_ok");
+ }
+ if (proto.has_retries_on_start()) {
+ Y_ENSURE(proto.retries_on_start(), "retries_on_start==0");
+ update(res.RetriesOnStart,
+ proto.retries_on_start(),
+ "retries_on_start");
+ }
+ if (proto.has_retries_in_background()) {
+ Y_ENSURE(proto.retries_in_background(), "retries_in_background==0");
+ update(res.RetriesInBackground,
+ proto.retries_in_background(),
+ "retries_in_background");
+ }
+ if (proto.has_worker_awaking_period_sec()) {
+ update(res.WorkerAwakingPeriod,
+ TDuration::Seconds(proto.worker_awaking_period_sec()),
+ "worker_awaking_period");
+ this_.WorkerAwakingPeriod_ = res.WorkerAwakingPeriod;
+ }
+ if (proto.has_dsts_limit()) {
+ Y_ENSURE(proto.dsts_limit(), "dsts_limit==0");
+ update(res.DstsLimit,
+ proto.dsts_limit(),
+ "dsts_limit");
+ }
+ if (proto.has_roles_update_period_sec()) {
+ Y_ENSURE(proto.roles_update_period_sec(), "roles_update_period==0");
+ update(res.RolesUpdatePeriod,
+ TDuration::Seconds(proto.roles_update_period_sec()),
+ "roles_update_period_sec");
+ }
+ if (proto.has_roles_warn_period_sec()) {
+ Y_ENSURE(proto.roles_warn_period_sec(), "roles_warn_period_sec==0");
+ update(res.RolesWarnPeriod,
+ TDuration::Seconds(proto.roles_warn_period_sec()),
+ "roles_warn_period_sec");
+ }
+ if (diff.empty()) {
+ return false;
+ }
+ LogDebug("Retry settings were updated: " + diff.Str());
+ return true;
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder()
+ << "Failed to update retry settings from server, header '"
+ << header << "': "
+ << e.what());
+ }
+ return false;
+ }
+ template <typename Func>
+ NUtils::TFetchResult TThreadedUpdater::FetchWithRetries(Func func, EScope scope) const {
+ const ui32 tries = Inited_ ? RetrySettings_.RetriesInBackground
+ : RetrySettings_.RetriesOnStart;
+ for (size_t idx = 1;; ++idx) {
+ RandomSleep();
+ try {
+ NUtils::TFetchResult result = func();
+ if (UpdateRetrySettings(result.RetrySettings) && RetrySettingsFilepath_) {
+ TDiskWriter w(RetrySettingsFilepath_, Logger_.Get());
+ w.Write(result.RetrySettings);
+ }
+ if (400 <= result.Code && result.Code <= 499) {
+ throw TNonRetriableException() << ProcessHttpError(scope, result.Path, result.Code, result.Response);
+ }
+ if (result.Code < 200 || result.Code >= 399) {
+ throw yexception() << ProcessHttpError(scope, result.Path, result.Code, result.Response);
+ }
+ ExpBackoff_.Decrease();
+ return result;
+ } catch (const TNonRetriableException& e) {
+ LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what());
+ ExpBackoff_.Increase();
+ throw;
+ } catch (const std::exception& e) {
+ LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what());
+ ExpBackoff_.Increase();
+ if (idx >= tries) {
+ throw;
+ }
+ }
+ }
+ throw yexception() << "unreachable";
+ }
+ void TThreadedUpdater::RandomSleep() const {
+ const TDuration maxSleep = TClientStatus::ECode::Ok == GetState()
+ ? RetrySettings_.MaxRandomSleepWhenOk
+ : RetrySettings_.MaxRandomSleepDefault;
+ if (maxSleep) {
+ ui32 toSleep = Random_.GenRand() % maxSleep.MilliSeconds();
+ ExpBackoff_.Sleep(TDuration::MilliSeconds(toSleep));
+ }
+ }
+ TString TThreadedUpdater::PrepareRequestForServiceTickets(TTvmId src,
+ const TServiceContext& ctx,
+ const TClientSettings::TDstVector& dsts,
+ const NUtils::TProcInfo& procInfo,
+ time_t now) {
+ TStringStream s;
+ const TString ts = IntToString<10>(now);
+ TStringStream dst;
+ dst.Reserve(10 * dsts.size());
+ for (const TClientSettings::TDst& d : dsts) {
+ if (dst.Str()) {
+ dst << ',';
+ }
+ dst << d.Id;
+ }
+ s << "grant_type=client_credentials";
+ s << "&src=" << src;
+ s << "&dst=" << dst.Str();
+ s << "&ts=" << ts;
+ s << "&sign=" << ctx.SignCgiParamsForTvm(ts, dst.Str());
+ s << "&get_retry_settings=yes";
+ s << "&";
+ procInfo.AddToRequest(s);
+ return s.Str();
+ }
+ template <class Dsts>
+ void TThreadedUpdater::ParseTicketsFromResponse(TStringBuf resp,
+ const Dsts& dsts,
+ TPairTicketsErrors& out) const {
+ NJson::TJsonValue doc;
+ Y_ENSURE(NJson::ReadJsonTree(resp, &doc), "Invalid json from tvm-api: " << resp);
+ const NJson::TJsonValue* currentResp = doc.IsMap() ? &doc : nullptr;
+ auto find = [&currentResp, &doc](TTvmId id, NJson::TJsonValue& obj) -> bool {
+ const TString idStr = IntToString<10>(id);
+ if (currentResp && currentResp->GetValue(idStr, &obj)) {
+ return true;
+ }
+ for (const NJson::TJsonValue& val : doc.GetArray()) {
+ currentResp = &val;
+ if (currentResp->GetValue(idStr, &obj)) {
+ return true;
+ }
+ }
+ return false;
+ };
+ for (const TClientSettings::TDst& d : dsts) {
+ NJson::TJsonValue obj;
+ NJson::TJsonValue val;
+ if (!find(d.Id, obj) || !obj.GetValue("ticket", &val)) {
+ TString err;
+ if (obj.GetValue("error", &val)) {
+ err = val.GetString();
+ } else {
+ err = "Missing tvm_id in response, should never happend: " + IntToString<10>(d.Id);
+ }
+ TStringStream s;
+ s << "Failed to get ServiceTicket for " << d.Id << ": " << err;
+ ProcessError(EType::NonRetriable, EScope::ServiceTickets, s.Str());
+ out.Errors.insert({d.Id, std::move(err)});
+ continue;
+ }
+ out.Tickets.insert({d.Id, val.GetString()});
+ }
+ }
+ void TThreadedUpdater::ParseTicketsFromDiskCache(TStringBuf cache,
+ TPairTicketsErrors& out) const {
+ NJson::TJsonValue doc;
+ Y_ENSURE(NJson::ReadJsonTree(cache, &doc), "Invalid json from disk: " << cache);
+ for (const NJson::TJsonValue& cacheItem : doc.GetArray()) {
+ for (const auto& [idStr, resp] : cacheItem.GetMap()) {
+ NJson::TJsonValue val;
+ TTvmId id;
+ if (!TryIntFromString<10, TTvmId, TString>(idStr, id)) {
+ LogWarning(TStringBuilder() << "tvm_id in cache is not integer: " << idStr);
+ continue;
+ }
+ if (resp.GetValue("ticket", &val)) {
+ out.Tickets[id] = val.GetString();
+ } else if (resp.GetValue("error", &val)) {
+ out.Errors[id] = val.GetString();
+ } else {
+ out.Errors[id] = "tvm_id found, but response has no error or ticket, should never happend: " + idStr;
+ }
+ }
+ }
+ }
+ static const char DELIMETER = '\t';
+ TString TThreadedUpdater::PrepareTicketsForDisk(TStringBuf tvmResponse, TTvmId selfId) {
+ TStringStream s;
+ s << tvmResponse << DELIMETER << selfId;
+ return s.Str();
+ }
+ std::pair<TStringBuf, TTvmId> TThreadedUpdater::ParseTicketsFromDisk(TStringBuf data) {
+ TStringBuf tvmId = data.RNextTok(DELIMETER);
+ return {data, IntFromString<TTvmId, 10>(tvmId)};
+ }
+ TDstSetPtr TThreadedUpdater::GetDsts() const {
+ return Destinations_.Get();
+ }
+ void TThreadedUpdater::SetDsts(TDstSetPtr dsts) {
+ Destinations_.Set(std::move(dsts));
+ }
+ bool TThreadedUpdater::IsTimeToUpdateServiceTickets(TInstant lastUpdate) const {
+ return TInstant::Now() - lastUpdate > ServiceTicketsDurations_.RefreshPeriod;
+ }
+ bool TThreadedUpdater::IsTimeToUpdatePublicKeys(TInstant lastUpdate) const {
+ return TInstant::Now() - lastUpdate > PublicKeysDurations_.RefreshPeriod;
+ }
+ bool TThreadedUpdater::AreServicesTicketsOk() const {
+ if (!Settings_.IsServiceTicketFetchingRequired()) {
+ return true;
+ }
+ auto c = GetCachedServiceTickets();
+ return c && (!Settings_.IsIncompleteTicketsSetAnError || c->TicketsById.size() == GetDsts()->size());
+ }
+ bool TThreadedUpdater::IsServiceContextOk() const {
+ if (!Settings_.CheckServiceTickets) {
+ return true;
+ }
+ return bool(GetCachedServiceContext());
+ }
+ bool TThreadedUpdater::IsUserContextOk() const {
+ if (!Settings_.CheckUserTicketsWithBbEnv) {
+ return true;
+ }
+ return bool(GetCachedUserContext());
+ }
+ void TThreadedUpdater::Worker() {
+ UpdateServiceTickets();
+ UpdatePublicKeys();
+ UpdateRoles();
+ }
+ TServiceTickets::TMapAliasId TThreadedUpdater::MakeAliasMap(const TClientSettings& settings) {
+ TServiceTickets::TMapAliasId res;
+ for (const auto& pair : settings.FetchServiceTicketsForDstsWithAliases) {
+ res.insert({pair.first, pair.second.Id});
+ }
+ return res;
+ }
+ TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(TServiceTicketsPtr available, const TDstSet& required) {
+ Y_ENSURE(available);
+ TDstSet set;
+ // available->TicketsById is not sorted
+ for (const auto& pair : available->TicketsById) {
+ set.insert(pair.first);
+ }
+ return FindMissingDsts(set, required);
+ }
+ TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(const TDstSet& available, const TDstSet& required) {
+ TClientSettings::TDstVector res;
+ std::set_difference(required.begin(), required.end(),
+ available.begin(), available.end(),
+ std::inserter(res, res.begin()));
+ return res;
+ }
+ TString TThreadedUpdater::CreateJsonArray(const TSmallVec<TString>& responses) {
+ if (responses.empty()) {
+ return "[]";
+ }
+ size_t size = 0;
+ for (const TString& r : responses) {
+ size += r.size() + 1;
+ }
+ TString res;
+ res.reserve(size + 2);
+ res.push_back('[');
+ for (const TString& r : responses) {
+ res.append(r).push_back(',');
+ }
+ res.back() = ']';
+ return res;
+ }
+ TString TThreadedUpdater::AppendToJsonArray(const TString& json, const TSmallVec<TString>& responses) {
+ Y_ENSURE(json, "previous body required");
+ size_t size = 0;
+ for (const TString& r : responses) {
+ size += r.size() + 1;
+ }
+ TString res;
+ res.reserve(size + 2 + json.size());
+ res.push_back('[');
+ if (json.StartsWith('[')) {
+ Y_ENSURE(json.EndsWith(']'), "array is broken:" << json);
+ res.append(TStringBuf(json).Chop(1).Skip(1));
+ } else {
+ res.append(json);
+ }
+ res.push_back(',');
+ for (const TString& r : responses) {
+ res.append(r).push_back(',');
+ }
+ res.back() = ']';
+ return res;
+ }
diff --git a/library/cpp/tvmauth/client/misc/api/threaded_updater.h b/library/cpp/tvmauth/client/misc/api/threaded_updater.h
new file mode 100644
index 0000000000..cabfcf8fb5
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/api/threaded_updater.h
@@ -0,0 +1,153 @@
+#pragma once
+#include "retry_settings.h"
+#include "roles_fetcher.h"
+#include "settings.h"
+#include <library/cpp/tvmauth/client/misc/async_updater.h>
+#include <library/cpp/tvmauth/client/misc/threaded_updater.h>
+#include <util/generic/set.h>
+#include <util/random/fast.h>
+#include <mutex>
+namespace NTvmAuth::NTvmApi {
+ using TDstSet = TSet<TClientSettings::TDst>;
+ using TDstSetPtr = std::shared_ptr<const TDstSet>;
+ class TThreadedUpdater: public TThreadedUpdaterBase {
+ public:
+ /*!
+ * Starts thread for updating of in-memory cache in background
+ * Reads cache from disk if specified
+ * @param settings
+ * @param logger is usefull for monitoring and debuging
+ */
+ static TAsyncUpdaterPtr Create(const TClientSettings& settings, TLoggerPtr logger);
+ ~TThreadedUpdater();
+ TClientStatus GetStatus() const override;
+ NRoles::TRolesPtr GetRoles() const override;
+ protected: // for tests
+ TClientStatus::ECode GetState() const;
+ TThreadedUpdater(const TClientSettings& settings, TLoggerPtr logger);
+ void Init();
+ void UpdateServiceTickets();
+ void UpdateAllServiceTickets();
+ TServiceTicketsPtr UpdateMissingServiceTickets(const TDstSet& required);
+ void UpdatePublicKeys();
+ void UpdateRoles();
+ TServiceTicketsPtr UpdateServiceTicketsCachePartly(TPairTicketsErrors&& tickets, size_t got);
+ void UpdateServiceTicketsCache(TPairTicketsErrors&& tickets, TInstant time);
+ void UpdatePublicKeysCache(const TString& publicKeys, TInstant time);
+ void ReadStateFromDisk();
+ struct TServiceTicketsFromDisk {
+ TPairTicketsErrors TicketsWithErrors;
+ TInstant BornDate;
+ TString FileBody;
+ };
+ std::pair<TServiceTicketsFromDisk, TServiceTicketsFromDisk> ReadServiceTicketsFromDisk() const;
+ std::pair<TString, TInstant> ReadPublicKeysFromDisk() const;
+ TString ReadRetrySettingsFromDisk() const;
+ struct THttpResult {
+ TPairTicketsErrors TicketsWithErrors;
+ TSmallVec<TString> Responses;
+ };
+ template <class Dsts>
+ THttpResult GetServiceTicketsFromHttp(const Dsts& dsts, const size_t dstLimit) const;
+ TString GetPublicKeysFromHttp() const;
+ TServiceTickets::TMapIdStr GetRequestedTicketsFromStartUpCache(const TDstSet& dsts) const;
+ virtual NUtils::TFetchResult FetchServiceTicketsFromHttp(const TString& body) const;
+ virtual NUtils::TFetchResult FetchPublicKeysFromHttp() const;
+ bool UpdateRetrySettings(const TString& header) const;
+ template <typename Func>
+ NUtils::TFetchResult FetchWithRetries(Func func, EScope scope) const;
+ void RandomSleep() const;
+ static TString PrepareRequestForServiceTickets(TTvmId src,
+ const TServiceContext& ctx,
+ const TClientSettings::TDstVector& dsts,
+ const NUtils::TProcInfo& procInfo,
+ time_t now = time(nullptr));
+ template <class Dsts>
+ void ParseTicketsFromResponse(TStringBuf resp,
+ const Dsts& dsts,
+ TPairTicketsErrors& out) const;
+ void ParseTicketsFromDiskCache(TStringBuf cache,
+ TPairTicketsErrors& out) const;
+ static TString PrepareTicketsForDisk(TStringBuf tvmResponse, TTvmId selfId);
+ static std::pair<TStringBuf, TTvmId> ParseTicketsFromDisk(TStringBuf data);
+ TDstSetPtr GetDsts() const;
+ void SetDsts(TDstSetPtr dsts);
+ TInstant GetStartUpCacheBornDate() const;
+ bool IsTimeToUpdateServiceTickets(TInstant lastUpdate) const;
+ bool IsTimeToUpdatePublicKeys(TInstant lastUpdate) const;
+ bool AreServicesTicketsOk() const;
+ bool IsServiceContextOk() const;
+ bool IsUserContextOk() const;
+ void Worker() override;
+ static TServiceTickets::TMapAliasId MakeAliasMap(const TClientSettings& settings);
+ static TClientSettings::TDstVector FindMissingDsts(TServiceTicketsPtr available, const TDstSet& required);
+ static TClientSettings::TDstVector FindMissingDsts(const TDstSet& available, const TDstSet& required);
+ static TString CreateJsonArray(const TSmallVec<TString>& responses);
+ static TString AppendToJsonArray(const TString& json, const TSmallVec<TString>& responses);
+ private:
+ TRetrySettings RetrySettings_;
+ protected:
+ mutable TExponentialBackoff ExpBackoff_;
+ std::unique_ptr<std::mutex> ServiceTicketBatchUpdateMutex_;
+ private:
+ const TClientSettings Settings_;
+ const NUtils::TProcInfo ProcInfo_;
+ const TString PublicKeysUrl_;
+ const TServiceTickets::TMapAliasId DstAliases_;
+ const TKeepAliveHttpClient::THeaders Headers_;
+ TMaybe<TServiceContext> SigningContext_;
+ NUtils::TProtectedValue<TDstSetPtr> Destinations_;
+ TString DiskCacheServiceTickets_;
+ TServiceTicketsFromDisk StartUpCache_;
+ bool NeedFetchMissingServiceTickets_ = true;
+ TString PublicKeysFilepath_;
+ TString ServiceTicketsFilepath_;
+ TString RetrySettingsFilepath_;
+ std::unique_ptr<TRolesFetcher> RolesFetcher_;
+ mutable TReallyFastRng32 Random_;
+ bool Inited_ = false;
+ };
diff --git a/library/cpp/tvmauth/client/misc/async_updater.cpp b/library/cpp/tvmauth/client/misc/async_updater.cpp
new file mode 100644
index 0000000000..9cb0332ed4
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/async_updater.cpp
@@ -0,0 +1,152 @@
+#include "async_updater.h"
+#include "utils.h"
+#include <library/cpp/tvmauth/client/exception.h>
+#include <util/string/builder.h>
+#include <util/system/spin_wait.h>
+namespace NTvmAuth {
+ TAsyncUpdaterBase::TAsyncUpdaterBase() {
+ ServiceTicketsDurations_.RefreshPeriod = TDuration::Hours(1);
+ ServiceTicketsDurations_.Expiring = TDuration::Hours(2);
+ ServiceTicketsDurations_.Invalid = TDuration::Hours(11);
+ PublicKeysDurations_.RefreshPeriod = TDuration::Days(1);
+ PublicKeysDurations_.Expiring = TDuration::Days(2);
+ PublicKeysDurations_.Invalid = TDuration::Days(6);
+ }
+ NRoles::TRolesPtr TAsyncUpdaterBase::GetRoles() const {
+ ythrow TIllegalUsage() << "not implemented";
+ }
+ TInstant TAsyncUpdaterBase::GetUpdateTimeOfPublicKeys() const {
+ return PublicKeysTime_.Get();
+ }
+ TInstant TAsyncUpdaterBase::GetUpdateTimeOfServiceTickets() const {
+ return ServiceTicketsTime_.Get();
+ }
+ TInstant TAsyncUpdaterBase::GetUpdateTimeOfRoles() const {
+ return RolesTime_.Get();
+ }
+ TInstant TAsyncUpdaterBase::GetInvalidationTimeOfPublicKeys() const {
+ TInstant ins = GetUpdateTimeOfPublicKeys();
+ return ins == TInstant() ? TInstant() : ins + PublicKeysDurations_.Invalid;
+ }
+ TInstant TAsyncUpdaterBase::GetInvalidationTimeOfServiceTickets() const {
+ TServiceTicketsPtr c = GetCachedServiceTickets();
+ return c ? c->InvalidationTime : TInstant();
+ }
+ bool TAsyncUpdaterBase::ArePublicKeysInvalid(TInstant now) const {
+ return IsInvalid(GetInvalidationTimeOfPublicKeys(), now);
+ }
+ bool TAsyncUpdaterBase::AreServiceTicketsInvalid(TInstant now) const {
+ TServiceTicketsPtr c = GetCachedServiceTickets();
+ // Empty set of tickets is allways valid.
+ return c && !c->TicketsById.empty() && IsInvalid(GetInvalidationTimeOfServiceTickets(), now);
+ }
+ bool TAsyncUpdaterBase::IsInvalid(TInstant invTime, TInstant now) {
+ return invTime -
+ TDuration::Minutes(1) // lag for closing from balancer
+ < now;
+ }
+ void TAsyncUpdaterBase::SetBbEnv(EBlackboxEnv original, TMaybe<EBlackboxEnv> overrided) {
+ if (overrided) {
+ Y_ENSURE_EX(NUtils::CheckBbEnvOverriding(original, *overrided),
+ TBrokenTvmClientSettings() << "Overriding of BlackboxEnv is illegal: "
+ << original << " -> " << *overrided);
+ }
+ Envs_.store({original, overrided}, std::memory_order_relaxed);
+ }
+ TServiceTicketsPtr TAsyncUpdaterBase::GetCachedServiceTickets() const {
+ return ServiceTickets_.Get();
+ }
+ TServiceContextPtr TAsyncUpdaterBase::GetCachedServiceContext() const {
+ return ServiceContext_.Get();
+ }
+ TUserContextPtr TAsyncUpdaterBase::GetCachedUserContext(TMaybe<EBlackboxEnv> overridenEnv) const {
+ TAllUserContextsPtr ctx = AllUserContexts_.Get();
+ if (!ctx) {
+ return nullptr;
+ }
+ const TEnvs envs = Envs_.load(std::memory_order_relaxed);
+ if (!envs.Original) {
+ return nullptr;
+ }
+ EBlackboxEnv env = *envs.Original;
+ if (overridenEnv) {
+ Y_ENSURE_EX(NUtils::CheckBbEnvOverriding(*envs.Original, *overridenEnv),
+ TBrokenTvmClientSettings() << "Overriding of BlackboxEnv is illegal: "
+ << *envs.Original << " -> " << *overridenEnv);
+ env = *overridenEnv;
+ } else if (envs.Overrided) {
+ env = *envs.Overrided;
+ }
+ return ctx->Get(env);
+ }
+ void TAsyncUpdaterBase::SetServiceTickets(TServiceTicketsPtr c) {
+ ServiceTickets_.Set(std::move(c));
+ }
+ void TAsyncUpdaterBase::SetServiceContext(TServiceContextPtr c) {
+ ServiceContext_.Set(std::move(c));
+ }
+ void TAsyncUpdaterBase::SetUserContext(TStringBuf publicKeys) {
+ AllUserContexts_.Set(MakeIntrusiveConst<TAllUserContexts>(publicKeys));
+ }
+ void TAsyncUpdaterBase::SetUpdateTimeOfPublicKeys(TInstant ins) {
+ PublicKeysTime_.Set(ins);
+ }
+ void TAsyncUpdaterBase::SetUpdateTimeOfServiceTickets(TInstant ins) {
+ ServiceTicketsTime_.Set(ins);
+ }
+ void TAsyncUpdaterBase::SetUpdateTimeOfRoles(TInstant ins) {
+ RolesTime_.Set(ins);
+ }
+ bool TAsyncUpdaterBase::IsServiceTicketMapOk(TServiceTicketsPtr c, size_t expectedTicketCount, bool strict) {
+ return c &&
+ (strict
+ ? c->TicketsById.size() == expectedTicketCount
+ : !c->TicketsById.empty());
+ }
+ TAllUserContexts::TAllUserContexts(TStringBuf publicKeys) {
+ auto add = [&, this](EBlackboxEnv env) {
+ Ctx_[(size_t)env] = MakeIntrusiveConst<TUserContext>(env, publicKeys);
+ };
+ add(EBlackboxEnv::Prod);
+ add(EBlackboxEnv::Test);
+ add(EBlackboxEnv::ProdYateam);
+ add(EBlackboxEnv::TestYateam);
+ add(EBlackboxEnv::Stress);
+ }
+ TUserContextPtr TAllUserContexts::Get(EBlackboxEnv env) const {
+ return Ctx_[(size_t)env];
+ }
diff --git a/library/cpp/tvmauth/client/misc/async_updater.h b/library/cpp/tvmauth/client/misc/async_updater.h
new file mode 100644
index 0000000000..68c0c11cf0
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/async_updater.h
@@ -0,0 +1,114 @@
+#pragma once
+#include "last_error.h"
+#include "service_tickets.h"
+#include "settings.h"
+#include "roles/roles.h"
+#include <library/cpp/tvmauth/client/client_status.h>
+#include <library/cpp/tvmauth/client/logger.h>
+#include <library/cpp/tvmauth/deprecated/service_context.h>
+#include <library/cpp/tvmauth/deprecated/user_context.h>
+#include <library/cpp/tvmauth/src/utils.h>
+#include <util/datetime/base.h>
+#include <util/generic/hash.h>
+#include <util/generic/maybe.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <array>
+#include <atomic>
+namespace NTvmAuth {
+ class TAllUserContexts: public TAtomicRefCount<TAllUserContexts> {
+ public:
+ TAllUserContexts(TStringBuf publicKeys);
+ TUserContextPtr Get(EBlackboxEnv env) const;
+ private:
+ std::array<TUserContextPtr, 5> Ctx_;
+ };
+ using TAllUserContextsPtr = TIntrusiveConstPtr<TAllUserContexts>;
+ class TAsyncUpdaterBase: public TAtomicRefCount<TAsyncUpdaterBase>, protected TLastError, TNonCopyable {
+ public:
+ TAsyncUpdaterBase();
+ virtual ~TAsyncUpdaterBase() = default;
+ virtual TClientStatus GetStatus() const = 0;
+ virtual NRoles::TRolesPtr GetRoles() const;
+ TServiceTicketsPtr GetCachedServiceTickets() const;
+ TServiceContextPtr GetCachedServiceContext() const;
+ TUserContextPtr GetCachedUserContext(TMaybe<EBlackboxEnv> overridenEnv = {}) const;
+ TInstant GetUpdateTimeOfPublicKeys() const;
+ TInstant GetUpdateTimeOfServiceTickets() const;
+ TInstant GetUpdateTimeOfRoles() const;
+ TInstant GetInvalidationTimeOfPublicKeys() const;
+ TInstant GetInvalidationTimeOfServiceTickets() const;
+ bool ArePublicKeysInvalid(TInstant now) const;
+ bool AreServiceTicketsInvalid(TInstant now) const;
+ static bool IsInvalid(TInstant invTime, TInstant now);
+ protected:
+ void SetBbEnv(EBlackboxEnv original, TMaybe<EBlackboxEnv> overrided = {});
+ void SetServiceTickets(TServiceTicketsPtr c);
+ void SetServiceContext(TServiceContextPtr c);
+ void SetUserContext(TStringBuf publicKeys);
+ void SetUpdateTimeOfPublicKeys(TInstant ins);
+ void SetUpdateTimeOfServiceTickets(TInstant ins);
+ void SetUpdateTimeOfRoles(TInstant ins);
+ static bool IsServiceTicketMapOk(TServiceTicketsPtr c, size_t expectedTicketCount, bool strict);
+ protected:
+ struct TPairTicketsErrors {
+ TServiceTickets::TMapIdStr Tickets;
+ TServiceTickets::TMapIdStr Errors;
+ bool operator==(const TPairTicketsErrors& o) const {
+ return Tickets == o.Tickets && Errors == o.Errors;
+ }
+ };
+ struct TStateDurations {
+ TDuration RefreshPeriod;
+ TDuration Expiring;
+ TDuration Invalid;
+ };
+ TStateDurations ServiceTicketsDurations_;
+ TStateDurations PublicKeysDurations_;
+ protected:
+ virtual void StartTvmClientStopping() const {
+ }
+ virtual bool IsTvmClientStopped() const {
+ return true;
+ }
+ friend class NTvmAuth::NInternal::TClientCaningKnife;
+ private:
+ struct TEnvs {
+ TMaybe<EBlackboxEnv> Original;
+ TMaybe<EBlackboxEnv> Overrided;
+ };
+ static_assert(sizeof(TEnvs) <= 8, "Small struct is easy to store as atomic");
+ std::atomic<TEnvs> Envs_ = {{}};
+ NUtils::TProtectedValue<TServiceTicketsPtr> ServiceTickets_;
+ NUtils::TProtectedValue<TServiceContextPtr> ServiceContext_;
+ NUtils::TProtectedValue<TAllUserContextsPtr> AllUserContexts_;
+ NUtils::TProtectedValue<TInstant> PublicKeysTime_;
+ NUtils::TProtectedValue<TInstant> ServiceTicketsTime_;
+ NUtils::TProtectedValue<TInstant> RolesTime_;
+ };
+ using TAsyncUpdaterPtr = TIntrusiveConstPtr<TAsyncUpdaterBase>;
diff --git a/library/cpp/tvmauth/client/misc/checker.h b/library/cpp/tvmauth/client/misc/checker.h
new file mode 100644
index 0000000000..16f1a95200
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/checker.h
@@ -0,0 +1,38 @@
+#pragma once
+#include <library/cpp/tvmauth/client/exception.h>
+#include <library/cpp/tvmauth/checked_service_ticket.h>
+#include <library/cpp/tvmauth/checked_user_ticket.h>
+#include <library/cpp/tvmauth/deprecated/service_context.h>
+#include <library/cpp/tvmauth/deprecated/user_context.h>
+namespace NTvmAuth {
+ class TServiceTicketChecker {
+ public:
+ /*!
+ * Checking must be enabled in TClientSettings
+ * Can throw exception if cache is out of date or wrong config
+ * @param ticket
+ */
+ static TCheckedServiceTicket Check(
+ TStringBuf ticket,
+ TServiceContextPtr c,
+ const TServiceContext::TCheckFlags& flags = {}) {
+ Y_ENSURE_EX(c, TBrokenTvmClientSettings() << "Need to use TClientSettings::EnableServiceTicketChecking()");
+ return c->Check(ticket, flags);
+ }
+ };
+ class TUserTicketChecker {
+ public:
+ /*!
+ * Blackbox enviroment must be cofingured in TClientSettings
+ * Can throw exception if cache is out of date or wrong config
+ */
+ static TCheckedUserTicket Check(TStringBuf ticket, TUserContextPtr c) {
+ Y_ENSURE_EX(c, TBrokenTvmClientSettings() << "Need to use TClientSettings::EnableUserTicketChecking()");
+ return c->Check(ticket);
+ }
+ };
diff --git a/library/cpp/tvmauth/client/misc/default_uid_checker.h b/library/cpp/tvmauth/client/misc/default_uid_checker.h
new file mode 100644
index 0000000000..b723d6e918
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/default_uid_checker.h
@@ -0,0 +1,31 @@
+#pragma once
+#include "roles/roles.h"
+#include <library/cpp/tvmauth/client/exception.h>
+#include <library/cpp/tvmauth/checked_user_ticket.h>
+#include <library/cpp/tvmauth/src/user_impl.h>
+#include <library/cpp/tvmauth/src/utils.h>
+namespace NTvmAuth {
+ class TDefaultUidChecker {
+ public:
+ /*!
+ * Checking must be enabled in TClientSettings
+ * Can throw exception if cache is out of date or wrong config
+ * @param ticket
+ */
+ static TCheckedUserTicket Check(TCheckedUserTicket ticket, NRoles::TRolesPtr r) {
+ Y_ENSURE_EX(r, TBrokenTvmClientSettings() << "Need to use TClientSettings::EnableRolesFetching()");
+ NRoles::TConsumerRolesPtr roles = r->GetRolesForUser(ticket);
+ if (roles) {
+ return ticket;
+ }
+ TUserTicketImplPtr impl = THolder(NInternal::TCanningKnife::GetU(ticket));
+ impl->SetStatus(ETicketStatus::NoRoles);
+ return TCheckedUserTicket(std::move(impl));
+ }
+ };
diff --git a/library/cpp/tvmauth/client/misc/disk_cache.cpp b/library/cpp/tvmauth/client/misc/disk_cache.cpp
new file mode 100644
index 0000000000..8f3ab7770f
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/disk_cache.cpp
@@ -0,0 +1,162 @@
+#include "disk_cache.h"
+#include <library/cpp/tvmauth/client/logger.h>
+#include <openssl/evp.h>
+#include <openssl/hmac.h>
+#include <util/stream/file.h>
+#include <util/stream/str.h>
+#include <util/system/fs.h>
+#include <util/system/sysstat.h>
+#include <exception>
+namespace NTvmAuth {
+ static const size_t HASH_SIZE = 32;
+ static const size_t TIMESTAMP_SIZE = sizeof(time_t);
+ TDiskReader::TDiskReader(const TString& filename, ILogger* logger)
+ : Filename_(filename)
+ , Logger_(logger)
+ {
+ }
+ bool TDiskReader::Read() {
+ TStringStream s;
+ try {
+ if (!NFs::Exists(Filename_)) {
+ if (Logger_) {
+ s << "File '" << Filename_ << "' does not exist";
+ Logger_->Debug(s.Str());
+ }
+ return false;
+ }
+ TFile file(Filename_, OpenExisting | RdOnly | Seq);
+ file.Flock(LOCK_SH | LOCK_NB);
+ TFileInput input(file);
+ return ParseData(input.ReadAll());
+ } catch (const std::exception& e) {
+ if (Logger_) {
+ s << "Failed to read '" << Filename_ << "': " << e.what();
+ Logger_->Error(s.Str());
+ }
+ }
+ return false;
+ }
+ bool TDiskReader::ParseData(TStringBuf buf) {
+ TStringStream s;
+ if (buf.size() <= HASH_SIZE + TIMESTAMP_SIZE) {
+ if (Logger_) {
+ s << "File '" << Filename_ << "' is too small";
+ Logger_->Warning(s.Str());
+ }
+ return false;
+ }
+ TStringBuf hash = buf.SubStr(0, HASH_SIZE);
+ if (hash != GetHash(buf.Skip(HASH_SIZE))) {
+ if (Logger_) {
+ s << "Content of '" << Filename_ << "' was incorrectly changed";
+ Logger_->Warning(s.Str());
+ }
+ return false;
+ }
+ Time_ = TInstant::Seconds(GetTimestamp(buf.substr(0, TIMESTAMP_SIZE)));
+ Data_ = buf.Skip(TIMESTAMP_SIZE);
+ if (Logger_) {
+ s << "File '" << Filename_ << "' was successfully read";
+ Logger_->Info(s.Str());
+ }
+ return true;
+ }
+ TString TDiskReader::GetHash(TStringBuf data) {
+ TString value(EVP_MAX_MD_SIZE, 0);
+ unsigned macLen = 0;
+ if (!::HMAC(EVP_sha256(),
+ "",
+ 0,
+ (unsigned char*)data.data(),
+ data.size(),
+ (unsigned char*)value.data(),
+ &macLen)) {
+ return {};
+ }
+ if (macLen != EVP_MAX_MD_SIZE) {
+ value.resize(macLen);
+ }
+ return value;
+ }
+ time_t TDiskReader::GetTimestamp(TStringBuf data) {
+ time_t time = 0;
+ for (int idx = TIMESTAMP_SIZE - 1; idx >= 0; --idx) {
+ time <<= 8;
+ time |= static_cast<unsigned char>(data.at(idx));
+ }
+ return time;
+ }
+ TDiskWriter::TDiskWriter(const TString& filename, ILogger* logger)
+ : Filename_(filename)
+ , Logger_(logger)
+ {
+ }
+ bool TDiskWriter::Write(TStringBuf data, TInstant now) {
+ TStringStream s;
+ try {
+ {
+ if (NFs::Exists(Filename_)) {
+ Chmod(Filename_.c_str(),
+ S_IRUSR | S_IWUSR); // 600
+ }
+ TFile file(Filename_, CreateAlways | WrOnly | Seq | AWUser | ARUser);
+ file.Flock(LOCK_EX | LOCK_NB);
+ TFileOutput output(file);
+ output << PrepareData(now, data);
+ }
+ if (Logger_) {
+ s << "File '" << Filename_ << "' was successfully written";
+ Logger_->Info(s.Str());
+ }
+ return true;
+ } catch (const std::exception& e) {
+ if (Logger_) {
+ s << "Failed to write '" << Filename_ << "': " << e.what();
+ Logger_->Error(s.Str());
+ }
+ }
+ return false;
+ }
+ TString TDiskWriter::PrepareData(TInstant time, TStringBuf data) {
+ TString toHash = WriteTimestamp(time.TimeT()) + data;
+ return TDiskReader::GetHash(toHash) + toHash;
+ }
+ TString TDiskWriter::WriteTimestamp(time_t time) {
+ TString res(TIMESTAMP_SIZE, 0);
+ for (size_t idx = 0; idx < TIMESTAMP_SIZE; ++idx) {
+ res[idx] = time & 0xFF;
+ time >>= 8;
+ }
+ return res;
+ }
diff --git a/library/cpp/tvmauth/client/misc/disk_cache.h b/library/cpp/tvmauth/client/misc/disk_cache.h
new file mode 100644
index 0000000000..9e77556f86
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/disk_cache.h
@@ -0,0 +1,50 @@
+#pragma once
+#include <util/datetime/base.h>
+#include <util/generic/string.h>
+namespace NTvmAuth {
+ class ILogger;
+ class TDiskReader {
+ public:
+ TDiskReader(const TString& filename, ILogger* logger = nullptr);
+ bool Read();
+ const TString& Data() const {
+ return Data_;
+ }
+ TInstant Time() const {
+ return Time_;
+ }
+ public: // for tests
+ bool ParseData(TStringBuf buf);
+ static TString GetHash(TStringBuf data);
+ static time_t GetTimestamp(TStringBuf data);
+ private:
+ TString Filename_;
+ ILogger* Logger_;
+ TInstant Time_;
+ TString Data_;
+ };
+ class TDiskWriter {
+ public:
+ TDiskWriter(const TString& filename, ILogger* logger = nullptr);
+ bool Write(TStringBuf data, TInstant now = TInstant::Now());
+ public: // for tests
+ static TString PrepareData(TInstant time, TStringBuf data);
+ static TString WriteTimestamp(time_t time);
+ private:
+ TString Filename_;
+ ILogger* Logger_;
+ };
diff --git a/library/cpp/tvmauth/client/misc/exponential_backoff.h b/library/cpp/tvmauth/client/misc/exponential_backoff.h
new file mode 100644
index 0000000000..89a7a3c8ad
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/exponential_backoff.h
@@ -0,0 +1,94 @@
+#pragma once
+#include <util/datetime/base.h>
+#include <util/random/normal.h>
+#include <util/system/event.h>
+#include <atomic>
+namespace NTvmAuth {
+ // https://habr.com/ru/post/227225/
+ class TExponentialBackoff {
+ public:
+ struct TSettings {
+ TDuration Min;
+ TDuration Max;
+ double Factor = 1.001;
+ double Jitter = 0;
+ bool operator==(const TSettings& o) const {
+ return Min == o.Min &&
+ Max == o.Max &&
+ Factor == o.Factor &&
+ Jitter == o.Jitter;
+ }
+ };
+ TExponentialBackoff(const TSettings& settings, bool isEnabled = true)
+ : CurrentValue_(settings.Min)
+ , IsEnabled_(isEnabled)
+ {
+ UpdateSettings(settings);
+ }
+ void UpdateSettings(const TSettings& settings) {
+ Y_ENSURE(settings.Factor > 1, "factor=" << settings.Factor << ". Should be > 1");
+ Y_ENSURE(settings.Jitter >= 0 && settings.Jitter < 1, "jitter should be in range [0, 1)");
+ Min_ = settings.Min;
+ Max_ = settings.Max;
+ Factor_ = settings.Factor;
+ Jitter_ = settings.Jitter;
+ }
+ TDuration Increase() {
+ CurrentValue_ = std::min(CurrentValue_ * Factor_, Max_);
+ double rnd = StdNormalRandom<double>();
+ const bool isNegative = rnd < 0;
+ rnd = std::abs(rnd);
+ const TDuration diff = rnd * Jitter_ * CurrentValue_;
+ if (isNegative) {
+ CurrentValue_ -= diff;
+ } else {
+ CurrentValue_ += diff;
+ }
+ return CurrentValue_;
+ }
+ TDuration Decrease() {
+ CurrentValue_ = std::max(CurrentValue_ / Factor_, Min_);
+ return CurrentValue_;
+ }
+ void Sleep(TDuration add = TDuration()) {
+ if (IsEnabled_.load(std::memory_order_relaxed)) {
+ Ev_.WaitT(CurrentValue_ + add);
+ }
+ }
+ void Interrupt() {
+ Ev_.Signal();
+ }
+ TDuration GetCurrentValue() const {
+ return CurrentValue_;
+ }
+ void SetEnabled(bool val) {
+ IsEnabled_.store(val);
+ }
+ private:
+ TDuration Min_;
+ TDuration Max_;
+ double Factor_;
+ double Jitter_;
+ TDuration CurrentValue_;
+ std::atomic_bool IsEnabled_;
+ TAutoEvent Ev_;
+ };
diff --git a/library/cpp/tvmauth/client/misc/fetch_result.h b/library/cpp/tvmauth/client/misc/fetch_result.h
new file mode 100644
index 0000000000..4b0774e92f
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/fetch_result.h
@@ -0,0 +1,13 @@
+#pragma once
+#include <library/cpp/http/simple/http_client.h>
+namespace NTvmAuth::NUtils {
+ struct TFetchResult {
+ TKeepAliveHttpClient::THttpCode Code;
+ THttpHeaders Headers;
+ TStringBuf Path;
+ TString Response;
+ TString RetrySettings;
+ };
diff --git a/library/cpp/tvmauth/client/misc/getter.h b/library/cpp/tvmauth/client/misc/getter.h
new file mode 100644
index 0000000000..6c7617b418
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/getter.h
@@ -0,0 +1,49 @@
+#pragma once
+#include "checker.h"
+#include "service_tickets.h"
+namespace NTvmAuth {
+ class TServiceTicketGetter {
+ public:
+ /*!
+ * Fetching must enabled in TClientSettings
+ * Can throw exception if cache is invalid or wrong config
+ * @param dst
+ */
+ static TString GetTicket(const TClientSettings::TAlias& dst, TServiceTicketsPtr c) {
+ Y_ENSURE_EX(c, TBrokenTvmClientSettings() << "Need to use TClientSettings::EnableServiceTicketsFetchOptions()");
+ return GetTicketImpl(dst, c->TicketsByAlias, c->ErrorsByAlias, c->UnfetchedAliases);
+ }
+ static TString GetTicket(const TTvmId dst, TServiceTicketsPtr c) {
+ Y_ENSURE_EX(c, TBrokenTvmClientSettings() << "Need to use TClientSettings::EnableServiceTicketsFetchOptions()");
+ return GetTicketImpl(dst, c->TicketsById, c->ErrorsById, c->UnfetchedIds);
+ }
+ private:
+ template <class Key, class Cont, class UnfetchedCont>
+ static TString GetTicketImpl(const Key& dst, const Cont& tickets, const Cont& errors, const UnfetchedCont& unfetched) {
+ auto it = tickets.find(dst);
+ if (it != tickets.end()) {
+ return it->second;
+ }
+ it = errors.find(dst);
+ if (it != errors.end()) {
+ ythrow TMissingServiceTicket()
+ << "Failed to get ticket for '" << dst << "': "
+ << it->second;
+ }
+ if (unfetched.contains(dst)) {
+ ythrow TMissingServiceTicket()
+ << "Failed to get ticket for '" << dst << "': this dst was not fetched yet.";
+ }
+ ythrow TBrokenTvmClientSettings()
+ << "Destination '" << dst << "' was not specified in settings. "
+ << "Check your settings (if you use Qloud/YP/tvmtool - check it's settings)";
+ }
+ };
diff --git a/library/cpp/tvmauth/client/misc/last_error.cpp b/library/cpp/tvmauth/client/misc/last_error.cpp
new file mode 100644
index 0000000000..a6279bb1ef
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/last_error.cpp
@@ -0,0 +1,115 @@
+#include "last_error.h"
+#include <util/string/builder.h>
+namespace NTvmAuth {
+ TLastError::TLastError()
+ : LastErrors_(MakeIntrusiveConst<TLastErrors>())
+ {
+ }
+ TString TLastError::GetLastError(bool isOk, EType* type) const {
+ if (isOk) {
+ return OK_;
+ }
+ const TLastErrorsPtr ptr = LastErrors_.Get();
+ for (const TLastErr& err : ptr->Errors) {
+ if (err && err->first == EType::NonRetriable) {
+ if (type) {
+ *type = EType::NonRetriable;
+ }
+ return err->second;
+ }
+ }
+ for (const TLastErr& err : ptr->Errors) {
+ if (err) {
+ if (type) {
+ *type = EType::Retriable;
+ }
+ return err->second;
+ }
+ }
+ if (type) {
+ *type = EType::NonRetriable;
+ }
+ return "Internal client error: failed to collect last useful error message, please report this message to tvm-dev@yandex-team.ru";
+ }
+ TString TLastError::ProcessHttpError(TLastError::EScope scope,
+ TStringBuf path,
+ int code,
+ const TString& msg) const {
+ TString err = TStringBuilder() << "Path:" << path << ".Code=" << code << ": " << msg;
+ ProcessError(code >= 400 && code < 500 ? EType::NonRetriable
+ : EType::Retriable,
+ scope,
+ err);
+ return err;
+ }
+ void TLastError::ProcessError(TLastError::EType type, TLastError::EScope scope, const TStringBuf msg) const {
+ Update(scope, [&](TLastErr& lastError) {
+ if (lastError && lastError->first == EType::NonRetriable && type == EType::Retriable) {
+ return false;
+ }
+ TString err = TStringBuilder() << scope << ": " << msg;
+ err.erase(std::remove(err.begin(), err.vend(), '\r'), err.vend());
+ std::replace(err.begin(), err.vend(), '\n', ' ');
+ lastError = {type, std::move(err)};
+ return true;
+ });
+ }
+ void TLastError::ClearError(TLastError::EScope scope) {
+ Update(scope, [&](TLastErr& lastError) {
+ if (!lastError) {
+ return false;
+ }
+ lastError.Clear();
+ return true;
+ });
+ }
+ void TLastError::ClearErrors() {
+ for (size_t idx = 0; idx < (size_t)EScope::COUNT; ++idx) {
+ ClearError((EScope)idx);
+ }
+ }
+ void TLastError::ThrowLastError() {
+ EType type;
+ TString err = GetLastError(false, &type);
+ switch (type) {
+ case EType::NonRetriable:
+ ythrow TNonRetriableException()
+ << "Failed to start TvmClient. Do not retry: "
+ << err;
+ case EType::Retriable:
+ ythrow TRetriableException()
+ << "Failed to start TvmClient. You can retry: "
+ << err;
+ }
+ }
+ template <typename Func>
+ void TLastError::Update(TLastError::EScope scope, Func func) const {
+ Y_VERIFY(scope != EScope::COUNT);
+ TLastErrors errs = *LastErrors_.Get();
+ TLastErr& lastError = errs.Errors[(size_t)scope];
+ if (func(lastError)) {
+ LastErrors_.Set(MakeIntrusiveConst<TLastErrors>(std::move(errs)));
+ }
+ }
diff --git a/library/cpp/tvmauth/client/misc/last_error.h b/library/cpp/tvmauth/client/misc/last_error.h
new file mode 100644
index 0000000000..b0ad33611f
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/last_error.h
@@ -0,0 +1,51 @@
+#pragma once
+#include "utils.h"
+#include <array>
+namespace NTvmAuth {
+ class TLastError {
+ public:
+ enum class EType {
+ NonRetriable,
+ Retriable,
+ };
+ enum class EScope {
+ ServiceTickets,
+ PublicKeys,
+ Roles,
+ TvmtoolConfig,
+ };
+ using TLastErr = TMaybe<std::pair<EType, TString>>;
+ struct TLastErrors: public TAtomicRefCount<TLastErrors> {
+ std::array<TLastErr, (int)EScope::COUNT> Errors;
+ };
+ using TLastErrorsPtr = TIntrusiveConstPtr<TLastErrors>;
+ public:
+ TLastError();
+ TString GetLastError(bool isOk, EType* type = nullptr) const;
+ TString ProcessHttpError(EScope scope, TStringBuf path, int code, const TString& msg) const;
+ void ProcessError(EType type, EScope scope, const TStringBuf msg) const;
+ void ClearError(EScope scope);
+ void ClearErrors();
+ void ThrowLastError();
+ private:
+ template <typename Func>
+ void Update(EScope scope, Func func) const;
+ private:
+ const TString OK_ = "OK";
+ mutable NUtils::TProtectedValue<TLastErrorsPtr> LastErrors_;
+ };
diff --git a/library/cpp/tvmauth/client/misc/proc_info.cpp b/library/cpp/tvmauth/client/misc/proc_info.cpp
new file mode 100644
index 0000000000..e2e5ec15b9
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/proc_info.cpp
@@ -0,0 +1,53 @@
+#include "proc_info.h"
+#include <library/cpp/tvmauth/version.h>
+#include <library/cpp/string_utils/quote/quote.h>
+#include <util/stream/file.h>
+#include <util/string/cast.h>
+#include <util/system/getpid.h>
+namespace NTvmAuth::NUtils {
+ void TProcInfo::AddToRequest(IOutputStream& out) const {
+ out << "_pid=" << Pid;
+ if (ProcessName) {
+ out << "&_procces_name=" << *ProcessName;
+ }
+ out << "&lib_version=client_" << VersionPrefix << LibVersion();
+ }
+ TProcInfo TProcInfo::Create(const TString& versionPrefix) {
+ TProcInfo res;
+ res.Pid = IntToString<10>(GetPID());
+ res.ProcessName = GetProcessName();
+ res.VersionPrefix = versionPrefix;
+ return res;
+ }
+ std::optional<TString> TProcInfo::GetProcessName() {
+ try {
+ // works only for linux
+ TFileInput proc("/proc/self/status");
+ TString line;
+ while (proc.ReadLine(line)) {
+ TStringBuf buf(line);
+ if (!buf.SkipPrefix("Name:")) {
+ continue;
+ }
+ while (buf && isspace(buf.front())) {
+ buf.Skip(1);
+ }
+ TString res(buf);
+ CGIEscape(res);
+ return res;
+ }
+ } catch (...) {
+ }
+ return {};
+ }
diff --git a/library/cpp/tvmauth/client/misc/proc_info.h b/library/cpp/tvmauth/client/misc/proc_info.h
new file mode 100644
index 0000000000..b1526e5c47
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/proc_info.h
@@ -0,0 +1,18 @@
+#pragma once
+#include <util/generic/string.h>
+#include <optional>
+namespace NTvmAuth::NUtils {
+ struct TProcInfo {
+ TString Pid;
+ std::optional<TString> ProcessName;
+ TString VersionPrefix;
+ void AddToRequest(IOutputStream& out) const;
+ static TProcInfo Create(const TString& versionPrefix);
+ static std::optional<TString> GetProcessName();
+ };
diff --git a/library/cpp/tvmauth/client/misc/retry_settings/v1/settings.proto b/library/cpp/tvmauth/client/misc/retry_settings/v1/settings.proto
new file mode 100644
index 0000000000..72817847a6
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/retry_settings/v1/settings.proto
@@ -0,0 +1,21 @@
+syntax = "proto2";
+package retry_settings.v1;
+option cc_enable_arenas = true;
+option go_package = "a.yandex-team.ru/library/cpp/tvmauth/client/misc/retry_settings/v1";
+message Settings {
+ optional uint32 exponential_backoff_min_sec = 1;
+ optional uint32 exponential_backoff_max_sec = 2;
+ optional double exponential_backoff_factor = 3;
+ optional double exponential_backoff_jitter = 4;
+ optional uint32 max_random_sleep_default = 5;
+ optional uint32 max_random_sleep_when_ok = 12;
+ optional uint32 retries_on_start = 6;
+ optional uint32 worker_awaking_period_sec = 7;
+ optional uint32 dsts_limit = 8;
+ optional uint32 retries_in_background = 9;
+ optional uint32 roles_update_period_sec = 10;
+ optional uint32 roles_warn_period_sec = 11;
diff --git a/library/cpp/tvmauth/client/misc/roles/decoder.cpp b/library/cpp/tvmauth/client/misc/roles/decoder.cpp
new file mode 100644
index 0000000000..6337fb91c2
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/roles/decoder.cpp
@@ -0,0 +1,93 @@
+#include "decoder.h"
+#include <library/cpp/tvmauth/client/misc/utils.h>
+#include <library/cpp/openssl/crypto/sha.h>
+#include <library/cpp/streams/brotli/brotli.h>
+#include <library/cpp/streams/zstd/zstd.h>
+#include <util/generic/yexception.h>
+#include <util/stream/zlib.h>
+#include <util/string/ascii.h>
+namespace NTvmAuth::NRoles {
+ TString TDecoder::Decode(const TStringBuf codec, TString&& blob) {
+ if (codec.empty()) {
+ return std::move(blob);
+ }
+ const TCodecInfo info = ParseCodec(codec);
+ TString decoded = DecodeImpl(info.Type, blob);
+ VerifySize(decoded, info.Size);
+ VerifyChecksum(decoded, info.Sha256);
+ return decoded;
+ }
+ TDecoder::TCodecInfo TDecoder::ParseCodec(TStringBuf codec) {
+ const char delim = ':';
+ const TStringBuf version = codec.NextTok(delim);
+ Y_ENSURE(version == "1",
+ "unknown codec format version; known: 1; got: " << version);
+ TCodecInfo res;
+ res.Type = codec.NextTok(delim);
+ Y_ENSURE(res.Type, "codec type is empty");
+ const TStringBuf size = codec.NextTok(delim);
+ Y_ENSURE(TryIntFromString<10>(size, res.Size),
+ "decoded blob size is not number");
+ res.Sha256 = codec;
+ const size_t expectedSha256Size = 2 * NOpenSsl::NSha256::DIGEST_LENGTH;
+ Y_ENSURE(res.Sha256.size() == expectedSha256Size,
+ "sha256 of decoded blob has invalid length: expected "
+ << expectedSha256Size << ", got " << res.Sha256.size());
+ return res;
+ }
+ TString TDecoder::DecodeImpl(TStringBuf codec, const TString& blob) {
+ if (AsciiEqualsIgnoreCase(codec, "brotli")) {
+ return DecodeBrolti(blob);
+ } else if (AsciiEqualsIgnoreCase(codec, "gzip")) {
+ return DecodeGzip(blob);
+ } else if (AsciiEqualsIgnoreCase(codec, "zstd")) {
+ return DecodeZstd(blob);
+ }
+ ythrow yexception() << "unknown codec: '" << codec << "'";
+ }
+ TString TDecoder::DecodeBrolti(const TString& blob) {
+ TStringInput in(blob);
+ return TBrotliDecompress(&in).ReadAll();
+ }
+ TString TDecoder::DecodeGzip(const TString& blob) {
+ TStringInput in(blob);
+ return TZLibDecompress(&in).ReadAll();
+ }
+ TString TDecoder::DecodeZstd(const TString& blob) {
+ TStringInput in(blob);
+ return TZstdDecompress(&in).ReadAll();
+ }
+ void TDecoder::VerifySize(const TStringBuf decoded, size_t expected) {
+ Y_ENSURE(expected == decoded.size(),
+ "Decoded blob has bad size: expected " << expected << ", actual " << decoded.size());
+ }
+ void TDecoder::VerifyChecksum(const TStringBuf decoded, const TStringBuf expected) {
+ using namespace NOpenSsl::NSha256;
+ const TDigest dig = Calc(decoded);
+ const TString actual = NUtils::ToHex(TStringBuf((char*)dig.data(), dig.size()));
+ Y_ENSURE(AsciiEqualsIgnoreCase(actual, expected),
+ "Decoded blob has bad sha256: expected=" << expected << ", actual=" << actual);
+ }
diff --git a/library/cpp/tvmauth/client/misc/roles/decoder.h b/library/cpp/tvmauth/client/misc/roles/decoder.h
new file mode 100644
index 0000000000..de5cdb37e0
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/roles/decoder.h
@@ -0,0 +1,32 @@
+#pragma once
+#include <util/generic/string.h>
+namespace NTvmAuth::NRoles {
+ class TDecoder {
+ public:
+ static TString Decode(const TStringBuf codec, TString&& blob);
+ public:
+ struct TCodecInfo {
+ TStringBuf Type;
+ size_t Size = 0;
+ TStringBuf Sha256;
+ bool operator==(const TCodecInfo& o) const {
+ return Type == o.Type &&
+ Size == o.Size &&
+ Sha256 == o.Sha256;
+ }
+ };
+ static TCodecInfo ParseCodec(TStringBuf codec);
+ static TString DecodeImpl(TStringBuf codec, const TString& blob);
+ static TString DecodeBrolti(const TString& blob);
+ static TString DecodeGzip(const TString& blob);
+ static TString DecodeZstd(const TString& blob);
+ static void VerifySize(const TStringBuf decoded, size_t expected);
+ static void VerifyChecksum(const TStringBuf decoded, const TStringBuf expected);
+ };
diff --git a/library/cpp/tvmauth/client/misc/roles/entities_index.cpp b/library/cpp/tvmauth/client/misc/roles/entities_index.cpp
new file mode 100644
index 0000000000..c9b72c3a17
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/roles/entities_index.cpp
@@ -0,0 +1,114 @@
+#include "entities_index.h"
+#include <util/stream/str.h>
+#include <set>
+namespace NTvmAuth::NRoles {
+ TEntitiesIndex::TStage::TStage(const std::set<TString>& k)
+ : Keys_(k.begin(), k.end())
+ {
+ }
+ // TODO TStringBuf
+ bool TEntitiesIndex::TStage::GetNextKeySet(std::vector<TString>& out) {
+ out.clear();
+ out.reserve(Keys_.size());
+ ++Id_;
+ for (size_t idx = 0; idx < Keys_.size(); ++idx) {
+ bool need = (Id_ >> idx) & 0x01;
+ if (need) {
+ out.push_back(Keys_[idx]);
+ }
+ }
+ return !out.empty();
+ }
+ TEntitiesIndex::TEntitiesIndex(const std::vector<TEntityPtr>& entities) {
+ const std::set<TString> uniqueKeys = GetUniqueSortedKeys(entities);
+ Idx_.Entities = entities;
+ Idx_.SubTree.reserve(uniqueKeys.size() * entities.size());
+ TStage stage(uniqueKeys);
+ std::vector<TString> keyset;
+ while (stage.GetNextKeySet(keyset)) {
+ for (const TEntityPtr& e : entities) {
+ TSubTree* currentBranch = &Idx_;
+ for (const TString& key : keyset) {
+ auto it = e->find(key);
+ if (it == e->end()) {
+ continue;
+ }
+ auto [i, ok] = currentBranch->SubTree.emplace(
+ TKeyValue{it->first, it->second},
+ TSubTree());
+ currentBranch = &i->second;
+ currentBranch->Entities.push_back(e);
+ }
+ }
+ }
+ MakeUnique(Idx_);
+ }
+ std::set<TString> TEntitiesIndex::GetUniqueSortedKeys(const std::vector<TEntityPtr>& entities) {
+ std::set<TString> res;
+ for (const TEntityPtr& e : entities) {
+ for (const auto& [key, value] : *e) {
+ res.insert(key);
+ }
+ }
+ return res;
+ }
+ void TEntitiesIndex::MakeUnique(TSubTree& branch) {
+ auto& vec = branch.Entities;
+ std::sort(vec.begin(), vec.end());
+ vec.erase(std::unique(vec.begin(), vec.end()), vec.end());
+ for (auto& [_, restPart] : branch.SubTree) {
+ MakeUnique(restPart);
+ }
+ }
+ static void Print(const TEntitiesIndex::TSubTree& part, IOutputStream& out, size_t offset = 0) {
+ std::vector<std::pair<TKeyValue, const TEntitiesIndex::TSubTree*>> vec;
+ vec.reserve(part.SubTree.size());
+ for (const auto& [key, value] : part.SubTree) {
+ vec.push_back({key, &value});
+ }
+ std::sort(vec.begin(), vec.end(), [](const auto& l, const auto& r) {
+ if (l.first.Key < r.first.Key) {
+ return true;
+ }
+ if (l.first.Value < r.first.Value) {
+ return true;
+ }
+ return false;
+ });
+ for (const auto& [key, value] : vec) {
+ out << TString(offset, ' ') << "\"" << key.Key << "/" << key.Value << "\"" << Endl;
+ Print(*value, out, offset + 4);
+ }
+ }
+ TString TEntitiesIndex::PrintDebugString() const {
+ TStringStream res;
+ res << Endl;
+ Print(Idx_, res);
+ return res.Str();
+ }
diff --git a/library/cpp/tvmauth/client/misc/roles/entities_index.h b/library/cpp/tvmauth/client/misc/roles/entities_index.h
new file mode 100644
index 0000000000..bf42750d52
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/roles/entities_index.h
@@ -0,0 +1,107 @@
+#pragma once
+#include "types.h"
+#include <library/cpp/tvmauth/client/exception.h>
+#include <set>
+#include <vector>
+namespace NTvmAuth::NRoles {
+ class TEntitiesIndex: TMoveOnly {
+ public:
+ struct TSubTree;
+ using TIdxByAttrs = THashMap<TKeyValue, TSubTree>;
+ struct TSubTree {
+ std::vector<TEntityPtr> Entities;
+ TIdxByAttrs SubTree;
+ };
+ class TStage {
+ public:
+ TStage(const std::set<TString>& k);
+ bool GetNextKeySet(std::vector<TString>& out);
+ private:
+ std::vector<TString> Keys_;
+ size_t Id_ = 0;
+ };
+ public:
+ TEntitiesIndex(const std::vector<TEntityPtr>& entities);
+ /**
+ * Iterators must be to sorted unique key/value
+ */
+ template <typename Iterator>
+ bool ContainsExactEntity(Iterator begin, Iterator end) const;
+ /**
+ * Iterators must be to sorted unique key/value
+ */
+ template <typename Iterator>
+ const std::vector<TEntityPtr>& GetEntitiesWithAttrs(Iterator begin, Iterator end) const;
+ public: // for tests
+ static std::set<TString> GetUniqueSortedKeys(const std::vector<TEntityPtr>& entities);
+ static void MakeUnique(TEntitiesIndex::TSubTree& branch);
+ TString PrintDebugString() const;
+ private:
+ template <typename Iterator>
+ const TSubTree* FindSubtree(Iterator begin, Iterator end, size_t& size) const;
+ private:
+ TSubTree Idx_;
+ std::vector<TEntityPtr> EmptyResult_;
+ };
+ template <typename Iterator>
+ bool TEntitiesIndex::ContainsExactEntity(Iterator begin, Iterator end) const {
+ size_t size = 0;
+ const TSubTree* subtree = FindSubtree(begin, end, size);
+ if (!subtree) {
+ return false;
+ }
+ auto res = std::find_if(
+ subtree->Entities.begin(),
+ subtree->Entities.end(),
+ [size](const auto& e) { return size == e->size(); });
+ return res != subtree->Entities.end();
+ }
+ template <typename Iterator>
+ const std::vector<TEntityPtr>& TEntitiesIndex::GetEntitiesWithAttrs(Iterator begin, Iterator end) const {
+ size_t size = 0;
+ const TSubTree* subtree = FindSubtree(begin, end, size);
+ if (!subtree) {
+ return EmptyResult_;
+ }
+ return subtree->Entities;
+ }
+ template <typename Iterator>
+ const TEntitiesIndex::TSubTree* TEntitiesIndex::FindSubtree(Iterator begin,
+ Iterator end,
+ size_t& size) const {
+ const TSubTree* subtree = &Idx_;
+ size = 0;
+ for (auto attr = begin; attr != end; ++attr) {
+ auto it = subtree->SubTree.find(TKeyValueView{attr->first, attr->second});
+ if (it == subtree->SubTree.end()) {
+ return nullptr;
+ }
+ ++size;
+ subtree = &it->second;
+ }
+ return subtree;
+ }
diff --git a/library/cpp/tvmauth/client/misc/roles/parser.cpp b/library/cpp/tvmauth/client/misc/roles/parser.cpp
new file mode 100644
index 0000000000..28faf4c057
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/roles/parser.cpp
@@ -0,0 +1,149 @@
+#include "parser.h"
+#include <library/cpp/json/json_reader.h>
+#include <util/string/cast.h>
+namespace NTvmAuth::NRoles {
+ static void GetRequiredValue(const NJson::TJsonValue& doc,
+ TStringBuf key,
+ NJson::TJsonValue& obj) {
+ Y_ENSURE(doc.GetValue(key, &obj), "Missing '" << key << "'");
+ }
+ static ui64 GetRequiredUInt(const NJson::TJsonValue& doc,
+ TStringBuf key) {
+ NJson::TJsonValue obj;
+ GetRequiredValue(doc, key, obj);
+ Y_ENSURE(obj.IsUInteger(), "key '" << key << "' must be uint");
+ return obj.GetUInteger();
+ }
+ static bool GetOptionalMap(const NJson::TJsonValue& doc,
+ TStringBuf key,
+ NJson::TJsonValue& obj) {
+ if (!doc.GetValue(key, &obj)) {
+ return false;
+ }
+ Y_ENSURE(obj.IsMap(), "'" << key << "' must be object");
+ return true;
+ }
+ TRolesPtr TParser::Parse(TRawPtr decodedBlob) {
+ try {
+ return ParseImpl(decodedBlob);
+ } catch (const std::exception& e) {
+ throw yexception() << "Failed to parse roles from tirole: " << e.what()
+ << ". '" << *decodedBlob << "'";
+ }
+ }
+ TRolesPtr TParser::ParseImpl(TRawPtr decodedBlob) {
+ NJson::TJsonValue doc;
+ Y_ENSURE(NJson::ReadJsonTree(*decodedBlob, &doc), "Invalid json");
+ Y_ENSURE(doc.IsMap(), "Json must be object");
+ TRoles::TTvmConsumers tvm = GetConsumers<TTvmId>(doc, "tvm");
+ TRoles::TUserConsumers user = GetConsumers<TUid>(doc, "user");
+ // fetch it last to provide more correct apply instant
+ TRoles::TMeta meta = GetMeta(doc);
+ return std::make_shared<TRoles>(
+ std::move(meta),
+ std::move(tvm),
+ std::move(user),
+ std::move(decodedBlob));
+ }
+ TRoles::TMeta TParser::GetMeta(const NJson::TJsonValue& doc) {
+ TRoles::TMeta res;
+ NJson::TJsonValue obj;
+ GetRequiredValue(doc, "revision", obj);
+ if (obj.IsString()) {
+ res.Revision = obj.GetString();
+ } else if (obj.IsUInteger()) {
+ res.Revision = ToString(obj.GetUInteger());
+ } else {
+ ythrow yexception() << "'revision' has unexpected type: " << obj.GetType();
+ }
+ res.BornTime = TInstant::Seconds(GetRequiredUInt(doc, "born_date"));
+ return res;
+ }
+ template <typename Id>
+ THashMap<Id, TConsumerRolesPtr> TParser::GetConsumers(const NJson::TJsonValue& doc,
+ TStringBuf type) {
+ THashMap<Id, TConsumerRolesPtr> res;
+ NJson::TJsonValue obj;
+ if (!GetOptionalMap(doc, type, obj)) {
+ return res;
+ }
+ for (const auto& [key, value] : obj.GetMap()) {
+ Y_ENSURE(value.IsMap(),
+ "roles for consumer must be map: '" << key << "' is " << value.GetType());
+ Id id = 0;
+ Y_ENSURE(TryIntFromString<10>(key, id),
+ "id must be valid positive number of proper size for "
+ << type << ". got '"
+ << key << "'");
+ Y_ENSURE(res.emplace(id, GetConsumer(value, key)).second,
+ "consumer duplicate detected: '" << key << "' for " << type);
+ }
+ return res;
+ }
+ TConsumerRolesPtr TParser::GetConsumer(const NJson::TJsonValue& obj, TStringBuf consumer) {
+ TEntitiesByRoles entities;
+ for (const auto& [key, value] : obj.GetMap()) {
+ Y_ENSURE(value.IsArray(),
+ "entities for roles must be array: '" << key << "' is " << value.GetType());
+ entities.emplace(key, GetEntities(value, consumer, key));
+ }
+ return std::make_shared<TConsumerRoles>(std::move(entities));
+ }
+ TEntitiesPtr TParser::GetEntities(const NJson::TJsonValue& obj,
+ TStringBuf consumer,
+ TStringBuf role) {
+ std::vector<TEntityPtr> entities;
+ entities.reserve(obj.GetArray().size());
+ for (const NJson::TJsonValue& e : obj.GetArray()) {
+ Y_ENSURE(e.IsMap(),
+ "role entity for role must be map: consumer '"
+ << consumer << "' with role '" << role << "' has " << e.GetType());
+ entities.push_back(GetEntity(e, consumer, role));
+ }
+ return std::make_shared<TEntities>(TEntities(entities));
+ }
+ TEntityPtr TParser::GetEntity(const NJson::TJsonValue& obj, TStringBuf consumer, TStringBuf role) {
+ TEntityPtr res = std::make_shared<TEntity>();
+ for (const auto& [key, value] : obj.GetMap()) {
+ Y_ENSURE(value.IsString(),
+ "entity is map (str->str), got value "
+ << value.GetType() << ". consumer '"
+ << consumer << "' with role '" << role << "'");
+ res->emplace(key, value.GetString());
+ }
+ return res;
+ }
diff --git a/library/cpp/tvmauth/client/misc/roles/parser.h b/library/cpp/tvmauth/client/misc/roles/parser.h
new file mode 100644
index 0000000000..0982ba78c6
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/roles/parser.h
@@ -0,0 +1,36 @@
+#pragma once
+#include "roles.h"
+#include "types.h"
+namespace NJson {
+ class TJsonValue;
+namespace NTvmAuth::NRoles {
+ class TParser {
+ public:
+ static TRolesPtr Parse(TRawPtr decodedBlob);
+ public:
+ static TRolesPtr ParseImpl(TRawPtr decodedBlob);
+ static TRoles::TMeta GetMeta(const NJson::TJsonValue& doc);
+ template <typename Id>
+ static THashMap<Id, TConsumerRolesPtr> GetConsumers(
+ const NJson::TJsonValue& doc,
+ TStringBuf key);
+ static TConsumerRolesPtr GetConsumer(
+ const NJson::TJsonValue& obj,
+ TStringBuf consumer);
+ static TEntitiesPtr GetEntities(
+ const NJson::TJsonValue& obj,
+ TStringBuf consumer,
+ TStringBuf role);
+ static TEntityPtr GetEntity(
+ const NJson::TJsonValue& obj,
+ TStringBuf consumer,
+ TStringBuf role);
+ };
diff --git a/library/cpp/tvmauth/client/misc/roles/roles.cpp b/library/cpp/tvmauth/client/misc/roles/roles.cpp
new file mode 100644
index 0000000000..0761033104
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/roles/roles.cpp
@@ -0,0 +1,101 @@
+#include "roles.h"
+#include <library/cpp/tvmauth/checked_service_ticket.h>
+#include <library/cpp/tvmauth/checked_user_ticket.h>
+namespace NTvmAuth::NRoles {
+ TRoles::TRoles(TMeta&& meta,
+ TTvmConsumers tvm,
+ TUserConsumers user,
+ TRawPtr raw)
+ : Meta_(std::move(meta))
+ , TvmIds_(std::move(tvm))
+ , Users_(std::move(user))
+ , Raw_(std::move(raw))
+ {
+ Y_ENSURE(Raw_);
+ }
+ TConsumerRolesPtr TRoles::GetRolesForService(const TCheckedServiceTicket& t) const {
+ TIllegalUsage() << "Service ticket must be valid, got: " << t.GetStatus());
+ auto it = TvmIds_.find(t.GetSrc());
+ return it == TvmIds_.end() ? TConsumerRolesPtr() : it->second;
+ }
+ TConsumerRolesPtr TRoles::GetRolesForUser(const TCheckedUserTicket& t,
+ std::optional<TUid> selectedUid) const {
+ TIllegalUsage() << "User ticket must be valid, got: " << t.GetStatus());
+ Y_ENSURE_EX(t.GetEnv() == EBlackboxEnv::ProdYateam,
+ TIllegalUsage() << "User ticket must be from ProdYateam, got from " << t.GetEnv());
+ TUid uid = t.GetDefaultUid();
+ if (selectedUid) {
+ auto it = std::find(t.GetUids().begin(), t.GetUids().end(), *selectedUid);
+ Y_ENSURE_EX(it != t.GetUids().end(),
+ TIllegalUsage() << "selectedUid must be in user ticket but it's not: "
+ << *selectedUid);
+ uid = *selectedUid;
+ }
+ auto it = Users_.find(uid);
+ return it == Users_.end() ? TConsumerRolesPtr() : it->second;
+ }
+ const TRoles::TMeta& TRoles::GetMeta() const {
+ return Meta_;
+ }
+ const TString& TRoles::GetRaw() const {
+ return *Raw_;
+ }
+ bool TRoles::CheckServiceRole(const TCheckedServiceTicket& t,
+ const TStringBuf roleName) const {
+ TConsumerRolesPtr c = GetRolesForService(t);
+ return c ? c->HasRole(roleName) : false;
+ }
+ bool TRoles::CheckUserRole(const TCheckedUserTicket& t,
+ const TStringBuf roleName,
+ std::optional<TUid> selectedUid) const {
+ TConsumerRolesPtr c = GetRolesForUser(t, selectedUid);
+ return c ? c->HasRole(roleName) : false;
+ }
+ bool TRoles::CheckServiceRoleForExactEntity(const TCheckedServiceTicket& t,
+ const TStringBuf roleName,
+ const TEntity& exactEntity) const {
+ TConsumerRolesPtr c = GetRolesForService(t);
+ return c ? c->CheckRoleForExactEntity(roleName, exactEntity) : false;
+ }
+ bool TRoles::CheckUserRoleForExactEntity(const TCheckedUserTicket& t,
+ const TStringBuf roleName,
+ const TEntity& exactEntity,
+ std::optional<TUid> selectedUid) const {
+ TConsumerRolesPtr c = GetRolesForUser(t, selectedUid);
+ return c ? c->CheckRoleForExactEntity(roleName, exactEntity) : false;
+ }
+ TConsumerRoles::TConsumerRoles(TEntitiesByRoles roles)
+ : Roles_(std::move(roles))
+ {
+ }
+ bool TConsumerRoles::CheckRoleForExactEntity(const TStringBuf roleName,
+ const TEntity& exactEntity) const {
+ auto it = Roles_.find(roleName);
+ if (it == Roles_.end()) {
+ return false;
+ }
+ return it->second->Contains(exactEntity);
+ }
+ TEntities::TEntities(TEntitiesIndex idx)
+ : Idx_(std::move(idx))
+ {
+ }
diff --git a/library/cpp/tvmauth/client/misc/roles/roles.h b/library/cpp/tvmauth/client/misc/roles/roles.h
new file mode 100644
index 0000000000..6d510ee8a1
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/roles/roles.h
@@ -0,0 +1,186 @@
+#pragma once
+#include "entities_index.h"
+#include "types.h"
+#include <library/cpp/tvmauth/client/exception.h>
+#include <library/cpp/tvmauth/type.h>
+#include <util/datetime/base.h>
+#include <util/generic/array_ref.h>
+#include <util/generic/hash.h>
+#include <vector>
+namespace NTvmAuth {
+ class TCheckedServiceTicket;
+ class TCheckedUserTicket;
+namespace NTvmAuth::NRoles {
+ class TRoles {
+ public:
+ struct TMeta {
+ TString Revision;
+ TInstant BornTime;
+ TInstant Applied = TInstant::Now();
+ };
+ using TTvmConsumers = THashMap<TTvmId, TConsumerRolesPtr>;
+ using TUserConsumers = THashMap<TUid, TConsumerRolesPtr>;
+ TRoles(TMeta&& meta,
+ TTvmConsumers tvm,
+ TUserConsumers user,
+ TRawPtr raw);
+ /**
+ * @return ptr to roles. It will be nullptr if there are no roles
+ */
+ TConsumerRolesPtr GetRolesForService(const TCheckedServiceTicket& t) const;
+ /**
+ * @return ptr to roles. It will be nullptr if there are no roles
+ */
+ TConsumerRolesPtr GetRolesForUser(const TCheckedUserTicket& t,
+ std::optional<TUid> selectedUid = {}) const;
+ const TMeta& GetMeta() const;
+ const TString& GetRaw() const;
+ public: // shortcuts
+ /**
+ * @brief CheckServiceRole() is shortcut for simple role checking - for any possible entity
+ */
+ bool CheckServiceRole(
+ const TCheckedServiceTicket& t,
+ const TStringBuf roleName) const;
+ /**
+ * @brief CheckUserRole() is shortcut for simple role checking - for any possible entity
+ */
+ bool CheckUserRole(
+ const TCheckedUserTicket& t,
+ const TStringBuf roleName,
+ std::optional<TUid> selectedUid = {}) const;
+ /**
+ * @brief CheckServiceRoleForExactEntity() is shortcut for simple role checking for exact entity
+ */
+ bool CheckServiceRoleForExactEntity(
+ const TCheckedServiceTicket& t,
+ const TStringBuf roleName,
+ const TEntity& exactEntity) const;
+ /**
+ * @brief CheckUserRoleForExactEntity() is shortcut for simple role checking for exact entity
+ */
+ bool CheckUserRoleForExactEntity(
+ const TCheckedUserTicket& t,
+ const TStringBuf roleName,
+ const TEntity& exactEntity,
+ std::optional<TUid> selectedUid = {}) const;
+ private:
+ TMeta Meta_;
+ TTvmConsumers TvmIds_;
+ TUserConsumers Users_;
+ TRawPtr Raw_;
+ };
+ class TConsumerRoles {
+ public:
+ TConsumerRoles(TEntitiesByRoles roles);
+ bool HasRole(const TStringBuf roleName) const {
+ return Roles_.contains(roleName);
+ }
+ const TEntitiesByRoles& GetRoles() const {
+ return Roles_;
+ }
+ /**
+ * @return ptr to entries. It will be nullptr if there is no role
+ */
+ TEntitiesPtr GetEntitiesForRole(const TStringBuf roleName) const {
+ auto it = Roles_.find(roleName);
+ return it == Roles_.end() ? TEntitiesPtr() : it->second;
+ }
+ /**
+ * @brief CheckRoleForExactEntity() is shortcut for simple role checking for exact entity
+ */
+ bool CheckRoleForExactEntity(const TStringBuf roleName,
+ const TEntity& exactEntity) const;
+ private:
+ TEntitiesByRoles Roles_;
+ };
+ class TEntities {
+ public:
+ TEntities(TEntitiesIndex idx);
+ /**
+ * @brief Contains() provides info about entity presence
+ */
+ bool Contains(const TEntity& exactEntity) const {
+ return Idx_.ContainsExactEntity(exactEntity.begin(), exactEntity.end());
+ }
+ /**
+ * @brief The same as Contains()
+ * It checks span for sorted and unique properties.
+ */
+ template <class StrKey = TString, class StrValue = TString>
+ bool ContainsSortedUnique(
+ const TArrayRef<const std::pair<StrKey, StrValue>>& exactEntity) const {
+ CheckSpan(exactEntity);
+ return Idx_.ContainsExactEntity(exactEntity.begin(), exactEntity.end());
+ }
+ /**
+ * @brief GetEntitiesWithAttrs() collects entities with ALL attributes from `attrs`
+ */
+ template <class StrKey = TString, class StrValue = TString>
+ const std::vector<TEntityPtr>& GetEntitiesWithAttrs(
+ const std::map<StrKey, StrValue>& attrs) const {
+ return Idx_.GetEntitiesWithAttrs(attrs.begin(), attrs.end());
+ }
+ /**
+ * @brief The same as GetEntitiesWithAttrs()
+ * It checks span for sorted and unique properties.
+ */
+ template <class StrKey = TString, class StrValue = TString>
+ const std::vector<TEntityPtr>& GetEntitiesWithSortedUniqueAttrs(
+ const TArrayRef<const std::pair<StrKey, StrValue>>& attrs) const {
+ CheckSpan(attrs);
+ return Idx_.GetEntitiesWithAttrs(attrs.begin(), attrs.end());
+ }
+ private:
+ template <class StrKey, class StrValue>
+ static void CheckSpan(const TArrayRef<const std::pair<StrKey, StrValue>>& attrs) {
+ if (attrs.empty()) {
+ return;
+ }
+ auto prev = attrs.begin();
+ for (auto it = prev + 1; it != attrs.end(); ++it) {
+ Y_ENSURE_EX(prev->first != it->first,
+ TIllegalUsage() << "attrs are not unique: '" << it->first << "'");
+ Y_ENSURE_EX(prev->first < it->first,
+ TIllegalUsage() << "attrs are not sorted: '" << prev->first
+ << "' before '" << it->first << "'");
+ prev = it;
+ }
+ }
+ private:
+ TEntitiesIndex Idx_;
+ };
diff --git a/library/cpp/tvmauth/client/misc/roles/types.h b/library/cpp/tvmauth/client/misc/roles/types.h
new file mode 100644
index 0000000000..de0745e72e
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/roles/types.h
@@ -0,0 +1,70 @@
+#pragma once
+#include <util/generic/hash_set.h>
+#include <map>
+#include <memory>
+namespace NTvmAuth::NRoles {
+ using TEntity = std::map<TString, TString>;
+ using TEntityPtr = std::shared_ptr<TEntity>;
+ class TEntities;
+ using TEntitiesPtr = std::shared_ptr<TEntities>;
+ using TEntitiesByRoles = THashMap<TString, TEntitiesPtr>;
+ class TConsumerRoles;
+ using TConsumerRolesPtr = std::shared_ptr<TConsumerRoles>;
+ class TRoles;
+ using TRolesPtr = std::shared_ptr<TRoles>;
+ using TRawPtr = std::shared_ptr<TString>;
+ template <class T>
+ struct TKeyValueBase {
+ T Key;
+ T Value;
+ template <typename U>
+ bool operator==(const TKeyValueBase<U>& o) const {
+ return Key == o.Key && Value == o.Value;
+ }
+ };
+ using TKeyValue = TKeyValueBase<TString>;
+ using TKeyValueView = TKeyValueBase<TStringBuf>;
+// Traits
+template <>
+struct THash<NTvmAuth::NRoles::TKeyValue> {
+ std::size_t operator()(const NTvmAuth::NRoles::TKeyValue& e) const {
+ return std::hash<std::string_view>()(e.Key) + std::hash<std::string_view>()(e.Value);
+ }
+ std::size_t operator()(const NTvmAuth::NRoles::TKeyValueView& e) const {
+ return std::hash<std::string_view>()(e.Key) + std::hash<std::string_view>()(e.Value);
+ }
+template <>
+struct TEqualTo<NTvmAuth::NRoles::TKeyValue> {
+ using is_transparent = std::true_type;
+ template <typename T, typename U>
+ bool operator()(const NTvmAuth::NRoles::TKeyValueBase<T>& l,
+ const NTvmAuth::NRoles::TKeyValueBase<U>& r) {
+ return l == r;
+ }
+inline bool operator<(const NTvmAuth::NRoles::TEntityPtr& l, const NTvmAuth::NRoles::TEntityPtr& r) {
+ return *l < *r;
+inline bool operator==(const NTvmAuth::NRoles::TEntityPtr& l, const NTvmAuth::NRoles::TEntityPtr& r) {
+ return *l == *r;
diff --git a/library/cpp/tvmauth/client/misc/service_tickets.h b/library/cpp/tvmauth/client/misc/service_tickets.h
new file mode 100644
index 0000000000..6a24bd5689
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/service_tickets.h
@@ -0,0 +1,86 @@
+#pragma once
+#include "settings.h"
+#include "roles/roles.h"
+#include <library/cpp/tvmauth/src/utils.h>
+#include <util/datetime/base.h>
+#include <util/generic/hash.h>
+#include <util/generic/maybe.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+namespace NTvmAuth::NInternal {
+ class TClientCaningKnife;
+namespace NTvmAuth {
+ class TServiceTickets: public TAtomicRefCount<TServiceTickets> {
+ public:
+ using TMapAliasStr = THashMap<TClientSettings::TAlias, TString>;
+ using TMapIdStr = THashMap<TTvmId, TString>;
+ using TIdSet = THashSet<TTvmId>;
+ using TAliasSet = THashSet<TClientSettings::TAlias>;
+ using TMapAliasId = THashMap<TClientSettings::TAlias, TTvmId>;
+ TServiceTickets(TMapIdStr&& tickets, TMapIdStr&& errors, const TMapAliasId& dstMap)
+ : TicketsById(std::move(tickets))
+ , ErrorsById(std::move(errors))
+ {
+ InitAliasesAndUnfetchedIds(dstMap);
+ InitInvalidationTime();
+ }
+ static TInstant GetInvalidationTime(const TMapIdStr& ticketsById) {
+ TInstant res;
+ for (const auto& pair : ticketsById) {
+ TMaybe<TInstant> t = NTvmAuth::NInternal::TCanningKnife::GetExpirationTime(pair.second);
+ if (!t) {
+ continue;
+ }
+ res = res == TInstant() ? *t : std::min(res, *t);
+ }
+ return res;
+ }
+ public:
+ TMapIdStr TicketsById;
+ TMapIdStr ErrorsById;
+ TMapAliasStr TicketsByAlias;
+ TMapAliasStr ErrorsByAlias;
+ TInstant InvalidationTime;
+ TIdSet UnfetchedIds;
+ TAliasSet UnfetchedAliases;
+ private:
+ void InitAliasesAndUnfetchedIds(const TMapAliasId& dstMap) {
+ for (const auto& pair : dstMap) {
+ auto it = TicketsById.find(pair.second);
+ auto errIt = ErrorsById.find(pair.second);
+ if (it == TicketsById.end()) {
+ if (errIt != ErrorsById.end()) {
+ Y_ENSURE(ErrorsByAlias.insert({pair.first, errIt->second}).second,
+ "failed to add: " << pair.first);
+ } else {
+ UnfetchedAliases.insert(pair.first);
+ UnfetchedIds.insert(pair.second);
+ }
+ } else {
+ Y_ENSURE(TicketsByAlias.insert({pair.first, it->second}).second,
+ "failed to add: " << pair.first);
+ }
+ }
+ }
+ void InitInvalidationTime() {
+ InvalidationTime = GetInvalidationTime(TicketsById);
+ }
+ };
+ using TServiceTicketsPtr = TIntrusiveConstPtr<TServiceTickets>;
diff --git a/library/cpp/tvmauth/client/misc/settings.h b/library/cpp/tvmauth/client/misc/settings.h
new file mode 100644
index 0000000000..8fae6c34d3
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/settings.h
@@ -0,0 +1,13 @@
+#pragma once
+#include <util/generic/fwd.h>
+namespace NTvmAuth {
+ class TClientSettings {
+ public:
+ /*!
+ * Look at description in relevant settings: NTvmApi::TClientSettings or NTvmTool::TClientSettings
+ */
+ using TAlias = TString;
+ };
diff --git a/library/cpp/tvmauth/client/misc/src_checker.h b/library/cpp/tvmauth/client/misc/src_checker.h
new file mode 100644
index 0000000000..bb99fe8884
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/src_checker.h
@@ -0,0 +1,31 @@
+#pragma once
+#include "roles/roles.h"
+#include <library/cpp/tvmauth/client/exception.h>
+#include <library/cpp/tvmauth/checked_service_ticket.h>
+#include <library/cpp/tvmauth/src/service_impl.h>
+#include <library/cpp/tvmauth/src/utils.h>
+namespace NTvmAuth {
+ class TSrcChecker {
+ public:
+ /*!
+ * Checking must be enabled in TClientSettings
+ * Can throw exception if cache is out of date or wrong config
+ * @param ticket
+ */
+ static TCheckedServiceTicket Check(TCheckedServiceTicket ticket, NRoles::TRolesPtr r) {
+ Y_ENSURE_EX(r, TBrokenTvmClientSettings() << "Need to use TClientSettings::EnableRolesFetching()");
+ NRoles::TConsumerRolesPtr roles = r->GetRolesForService(ticket);
+ if (roles) {
+ return ticket;
+ }
+ TServiceTicketImplPtr impl = THolder(NInternal::TCanningKnife::GetS(ticket));
+ impl->SetStatus(ETicketStatus::NoRoles);
+ return TCheckedServiceTicket(std::move(impl));
+ }
+ };
diff --git a/library/cpp/tvmauth/client/misc/threaded_updater.cpp b/library/cpp/tvmauth/client/misc/threaded_updater.cpp
new file mode 100644
index 0000000000..5d21ce67a7
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/threaded_updater.cpp
@@ -0,0 +1,111 @@
+#include "threaded_updater.h"
+#include <library/cpp/tvmauth/client/exception.h>
+#include <util/string/builder.h>
+#include <util/system/spin_wait.h>
+#include <util/system/thread.h>
+namespace NTvmAuth {
+ TThreadedUpdaterBase::TThreadedUpdaterBase(TDuration workerAwakingPeriod,
+ TLoggerPtr logger,
+ const TString& url,
+ ui16 port,
+ TDuration socketTimeout,
+ TDuration connectTimeout)
+ : WorkerAwakingPeriod_(workerAwakingPeriod)
+ , Logger_(std::move(logger))
+ , TvmUrl_(url)
+ , TvmPort_(port)
+ , TvmSocketTimeout_(socketTimeout)
+ , TvmConnectTimeout_(connectTimeout)
+ , IsStopped_(true)
+ {
+ Y_ENSURE_EX(Logger_, TNonRetriableException() << "Logger is required");
+ ServiceTicketsDurations_.RefreshPeriod = TDuration::Hours(1);
+ ServiceTicketsDurations_.Expiring = TDuration::Hours(2);
+ ServiceTicketsDurations_.Invalid = TDuration::Hours(11);
+ PublicKeysDurations_.RefreshPeriod = TDuration::Days(1);
+ PublicKeysDurations_.Expiring = TDuration::Days(2);
+ PublicKeysDurations_.Invalid = TDuration::Days(6);
+ }
+ TThreadedUpdaterBase::~TThreadedUpdaterBase() {
+ StopWorker();
+ }
+ void TThreadedUpdaterBase::StartWorker() {
+ if (HttpClient_) {
+ HttpClient_->ResetConnection();
+ }
+ Thread_ = MakeHolder<TThread>(WorkerWrap, this);
+ Thread_->Start();
+ Started_.Wait();
+ IsStopped_ = false;
+ }
+ void TThreadedUpdaterBase::StopWorker() {
+ Event_.Signal();
+ if (Thread_) {
+ Thread_.Reset();
+ }
+ }
+ TKeepAliveHttpClient& TThreadedUpdaterBase::GetClient() const {
+ if (!HttpClient_) {
+ HttpClient_ = MakeHolder<TKeepAliveHttpClient>(TvmUrl_, TvmPort_, TvmSocketTimeout_, TvmConnectTimeout_);
+ }
+ return *HttpClient_;
+ }
+ void TThreadedUpdaterBase::LogDebug(const TString& msg) const {
+ if (Logger_) {
+ Logger_->Debug(msg);
+ }
+ }
+ void TThreadedUpdaterBase::LogInfo(const TString& msg) const {
+ if (Logger_) {
+ Logger_->Info(msg);
+ }
+ }
+ void TThreadedUpdaterBase::LogWarning(const TString& msg) const {
+ if (Logger_) {
+ Logger_->Warning(msg);
+ }
+ }
+ void TThreadedUpdaterBase::LogError(const TString& msg) const {
+ if (Logger_) {
+ Logger_->Error(msg);
+ }
+ }
+ void* TThreadedUpdaterBase::WorkerWrap(void* arg) {
+ TThread::SetCurrentThreadName("TicketParserUpd");
+ TThreadedUpdaterBase& this_ = *reinterpret_cast<TThreadedUpdaterBase*>(arg);
+ this_.Started_.Signal();
+ this_.LogDebug("Thread-worker started");
+ while (true) {
+ if (this_.Event_.WaitT(this_.WorkerAwakingPeriod_)) {
+ break;
+ }
+ try {
+ this_.Worker();
+ this_.GetClient().ResetConnection();
+ } catch (const std::exception& e) { // impossible now
+ this_.LogError(TStringBuilder() << "Failed to generate new cache: " << e.what());
+ }
+ }
+ this_.LogDebug("Thread-worker stopped");
+ this_.IsStopped_ = true;
+ return nullptr;
+ }
diff --git a/library/cpp/tvmauth/client/misc/threaded_updater.h b/library/cpp/tvmauth/client/misc/threaded_updater.h
new file mode 100644
index 0000000000..783684ba3b
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/threaded_updater.h
@@ -0,0 +1,76 @@
+#pragma once
+#include "async_updater.h"
+#include "settings.h"
+#include <library/cpp/tvmauth/client/logger.h>
+#include <library/cpp/http/simple/http_client.h>
+#include <util/datetime/base.h>
+#include <util/generic/ptr.h>
+#include <util/system/event.h>
+#include <util/system/thread.h>
+class TKeepAliveHttpClient;
+namespace NTvmAuth::NInternal {
+ class TClientCaningKnife;
+namespace NTvmAuth {
+ class TThreadedUpdaterBase: public TAsyncUpdaterBase {
+ public:
+ TThreadedUpdaterBase(TDuration workerAwakingPeriod,
+ TLoggerPtr logger,
+ const TString& url,
+ ui16 port,
+ TDuration socketTimeout,
+ TDuration connectTimeout);
+ virtual ~TThreadedUpdaterBase();
+ protected:
+ void StartWorker();
+ void StopWorker();
+ virtual void Worker() {
+ }
+ TKeepAliveHttpClient& GetClient() const;
+ void LogDebug(const TString& msg) const;
+ void LogInfo(const TString& msg) const;
+ void LogWarning(const TString& msg) const;
+ void LogError(const TString& msg) const;
+ protected:
+ TDuration WorkerAwakingPeriod_;
+ const TLoggerPtr Logger_;
+ protected:
+ const TString TvmUrl_;
+ private:
+ static void* WorkerWrap(void* arg);
+ void StartTvmClientStopping() const override {
+ Event_.Signal();
+ }
+ bool IsTvmClientStopped() const override {
+ return IsStopped_;
+ }
+ private:
+ mutable THolder<TKeepAliveHttpClient> HttpClient_;
+ const ui32 TvmPort_;
+ const TDuration TvmSocketTimeout_;
+ const TDuration TvmConnectTimeout_;
+ mutable TAutoEvent Event_;
+ mutable TAutoEvent Started_;
+ std::atomic_bool IsStopped_;
+ THolder<TThread> Thread_;
+ };
diff --git a/library/cpp/tvmauth/client/misc/tool/meta_info.cpp b/library/cpp/tvmauth/client/misc/tool/meta_info.cpp
new file mode 100644
index 0000000000..9a0ae228fe
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/tool/meta_info.cpp
@@ -0,0 +1,208 @@
+#include "meta_info.h"
+#include <library/cpp/json/json_reader.h>
+#include <util/string/builder.h>
+namespace NTvmAuth::NTvmTool {
+ TString TMetaInfo::TConfig::ToString() const {
+ TStringStream s;
+ s << "self_tvm_id=" << SelfTvmId << ", "
+ << "bb_env=" << BbEnv << ", "
+ << "idm_slug=" << (IdmSlug ? IdmSlug : "<NULL>") << ", "
+ << "dsts=[";
+ for (const auto& pair : DstAliases) {
+ s << "(" << pair.first << ":" << pair.second << ")";
+ }
+ s << "]";
+ return std::move(s.Str());
+ }
+ TMetaInfo::TMetaInfo(TLoggerPtr logger)
+ : Logger_(std::move(logger))
+ {
+ }
+ TMetaInfo::TConfigPtr TMetaInfo::Init(TKeepAliveHttpClient& client,
+ const TClientSettings& settings) {
+ ApplySettings(settings);
+ TryPing(client);
+ const TString metaString = Fetch(client);
+ if (Logger_) {
+ TStringStream s;
+ s << "Meta info fetched from " << settings.GetHostname() << ":" << settings.GetPort();
+ Logger_->Debug(s.Str());
+ }
+ try {
+ Config_.Set(ParseMetaString(metaString, SelfAlias_));
+ } catch (const yexception& e) {
+ ythrow TNonRetriableException() << "Malformed json from tvmtool: " << e.what();
+ }
+ TConfigPtr cfg = Config_.Get();
+ Y_ENSURE_EX(cfg, TNonRetriableException() << "Alias '" << SelfAlias_ << "' not found in meta info");
+ if (Logger_) {
+ Logger_->Info("Meta: " + cfg->ToString());
+ }
+ return cfg;
+ }
+ TString TMetaInfo::GetRequestForTickets(const TConfig& config) {
+ Y_ENSURE(!config.DstAliases.empty());
+ TStringStream s;
+ s << "/tvm/tickets"
+ << "?src=" << config.SelfTvmId
+ << "&dsts=";
+ for (const auto& pair : config.DstAliases) {
+ s << pair.second << ","; // avoid aliases - url-encoding required
+ }
+ s.Str().pop_back();
+ return s.Str();
+ }
+ bool TMetaInfo::TryUpdateConfig(TKeepAliveHttpClient& client) {
+ const TString metaString = Fetch(client);
+ TConfigPtr config;
+ try {
+ config = ParseMetaString(metaString, SelfAlias_);
+ } catch (const yexception& e) {
+ ythrow TNonRetriableException() << "Malformed json from tvmtool: " << e.what();
+ }
+ Y_ENSURE_EX(config, TNonRetriableException() << "Alias '" << SelfAlias_ << "' not found in meta info");
+ TConfigPtr oldConfig = Config_.Get();
+ if (*config == *oldConfig) {
+ return false;
+ }
+ if (Logger_) {
+ Logger_->Info(TStringBuilder()
+ << "Meta was updated. Old: (" << oldConfig->ToString()
+ << "). New: (" << config->ToString() << ")");
+ }
+ Config_ = config;
+ return true;
+ }
+ void TMetaInfo::TryPing(TKeepAliveHttpClient& client) {
+ try {
+ TStringStream s;
+ TKeepAliveHttpClient::THttpCode code = client.DoGet("/tvm/ping", &s);
+ if (code < 200 || 300 <= code) {
+ throw yexception() << "(" << code << ") " << s.Str();
+ }
+ } catch (const std::exception& e) {
+ ythrow TNonRetriableException() << "Failed to connect to tvmtool: " << e.what();
+ }
+ }
+ TString TMetaInfo::Fetch(TKeepAliveHttpClient& client) const {
+ TStringStream res;
+ TKeepAliveHttpClient::THttpCode code;
+ try {
+ code = client.DoGet("/tvm/private_api/__meta__", &res, AuthHeader_);
+ } catch (const std::exception& e) {
+ ythrow TRetriableException() << "Failed to fetch meta data from tvmtool: " << e.what();
+ }
+ if (code != 200) {
+ Y_ENSURE_EX(code != 404,
+ TNonRetriableException() << "Library does not support so old tvmtool. You need tvmtool>=1.1.0");
+ TStringStream err;
+ err << "Failed to fetch meta from tvmtool: " << client.GetHost() << ":" << client.GetPort()
+ << " (" << code << "): " << res.Str();
+ Y_ENSURE_EX(!(500 <= code && code < 600), TRetriableException() << err.Str());
+ ythrow TNonRetriableException() << err.Str();
+ }
+ return res.Str();
+ }
+ static TMetaInfo::TDstAliases::value_type ParsePair(const NJson::TJsonValue& val, const TString& meta) {
+ NJson::TJsonValue jAlias;
+ Y_ENSURE(val.GetValue("alias", &jAlias), meta);
+ Y_ENSURE(jAlias.IsString(), meta);
+ NJson::TJsonValue jClientId;
+ Y_ENSURE(val.GetValue("client_id", &jClientId), meta);
+ Y_ENSURE(jClientId.IsInteger(), meta);
+ return {jAlias.GetString(), jClientId.GetInteger()};
+ }
+ TMetaInfo::TConfigPtr TMetaInfo::ParseMetaString(const TString& meta, const TString& self) {
+ NJson::TJsonValue jDoc;
+ Y_ENSURE(NJson::ReadJsonTree(meta, &jDoc), meta);
+ NJson::TJsonValue jEnv;
+ Y_ENSURE(jDoc.GetValue("bb_env", &jEnv), meta);
+ NJson::TJsonValue jTenants;
+ Y_ENSURE(jDoc.GetValue("tenants", &jTenants), meta);
+ Y_ENSURE(jTenants.IsArray(), meta);
+ for (const NJson::TJsonValue& jTen : jTenants.GetArray()) {
+ NJson::TJsonValue jSelf;
+ Y_ENSURE(jTen.GetValue("self", &jSelf), meta);
+ auto selfPair = ParsePair(jSelf, meta);
+ if (selfPair.first != self) {
+ continue;
+ }
+ TConfigPtr config = std::make_shared<TConfig>();
+ config->SelfTvmId = selfPair.second;
+ config->BbEnv = BbEnvFromString(jEnv.GetString(), meta);
+ {
+ NJson::TJsonValue jSlug;
+ if (jTen.GetValue("idm_slug", &jSlug)) {
+ config->IdmSlug = jSlug.GetString();
+ }
+ }
+ NJson::TJsonValue jDsts;
+ Y_ENSURE(jTen.GetValue("dsts", &jDsts), meta);
+ Y_ENSURE(jDsts.IsArray(), meta);
+ for (const NJson::TJsonValue& jDst : jDsts.GetArray()) {
+ config->DstAliases.insert(ParsePair(jDst, meta));
+ }
+ return config;
+ }
+ return {};
+ }
+ void TMetaInfo::ApplySettings(const TClientSettings& settings) {
+ AuthHeader_ = {{"Authorization", settings.GetAuthToken()}};
+ SelfAlias_ = settings.GetSelfAlias();
+ }
+ EBlackboxEnv TMetaInfo::BbEnvFromString(const TString& env, const TString& meta) {
+ if (env == "Prod") {
+ return EBlackboxEnv::Prod;
+ } else if (env == "Test") {
+ return EBlackboxEnv::Test;
+ } else if (env == "ProdYaTeam") {
+ return EBlackboxEnv::ProdYateam;
+ } else if (env == "TestYaTeam") {
+ return EBlackboxEnv::TestYateam;
+ } else if (env == "Stress") {
+ return EBlackboxEnv::Stress;
+ }
+ ythrow yexception() << "'bb_env'=='" << env << "'. " << meta;
+ }
diff --git a/library/cpp/tvmauth/client/misc/tool/meta_info.h b/library/cpp/tvmauth/client/misc/tool/meta_info.h
new file mode 100644
index 0000000000..9dd4f0dbf8
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/tool/meta_info.h
@@ -0,0 +1,69 @@
+#pragma once
+#include "settings.h"
+#include <library/cpp/tvmauth/client/misc/utils.h>
+#include <library/cpp/tvmauth/client/logger.h>
+#include <library/cpp/http/simple/http_client.h>
+namespace NTvmAuth::NTvmTool {
+ class TMetaInfo {
+ public:
+ using TDstAliases = THashMap<TClientSettings::TAlias, TTvmId>;
+ struct TConfig {
+ TTvmId SelfTvmId = 0;
+ EBlackboxEnv BbEnv = EBlackboxEnv::Prod;
+ TString IdmSlug;
+ TDstAliases DstAliases;
+ bool AreTicketsRequired() const {
+ return !DstAliases.empty();
+ }
+ TString ToString() const;
+ bool operator==(const TConfig& c) const {
+ return SelfTvmId == c.SelfTvmId &&
+ BbEnv == c.BbEnv &&
+ IdmSlug == c.IdmSlug &&
+ DstAliases == c.DstAliases;
+ }
+ };
+ using TConfigPtr = std::shared_ptr<TConfig>;
+ public:
+ TMetaInfo(TLoggerPtr logger);
+ TConfigPtr Init(TKeepAliveHttpClient& client,
+ const TClientSettings& settings);
+ static TString GetRequestForTickets(const TMetaInfo::TConfig& config);
+ const TKeepAliveHttpClient::THeaders& GetAuthHeader() const {
+ return AuthHeader_;
+ }
+ TConfigPtr GetConfig() const {
+ return Config_.Get();
+ }
+ bool TryUpdateConfig(TKeepAliveHttpClient& client);
+ protected:
+ void TryPing(TKeepAliveHttpClient& client);
+ TString Fetch(TKeepAliveHttpClient& client) const;
+ static TConfigPtr ParseMetaString(const TString& meta, const TString& self);
+ void ApplySettings(const TClientSettings& settings);
+ static EBlackboxEnv BbEnvFromString(const TString& env, const TString& meta);
+ protected:
+ NUtils::TProtectedValue<TConfigPtr> Config_;
+ TKeepAliveHttpClient::THeaders AuthHeader_;
+ TLoggerPtr Logger_;
+ TString SelfAlias_;
+ };
diff --git a/library/cpp/tvmauth/client/misc/tool/roles_fetcher.cpp b/library/cpp/tvmauth/client/misc/tool/roles_fetcher.cpp
new file mode 100644
index 0000000000..05b0856edc
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/tool/roles_fetcher.cpp
@@ -0,0 +1,81 @@
+#include "roles_fetcher.h"
+#include <library/cpp/tvmauth/client/misc/roles/parser.h>
+#include <library/cpp/http/misc/httpcodes.h>
+#include <library/cpp/string_utils/quote/quote.h>
+#include <util/string/builder.h>
+#include <util/string/join.h>
+namespace NTvmAuth::NTvmTool {
+ TRolesFetcher::TRolesFetcher(const TRolesFetcherSettings& settings, TLoggerPtr logger)
+ : Settings_(settings)
+ , Logger_(std::move(logger))
+ {
+ }
+ bool TRolesFetcher::IsTimeToUpdate(TDuration sinceUpdate) const {
+ return Settings_.UpdatePeriod < sinceUpdate;
+ }
+ bool TRolesFetcher::ShouldWarn(TDuration sinceUpdate) const {
+ return Settings_.WarnPeriod < sinceUpdate;
+ }
+ bool TRolesFetcher::AreRolesOk() const {
+ return bool(GetCurrentRoles());
+ }
+ NUtils::TFetchResult TRolesFetcher::FetchActualRoles(const TKeepAliveHttpClient::THeaders& authHeader,
+ TKeepAliveHttpClient& client) const {
+ const TRequest req = CreateRequest(authHeader);
+ TStringStream out;
+ THttpHeaders outHeaders;
+ TKeepAliveHttpClient::THttpCode code = client.DoGet(
+ req.Url,
+ &out,
+ req.Headers,
+ &outHeaders);
+ return {code, std::move(outHeaders), "/v2/roles", out.Str(), {}};
+ }
+ void TRolesFetcher::Update(NUtils::TFetchResult&& fetchResult) {
+ if (fetchResult.Code == HTTP_NOT_MODIFIED) {
+ Y_ENSURE(CurrentRoles_.Get(),
+ "tvmtool did not return any roles because current roles are actual,"
+ " but there are no roles in memory - this should never happen");
+ return;
+ }
+ Y_ENSURE(fetchResult.Code == HTTP_OK,
+ "Unexpected code from tvmtool: " << fetchResult.Code << ". " << fetchResult.Response);
+ CurrentRoles_.Set(NRoles::TParser::Parse(std::make_shared<TString>(std::move(fetchResult.Response))));
+ Logger_->Debug(
+ TStringBuilder() << "Succeed to update roles with revision "
+ << CurrentRoles_.Get()->GetMeta().Revision);
+ }
+ NTvmAuth::NRoles::TRolesPtr TRolesFetcher::GetCurrentRoles() const {
+ return CurrentRoles_.Get();
+ }
+ TRolesFetcher::TRequest TRolesFetcher::CreateRequest(const TKeepAliveHttpClient::THeaders& authHeader) const {
+ TRequest request{
+ .Url = "/v2/roles?self=" + CGIEscapeRet(Settings_.SelfAlias),
+ .Headers = authHeader,
+ };
+ NRoles::TRolesPtr roles = CurrentRoles_.Get();
+ if (roles) {
+ request.Headers.emplace(IfNoneMatch_, Join("", "\"", roles->GetMeta().Revision, "\""));
+ }
+ return request;
+ }
diff --git a/library/cpp/tvmauth/client/misc/tool/roles_fetcher.h b/library/cpp/tvmauth/client/misc/tool/roles_fetcher.h
new file mode 100644
index 0000000000..8c60b59610
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/tool/roles_fetcher.h
@@ -0,0 +1,49 @@
+#pragma once
+#include <library/cpp/tvmauth/client/misc/fetch_result.h>
+#include <library/cpp/tvmauth/client/misc/utils.h>
+#include <library/cpp/tvmauth/client/misc/roles/roles.h>
+#include <library/cpp/tvmauth/client/logger.h>
+#include <util/datetime/base.h>
+#include <util/generic/string.h>
+namespace NTvmAuth::NTvmTool {
+ struct TRolesFetcherSettings {
+ TString SelfAlias;
+ TDuration UpdatePeriod = TDuration::Minutes(1);
+ TDuration WarnPeriod = TDuration::Minutes(20);
+ };
+ class TRolesFetcher {
+ public:
+ TRolesFetcher(const TRolesFetcherSettings& settings, TLoggerPtr logger);
+ bool IsTimeToUpdate(TDuration sinceUpdate) const;
+ bool ShouldWarn(TDuration sinceUpdate) const;
+ bool AreRolesOk() const;
+ NUtils::TFetchResult FetchActualRoles(const TKeepAliveHttpClient::THeaders& authHeader,
+ TKeepAliveHttpClient& client) const;
+ void Update(NUtils::TFetchResult&& fetchResult);
+ NTvmAuth::NRoles::TRolesPtr GetCurrentRoles() const;
+ protected:
+ struct TRequest {
+ TString Url;
+ TKeepAliveHttpClient::THeaders Headers;
+ };
+ protected:
+ TRequest CreateRequest(const TKeepAliveHttpClient::THeaders& authHeader) const;
+ private:
+ const TRolesFetcherSettings Settings_;
+ const TLoggerPtr Logger_;
+ const TString IfNoneMatch_ = "If-None-Match";
+ NUtils::TProtectedValue<NTvmAuth::NRoles::TRolesPtr> CurrentRoles_;
+ };
diff --git a/library/cpp/tvmauth/client/misc/tool/settings.cpp b/library/cpp/tvmauth/client/misc/tool/settings.cpp
new file mode 100644
index 0000000000..894501f19d
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/tool/settings.cpp
@@ -0,0 +1,37 @@
+#include "settings.h"
+#include <library/cpp/string_utils/url/url.h>
+#include <util/system/env.h>
+namespace NTvmAuth::NTvmTool {
+ TClientSettings::TClientSettings(const TAlias& selfAias)
+ : SelfAias_(selfAias)
+ , Hostname_("localhost")
+ , Port_(1)
+ , SocketTimeout_(TDuration::Seconds(5))
+ , ConnectTimeout_(TDuration::Seconds(30))
+ {
+ if (!AuthToken_) {
+ AuthToken_ = GetEnv("QLOUD_TVM_TOKEN");
+ }
+ TStringBuf auth(AuthToken_);
+ FixSpaces(auth);
+ AuthToken_ = auth;
+ const TString url = GetEnv("DEPLOY_TVM_TOOL_URL");
+ if (url) {
+ TStringBuf scheme, host;
+ TryGetSchemeHostAndPort(url, scheme, host, Port_);
+ }
+ Y_ENSURE_EX(SelfAias_, TBrokenTvmClientSettings() << "Alias for your TVM client cannot be empty");
+ }
+ void TClientSettings::FixSpaces(TStringBuf& str) {
+ while (str && isspace(str.back())) {
+ str.Chop(1);
+ }
+ }
diff --git a/library/cpp/tvmauth/client/misc/tool/settings.h b/library/cpp/tvmauth/client/misc/tool/settings.h
new file mode 100644
index 0000000000..67c3959263
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/tool/settings.h
@@ -0,0 +1,169 @@
+#pragma once
+#include <library/cpp/tvmauth/client/misc/settings.h>
+#include <library/cpp/tvmauth/client/exception.h>
+#include <library/cpp/tvmauth/checked_user_ticket.h>
+#include <util/datetime/base.h>
+#include <util/generic/maybe.h>
+namespace NTvmAuth::NTvmTool {
+ /**
+ * Uses local http-interface to get state: http://localhost/tvm/.
+ * This interface can be provided with tvmtool (local daemon) or Qloud/YP (local http api in container).
+ * See more: https://wiki.yandex-team.ru/passport/tvm2/qloud/.
+ *
+ * Most part of settings will be fetched from tvmtool on start of client.
+ * You need to use aliases for TVM-clients (src and dst) which you specified in tvmtool or Qloud/YP interface
+ */
+ class TClientSettings: public NTvmAuth::TClientSettings {
+ public:
+ /*!
+ * Sets default values:
+ * - hostname == "localhost"
+ * - port detected with env["DEPLOY_TVM_TOOL_URL"] (provided with Yandex.Deploy),
+ * otherwise port == 1 (it is ok for Qloud)
+ * - authToken: env["TVMTOOL_LOCAL_AUTHTOKEN"] (provided with Yandex.Deploy),
+ * otherwise env["QLOUD_TVM_TOKEN"] (provided with Qloud)
+ *
+ * AuthToken is protection from SSRF.
+ *
+ * @param selfAias - alias for your TVM client, which you specified in tvmtool or YD interface
+ */
+ TClientSettings(const TAlias& selfAias);
+ /*!
+ * Look at comment for ctor
+ * @param port
+ */
+ TClientSettings& SetPort(ui16 port) {
+ Port_ = port;
+ return *this;
+ }
+ /*!
+ * Default value: hostname == "localhost"
+ * @param hostname
+ */
+ TClientSettings& SetHostname(const TString& hostname) {
+ Y_ENSURE_EX(hostname, TBrokenTvmClientSettings() << "Hostname cannot be empty");
+ Hostname_ = hostname;
+ return *this;
+ }
+ TClientSettings& SetSocketTimeout(TDuration socketTimeout) {
+ SocketTimeout_ = socketTimeout;
+ return *this;
+ }
+ TClientSettings& SetConnectTimeout(TDuration connectTimeout) {
+ ConnectTimeout_ = connectTimeout;
+ return *this;
+ }
+ /*!
+ * Look at comment for ctor
+ * @param token
+ */
+ TClientSettings& SetAuthToken(TStringBuf token) {
+ FixSpaces(token);
+ Y_ENSURE_EX(token, TBrokenTvmClientSettings() << "Auth token cannot be empty");
+ AuthToken_ = token;
+ return *this;
+ }
+ /*!
+ * Blackbox environmet is provided by tvmtool for client.
+ * You can override it for your purpose with limitations:
+ * (env from tvmtool) -> (override)
+ * - Prod/ProdYateam -> Prod/ProdYateam
+ * - Test/TestYateam -> Test/TestYateam
+ * - Stress -> Stress
+ *
+ * You can contact tvm-dev@yandex-team.ru if limitations are too strict
+ * @param env
+ */
+ TClientSettings& OverrideBlackboxEnv(EBlackboxEnv env) {
+ BbEnv_ = env;
+ return *this;
+ }
+ /*!
+ * By default client checks src from ServiceTicket or default uid from UserTicket -
+ * to prevent you from forgetting to check it yourself.
+ * It does binary checks only:
+ * ticket gets status NoRoles, if there is no role for src or default uid.
+ * You need to check roles on your own if you have a non-binary role system or
+ * you have disabled ShouldCheckSrc/ShouldCheckDefaultUid
+ *
+ * You may need to disable this check in the following cases:
+ * - You use GetRoles() to provide verbose message (with revision).
+ * Double check may be inconsistent:
+ * binary check inside client uses revision of roles X - i.e. src 100500 has no role,
+ * exact check in your code uses revision of roles Y - i.e. src 100500 has some roles.
+ */
+ bool ShouldCheckSrc = true;
+ bool ShouldCheckDefaultUid = true;
+ // TODO: get rid of it: PASSP-35377
+ public:
+ // Deprecated: set attributes directly
+ TClientSettings& SetShouldCheckSrc(bool val = true) {
+ ShouldCheckSrc = val;
+ return *this;
+ }
+ // Deprecated: set attributes directly
+ TClientSettings& SetSShouldCheckDefaultUid(bool val = true) {
+ ShouldCheckDefaultUid = val;
+ return *this;
+ }
+ public: // for TAsyncUpdaterBase
+ const TAlias& GetSelfAlias() const {
+ return SelfAias_;
+ }
+ const TString& GetHostname() const {
+ return Hostname_;
+ }
+ ui16 GetPort() const {
+ return Port_;
+ }
+ TDuration GetSocketTimeout() const {
+ return SocketTimeout_;
+ }
+ TDuration GetConnectTimeout() const {
+ return ConnectTimeout_;
+ }
+ const TString& GetAuthToken() const {
+ Y_ENSURE_EX(AuthToken_, TBrokenTvmClientSettings()
+ << "Auth token cannot be empty. "
+ << "Env 'TVMTOOL_LOCAL_AUTHTOKEN' and 'QLOUD_TVM_TOKEN' are empty.");
+ return AuthToken_;
+ }
+ TMaybe<EBlackboxEnv> GetOverridedBlackboxEnv() const {
+ return BbEnv_;
+ }
+ private:
+ void FixSpaces(TStringBuf& str);
+ private:
+ TAlias SelfAias_;
+ TString Hostname_;
+ ui16 Port_;
+ TDuration SocketTimeout_;
+ TDuration ConnectTimeout_;
+ TString AuthToken_;
+ TMaybe<EBlackboxEnv> BbEnv_;
+ };
diff --git a/library/cpp/tvmauth/client/misc/tool/threaded_updater.cpp b/library/cpp/tvmauth/client/misc/tool/threaded_updater.cpp
new file mode 100644
index 0000000000..35bbe4f617
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/tool/threaded_updater.cpp
@@ -0,0 +1,370 @@
+#include "threaded_updater.h"
+#include <library/cpp/tvmauth/client/misc/utils.h>
+#include <library/cpp/json/json_reader.h>
+#include <util/generic/hash_set.h>
+#include <util/stream/str.h>
+#include <util/string/ascii.h>
+#include <util/string/builder.h>
+#include <util/string/cast.h>
+namespace NTvmAuth::NTvmTool {
+ TAsyncUpdaterPtr TThreadedUpdater::Create(const TClientSettings& settings, TLoggerPtr logger) {
+ Y_ENSURE_EX(logger, TNonRetriableException() << "Logger is required");
+ THolder<TThreadedUpdater> p(new TThreadedUpdater(
+ settings.GetHostname(),
+ settings.GetPort(),
+ settings.GetSocketTimeout(),
+ settings.GetConnectTimeout(),
+ std::move(logger)));
+ p->Init(settings);
+ p->StartWorker();
+ return p.Release();
+ }
+ TThreadedUpdater::~TThreadedUpdater() {
+ StopWorker(); // Required here to avoid using of deleted members
+ }
+ TClientStatus TThreadedUpdater::GetStatus() const {
+ const TClientStatus::ECode state = GetState();
+ return TClientStatus(state, GetLastError(state == TClientStatus::Ok));
+ }
+ NRoles::TRolesPtr TThreadedUpdater::GetRoles() const {
+ Y_ENSURE_EX(RolesFetcher_,
+ TBrokenTvmClientSettings() << "Roles were not configured in settings");
+ return RolesFetcher_->GetCurrentRoles();
+ }
+ TClientStatus::ECode TThreadedUpdater::GetState() const {
+ const TInstant now = TInstant::Now();
+ const TMetaInfo::TConfigPtr config = MetaInfo_.GetConfig();
+ if ((config->AreTicketsRequired() && AreServiceTicketsInvalid(now)) || ArePublicKeysInvalid(now)) {
+ return TClientStatus::Error;
+ }
+ if (config->AreTicketsRequired()) {
+ if (!GetCachedServiceTickets() || config->DstAliases.size() > GetCachedServiceTickets()->TicketsByAlias.size()) {
+ return TClientStatus::Error;
+ }
+ }
+ const TDuration st = now - GetUpdateTimeOfServiceTickets();
+ const TDuration pk = now - GetUpdateTimeOfPublicKeys();
+ if ((config->AreTicketsRequired() && st > ServiceTicketsDurations_.Expiring) || pk > PublicKeysDurations_.Expiring) {
+ return TClientStatus::Warning;
+ }
+ if (RolesFetcher_ && RolesFetcher_->ShouldWarn(now - GetUpdateTimeOfRoles())) {
+ return TClientStatus::Warning;
+ }
+ if (IsConfigWarnTime()) {
+ return TClientStatus::Warning;
+ }
+ return TClientStatus::Ok;
+ }
+ TThreadedUpdater::TThreadedUpdater(const TString& host, ui16 port, TDuration socketTimeout, TDuration connectTimeout, TLoggerPtr logger)
+ : TThreadedUpdaterBase(TDuration::Seconds(5), logger, host, port, socketTimeout, connectTimeout)
+ , MetaInfo_(logger)
+ , ConfigWarnDelay_(TDuration::Seconds(30))
+ {
+ ServiceTicketsDurations_.RefreshPeriod = TDuration::Minutes(10);
+ PublicKeysDurations_.RefreshPeriod = TDuration::Minutes(10);
+ }
+ void TThreadedUpdater::Init(const TClientSettings& settings) {
+ const TMetaInfo::TConfigPtr config = MetaInfo_.Init(GetClient(), settings);
+ LastVisitForConfig_ = TInstant::Now();
+ SetBbEnv(config->BbEnv, settings.GetOverridedBlackboxEnv());
+ if (settings.GetOverridedBlackboxEnv()) {
+ LogInfo(TStringBuilder()
+ << "Meta: override blackbox env: " << config->BbEnv
+ << "->" << *settings.GetOverridedBlackboxEnv());
+ }
+ if (config->IdmSlug) {
+ RolesFetcher_ = std::make_unique<TRolesFetcher>(
+ TRolesFetcherSettings{
+ .SelfAlias = settings.GetSelfAlias(),
+ },
+ Logger_);
+ }
+ ui8 tries = 3;
+ do {
+ UpdateState();
+ } while (!IsEverythingOk(*config) && --tries > 0);
+ if (!IsEverythingOk(*config)) {
+ ThrowLastError();
+ }
+ }
+ void TThreadedUpdater::UpdateState() {
+ bool wasUpdated = false;
+ try {
+ wasUpdated = MetaInfo_.TryUpdateConfig(GetClient());
+ LastVisitForConfig_ = TInstant::Now();
+ ClearError(EScope::TvmtoolConfig);
+ } catch (const std::exception& e) {
+ ProcessError(EType::Retriable, EScope::TvmtoolConfig, e.what());
+ LogWarning(TStringBuilder() << "Error while fetching of tvmtool config: " << e.what());
+ }
+ if (IsConfigWarnTime()) {
+ LogError(TStringBuilder() << "Tvmtool config have not been refreshed for too long period");
+ }
+ TMetaInfo::TConfigPtr config = MetaInfo_.GetConfig();
+ if (wasUpdated || IsTimeToUpdateServiceTickets(*config, LastVisitForServiceTickets_)) {
+ try {
+ const TInstant updateTime = UpdateServiceTickets(*config);
+ SetUpdateTimeOfServiceTickets(updateTime);
+ LastVisitForServiceTickets_ = TInstant::Now();
+ if (AreServiceTicketsOk(*config)) {
+ ClearError(EScope::ServiceTickets);
+ }
+ LogDebug(TStringBuilder() << "Tickets fetched from tvmtool: " << updateTime);
+ } catch (const std::exception& e) {
+ ProcessError(EType::Retriable, EScope::ServiceTickets, e.what());
+ LogWarning(TStringBuilder() << "Error while fetching of tickets: " << e.what());
+ }
+ if (TInstant::Now() - GetUpdateTimeOfServiceTickets() > ServiceTicketsDurations_.Expiring) {
+ LogError("Service tickets have not been refreshed for too long period");
+ }
+ }
+ if (wasUpdated || IsTimeToUpdatePublicKeys(LastVisitForPublicKeys_)) {
+ try {
+ const TInstant updateTime = UpdateKeys(*config);
+ SetUpdateTimeOfPublicKeys(updateTime);
+ LastVisitForPublicKeys_ = TInstant::Now();
+ if (ArePublicKeysOk()) {
+ ClearError(EScope::PublicKeys);
+ }
+ LogDebug(TStringBuilder() << "Public keys fetched from tvmtool: " << updateTime);
+ } catch (const std::exception& e) {
+ ProcessError(EType::Retriable, EScope::PublicKeys, e.what());
+ LogWarning(TStringBuilder() << "Error while fetching of public keys: " << e.what());
+ }
+ if (TInstant::Now() - GetUpdateTimeOfPublicKeys() > PublicKeysDurations_.Expiring) {
+ LogError("Public keys have not been refreshed for too long period");
+ }
+ }
+ if (RolesFetcher_ && (wasUpdated || RolesFetcher_->IsTimeToUpdate(TInstant::Now() - GetUpdateTimeOfRoles()))) {
+ try {
+ RolesFetcher_->Update(RolesFetcher_->FetchActualRoles(MetaInfo_.GetAuthHeader(), GetClient()));
+ SetUpdateTimeOfRoles(TInstant::Now());
+ if (RolesFetcher_->AreRolesOk()) {
+ ClearError(EScope::Roles);
+ }
+ } catch (const std::exception& e) {
+ ProcessError(EType::Retriable, EScope::Roles, e.what());
+ LogWarning(TStringBuilder() << "Failed to update roles: " << e.what());
+ }
+ if (RolesFetcher_->ShouldWarn(TInstant::Now() - GetUpdateTimeOfRoles())) {
+ LogError("Roles have not been refreshed for too long period");
+ }
+ }
+ }
+ TInstant TThreadedUpdater::UpdateServiceTickets(const TMetaInfo::TConfig& config) {
+ const std::pair<TString, TInstant> tickets = FetchServiceTickets(config);
+ if (TInstant::Now() - tickets.second >= ServiceTicketsDurations_.Invalid) {
+ throw yexception() << "Service tickets are too old: " << tickets.second;
+ }
+ TPairTicketsErrors p = ParseFetchTicketsResponse(tickets.first, config.DstAliases);
+ SetServiceTickets(MakeIntrusiveConst<TServiceTickets>(std::move(p.Tickets),
+ std::move(p.Errors),
+ config.DstAliases));
+ return tickets.second;
+ }
+ std::pair<TString, TInstant> TThreadedUpdater::FetchServiceTickets(const TMetaInfo::TConfig& config) const {
+ TStringStream s;
+ THttpHeaders headers;
+ const TString request = TMetaInfo::GetRequestForTickets(config);
+ auto code = GetClient().DoGet(request, &s, MetaInfo_.GetAuthHeader(), &headers);
+ Y_ENSURE(code == 200, ProcessHttpError(EScope::ServiceTickets, request, code, s.Str()));
+ return {s.Str(), GetBirthTimeFromResponse(headers, "tickets")};
+ }
+ static THashSet<TTvmId> GetAllTvmIds(const TMetaInfo::TDstAliases& dsts) {
+ THashSet<TTvmId> res;
+ res.reserve(dsts.size());
+ for (const auto& pair : dsts) {
+ res.insert(pair.second);
+ }
+ return res;
+ }
+ TAsyncUpdaterBase::TPairTicketsErrors TThreadedUpdater::ParseFetchTicketsResponse(const TString& resp,
+ const TMetaInfo::TDstAliases& dsts) const {
+ const THashSet<TTvmId> allTvmIds = GetAllTvmIds(dsts);
+ TServiceTickets::TMapIdStr tickets;
+ TServiceTickets::TMapIdStr errors;
+ auto procErr = [this](const TString& msg) {
+ ProcessError(EType::NonRetriable, EScope::ServiceTickets, msg);
+ LogError(msg);
+ };
+ NJson::TJsonValue doc;
+ Y_ENSURE(NJson::ReadJsonTree(resp, &doc), "Invalid json from tvmtool: " << resp);
+ for (const auto& pair : doc.GetMap()) {
+ NJson::TJsonValue tvmId;
+ unsigned long long tvmIdNum = 0;
+ if (!pair.second.GetValue("tvm_id", &tvmId) ||
+ !tvmId.GetUInteger(&tvmIdNum)) {
+ procErr(TStringBuilder()
+ << "Failed to get 'tvm_id' from key, should never happend '"
+ << pair.first << "': " << resp);
+ continue;
+ }
+ if (!allTvmIds.contains(tvmIdNum)) {
+ continue;
+ }
+ NJson::TJsonValue val;
+ if (!pair.second.GetValue("ticket", &val)) {
+ TString err;
+ if (pair.second.GetValue("error", &val)) {
+ err = val.GetString();
+ } else {
+ err = "Failed to get 'ticket' and 'error', should never happend: " + pair.first;
+ }
+ procErr(TStringBuilder()
+ << "Failed to get ServiceTicket for " << pair.first
+ << " (" << tvmIdNum << "): " << err);
+ errors.insert({tvmIdNum, std::move(err)});
+ continue;
+ }
+ tickets.insert({tvmIdNum, val.GetString()});
+ }
+ // This work-around is required because of bug in old verions of tvmtool: PASSP-24829
+ for (const auto& pair : dsts) {
+ if (!tickets.contains(pair.second) && !errors.contains(pair.second)) {
+ TString err = "Missing tvm_id in response, should never happend: " + pair.first;
+ procErr(TStringBuilder()
+ << "Failed to get ServiceTicket for " << pair.first
+ << " (" << pair.second << "): " << err);
+ errors.emplace(pair.second, std::move(err));
+ }
+ }
+ return {std::move(tickets), std::move(errors)};
+ }
+ TInstant TThreadedUpdater::UpdateKeys(const TMetaInfo::TConfig& config) {
+ const std::pair<TString, TInstant> keys = FetchPublicKeys();
+ if (TInstant::Now() - keys.second >= PublicKeysDurations_.Invalid) {
+ throw yexception() << "Public keys are too old: " << keys.second;
+ }
+ SetServiceContext(MakeIntrusiveConst<TServiceContext>(
+ TServiceContext::CheckingFactory(config.SelfTvmId, keys.first)));
+ SetUserContext(keys.first);
+ return keys.second;
+ }
+ std::pair<TString, TInstant> TThreadedUpdater::FetchPublicKeys() const {
+ TStringStream s;
+ THttpHeaders headers;
+ auto code = GetClient().DoGet("/tvm/keys", &s, MetaInfo_.GetAuthHeader(), &headers);
+ Y_ENSURE(code == 200, ProcessHttpError(EScope::PublicKeys, "/tvm/keys", code, s.Str()));
+ return {s.Str(), GetBirthTimeFromResponse(headers, "public keys")};
+ }
+ TInstant TThreadedUpdater::GetBirthTimeFromResponse(const THttpHeaders& headers, TStringBuf errMsg) {
+ auto it = std::find_if(headers.begin(),
+ headers.end(),
+ [](const THttpInputHeader& h) {
+ return AsciiEqualsIgnoreCase(h.Name(), "X-Ya-Tvmtool-Data-Birthtime");
+ });
+ Y_ENSURE(it != headers.end(), "Failed to fetch bithtime of " << errMsg << " from tvmtool");
+ ui64 time = 0;
+ Y_ENSURE(TryIntFromString<10>(it->Value(), time),
+ "Bithtime of " << errMsg << " from tvmtool must be unixtime. Got: " << it->Value());
+ return TInstant::Seconds(time);
+ }
+ bool TThreadedUpdater::IsTimeToUpdateServiceTickets(const TMetaInfo::TConfig& config,
+ TInstant lastUpdate) const {
+ return config.AreTicketsRequired() &&
+ TInstant::Now() - lastUpdate > ServiceTicketsDurations_.RefreshPeriod;
+ }
+ bool TThreadedUpdater::IsTimeToUpdatePublicKeys(TInstant lastUpdate) const {
+ return TInstant::Now() - lastUpdate > PublicKeysDurations_.RefreshPeriod;
+ }
+ bool TThreadedUpdater::IsEverythingOk(const TMetaInfo::TConfig& config) const {
+ if (RolesFetcher_ && !RolesFetcher_->AreRolesOk()) {
+ return false;
+ }
+ return AreServiceTicketsOk(config) && ArePublicKeysOk();
+ }
+ bool TThreadedUpdater::AreServiceTicketsOk(const TMetaInfo::TConfig& config) const {
+ return AreServiceTicketsOk(config.DstAliases.size());
+ }
+ bool TThreadedUpdater::AreServiceTicketsOk(size_t requiredCount) const {
+ if (requiredCount == 0) {
+ return true;
+ }
+ auto c = GetCachedServiceTickets();
+ return c && c->TicketsByAlias.size() == requiredCount;
+ }
+ bool TThreadedUpdater::ArePublicKeysOk() const {
+ return GetCachedServiceContext() && GetCachedUserContext();
+ }
+ bool TThreadedUpdater::IsConfigWarnTime() const {
+ return LastVisitForConfig_ + ConfigWarnDelay_ < TInstant::Now();
+ }
+ void TThreadedUpdater::Worker() {
+ UpdateState();
+ }
diff --git a/library/cpp/tvmauth/client/misc/tool/threaded_updater.h b/library/cpp/tvmauth/client/misc/tool/threaded_updater.h
new file mode 100644
index 0000000000..57f97f5442
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/tool/threaded_updater.h
@@ -0,0 +1,58 @@
+#pragma once
+#include "meta_info.h"
+#include "roles_fetcher.h"
+#include <library/cpp/tvmauth/client/misc/async_updater.h>
+#include <library/cpp/tvmauth/client/misc/threaded_updater.h>
+#include <atomic>
+namespace NTvmAuth::NTvmTool {
+ class TThreadedUpdater: public TThreadedUpdaterBase {
+ public:
+ static TAsyncUpdaterPtr Create(const TClientSettings& settings, TLoggerPtr logger);
+ ~TThreadedUpdater();
+ TClientStatus GetStatus() const override;
+ NRoles::TRolesPtr GetRoles() const override;
+ protected: // for tests
+ TClientStatus::ECode GetState() const;
+ TThreadedUpdater(const TString& host, ui16 port, TDuration socketTimeout, TDuration connectTimeout, TLoggerPtr logger);
+ void Init(const TClientSettings& settings);
+ void UpdateState();
+ TInstant UpdateServiceTickets(const TMetaInfo::TConfig& config);
+ std::pair<TString, TInstant> FetchServiceTickets(const TMetaInfo::TConfig& config) const;
+ TPairTicketsErrors ParseFetchTicketsResponse(const TString& resp,
+ const TMetaInfo::TDstAliases& dsts) const;
+ TInstant UpdateKeys(const TMetaInfo::TConfig& config);
+ std::pair<TString, TInstant> FetchPublicKeys() const;
+ static TInstant GetBirthTimeFromResponse(const THttpHeaders& headers, TStringBuf errMsg);
+ bool IsTimeToUpdateServiceTickets(const TMetaInfo::TConfig& config, TInstant lastUpdate) const;
+ bool IsTimeToUpdatePublicKeys(TInstant lastUpdate) const;
+ bool IsEverythingOk(const TMetaInfo::TConfig& config) const;
+ bool AreServiceTicketsOk(const TMetaInfo::TConfig& config) const;
+ bool AreServiceTicketsOk(size_t requiredCount) const;
+ bool ArePublicKeysOk() const;
+ bool IsConfigWarnTime() const;
+ private:
+ void Worker() override;
+ protected:
+ TMetaInfo MetaInfo_;
+ TInstant LastVisitForServiceTickets_;
+ TInstant LastVisitForPublicKeys_;
+ TInstant LastVisitForConfig_;
+ TDuration ConfigWarnDelay_;
+ std::unique_ptr<TRolesFetcher> RolesFetcher_;
+ };
diff --git a/library/cpp/tvmauth/client/misc/utils.cpp b/library/cpp/tvmauth/client/misc/utils.cpp
new file mode 100644
index 0000000000..a124c7b11c
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/utils.cpp
@@ -0,0 +1,46 @@
+#include "utils.h"
+#include <library/cpp/tvmauth/client/facade.h>
+#include <util/stream/format.h>
+namespace NTvmAuth::NInternal {
+ void TClientCaningKnife::StartTvmClientStopping(TTvmClient* c) {
+ if (c && c->Updater_) {
+ c->Updater_->StartTvmClientStopping();
+ }
+ }
+ bool TClientCaningKnife::IsTvmClientStopped(TTvmClient* c) {
+ return c && c->Updater_ ? c->Updater_->IsTvmClientStopped() : true;
+ }
+namespace NTvmAuth::NUtils {
+ TString ToHex(const TStringBuf s) {
+ TStringStream res;
+ res.Reserve(2 * s.size());
+ for (char c : s) {
+ res << Hex(c, HF_FULL);
+ }
+ return std::move(res.Str());
+ }
+ bool CheckBbEnvOverriding(EBlackboxEnv original, EBlackboxEnv override) noexcept {
+ switch (original) {
+ case EBlackboxEnv::Prod:
+ case EBlackboxEnv::ProdYateam:
+ return override == EBlackboxEnv::Prod || override == EBlackboxEnv::ProdYateam;
+ case EBlackboxEnv::Test:
+ return true;
+ case EBlackboxEnv::TestYateam:
+ return override == EBlackboxEnv::Test || override == EBlackboxEnv::TestYateam;
+ case EBlackboxEnv::Stress:
+ return override == EBlackboxEnv::Stress;
+ }
+ return false;
+ }
diff --git a/library/cpp/tvmauth/client/misc/utils.h b/library/cpp/tvmauth/client/misc/utils.h
new file mode 100644
index 0000000000..1aa5e61bf1
--- /dev/null
+++ b/library/cpp/tvmauth/client/misc/utils.h
@@ -0,0 +1,95 @@
+#pragma once
+#include "api/settings.h"
+#include "tool/settings.h"
+#include <util/string/cast.h>
+#include <util/system/spinlock.h>
+#include <optional>
+namespace NTvmAuth {
+ class TTvmClient;
+namespace NTvmAuth::NInternal {
+ class TClientCaningKnife {
+ public:
+ static void StartTvmClientStopping(TTvmClient* c);
+ static bool IsTvmClientStopped(TTvmClient* c);
+ };
+namespace NTvmAuth::NUtils {
+ TString ToHex(const TStringBuf s);
+ inline NTvmAuth::NTvmApi::TClientSettings::TDstMap ParseDstMap(TStringBuf dsts) {
+ NTvmAuth::NTvmApi::TClientSettings::TDstMap res;
+ while (dsts) {
+ TStringBuf pair = dsts.NextTok(';');
+ TStringBuf alias = pair.NextTok(':');
+ res.insert(decltype(res)::value_type(
+ alias,
+ IntFromString<TTvmId, 10>(pair)));
+ }
+ return res;
+ }
+ inline NTvmAuth::NTvmApi::TClientSettings::TDstVector ParseDstVector(TStringBuf dsts) {
+ NTvmAuth::NTvmApi::TClientSettings::TDstVector res;
+ while (dsts) {
+ res.push_back(IntFromString<TTvmId, 10>(dsts.NextTok(';')));
+ }
+ return res;
+ }
+ bool CheckBbEnvOverriding(EBlackboxEnv original, EBlackboxEnv override) noexcept;
+ template <class T>
+ class TProtectedValue {
+ class TAssignOp {
+ public:
+ static void Assign(T& l, const T& r) {
+ l = r;
+ }
+ template <typename U>
+ static void Assign(std::shared_ptr<U>& l, std::shared_ptr<U>& r) {
+ l.swap(r);
+ }
+ template <typename U>
+ static void Assign(TIntrusiveConstPtr<U>& l, TIntrusiveConstPtr<U>& r) {
+ l.Swap(r);
+ }
+ };
+ public:
+ TProtectedValue() = default;
+ TProtectedValue(T value)
+ : Value_(value)
+ {
+ }
+ T Get() const {
+ with_lock (Lock_) {
+ return Value_;
+ }
+ }
+ void Set(T o) {
+ with_lock (Lock_) {
+ TAssignOp::Assign(Value_, o);
+ }
+ }
+ private:
+ T Value_;
+ mutable TAdaptiveLock Lock_;
+ };