diff options
author | cerevra <cerevra@yandex-team.ru> | 2022-02-10 16:45:58 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:58 +0300 |
commit | bf41dd01f6c920583e9faae7cd55ed25e547e052 (patch) | |
tree | ec7c8c285ffa648a5c5efeff453787a15ab811ac /library/cpp/tvmauth/client/misc/api/threaded_updater.cpp | |
parent | e2c3e3004f7cd68441cefcfa4aaccd3d8051c846 (diff) | |
download | ydb-bf41dd01f6c920583e9faae7cd55ed25e547e052.tar.gz |
Restoring authorship annotation for <cerevra@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/tvmauth/client/misc/api/threaded_updater.cpp')
-rw-r--r-- | library/cpp/tvmauth/client/misc/api/threaded_updater.cpp | 1886 |
1 files changed, 943 insertions, 943 deletions
diff --git a/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp b/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp index a7df49c05d..c437e892dc 100644 --- a/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp +++ b/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp @@ -1,954 +1,954 @@ -#include "threaded_updater.h" - -#include <library/cpp/tvmauth/client/misc/disk_cache.h> -#include <library/cpp/tvmauth/client/misc/utils.h> -#include <library/cpp/tvmauth/client/misc/retry_settings/v1/settings.pb.h> - -#include <library/cpp/tvmauth/client/logger.h> - -#include <library/cpp/json/json_reader.h> - -#include <util/stream/str.h> -#include <util/string/builder.h> -#include <util/string/cast.h> -#include <util/system/thread.h> - -namespace NTvmAuth::NTvmApi { - static TString CreatePublicKeysUrl(const TClientSettings& settings, - const NUtils::TProcInfo& procInfo) { - TStringStream s; - s << "/2/keys"; - s << "?"; - procInfo.AddToRequest(s); - - s << "&get_retry_settings=yes"; - - if (settings.GetSelfTvmId() != 0) { - s << "&src=" << settings.GetSelfTvmId(); - } - - if (settings.IsUserTicketCheckingRequired()) { - s << "&env=" << static_cast<int>(settings.GetEnvForUserTickets()); - } - - return s.Str(); - } - - TAsyncUpdaterPtr TThreadedUpdater::Create(const TClientSettings& settings, TLoggerPtr logger) { - Y_ENSURE_EX(logger, TNonRetriableException() << "Logger is required"); - THolder<TThreadedUpdater> p(new TThreadedUpdater(settings, std::move(logger))); - p->Init(); - p->StartWorker(); - return p.Release(); - } - - TThreadedUpdater::~TThreadedUpdater() { - ExpBackoff_.SetEnabled(false); - ExpBackoff_.Interrupt(); - StopWorker(); // Required here to avoid using of deleted members - } - - TClientStatus TThreadedUpdater::GetStatus() const { - const TClientStatus::ECode state = GetState(); +#include "threaded_updater.h" + +#include <library/cpp/tvmauth/client/misc/disk_cache.h> +#include <library/cpp/tvmauth/client/misc/utils.h> +#include <library/cpp/tvmauth/client/misc/retry_settings/v1/settings.pb.h> + +#include <library/cpp/tvmauth/client/logger.h> + +#include <library/cpp/json/json_reader.h> + +#include <util/stream/str.h> +#include <util/string/builder.h> +#include <util/string/cast.h> +#include <util/system/thread.h> + +namespace NTvmAuth::NTvmApi { + static TString CreatePublicKeysUrl(const TClientSettings& settings, + const NUtils::TProcInfo& procInfo) { + TStringStream s; + s << "/2/keys"; + s << "?"; + procInfo.AddToRequest(s); + + s << "&get_retry_settings=yes"; + + if (settings.GetSelfTvmId() != 0) { + s << "&src=" << settings.GetSelfTvmId(); + } + + if (settings.IsUserTicketCheckingRequired()) { + s << "&env=" << static_cast<int>(settings.GetEnvForUserTickets()); + } + + return s.Str(); + } + + TAsyncUpdaterPtr TThreadedUpdater::Create(const TClientSettings& settings, TLoggerPtr logger) { + Y_ENSURE_EX(logger, TNonRetriableException() << "Logger is required"); + THolder<TThreadedUpdater> p(new TThreadedUpdater(settings, std::move(logger))); + p->Init(); + p->StartWorker(); + return p.Release(); + } + + TThreadedUpdater::~TThreadedUpdater() { + ExpBackoff_.SetEnabled(false); + ExpBackoff_.Interrupt(); + StopWorker(); // Required here to avoid using of deleted members + } + + TClientStatus TThreadedUpdater::GetStatus() const { + const TClientStatus::ECode state = GetState(); return TClientStatus(state, GetLastError(state == TClientStatus::Ok || state == TClientStatus::IncompleteTicketsSet)); - } - - NRoles::TRolesPtr TThreadedUpdater::GetRoles() const { - Y_ENSURE_EX(RolesFetcher_, - TBrokenTvmClientSettings() << "Roles were not configured in settings"); - return RolesFetcher_->GetCurrentRoles(); - } - - TClientStatus::ECode TThreadedUpdater::GetState() const { - const TInstant now = TInstant::Now(); - - if (Settings_.IsServiceTicketFetchingRequired()) { - if (AreServiceTicketsInvalid(now)) { - return TClientStatus::Error; - } - auto tickets = GetCachedServiceTickets(); - if (!tickets) { - return TClientStatus::Error; - } - if (tickets->TicketsById.size() < Destinations_.size()) { - if (Settings_.IsIncompleteTicketsSetAnError) { + } + + NRoles::TRolesPtr TThreadedUpdater::GetRoles() const { + Y_ENSURE_EX(RolesFetcher_, + TBrokenTvmClientSettings() << "Roles were not configured in settings"); + return RolesFetcher_->GetCurrentRoles(); + } + + TClientStatus::ECode TThreadedUpdater::GetState() const { + const TInstant now = TInstant::Now(); + + if (Settings_.IsServiceTicketFetchingRequired()) { + if (AreServiceTicketsInvalid(now)) { + return TClientStatus::Error; + } + auto tickets = GetCachedServiceTickets(); + if (!tickets) { + return TClientStatus::Error; + } + if (tickets->TicketsById.size() < Destinations_.size()) { + if (Settings_.IsIncompleteTicketsSetAnError) { return TClientStatus::Error; } else { return TClientStatus::IncompleteTicketsSet; } - } - } - if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) && ArePublicKeysInvalid(now)) { - return TClientStatus::Error; - } - - const TDuration sincePublicKeysUpdate = now - GetUpdateTimeOfPublicKeys(); - const TDuration sinceServiceTicketsUpdate = now - GetUpdateTimeOfServiceTickets(); - const TDuration sinceRolesUpdate = now - GetUpdateTimeOfRoles(); - - if (Settings_.IsServiceTicketFetchingRequired() && sinceServiceTicketsUpdate > ServiceTicketsDurations_.Expiring) { - return TClientStatus::Warning; - } - if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) && - sincePublicKeysUpdate > PublicKeysDurations_.Expiring) - { - return TClientStatus::Warning; - } - if (RolesFetcher_ && TRolesFetcher::ShouldWarn(RetrySettings_, sinceRolesUpdate)) { - return TClientStatus::Warning; - } - - return TClientStatus::Ok; - } - - TThreadedUpdater::TThreadedUpdater(const TClientSettings& settings, TLoggerPtr logger) + } + } + if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) && ArePublicKeysInvalid(now)) { + return TClientStatus::Error; + } + + const TDuration sincePublicKeysUpdate = now - GetUpdateTimeOfPublicKeys(); + const TDuration sinceServiceTicketsUpdate = now - GetUpdateTimeOfServiceTickets(); + const TDuration sinceRolesUpdate = now - GetUpdateTimeOfRoles(); + + if (Settings_.IsServiceTicketFetchingRequired() && sinceServiceTicketsUpdate > ServiceTicketsDurations_.Expiring) { + return TClientStatus::Warning; + } + if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) && + sincePublicKeysUpdate > PublicKeysDurations_.Expiring) + { + return TClientStatus::Warning; + } + if (RolesFetcher_ && TRolesFetcher::ShouldWarn(RetrySettings_, sinceRolesUpdate)) { + return TClientStatus::Warning; + } + + return TClientStatus::Ok; + } + + TThreadedUpdater::TThreadedUpdater(const TClientSettings& settings, TLoggerPtr logger) : TThreadedUpdaterBase( - TRetrySettings{}.WorkerAwakingPeriod, - std::move(logger), - settings.GetTvmHost(), - settings.GetTvmPort(), - settings.TvmSocketTimeout, - settings.TvmConnectTimeout) - , ExpBackoff_(RetrySettings_.BackoffSettings) - , Settings_(settings.CloneNormalized()) - , ProcInfo_(NUtils::TProcInfo::Create(Settings_.GetLibVersionPrefix())) - , PublicKeysUrl_(CreatePublicKeysUrl(Settings_, ProcInfo_)) - , DstAliases_(MakeAliasMap(Settings_)) - , Headers_({{"Content-Type", "application/x-www-form-urlencoded"}}) - , Random_(TInstant::Now().MicroSeconds()) - { - if (Settings_.IsServiceTicketFetchingRequired()) { - SigningContext_ = TServiceContext::SigningFactory(Settings_.GetSelfSecret()); - } - - if (Settings_.IsServiceTicketFetchingRequired()) { - Destinations_ = {Settings_.GetDestinations().begin(), Settings_.GetDestinations().end()}; - } - - PublicKeysDurations_.RefreshPeriod = TDuration::Days(1); - ServiceTicketsDurations_.RefreshPeriod = TDuration::Hours(1); - - if (Settings_.IsUserTicketCheckingRequired()) { - SetBbEnv(Settings_.GetEnvForUserTickets()); - } - - if (Settings_.IsRolesFetchingEnabled()) { - RolesFetcher_ = std::make_unique<TRolesFetcher>( - TRolesFetcherSettings{ - Settings_.GetTiroleHost(), - Settings_.GetTirolePort(), - Settings_.GetDiskCacheDir(), - ProcInfo_, - Settings_.GetSelfTvmId(), - Settings_.GetIdmSystemSlug(), - }, - Logger_); - } - - if (Settings_.IsDiskCacheUsed()) { - TString path = Settings_.GetDiskCacheDir(); - if (path.back() != '/') { - path.push_back('/'); - } - - if (Settings_.IsServiceTicketFetchingRequired()) { - ServiceTicketsFilepath_ = path; - ServiceTicketsFilepath_.append("service_tickets"); - } - - if (Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) { - PublicKeysFilepath_ = path; - PublicKeysFilepath_.append("public_keys"); - } - - RetrySettingsFilepath_ = path + "retry_settings"; + TRetrySettings{}.WorkerAwakingPeriod, + std::move(logger), + settings.GetTvmHost(), + settings.GetTvmPort(), + settings.TvmSocketTimeout, + settings.TvmConnectTimeout) + , ExpBackoff_(RetrySettings_.BackoffSettings) + , Settings_(settings.CloneNormalized()) + , ProcInfo_(NUtils::TProcInfo::Create(Settings_.GetLibVersionPrefix())) + , PublicKeysUrl_(CreatePublicKeysUrl(Settings_, ProcInfo_)) + , DstAliases_(MakeAliasMap(Settings_)) + , Headers_({{"Content-Type", "application/x-www-form-urlencoded"}}) + , Random_(TInstant::Now().MicroSeconds()) + { + if (Settings_.IsServiceTicketFetchingRequired()) { + SigningContext_ = TServiceContext::SigningFactory(Settings_.GetSelfSecret()); + } + + if (Settings_.IsServiceTicketFetchingRequired()) { + Destinations_ = {Settings_.GetDestinations().begin(), Settings_.GetDestinations().end()}; + } + + PublicKeysDurations_.RefreshPeriod = TDuration::Days(1); + ServiceTicketsDurations_.RefreshPeriod = TDuration::Hours(1); + + if (Settings_.IsUserTicketCheckingRequired()) { + SetBbEnv(Settings_.GetEnvForUserTickets()); + } + + if (Settings_.IsRolesFetchingEnabled()) { + RolesFetcher_ = std::make_unique<TRolesFetcher>( + TRolesFetcherSettings{ + Settings_.GetTiroleHost(), + Settings_.GetTirolePort(), + Settings_.GetDiskCacheDir(), + ProcInfo_, + Settings_.GetSelfTvmId(), + Settings_.GetIdmSystemSlug(), + }, + Logger_); + } + + if (Settings_.IsDiskCacheUsed()) { + TString path = Settings_.GetDiskCacheDir(); + if (path.back() != '/') { + path.push_back('/'); + } + + if (Settings_.IsServiceTicketFetchingRequired()) { + ServiceTicketsFilepath_ = path; + ServiceTicketsFilepath_.append("service_tickets"); + } + + if (Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) { + PublicKeysFilepath_ = path; + PublicKeysFilepath_.append("public_keys"); + } + + RetrySettingsFilepath_ = path + "retry_settings"; } else { LogInfo("Disk cache disabled. Please set disk cache directory in settings for best reliability"); - } - } - - void TThreadedUpdater::Init() { - ReadStateFromDisk(); - ClearErrors(); - ExpBackoff_.SetEnabled(false); - - // First of all try to get tickets: there are a lot of reasons to fail this request. - // As far as disk cache usually disabled, client will not fetch keys before fail on every ctor call. - UpdateServiceTickets(); - if (!AreServicesTicketsOk()) { - ThrowLastError(); - } - - UpdatePublicKeys(); - if (!IsServiceContextOk() || !IsUserContextOk()) { - ThrowLastError(); - } - - UpdateRoles(); - if (RolesFetcher_ && !RolesFetcher_->AreRolesOk()) { - ThrowLastError(); - } - - Inited_ = true; - ExpBackoff_.SetEnabled(true); - } - - void TThreadedUpdater::UpdateServiceTickets() { - if (!Settings_.IsServiceTicketFetchingRequired()) { - return; - } - - TInstant stut = GetUpdateTimeOfServiceTickets(); - try { - if (IsTimeToUpdateServiceTickets(stut)) { - UpdateAllServiceTickets(); - NeedFetchMissingServiceTickets_ = false; - } else if (NeedFetchMissingServiceTickets_ && GetCachedServiceTickets()->TicketsById.size() < Destinations_.size()) { - UpdateMissingServiceTickets(Destinations_); - NeedFetchMissingServiceTickets_ = false; - } - if (AreServicesTicketsOk()) { - ClearError(EScope::ServiceTickets); - } - } catch (const std::exception& e) { - ProcessError(EType::Retriable, EScope::ServiceTickets, e.what()); - LogWarning(TStringBuilder() << "Failed to update service tickets: " << e.what()); - if (TInstant::Now() - stut > ServiceTicketsDurations_.Expiring) { - LogError("Service tickets have not been refreshed for too long period"); - } - } - } - - void TThreadedUpdater::UpdateAllServiceTickets() { - THttpResult st = GetServiceTicketsFromHttp(Destinations_, RetrySettings_.DstsLimit); - - auto oldCache = GetCachedServiceTickets(); - if (oldCache) { - for (const auto& pair : oldCache->ErrorsById) { - st.TicketsWithErrors.Errors.insert(pair); - } - } - - UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), TInstant::Now()); - if (ServiceTicketsFilepath_) { - DiskCacheServiceTickets_ = CreateJsonArray(st.Responses); - TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get()); - w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId())); - } - } - - TServiceTicketsPtr TThreadedUpdater::UpdateMissingServiceTickets(const TDstSet& required) { - TServiceTicketsPtr cache = GetCachedServiceTickets(); - TClientSettings::TDstVector dsts = FindMissingDsts(cache, required); - - if (dsts.empty()) { - return cache; - } - - THttpResult st = GetServiceTicketsFromHttp(dsts, RetrySettings_.DstsLimit); - - size_t gotTickets = st.TicketsWithErrors.Tickets.size(); - - for (const auto& pair : cache->TicketsById) { - st.TicketsWithErrors.Tickets.insert(pair); - } - for (const auto& pair : cache->ErrorsById) { - st.TicketsWithErrors.Errors.insert(pair); - } - for (const auto& pair : st.TicketsWithErrors.Tickets) { - st.TicketsWithErrors.Errors.erase(pair.first); - } - - TServiceTicketsPtr c = UpdateServiceTicketsCachePartly( - std::move(st.TicketsWithErrors), - gotTickets); - if (!c) { - LogWarning("UpdateMissingServiceTickets: new cache is NULL. BUG?"); - c = cache; - } - - if (!ServiceTicketsFilepath_) { - return c; - } - - DiskCacheServiceTickets_ = AppendToJsonArray(DiskCacheServiceTickets_, st.Responses); - - TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get()); - w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId())); - - return c; - } - - void TThreadedUpdater::UpdatePublicKeys() { - if (!Settings_.IsServiceTicketCheckingRequired() && !Settings_.IsUserTicketCheckingRequired()) { - return; - } - - TInstant pkut = GetUpdateTimeOfPublicKeys(); - if (!IsTimeToUpdatePublicKeys(pkut)) { - return; - } - - try { - TString publicKeys = GetPublicKeysFromHttp(); - - UpdatePublicKeysCache(publicKeys, TInstant::Now()); - if (PublicKeysFilepath_) { - TDiskWriter w(PublicKeysFilepath_, Logger_.Get()); - w.Write(publicKeys); - } - if (IsServiceContextOk() && IsUserContextOk()) { - ClearError(EScope::PublicKeys); - } - } catch (const std::exception& e) { - ProcessError(EType::Retriable, EScope::PublicKeys, e.what()); - LogWarning(TStringBuilder() << "Failed to update public keys: " << e.what()); - if (TInstant::Now() - pkut > PublicKeysDurations_.Expiring) { - LogError("Public keys have not been refreshed for too long period"); - } - } - } - - void TThreadedUpdater::UpdateRoles() { - if (!RolesFetcher_) { - return; - } - - TInstant rut = GetUpdateTimeOfRoles(); - if (!TRolesFetcher::IsTimeToUpdate(RetrySettings_, TInstant::Now() - rut)) { - return; - } - - struct TCloser { - TRolesFetcher* Fetcher; - ~TCloser() { - Fetcher->ResetConnection(); - } - } closer{RolesFetcher_.get()}; - - try { - TServiceTicketsPtr st = GetCachedServiceTickets(); - Y_ENSURE(st, "No one service ticket in memory: how it possible?"); - auto it = st->TicketsById.find(Settings_.GetTiroleTvmId()); - Y_ENSURE(it != st->TicketsById.end(), - "Missing tvmid for tirole in cache: " << Settings_.GetTiroleTvmId()); - - RolesFetcher_->Update( - FetchWithRetries( - [&]() { return RolesFetcher_->FetchActualRoles(it->second); }, - EScope::Roles)); - SetUpdateTimeOfRoles(TInstant::Now()); - - if (RolesFetcher_->AreRolesOk()) { - ClearError(EScope::Roles); - } - } catch (const std::exception& e) { - ProcessError(EType::Retriable, EScope::Roles, e.what()); - LogWarning(TStringBuilder() << "Failed to update roles: " << e.what()); - if (TRolesFetcher::ShouldWarn(RetrySettings_, TInstant::Now() - rut)) { - LogError("Roles have not been refreshed for too long period"); - } - } - } - - TServiceTicketsPtr TThreadedUpdater::UpdateServiceTicketsCachePartly( - TAsyncUpdaterBase::TPairTicketsErrors&& tickets, - size_t got) { - size_t count = tickets.Tickets.size(); - TServiceTicketsPtr c = MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets), - std::move(tickets.Errors), - DstAliases_); - SetServiceTickets(c); - - LogInfo(TStringBuilder() - << "Cache was partly updated with " << got - << " service ticket(s). total: " << count); - - return c; - } - - void TThreadedUpdater::UpdateServiceTicketsCache(TPairTicketsErrors&& tickets, TInstant time) { - size_t count = tickets.Tickets.size(); - SetServiceTickets(MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets), - std::move(tickets.Errors), - DstAliases_)); - - SetUpdateTimeOfServiceTickets(time); - + } + } + + void TThreadedUpdater::Init() { + ReadStateFromDisk(); + ClearErrors(); + ExpBackoff_.SetEnabled(false); + + // First of all try to get tickets: there are a lot of reasons to fail this request. + // As far as disk cache usually disabled, client will not fetch keys before fail on every ctor call. + UpdateServiceTickets(); + if (!AreServicesTicketsOk()) { + ThrowLastError(); + } + + UpdatePublicKeys(); + if (!IsServiceContextOk() || !IsUserContextOk()) { + ThrowLastError(); + } + + UpdateRoles(); + if (RolesFetcher_ && !RolesFetcher_->AreRolesOk()) { + ThrowLastError(); + } + + Inited_ = true; + ExpBackoff_.SetEnabled(true); + } + + void TThreadedUpdater::UpdateServiceTickets() { + if (!Settings_.IsServiceTicketFetchingRequired()) { + return; + } + + TInstant stut = GetUpdateTimeOfServiceTickets(); + try { + if (IsTimeToUpdateServiceTickets(stut)) { + UpdateAllServiceTickets(); + NeedFetchMissingServiceTickets_ = false; + } else if (NeedFetchMissingServiceTickets_ && GetCachedServiceTickets()->TicketsById.size() < Destinations_.size()) { + UpdateMissingServiceTickets(Destinations_); + NeedFetchMissingServiceTickets_ = false; + } + if (AreServicesTicketsOk()) { + ClearError(EScope::ServiceTickets); + } + } catch (const std::exception& e) { + ProcessError(EType::Retriable, EScope::ServiceTickets, e.what()); + LogWarning(TStringBuilder() << "Failed to update service tickets: " << e.what()); + if (TInstant::Now() - stut > ServiceTicketsDurations_.Expiring) { + LogError("Service tickets have not been refreshed for too long period"); + } + } + } + + void TThreadedUpdater::UpdateAllServiceTickets() { + THttpResult st = GetServiceTicketsFromHttp(Destinations_, RetrySettings_.DstsLimit); + + auto oldCache = GetCachedServiceTickets(); + if (oldCache) { + for (const auto& pair : oldCache->ErrorsById) { + st.TicketsWithErrors.Errors.insert(pair); + } + } + + UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), TInstant::Now()); + if (ServiceTicketsFilepath_) { + DiskCacheServiceTickets_ = CreateJsonArray(st.Responses); + TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get()); + w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId())); + } + } + + TServiceTicketsPtr TThreadedUpdater::UpdateMissingServiceTickets(const TDstSet& required) { + TServiceTicketsPtr cache = GetCachedServiceTickets(); + TClientSettings::TDstVector dsts = FindMissingDsts(cache, required); + + if (dsts.empty()) { + return cache; + } + + THttpResult st = GetServiceTicketsFromHttp(dsts, RetrySettings_.DstsLimit); + + size_t gotTickets = st.TicketsWithErrors.Tickets.size(); + + for (const auto& pair : cache->TicketsById) { + st.TicketsWithErrors.Tickets.insert(pair); + } + for (const auto& pair : cache->ErrorsById) { + st.TicketsWithErrors.Errors.insert(pair); + } + for (const auto& pair : st.TicketsWithErrors.Tickets) { + st.TicketsWithErrors.Errors.erase(pair.first); + } + + TServiceTicketsPtr c = UpdateServiceTicketsCachePartly( + std::move(st.TicketsWithErrors), + gotTickets); + if (!c) { + LogWarning("UpdateMissingServiceTickets: new cache is NULL. BUG?"); + c = cache; + } + + if (!ServiceTicketsFilepath_) { + return c; + } + + DiskCacheServiceTickets_ = AppendToJsonArray(DiskCacheServiceTickets_, st.Responses); + + TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get()); + w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId())); + + return c; + } + + void TThreadedUpdater::UpdatePublicKeys() { + if (!Settings_.IsServiceTicketCheckingRequired() && !Settings_.IsUserTicketCheckingRequired()) { + return; + } + + TInstant pkut = GetUpdateTimeOfPublicKeys(); + if (!IsTimeToUpdatePublicKeys(pkut)) { + return; + } + + try { + TString publicKeys = GetPublicKeysFromHttp(); + + UpdatePublicKeysCache(publicKeys, TInstant::Now()); + if (PublicKeysFilepath_) { + TDiskWriter w(PublicKeysFilepath_, Logger_.Get()); + w.Write(publicKeys); + } + if (IsServiceContextOk() && IsUserContextOk()) { + ClearError(EScope::PublicKeys); + } + } catch (const std::exception& e) { + ProcessError(EType::Retriable, EScope::PublicKeys, e.what()); + LogWarning(TStringBuilder() << "Failed to update public keys: " << e.what()); + if (TInstant::Now() - pkut > PublicKeysDurations_.Expiring) { + LogError("Public keys have not been refreshed for too long period"); + } + } + } + + void TThreadedUpdater::UpdateRoles() { + if (!RolesFetcher_) { + return; + } + + TInstant rut = GetUpdateTimeOfRoles(); + if (!TRolesFetcher::IsTimeToUpdate(RetrySettings_, TInstant::Now() - rut)) { + return; + } + + struct TCloser { + TRolesFetcher* Fetcher; + ~TCloser() { + Fetcher->ResetConnection(); + } + } closer{RolesFetcher_.get()}; + + try { + TServiceTicketsPtr st = GetCachedServiceTickets(); + Y_ENSURE(st, "No one service ticket in memory: how it possible?"); + auto it = st->TicketsById.find(Settings_.GetTiroleTvmId()); + Y_ENSURE(it != st->TicketsById.end(), + "Missing tvmid for tirole in cache: " << Settings_.GetTiroleTvmId()); + + RolesFetcher_->Update( + FetchWithRetries( + [&]() { return RolesFetcher_->FetchActualRoles(it->second); }, + EScope::Roles)); + SetUpdateTimeOfRoles(TInstant::Now()); + + if (RolesFetcher_->AreRolesOk()) { + ClearError(EScope::Roles); + } + } catch (const std::exception& e) { + ProcessError(EType::Retriable, EScope::Roles, e.what()); + LogWarning(TStringBuilder() << "Failed to update roles: " << e.what()); + if (TRolesFetcher::ShouldWarn(RetrySettings_, TInstant::Now() - rut)) { + LogError("Roles have not been refreshed for too long period"); + } + } + } + + TServiceTicketsPtr TThreadedUpdater::UpdateServiceTicketsCachePartly( + TAsyncUpdaterBase::TPairTicketsErrors&& tickets, + size_t got) { + size_t count = tickets.Tickets.size(); + TServiceTicketsPtr c = MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets), + std::move(tickets.Errors), + DstAliases_); + SetServiceTickets(c); + + LogInfo(TStringBuilder() + << "Cache was partly updated with " << got + << " service ticket(s). total: " << count); + + return c; + } + + void TThreadedUpdater::UpdateServiceTicketsCache(TPairTicketsErrors&& tickets, TInstant time) { + size_t count = tickets.Tickets.size(); + SetServiceTickets(MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets), + std::move(tickets.Errors), + DstAliases_)); + + SetUpdateTimeOfServiceTickets(time); + if (count > 0) { LogInfo(TStringBuilder() << "Cache was updated with " << count << " service ticket(s): " << time); } - } - - void TThreadedUpdater::UpdatePublicKeysCache(const TString& publicKeys, TInstant time) { - if (publicKeys.empty()) { - return; - } - - if (Settings_.IsServiceTicketCheckingRequired()) { - SetServiceContext(MakeIntrusiveConst<TServiceContext>( - TServiceContext::CheckingFactory(Settings_.GetSelfTvmId(), - publicKeys))); - } - - if (Settings_.IsUserTicketCheckingRequired()) { - SetUserContext(publicKeys); - } - - SetUpdateTimeOfPublicKeys(time); - - LogInfo(TStringBuilder() << "Cache was updated with public keys: " << time); - } - - void TThreadedUpdater::ReadStateFromDisk() { - try { - TServiceTicketsFromDisk st = ReadServiceTicketsFromDisk(); - UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), st.BornDate); - DiskCacheServiceTickets_ = st.FileBody; - } catch (const std::exception& e) { - LogWarning(TStringBuilder() << "Failed to read service tickets from disk: " << e.what()); - } - - try { - std::pair<TString, TInstant> pk = ReadPublicKeysFromDisk(); - UpdatePublicKeysCache(pk.first, pk.second); - } catch (const std::exception& e) { - LogWarning(TStringBuilder() << "Failed to read public keys from disk: " << e.what()); - } - - try { - TString rs = ReadRetrySettingsFromDisk(); - UpdateRetrySettings(rs); - } catch (const std::exception& e) { - LogWarning(TStringBuilder() << "Failed to read retry settings from disk: " << e.what()); - } - - try { - if (RolesFetcher_) { - SetUpdateTimeOfRoles(RolesFetcher_->ReadFromDisk()); - } - } catch (const std::exception& e) { - LogWarning(TStringBuilder() << "Failed to read roles from disk: " << e.what()); - } - } - - TThreadedUpdater::TServiceTicketsFromDisk TThreadedUpdater::ReadServiceTicketsFromDisk() const { - if (!ServiceTicketsFilepath_) { - return {}; - } - - TDiskReader r(ServiceTicketsFilepath_, Logger_.Get()); - if (!r.Read()) { - return {}; - } - - std::pair<TStringBuf, TTvmId> data = ParseTicketsFromDisk(r.Data()); - if (data.second != Settings_.GetSelfTvmId()) { - TStringStream s; - s << "Disk cache is for another tvmId (" << data.second << "). "; - s << "Self=" << Settings_.GetSelfTvmId(); - LogWarning(s.Str()); - return {}; - } - - TPairTicketsErrors res; - ParseTicketsFromResponse(data.first, Destinations_, res); - if (IsInvalid(TServiceTickets::GetInvalidationTime(res.Tickets), TInstant::Now())) { - LogWarning("Disk cache (service tickets) is too old"); - return {}; - } - - LogInfo(TStringBuilder() << "Got " << res.Tickets.size() << " service ticket(s) from disk"); - return {std::move(res), r.Time(), TString(data.first)}; - } - - std::pair<TString, TInstant> TThreadedUpdater::ReadPublicKeysFromDisk() const { - if (!PublicKeysFilepath_) { - return {}; - } - - TDiskReader r(PublicKeysFilepath_, Logger_.Get()); - if (!r.Read()) { - return {}; - } - - if (TInstant::Now() - r.Time() > PublicKeysDurations_.Invalid) { - LogWarning("Disk cache (public keys) is too old"); - return {}; - } - - return {r.Data(), r.Time()}; - } - - TString TThreadedUpdater::ReadRetrySettingsFromDisk() const { - if (!RetrySettingsFilepath_) { - return {}; - } - - TDiskReader r(RetrySettingsFilepath_, Logger_.Get()); - if (!r.Read()) { - return {}; - } - - return r.Data(); - } - - template <class Dsts> - TThreadedUpdater::THttpResult TThreadedUpdater::GetServiceTicketsFromHttp(const Dsts& dsts, const size_t dstLimit) const { - Y_ENSURE(SigningContext_, "Internal error"); - - TClientSettings::TDstVector part; - part.reserve(dstLimit); - THttpResult res; - res.TicketsWithErrors.Tickets.reserve(dsts.size()); - res.Responses.reserve(dsts.size() / dstLimit + 1); - - for (auto it = dsts.begin(); it != dsts.end();) { - part.clear(); - for (size_t count = 0; it != dsts.end() && count < dstLimit; ++count, ++it) { - part.push_back(*it); - } - - TString response = - FetchWithRetries( - [this, &part]() { - // create request here to keep 'ts' actual - return FetchServiceTicketsFromHttp(PrepareRequestForServiceTickets( - Settings_.GetSelfTvmId(), - *SigningContext_, - part, - ProcInfo_)); - }, - EScope::ServiceTickets) - .Response; - ParseTicketsFromResponse(response, part, res.TicketsWithErrors); - LogDebug(TStringBuilder() - << "Response with service tickets for " << part.size() - << " destination(s) was successfully fetched from " << TvmUrl_); - - res.Responses.push_back(response); - } - - LogDebug(TStringBuilder() - << "Got responses with service tickets with " << res.Responses.size() << " pages for " - << dsts.size() << " destination(s)"); - for (const auto& p : res.TicketsWithErrors.Errors) { - LogError(TStringBuilder() - << "Failed to get service ticket for dst=" << p.first << ": " << p.second); - } - - return res; - } - - TString TThreadedUpdater::GetPublicKeysFromHttp() const { - TString publicKeys = - FetchWithRetries( - [this]() { return FetchPublicKeysFromHttp(); }, - EScope::PublicKeys) - .Response; - - LogDebug("Public keys were successfully fetched from " + TvmUrl_); - - return publicKeys; - } - - NUtils::TFetchResult TThreadedUpdater::FetchServiceTicketsFromHttp(const TString& body) const { - TStringStream s; - - THttpHeaders outHeaders; - TKeepAliveHttpClient::THttpCode code = GetClient().DoPost("/2/ticket", body, &s, Headers_, &outHeaders); - - const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings"); - - return {code, {}, "/2/ticket", s.Str(), settings ? settings->Value() : ""}; - } - - NUtils::TFetchResult TThreadedUpdater::FetchPublicKeysFromHttp() const { - TStringStream s; - - THttpHeaders outHeaders; - TKeepAliveHttpClient::THttpCode code = GetClient().DoGet(PublicKeysUrl_, &s, {}, &outHeaders); - - const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings"); - - return {code, {}, "/2/keys", s.Str(), settings ? settings->Value() : ""}; - } - - bool TThreadedUpdater::UpdateRetrySettings(const TString& header) const { - if (header.empty()) { - // Probably it is some kind of test? - return false; - } - - try { - TString raw = NUtils::Base64url2bin(header); - Y_ENSURE(raw, "Invalid base64url in settings"); - - retry_settings::v1::Settings proto; - Y_ENSURE(proto.ParseFromString(raw), "Invalid proto"); - - // This ugly hack helps to process these settings in any case - TThreadedUpdater& this_ = *const_cast<TThreadedUpdater*>(this); - TRetrySettings& res = this_.RetrySettings_; - - TStringStream diff; - auto update = [&diff](auto& l, const auto& r, TStringBuf desc) { - if (l != r) { - diff << desc << ":" << l << "->" << r << ";"; - l = r; - } - }; - - if (proto.has_exponential_backoff_min_sec()) { - update(res.BackoffSettings.Min, - TDuration::Seconds(proto.exponential_backoff_min_sec()), - "exponential_backoff_min"); - } - if (proto.has_exponential_backoff_max_sec()) { - update(res.BackoffSettings.Max, - TDuration::Seconds(proto.exponential_backoff_max_sec()), - "exponential_backoff_max"); - } - if (proto.has_exponential_backoff_factor()) { - update(res.BackoffSettings.Factor, - proto.exponential_backoff_factor(), - "exponential_backoff_factor"); - } - if (proto.has_exponential_backoff_jitter()) { - update(res.BackoffSettings.Jitter, - proto.exponential_backoff_jitter(), - "exponential_backoff_jitter"); - } - this_.ExpBackoff_.UpdateSettings(res.BackoffSettings); - - if (proto.has_max_random_sleep_default()) { - update(res.MaxRandomSleepDefault, - TDuration::MilliSeconds(proto.max_random_sleep_default()), - "max_random_sleep_default"); - } - if (proto.has_max_random_sleep_when_ok()) { - update(res.MaxRandomSleepWhenOk, - TDuration::MilliSeconds(proto.max_random_sleep_when_ok()), - "max_random_sleep_when_ok"); - } - if (proto.has_retries_on_start()) { - Y_ENSURE(proto.retries_on_start(), "retries_on_start==0"); - update(res.RetriesOnStart, - proto.retries_on_start(), - "retries_on_start"); - } - if (proto.has_retries_in_background()) { - Y_ENSURE(proto.retries_in_background(), "retries_in_background==0"); - update(res.RetriesInBackground, - proto.retries_in_background(), - "retries_in_background"); - } - if (proto.has_worker_awaking_period_sec()) { - update(res.WorkerAwakingPeriod, - TDuration::Seconds(proto.worker_awaking_period_sec()), - "worker_awaking_period"); - this_.WorkerAwakingPeriod_ = res.WorkerAwakingPeriod; - } - if (proto.has_dsts_limit()) { - Y_ENSURE(proto.dsts_limit(), "dsts_limit==0"); - update(res.DstsLimit, - proto.dsts_limit(), - "dsts_limit"); - } - - if (proto.has_roles_update_period_sec()) { - Y_ENSURE(proto.roles_update_period_sec(), "roles_update_period==0"); - update(res.RolesUpdatePeriod, - TDuration::Seconds(proto.roles_update_period_sec()), - "roles_update_period_sec"); - } - if (proto.has_roles_warn_period_sec()) { - Y_ENSURE(proto.roles_warn_period_sec(), "roles_warn_period_sec==0"); - update(res.RolesWarnPeriod, - TDuration::Seconds(proto.roles_warn_period_sec()), - "roles_warn_period_sec"); - } - - if (diff.empty()) { - return false; - } - - LogDebug("Retry settings were updated: " + diff.Str()); - return true; - } catch (const std::exception& e) { - LogWarning(TStringBuilder() - << "Failed to update retry settings from server, header '" - << header << "': " - << e.what()); - } - - return false; - } - - template <typename Func> - NUtils::TFetchResult TThreadedUpdater::FetchWithRetries(Func func, EScope scope) const { - const ui32 tries = Inited_ ? RetrySettings_.RetriesInBackground - : RetrySettings_.RetriesOnStart; - - for (size_t idx = 1;; ++idx) { - RandomSleep(); - - try { - NUtils::TFetchResult result = func(); - - if (UpdateRetrySettings(result.RetrySettings) && RetrySettingsFilepath_) { - TDiskWriter w(RetrySettingsFilepath_, Logger_.Get()); - w.Write(result.RetrySettings); - } - - if (400 <= result.Code && result.Code <= 499) { - throw TNonRetriableException() << ProcessHttpError(scope, result.Path, result.Code, result.Response); - } - if (result.Code < 200 || result.Code >= 399) { - throw yexception() << ProcessHttpError(scope, result.Path, result.Code, result.Response); - } - - ExpBackoff_.Decrease(); - return result; - } catch (const TNonRetriableException& e) { - LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what()); - ExpBackoff_.Increase(); - throw; - } catch (const std::exception& e) { - LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what()); - ExpBackoff_.Increase(); - if (idx >= tries) { - throw; - } - } - } - - throw yexception() << "unreachable"; - } - - void TThreadedUpdater::RandomSleep() const { - const TDuration maxSleep = TClientStatus::ECode::Ok == GetState() - ? RetrySettings_.MaxRandomSleepWhenOk - : RetrySettings_.MaxRandomSleepDefault; - - if (maxSleep) { - ui32 toSleep = Random_.GenRand() % maxSleep.MilliSeconds(); - ExpBackoff_.Sleep(TDuration::MilliSeconds(toSleep)); - } - } - - TString TThreadedUpdater::PrepareRequestForServiceTickets(TTvmId src, - const TServiceContext& ctx, - const TClientSettings::TDstVector& dsts, - const NUtils::TProcInfo& procInfo, - time_t now) { - TStringStream s; - - const TString ts = IntToString<10>(now); - TStringStream dst; - dst.Reserve(10 * dsts.size()); - for (const TClientSettings::TDst& d : dsts) { - if (dst.Str()) { - dst << ','; - } - dst << d.Id; - } - - s << "grant_type=client_credentials"; - s << "&src=" << src; - s << "&dst=" << dst.Str(); - s << "&ts=" << ts; - s << "&sign=" << ctx.SignCgiParamsForTvm(ts, dst.Str()); - s << "&get_retry_settings=yes"; - - s << "&"; - procInfo.AddToRequest(s); - - return s.Str(); - } - - template <class Dsts> - void TThreadedUpdater::ParseTicketsFromResponse(TStringBuf resp, - const Dsts& dsts, - TPairTicketsErrors& out) const { - NJson::TJsonValue doc; - Y_ENSURE(NJson::ReadJsonTree(resp, &doc), "Invalid json from tvm-api: " << resp); - - const NJson::TJsonValue* currentResp = doc.IsMap() ? &doc : nullptr; - auto find = [¤tResp, &doc](TTvmId id, NJson::TJsonValue& obj) -> bool { - const TString idStr = IntToString<10>(id); - if (currentResp && currentResp->GetValue(idStr, &obj)) { - return true; - } - - for (const NJson::TJsonValue& val : doc.GetArray()) { - currentResp = &val; - if (currentResp->GetValue(idStr, &obj)) { - return true; - } - } - - return false; - }; - - for (const TClientSettings::TDst& d : dsts) { - NJson::TJsonValue obj; - NJson::TJsonValue val; - - if (!find(d.Id, obj) || !obj.GetValue("ticket", &val)) { - TString err; - if (obj.GetValue("error", &val)) { - err = val.GetString(); - } else { - err = "Missing tvm_id in response, should never happend: " + IntToString<10>(d.Id); - } - - TStringStream s; - s << "Failed to get ServiceTicket for " << d.Id << ": " << err; - ProcessError(EType::NonRetriable, EScope::ServiceTickets, s.Str()); - - out.Errors.insert({d.Id, std::move(err)}); - continue; - } - - out.Tickets.insert({d.Id, val.GetString()}); - } - } - - static const char DELIMETER = '\t'; - TString TThreadedUpdater::PrepareTicketsForDisk(TStringBuf tvmResponse, TTvmId selfId) { - TStringStream s; - s << tvmResponse << DELIMETER << selfId; - return s.Str(); - } - - std::pair<TStringBuf, TTvmId> TThreadedUpdater::ParseTicketsFromDisk(TStringBuf data) { - TStringBuf tvmId = data.RNextTok(DELIMETER); - return {data, IntFromString<TTvmId, 10>(tvmId)}; - } - - const TDstSet& TThreadedUpdater::GetDsts() const { - return Destinations_; - } - - void TThreadedUpdater::AddDstToSettings(const TClientSettings::TDst& dst) { - Destinations_.insert(dst); - } - - bool TThreadedUpdater::IsTimeToUpdateServiceTickets(TInstant lastUpdate) const { - return TInstant::Now() - lastUpdate > ServiceTicketsDurations_.RefreshPeriod; - } - - bool TThreadedUpdater::IsTimeToUpdatePublicKeys(TInstant lastUpdate) const { - return TInstant::Now() - lastUpdate > PublicKeysDurations_.RefreshPeriod; - } - - bool TThreadedUpdater::AreServicesTicketsOk() const { - if (!Settings_.IsServiceTicketFetchingRequired()) { - return true; - } - auto c = GetCachedServiceTickets(); - return c && (!Settings_.IsIncompleteTicketsSetAnError || c->TicketsById.size() == Destinations_.size()); - } - - bool TThreadedUpdater::IsServiceContextOk() const { - if (!Settings_.IsServiceTicketCheckingRequired()) { - return true; - } - - return bool(GetCachedServiceContext()); - } - - bool TThreadedUpdater::IsUserContextOk() const { - if (!Settings_.IsUserTicketCheckingRequired()) { - return true; - } - return bool(GetCachedUserContext()); - } - - void TThreadedUpdater::Worker() { - UpdateServiceTickets(); - UpdatePublicKeys(); - UpdateRoles(); - } - - TServiceTickets::TMapAliasId TThreadedUpdater::MakeAliasMap(const TClientSettings& settings) { - TServiceTickets::TMapAliasId res; - - if (settings.HasDstAliases()) { - for (const auto& pair : settings.GetDstAliases()) { - res.insert({pair.first, pair.second.Id}); - } - } - - return res; - } - - TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(TServiceTicketsPtr available, const TDstSet& required) { - Y_ENSURE(available); - TDstSet set; - // available->TicketsById is not sorted - for (const auto& pair : available->TicketsById) { - set.insert(pair.first); - } - return FindMissingDsts(set, required); - } - - TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(const TDstSet& available, const TDstSet& required) { - TClientSettings::TDstVector res; - std::set_difference(required.begin(), required.end(), - available.begin(), available.end(), - std::inserter(res, res.begin())); - return res; - } - - TString TThreadedUpdater::CreateJsonArray(const TSmallVec<TString>& responses) { - if (responses.empty()) { - return "[]"; - } - - size_t size = 0; - for (const TString& r : responses) { - size += r.size() + 1; - } - - TString res; - res.reserve(size + 2); - - res.push_back('['); - for (const TString& r : responses) { - res.append(r).push_back(','); - } - res.back() = ']'; - - return res; - } - - TString TThreadedUpdater::AppendToJsonArray(const TString& json, const TSmallVec<TString>& responses) { - Y_ENSURE(json, "previous body required"); - - size_t size = 0; - for (const TString& r : responses) { - size += r.size() + 1; - } - - TString res; - res.reserve(size + 2 + json.size()); - - res.push_back('['); - if (json.StartsWith('[')) { - Y_ENSURE(json.EndsWith(']'), "array is broken:" << json); - res.append(TStringBuf(json).Chop(1).Skip(1)); - } else { - res.append(json); - } - - res.push_back(','); - for (const TString& r : responses) { - res.append(r).push_back(','); - } - res.back() = ']'; - - return res; - } -} + } + + void TThreadedUpdater::UpdatePublicKeysCache(const TString& publicKeys, TInstant time) { + if (publicKeys.empty()) { + return; + } + + if (Settings_.IsServiceTicketCheckingRequired()) { + SetServiceContext(MakeIntrusiveConst<TServiceContext>( + TServiceContext::CheckingFactory(Settings_.GetSelfTvmId(), + publicKeys))); + } + + if (Settings_.IsUserTicketCheckingRequired()) { + SetUserContext(publicKeys); + } + + SetUpdateTimeOfPublicKeys(time); + + LogInfo(TStringBuilder() << "Cache was updated with public keys: " << time); + } + + void TThreadedUpdater::ReadStateFromDisk() { + try { + TServiceTicketsFromDisk st = ReadServiceTicketsFromDisk(); + UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), st.BornDate); + DiskCacheServiceTickets_ = st.FileBody; + } catch (const std::exception& e) { + LogWarning(TStringBuilder() << "Failed to read service tickets from disk: " << e.what()); + } + + try { + std::pair<TString, TInstant> pk = ReadPublicKeysFromDisk(); + UpdatePublicKeysCache(pk.first, pk.second); + } catch (const std::exception& e) { + LogWarning(TStringBuilder() << "Failed to read public keys from disk: " << e.what()); + } + + try { + TString rs = ReadRetrySettingsFromDisk(); + UpdateRetrySettings(rs); + } catch (const std::exception& e) { + LogWarning(TStringBuilder() << "Failed to read retry settings from disk: " << e.what()); + } + + try { + if (RolesFetcher_) { + SetUpdateTimeOfRoles(RolesFetcher_->ReadFromDisk()); + } + } catch (const std::exception& e) { + LogWarning(TStringBuilder() << "Failed to read roles from disk: " << e.what()); + } + } + + TThreadedUpdater::TServiceTicketsFromDisk TThreadedUpdater::ReadServiceTicketsFromDisk() const { + if (!ServiceTicketsFilepath_) { + return {}; + } + + TDiskReader r(ServiceTicketsFilepath_, Logger_.Get()); + if (!r.Read()) { + return {}; + } + + std::pair<TStringBuf, TTvmId> data = ParseTicketsFromDisk(r.Data()); + if (data.second != Settings_.GetSelfTvmId()) { + TStringStream s; + s << "Disk cache is for another tvmId (" << data.second << "). "; + s << "Self=" << Settings_.GetSelfTvmId(); + LogWarning(s.Str()); + return {}; + } + + TPairTicketsErrors res; + ParseTicketsFromResponse(data.first, Destinations_, res); + if (IsInvalid(TServiceTickets::GetInvalidationTime(res.Tickets), TInstant::Now())) { + LogWarning("Disk cache (service tickets) is too old"); + return {}; + } + + LogInfo(TStringBuilder() << "Got " << res.Tickets.size() << " service ticket(s) from disk"); + return {std::move(res), r.Time(), TString(data.first)}; + } + + std::pair<TString, TInstant> TThreadedUpdater::ReadPublicKeysFromDisk() const { + if (!PublicKeysFilepath_) { + return {}; + } + + TDiskReader r(PublicKeysFilepath_, Logger_.Get()); + if (!r.Read()) { + return {}; + } + + if (TInstant::Now() - r.Time() > PublicKeysDurations_.Invalid) { + LogWarning("Disk cache (public keys) is too old"); + return {}; + } + + return {r.Data(), r.Time()}; + } + + TString TThreadedUpdater::ReadRetrySettingsFromDisk() const { + if (!RetrySettingsFilepath_) { + return {}; + } + + TDiskReader r(RetrySettingsFilepath_, Logger_.Get()); + if (!r.Read()) { + return {}; + } + + return r.Data(); + } + + template <class Dsts> + TThreadedUpdater::THttpResult TThreadedUpdater::GetServiceTicketsFromHttp(const Dsts& dsts, const size_t dstLimit) const { + Y_ENSURE(SigningContext_, "Internal error"); + + TClientSettings::TDstVector part; + part.reserve(dstLimit); + THttpResult res; + res.TicketsWithErrors.Tickets.reserve(dsts.size()); + res.Responses.reserve(dsts.size() / dstLimit + 1); + + for (auto it = dsts.begin(); it != dsts.end();) { + part.clear(); + for (size_t count = 0; it != dsts.end() && count < dstLimit; ++count, ++it) { + part.push_back(*it); + } + + TString response = + FetchWithRetries( + [this, &part]() { + // create request here to keep 'ts' actual + return FetchServiceTicketsFromHttp(PrepareRequestForServiceTickets( + Settings_.GetSelfTvmId(), + *SigningContext_, + part, + ProcInfo_)); + }, + EScope::ServiceTickets) + .Response; + ParseTicketsFromResponse(response, part, res.TicketsWithErrors); + LogDebug(TStringBuilder() + << "Response with service tickets for " << part.size() + << " destination(s) was successfully fetched from " << TvmUrl_); + + res.Responses.push_back(response); + } + + LogDebug(TStringBuilder() + << "Got responses with service tickets with " << res.Responses.size() << " pages for " + << dsts.size() << " destination(s)"); + for (const auto& p : res.TicketsWithErrors.Errors) { + LogError(TStringBuilder() + << "Failed to get service ticket for dst=" << p.first << ": " << p.second); + } + + return res; + } + + TString TThreadedUpdater::GetPublicKeysFromHttp() const { + TString publicKeys = + FetchWithRetries( + [this]() { return FetchPublicKeysFromHttp(); }, + EScope::PublicKeys) + .Response; + + LogDebug("Public keys were successfully fetched from " + TvmUrl_); + + return publicKeys; + } + + NUtils::TFetchResult TThreadedUpdater::FetchServiceTicketsFromHttp(const TString& body) const { + TStringStream s; + + THttpHeaders outHeaders; + TKeepAliveHttpClient::THttpCode code = GetClient().DoPost("/2/ticket", body, &s, Headers_, &outHeaders); + + const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings"); + + return {code, {}, "/2/ticket", s.Str(), settings ? settings->Value() : ""}; + } + + NUtils::TFetchResult TThreadedUpdater::FetchPublicKeysFromHttp() const { + TStringStream s; + + THttpHeaders outHeaders; + TKeepAliveHttpClient::THttpCode code = GetClient().DoGet(PublicKeysUrl_, &s, {}, &outHeaders); + + const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings"); + + return {code, {}, "/2/keys", s.Str(), settings ? settings->Value() : ""}; + } + + bool TThreadedUpdater::UpdateRetrySettings(const TString& header) const { + if (header.empty()) { + // Probably it is some kind of test? + return false; + } + + try { + TString raw = NUtils::Base64url2bin(header); + Y_ENSURE(raw, "Invalid base64url in settings"); + + retry_settings::v1::Settings proto; + Y_ENSURE(proto.ParseFromString(raw), "Invalid proto"); + + // This ugly hack helps to process these settings in any case + TThreadedUpdater& this_ = *const_cast<TThreadedUpdater*>(this); + TRetrySettings& res = this_.RetrySettings_; + + TStringStream diff; + auto update = [&diff](auto& l, const auto& r, TStringBuf desc) { + if (l != r) { + diff << desc << ":" << l << "->" << r << ";"; + l = r; + } + }; + + if (proto.has_exponential_backoff_min_sec()) { + update(res.BackoffSettings.Min, + TDuration::Seconds(proto.exponential_backoff_min_sec()), + "exponential_backoff_min"); + } + if (proto.has_exponential_backoff_max_sec()) { + update(res.BackoffSettings.Max, + TDuration::Seconds(proto.exponential_backoff_max_sec()), + "exponential_backoff_max"); + } + if (proto.has_exponential_backoff_factor()) { + update(res.BackoffSettings.Factor, + proto.exponential_backoff_factor(), + "exponential_backoff_factor"); + } + if (proto.has_exponential_backoff_jitter()) { + update(res.BackoffSettings.Jitter, + proto.exponential_backoff_jitter(), + "exponential_backoff_jitter"); + } + this_.ExpBackoff_.UpdateSettings(res.BackoffSettings); + + if (proto.has_max_random_sleep_default()) { + update(res.MaxRandomSleepDefault, + TDuration::MilliSeconds(proto.max_random_sleep_default()), + "max_random_sleep_default"); + } + if (proto.has_max_random_sleep_when_ok()) { + update(res.MaxRandomSleepWhenOk, + TDuration::MilliSeconds(proto.max_random_sleep_when_ok()), + "max_random_sleep_when_ok"); + } + if (proto.has_retries_on_start()) { + Y_ENSURE(proto.retries_on_start(), "retries_on_start==0"); + update(res.RetriesOnStart, + proto.retries_on_start(), + "retries_on_start"); + } + if (proto.has_retries_in_background()) { + Y_ENSURE(proto.retries_in_background(), "retries_in_background==0"); + update(res.RetriesInBackground, + proto.retries_in_background(), + "retries_in_background"); + } + if (proto.has_worker_awaking_period_sec()) { + update(res.WorkerAwakingPeriod, + TDuration::Seconds(proto.worker_awaking_period_sec()), + "worker_awaking_period"); + this_.WorkerAwakingPeriod_ = res.WorkerAwakingPeriod; + } + if (proto.has_dsts_limit()) { + Y_ENSURE(proto.dsts_limit(), "dsts_limit==0"); + update(res.DstsLimit, + proto.dsts_limit(), + "dsts_limit"); + } + + if (proto.has_roles_update_period_sec()) { + Y_ENSURE(proto.roles_update_period_sec(), "roles_update_period==0"); + update(res.RolesUpdatePeriod, + TDuration::Seconds(proto.roles_update_period_sec()), + "roles_update_period_sec"); + } + if (proto.has_roles_warn_period_sec()) { + Y_ENSURE(proto.roles_warn_period_sec(), "roles_warn_period_sec==0"); + update(res.RolesWarnPeriod, + TDuration::Seconds(proto.roles_warn_period_sec()), + "roles_warn_period_sec"); + } + + if (diff.empty()) { + return false; + } + + LogDebug("Retry settings were updated: " + diff.Str()); + return true; + } catch (const std::exception& e) { + LogWarning(TStringBuilder() + << "Failed to update retry settings from server, header '" + << header << "': " + << e.what()); + } + + return false; + } + + template <typename Func> + NUtils::TFetchResult TThreadedUpdater::FetchWithRetries(Func func, EScope scope) const { + const ui32 tries = Inited_ ? RetrySettings_.RetriesInBackground + : RetrySettings_.RetriesOnStart; + + for (size_t idx = 1;; ++idx) { + RandomSleep(); + + try { + NUtils::TFetchResult result = func(); + + if (UpdateRetrySettings(result.RetrySettings) && RetrySettingsFilepath_) { + TDiskWriter w(RetrySettingsFilepath_, Logger_.Get()); + w.Write(result.RetrySettings); + } + + if (400 <= result.Code && result.Code <= 499) { + throw TNonRetriableException() << ProcessHttpError(scope, result.Path, result.Code, result.Response); + } + if (result.Code < 200 || result.Code >= 399) { + throw yexception() << ProcessHttpError(scope, result.Path, result.Code, result.Response); + } + + ExpBackoff_.Decrease(); + return result; + } catch (const TNonRetriableException& e) { + LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what()); + ExpBackoff_.Increase(); + throw; + } catch (const std::exception& e) { + LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what()); + ExpBackoff_.Increase(); + if (idx >= tries) { + throw; + } + } + } + + throw yexception() << "unreachable"; + } + + void TThreadedUpdater::RandomSleep() const { + const TDuration maxSleep = TClientStatus::ECode::Ok == GetState() + ? RetrySettings_.MaxRandomSleepWhenOk + : RetrySettings_.MaxRandomSleepDefault; + + if (maxSleep) { + ui32 toSleep = Random_.GenRand() % maxSleep.MilliSeconds(); + ExpBackoff_.Sleep(TDuration::MilliSeconds(toSleep)); + } + } + + TString TThreadedUpdater::PrepareRequestForServiceTickets(TTvmId src, + const TServiceContext& ctx, + const TClientSettings::TDstVector& dsts, + const NUtils::TProcInfo& procInfo, + time_t now) { + TStringStream s; + + const TString ts = IntToString<10>(now); + TStringStream dst; + dst.Reserve(10 * dsts.size()); + for (const TClientSettings::TDst& d : dsts) { + if (dst.Str()) { + dst << ','; + } + dst << d.Id; + } + + s << "grant_type=client_credentials"; + s << "&src=" << src; + s << "&dst=" << dst.Str(); + s << "&ts=" << ts; + s << "&sign=" << ctx.SignCgiParamsForTvm(ts, dst.Str()); + s << "&get_retry_settings=yes"; + + s << "&"; + procInfo.AddToRequest(s); + + return s.Str(); + } + + template <class Dsts> + void TThreadedUpdater::ParseTicketsFromResponse(TStringBuf resp, + const Dsts& dsts, + TPairTicketsErrors& out) const { + NJson::TJsonValue doc; + Y_ENSURE(NJson::ReadJsonTree(resp, &doc), "Invalid json from tvm-api: " << resp); + + const NJson::TJsonValue* currentResp = doc.IsMap() ? &doc : nullptr; + auto find = [¤tResp, &doc](TTvmId id, NJson::TJsonValue& obj) -> bool { + const TString idStr = IntToString<10>(id); + if (currentResp && currentResp->GetValue(idStr, &obj)) { + return true; + } + + for (const NJson::TJsonValue& val : doc.GetArray()) { + currentResp = &val; + if (currentResp->GetValue(idStr, &obj)) { + return true; + } + } + + return false; + }; + + for (const TClientSettings::TDst& d : dsts) { + NJson::TJsonValue obj; + NJson::TJsonValue val; + + if (!find(d.Id, obj) || !obj.GetValue("ticket", &val)) { + TString err; + if (obj.GetValue("error", &val)) { + err = val.GetString(); + } else { + err = "Missing tvm_id in response, should never happend: " + IntToString<10>(d.Id); + } + + TStringStream s; + s << "Failed to get ServiceTicket for " << d.Id << ": " << err; + ProcessError(EType::NonRetriable, EScope::ServiceTickets, s.Str()); + + out.Errors.insert({d.Id, std::move(err)}); + continue; + } + + out.Tickets.insert({d.Id, val.GetString()}); + } + } + + static const char DELIMETER = '\t'; + TString TThreadedUpdater::PrepareTicketsForDisk(TStringBuf tvmResponse, TTvmId selfId) { + TStringStream s; + s << tvmResponse << DELIMETER << selfId; + return s.Str(); + } + + std::pair<TStringBuf, TTvmId> TThreadedUpdater::ParseTicketsFromDisk(TStringBuf data) { + TStringBuf tvmId = data.RNextTok(DELIMETER); + return {data, IntFromString<TTvmId, 10>(tvmId)}; + } + + const TDstSet& TThreadedUpdater::GetDsts() const { + return Destinations_; + } + + void TThreadedUpdater::AddDstToSettings(const TClientSettings::TDst& dst) { + Destinations_.insert(dst); + } + + bool TThreadedUpdater::IsTimeToUpdateServiceTickets(TInstant lastUpdate) const { + return TInstant::Now() - lastUpdate > ServiceTicketsDurations_.RefreshPeriod; + } + + bool TThreadedUpdater::IsTimeToUpdatePublicKeys(TInstant lastUpdate) const { + return TInstant::Now() - lastUpdate > PublicKeysDurations_.RefreshPeriod; + } + + bool TThreadedUpdater::AreServicesTicketsOk() const { + if (!Settings_.IsServiceTicketFetchingRequired()) { + return true; + } + auto c = GetCachedServiceTickets(); + return c && (!Settings_.IsIncompleteTicketsSetAnError || c->TicketsById.size() == Destinations_.size()); + } + + bool TThreadedUpdater::IsServiceContextOk() const { + if (!Settings_.IsServiceTicketCheckingRequired()) { + return true; + } + + return bool(GetCachedServiceContext()); + } + + bool TThreadedUpdater::IsUserContextOk() const { + if (!Settings_.IsUserTicketCheckingRequired()) { + return true; + } + return bool(GetCachedUserContext()); + } + + void TThreadedUpdater::Worker() { + UpdateServiceTickets(); + UpdatePublicKeys(); + UpdateRoles(); + } + + TServiceTickets::TMapAliasId TThreadedUpdater::MakeAliasMap(const TClientSettings& settings) { + TServiceTickets::TMapAliasId res; + + if (settings.HasDstAliases()) { + for (const auto& pair : settings.GetDstAliases()) { + res.insert({pair.first, pair.second.Id}); + } + } + + return res; + } + + TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(TServiceTicketsPtr available, const TDstSet& required) { + Y_ENSURE(available); + TDstSet set; + // available->TicketsById is not sorted + for (const auto& pair : available->TicketsById) { + set.insert(pair.first); + } + return FindMissingDsts(set, required); + } + + TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(const TDstSet& available, const TDstSet& required) { + TClientSettings::TDstVector res; + std::set_difference(required.begin(), required.end(), + available.begin(), available.end(), + std::inserter(res, res.begin())); + return res; + } + + TString TThreadedUpdater::CreateJsonArray(const TSmallVec<TString>& responses) { + if (responses.empty()) { + return "[]"; + } + + size_t size = 0; + for (const TString& r : responses) { + size += r.size() + 1; + } + + TString res; + res.reserve(size + 2); + + res.push_back('['); + for (const TString& r : responses) { + res.append(r).push_back(','); + } + res.back() = ']'; + + return res; + } + + TString TThreadedUpdater::AppendToJsonArray(const TString& json, const TSmallVec<TString>& responses) { + Y_ENSURE(json, "previous body required"); + + size_t size = 0; + for (const TString& r : responses) { + size += r.size() + 1; + } + + TString res; + res.reserve(size + 2 + json.size()); + + res.push_back('['); + if (json.StartsWith('[')) { + Y_ENSURE(json.EndsWith(']'), "array is broken:" << json); + res.append(TStringBuf(json).Chop(1).Skip(1)); + } else { + res.append(json); + } + + res.push_back(','); + for (const TString& r : responses) { + res.append(r).push_back(','); + } + res.back() = ']'; + + return res; + } +} |