diff options
| author | komels <[email protected]> | 2022-04-14 13:10:53 +0300 | 
|---|---|---|
| committer | komels <[email protected]> | 2022-04-14 13:10:53 +0300 | 
| commit | 21c9b0e6b039e9765eb414c406c2b86e8cea6850 (patch) | |
| tree | f40ebc18ff8958dfbd189954ad024043ca983ea5 /library/cpp/tvmauth/client/misc/api/threaded_updater.cpp | |
| parent | 9a4effa852abe489707139c2b260dccc6f4f9aa9 (diff) | |
Final part on compatibility layer: LOGBROKER-7215
ref:777c67aadbf705d19034a09a792b2df61ba53697
Diffstat (limited to 'library/cpp/tvmauth/client/misc/api/threaded_updater.cpp')
| -rw-r--r-- | library/cpp/tvmauth/client/misc/api/threaded_updater.cpp | 954 | 
1 files changed, 954 insertions, 0 deletions
diff --git a/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp b/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp new file mode 100644 index 00000000000..a7df49c05dd --- /dev/null +++ b/library/cpp/tvmauth/client/misc/api/threaded_updater.cpp @@ -0,0 +1,954 @@ +#include "threaded_updater.h" + +#include <library/cpp/tvmauth/client/misc/disk_cache.h> +#include <library/cpp/tvmauth/client/misc/utils.h> +#include <library/cpp/tvmauth/client/misc/retry_settings/v1/settings.pb.h> + +#include <library/cpp/tvmauth/client/logger.h> + +#include <library/cpp/json/json_reader.h> + +#include <util/stream/str.h> +#include <util/string/builder.h> +#include <util/string/cast.h> +#include <util/system/thread.h> + +namespace NTvmAuth::NTvmApi { +    static TString CreatePublicKeysUrl(const TClientSettings& settings, +                                       const NUtils::TProcInfo& procInfo) { +        TStringStream s; +        s << "/2/keys"; +        s << "?"; +        procInfo.AddToRequest(s); + +        s << "&get_retry_settings=yes"; + +        if (settings.GetSelfTvmId() != 0) { +            s << "&src=" << settings.GetSelfTvmId(); +        } + +        if (settings.IsUserTicketCheckingRequired()) { +            s << "&env=" << static_cast<int>(settings.GetEnvForUserTickets()); +        } + +        return s.Str(); +    } + +    TAsyncUpdaterPtr TThreadedUpdater::Create(const TClientSettings& settings, TLoggerPtr logger) { +        Y_ENSURE_EX(logger, TNonRetriableException() << "Logger is required"); +        THolder<TThreadedUpdater> p(new TThreadedUpdater(settings, std::move(logger))); +        p->Init(); +        p->StartWorker(); +        return p.Release(); +    } + +    TThreadedUpdater::~TThreadedUpdater() { +        ExpBackoff_.SetEnabled(false); +        ExpBackoff_.Interrupt(); +        StopWorker(); // Required here to avoid using of deleted members +    } + +    TClientStatus TThreadedUpdater::GetStatus() const { +        const TClientStatus::ECode state = GetState(); +        return TClientStatus(state, GetLastError(state == TClientStatus::Ok || state == TClientStatus::IncompleteTicketsSet)); +    } + +    NRoles::TRolesPtr TThreadedUpdater::GetRoles() const { +        Y_ENSURE_EX(RolesFetcher_, +                    TBrokenTvmClientSettings() << "Roles were not configured in settings"); +        return RolesFetcher_->GetCurrentRoles(); +    } + +    TClientStatus::ECode TThreadedUpdater::GetState() const { +        const TInstant now = TInstant::Now(); + +        if (Settings_.IsServiceTicketFetchingRequired()) { +            if (AreServiceTicketsInvalid(now)) { +                return TClientStatus::Error; +            } +            auto tickets = GetCachedServiceTickets(); +            if (!tickets) { +                return TClientStatus::Error; +            } +            if (tickets->TicketsById.size() < Destinations_.size()) { +                if (Settings_.IsIncompleteTicketsSetAnError) { +                    return TClientStatus::Error; +                } else { +                    return TClientStatus::IncompleteTicketsSet; +                } +            } +        } +        if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) && ArePublicKeysInvalid(now)) { +            return TClientStatus::Error; +        } + +        const TDuration sincePublicKeysUpdate = now - GetUpdateTimeOfPublicKeys(); +        const TDuration sinceServiceTicketsUpdate = now - GetUpdateTimeOfServiceTickets(); +        const TDuration sinceRolesUpdate = now - GetUpdateTimeOfRoles(); + +        if (Settings_.IsServiceTicketFetchingRequired() && sinceServiceTicketsUpdate > ServiceTicketsDurations_.Expiring) { +            return TClientStatus::Warning; +        } +        if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) && +            sincePublicKeysUpdate > PublicKeysDurations_.Expiring) +        { +            return TClientStatus::Warning; +        } +        if (RolesFetcher_ && TRolesFetcher::ShouldWarn(RetrySettings_, sinceRolesUpdate)) { +            return TClientStatus::Warning; +        } + +        return TClientStatus::Ok; +    } + +    TThreadedUpdater::TThreadedUpdater(const TClientSettings& settings, TLoggerPtr logger) +        : TThreadedUpdaterBase( +              TRetrySettings{}.WorkerAwakingPeriod, +              std::move(logger), +              settings.GetTvmHost(), +              settings.GetTvmPort(), +              settings.TvmSocketTimeout, +              settings.TvmConnectTimeout) +        , ExpBackoff_(RetrySettings_.BackoffSettings) +        , Settings_(settings.CloneNormalized()) +        , ProcInfo_(NUtils::TProcInfo::Create(Settings_.GetLibVersionPrefix())) +        , PublicKeysUrl_(CreatePublicKeysUrl(Settings_, ProcInfo_)) +        , DstAliases_(MakeAliasMap(Settings_)) +        , Headers_({{"Content-Type", "application/x-www-form-urlencoded"}}) +        , Random_(TInstant::Now().MicroSeconds()) +    { +        if (Settings_.IsServiceTicketFetchingRequired()) { +            SigningContext_ = TServiceContext::SigningFactory(Settings_.GetSelfSecret()); +        } + +        if (Settings_.IsServiceTicketFetchingRequired()) { +            Destinations_ = {Settings_.GetDestinations().begin(), Settings_.GetDestinations().end()}; +        } + +        PublicKeysDurations_.RefreshPeriod = TDuration::Days(1); +        ServiceTicketsDurations_.RefreshPeriod = TDuration::Hours(1); + +        if (Settings_.IsUserTicketCheckingRequired()) { +            SetBbEnv(Settings_.GetEnvForUserTickets()); +        } + +        if (Settings_.IsRolesFetchingEnabled()) { +            RolesFetcher_ = std::make_unique<TRolesFetcher>( +                TRolesFetcherSettings{ +                    Settings_.GetTiroleHost(), +                    Settings_.GetTirolePort(), +                    Settings_.GetDiskCacheDir(), +                    ProcInfo_, +                    Settings_.GetSelfTvmId(), +                    Settings_.GetIdmSystemSlug(), +                }, +                Logger_); +        } + +        if (Settings_.IsDiskCacheUsed()) { +            TString path = Settings_.GetDiskCacheDir(); +            if (path.back() != '/') { +                path.push_back('/'); +            } + +            if (Settings_.IsServiceTicketFetchingRequired()) { +                ServiceTicketsFilepath_ = path; +                ServiceTicketsFilepath_.append("service_tickets"); +            } + +            if (Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) { +                PublicKeysFilepath_ = path; +                PublicKeysFilepath_.append("public_keys"); +            } + +            RetrySettingsFilepath_ = path + "retry_settings"; +        } else { +            LogInfo("Disk cache disabled. Please set disk cache directory in settings for best reliability"); +        } +    } + +    void TThreadedUpdater::Init() { +        ReadStateFromDisk(); +        ClearErrors(); +        ExpBackoff_.SetEnabled(false); + +        // First of all try to get tickets: there are a lot of reasons to fail this request. +        // As far as disk cache usually disabled, client will not fetch keys before fail on every ctor call. +        UpdateServiceTickets(); +        if (!AreServicesTicketsOk()) { +            ThrowLastError(); +        } + +        UpdatePublicKeys(); +        if (!IsServiceContextOk() || !IsUserContextOk()) { +            ThrowLastError(); +        } + +        UpdateRoles(); +        if (RolesFetcher_ && !RolesFetcher_->AreRolesOk()) { +            ThrowLastError(); +        } + +        Inited_ = true; +        ExpBackoff_.SetEnabled(true); +    } + +    void TThreadedUpdater::UpdateServiceTickets() { +        if (!Settings_.IsServiceTicketFetchingRequired()) { +            return; +        } + +        TInstant stut = GetUpdateTimeOfServiceTickets(); +        try { +            if (IsTimeToUpdateServiceTickets(stut)) { +                UpdateAllServiceTickets(); +                NeedFetchMissingServiceTickets_ = false; +            } else if (NeedFetchMissingServiceTickets_ && GetCachedServiceTickets()->TicketsById.size() < Destinations_.size()) { +                UpdateMissingServiceTickets(Destinations_); +                NeedFetchMissingServiceTickets_ = false; +            } +            if (AreServicesTicketsOk()) { +                ClearError(EScope::ServiceTickets); +            } +        } catch (const std::exception& e) { +            ProcessError(EType::Retriable, EScope::ServiceTickets, e.what()); +            LogWarning(TStringBuilder() << "Failed to update service tickets: " << e.what()); +            if (TInstant::Now() - stut > ServiceTicketsDurations_.Expiring) { +                LogError("Service tickets have not been refreshed for too long period"); +            } +        } +    } + +    void TThreadedUpdater::UpdateAllServiceTickets() { +        THttpResult st = GetServiceTicketsFromHttp(Destinations_, RetrySettings_.DstsLimit); + +        auto oldCache = GetCachedServiceTickets(); +        if (oldCache) { +            for (const auto& pair : oldCache->ErrorsById) { +                st.TicketsWithErrors.Errors.insert(pair); +            } +        } + +        UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), TInstant::Now()); +        if (ServiceTicketsFilepath_) { +            DiskCacheServiceTickets_ = CreateJsonArray(st.Responses); +            TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get()); +            w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId())); +        } +    } + +    TServiceTicketsPtr TThreadedUpdater::UpdateMissingServiceTickets(const TDstSet& required) { +        TServiceTicketsPtr cache = GetCachedServiceTickets(); +        TClientSettings::TDstVector dsts = FindMissingDsts(cache, required); + +        if (dsts.empty()) { +            return cache; +        } + +        THttpResult st = GetServiceTicketsFromHttp(dsts, RetrySettings_.DstsLimit); + +        size_t gotTickets = st.TicketsWithErrors.Tickets.size(); + +        for (const auto& pair : cache->TicketsById) { +            st.TicketsWithErrors.Tickets.insert(pair); +        } +        for (const auto& pair : cache->ErrorsById) { +            st.TicketsWithErrors.Errors.insert(pair); +        } +        for (const auto& pair : st.TicketsWithErrors.Tickets) { +            st.TicketsWithErrors.Errors.erase(pair.first); +        } + +        TServiceTicketsPtr c = UpdateServiceTicketsCachePartly( +            std::move(st.TicketsWithErrors), +            gotTickets); +        if (!c) { +            LogWarning("UpdateMissingServiceTickets: new cache is NULL. BUG?"); +            c = cache; +        } + +        if (!ServiceTicketsFilepath_) { +            return c; +        } + +        DiskCacheServiceTickets_ = AppendToJsonArray(DiskCacheServiceTickets_, st.Responses); + +        TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get()); +        w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId())); + +        return c; +    } + +    void TThreadedUpdater::UpdatePublicKeys() { +        if (!Settings_.IsServiceTicketCheckingRequired() && !Settings_.IsUserTicketCheckingRequired()) { +            return; +        } + +        TInstant pkut = GetUpdateTimeOfPublicKeys(); +        if (!IsTimeToUpdatePublicKeys(pkut)) { +            return; +        } + +        try { +            TString publicKeys = GetPublicKeysFromHttp(); + +            UpdatePublicKeysCache(publicKeys, TInstant::Now()); +            if (PublicKeysFilepath_) { +                TDiskWriter w(PublicKeysFilepath_, Logger_.Get()); +                w.Write(publicKeys); +            } +            if (IsServiceContextOk() && IsUserContextOk()) { +                ClearError(EScope::PublicKeys); +            } +        } catch (const std::exception& e) { +            ProcessError(EType::Retriable, EScope::PublicKeys, e.what()); +            LogWarning(TStringBuilder() << "Failed to update public keys: " << e.what()); +            if (TInstant::Now() - pkut > PublicKeysDurations_.Expiring) { +                LogError("Public keys have not been refreshed for too long period"); +            } +        } +    } + +    void TThreadedUpdater::UpdateRoles() { +        if (!RolesFetcher_) { +            return; +        } + +        TInstant rut = GetUpdateTimeOfRoles(); +        if (!TRolesFetcher::IsTimeToUpdate(RetrySettings_, TInstant::Now() - rut)) { +            return; +        } + +        struct TCloser { +            TRolesFetcher* Fetcher; +            ~TCloser() { +                Fetcher->ResetConnection(); +            } +        } closer{RolesFetcher_.get()}; + +        try { +            TServiceTicketsPtr st = GetCachedServiceTickets(); +            Y_ENSURE(st, "No one service ticket in memory: how it possible?"); +            auto it = st->TicketsById.find(Settings_.GetTiroleTvmId()); +            Y_ENSURE(it != st->TicketsById.end(), +                     "Missing tvmid for tirole in cache: " << Settings_.GetTiroleTvmId()); + +            RolesFetcher_->Update( +                FetchWithRetries( +                    [&]() { return RolesFetcher_->FetchActualRoles(it->second); }, +                    EScope::Roles)); +            SetUpdateTimeOfRoles(TInstant::Now()); + +            if (RolesFetcher_->AreRolesOk()) { +                ClearError(EScope::Roles); +            } +        } catch (const std::exception& e) { +            ProcessError(EType::Retriable, EScope::Roles, e.what()); +            LogWarning(TStringBuilder() << "Failed to update roles: " << e.what()); +            if (TRolesFetcher::ShouldWarn(RetrySettings_, TInstant::Now() - rut)) { +                LogError("Roles have not been refreshed for too long period"); +            } +        } +    } + +    TServiceTicketsPtr TThreadedUpdater::UpdateServiceTicketsCachePartly( +        TAsyncUpdaterBase::TPairTicketsErrors&& tickets, +        size_t got) { +        size_t count = tickets.Tickets.size(); +        TServiceTicketsPtr c = MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets), +                                                                   std::move(tickets.Errors), +                                                                   DstAliases_); +        SetServiceTickets(c); + +        LogInfo(TStringBuilder() +                << "Cache was partly updated with " << got +                << " service ticket(s). total: " << count); + +        return c; +    } + +    void TThreadedUpdater::UpdateServiceTicketsCache(TPairTicketsErrors&& tickets, TInstant time) { +        size_t count = tickets.Tickets.size(); +        SetServiceTickets(MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets), +                                                              std::move(tickets.Errors), +                                                              DstAliases_)); + +        SetUpdateTimeOfServiceTickets(time); + +        if (count > 0) { +            LogInfo(TStringBuilder() << "Cache was updated with " << count << " service ticket(s): " << time); +        } +    } + +    void TThreadedUpdater::UpdatePublicKeysCache(const TString& publicKeys, TInstant time) { +        if (publicKeys.empty()) { +            return; +        } + +        if (Settings_.IsServiceTicketCheckingRequired()) { +            SetServiceContext(MakeIntrusiveConst<TServiceContext>( +                TServiceContext::CheckingFactory(Settings_.GetSelfTvmId(), +                                                 publicKeys))); +        } + +        if (Settings_.IsUserTicketCheckingRequired()) { +            SetUserContext(publicKeys); +        } + +        SetUpdateTimeOfPublicKeys(time); + +        LogInfo(TStringBuilder() << "Cache was updated with public keys: " << time); +    } + +    void TThreadedUpdater::ReadStateFromDisk() { +        try { +            TServiceTicketsFromDisk st = ReadServiceTicketsFromDisk(); +            UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), st.BornDate); +            DiskCacheServiceTickets_ = st.FileBody; +        } catch (const std::exception& e) { +            LogWarning(TStringBuilder() << "Failed to read service tickets from disk: " << e.what()); +        } + +        try { +            std::pair<TString, TInstant> pk = ReadPublicKeysFromDisk(); +            UpdatePublicKeysCache(pk.first, pk.second); +        } catch (const std::exception& e) { +            LogWarning(TStringBuilder() << "Failed to read public keys from disk: " << e.what()); +        } + +        try { +            TString rs = ReadRetrySettingsFromDisk(); +            UpdateRetrySettings(rs); +        } catch (const std::exception& e) { +            LogWarning(TStringBuilder() << "Failed to read retry settings from disk: " << e.what()); +        } + +        try { +            if (RolesFetcher_) { +                SetUpdateTimeOfRoles(RolesFetcher_->ReadFromDisk()); +            } +        } catch (const std::exception& e) { +            LogWarning(TStringBuilder() << "Failed to read roles from disk: " << e.what()); +        } +    } + +    TThreadedUpdater::TServiceTicketsFromDisk TThreadedUpdater::ReadServiceTicketsFromDisk() const { +        if (!ServiceTicketsFilepath_) { +            return {}; +        } + +        TDiskReader r(ServiceTicketsFilepath_, Logger_.Get()); +        if (!r.Read()) { +            return {}; +        } + +        std::pair<TStringBuf, TTvmId> data = ParseTicketsFromDisk(r.Data()); +        if (data.second != Settings_.GetSelfTvmId()) { +            TStringStream s; +            s << "Disk cache is for another tvmId (" << data.second << "). "; +            s << "Self=" << Settings_.GetSelfTvmId(); +            LogWarning(s.Str()); +            return {}; +        } + +        TPairTicketsErrors res; +        ParseTicketsFromResponse(data.first, Destinations_, res); +        if (IsInvalid(TServiceTickets::GetInvalidationTime(res.Tickets), TInstant::Now())) { +            LogWarning("Disk cache (service tickets) is too old"); +            return {}; +        } + +        LogInfo(TStringBuilder() << "Got " << res.Tickets.size() << " service ticket(s) from disk"); +        return {std::move(res), r.Time(), TString(data.first)}; +    } + +    std::pair<TString, TInstant> TThreadedUpdater::ReadPublicKeysFromDisk() const { +        if (!PublicKeysFilepath_) { +            return {}; +        } + +        TDiskReader r(PublicKeysFilepath_, Logger_.Get()); +        if (!r.Read()) { +            return {}; +        } + +        if (TInstant::Now() - r.Time() > PublicKeysDurations_.Invalid) { +            LogWarning("Disk cache (public keys) is too old"); +            return {}; +        } + +        return {r.Data(), r.Time()}; +    } + +    TString TThreadedUpdater::ReadRetrySettingsFromDisk() const { +        if (!RetrySettingsFilepath_) { +            return {}; +        } + +        TDiskReader r(RetrySettingsFilepath_, Logger_.Get()); +        if (!r.Read()) { +            return {}; +        } + +        return r.Data(); +    } + +    template <class Dsts> +    TThreadedUpdater::THttpResult TThreadedUpdater::GetServiceTicketsFromHttp(const Dsts& dsts, const size_t dstLimit) const { +        Y_ENSURE(SigningContext_, "Internal error"); + +        TClientSettings::TDstVector part; +        part.reserve(dstLimit); +        THttpResult res; +        res.TicketsWithErrors.Tickets.reserve(dsts.size()); +        res.Responses.reserve(dsts.size() / dstLimit + 1); + +        for (auto it = dsts.begin(); it != dsts.end();) { +            part.clear(); +            for (size_t count = 0; it != dsts.end() && count < dstLimit; ++count, ++it) { +                part.push_back(*it); +            } + +            TString response = +                FetchWithRetries( +                    [this, &part]() { +                        // create request here to keep 'ts' actual +                        return FetchServiceTicketsFromHttp(PrepareRequestForServiceTickets( +                            Settings_.GetSelfTvmId(), +                            *SigningContext_, +                            part, +                            ProcInfo_)); +                    }, +                    EScope::ServiceTickets) +                    .Response; +            ParseTicketsFromResponse(response, part, res.TicketsWithErrors); +            LogDebug(TStringBuilder() +                     << "Response with service tickets for " << part.size() +                     << " destination(s) was successfully fetched from " << TvmUrl_); + +            res.Responses.push_back(response); +        } + +        LogDebug(TStringBuilder() +                 << "Got responses with service tickets with " << res.Responses.size() << " pages for " +                 << dsts.size() << " destination(s)"); +        for (const auto& p : res.TicketsWithErrors.Errors) { +            LogError(TStringBuilder() +                     << "Failed to get service ticket for dst=" << p.first << ": " << p.second); +        } + +        return res; +    } + +    TString TThreadedUpdater::GetPublicKeysFromHttp() const { +        TString publicKeys = +            FetchWithRetries( +                [this]() { return FetchPublicKeysFromHttp(); }, +                EScope::PublicKeys) +                .Response; + +        LogDebug("Public keys were successfully fetched from " + TvmUrl_); + +        return publicKeys; +    } + +    NUtils::TFetchResult TThreadedUpdater::FetchServiceTicketsFromHttp(const TString& body) const { +        TStringStream s; + +        THttpHeaders outHeaders; +        TKeepAliveHttpClient::THttpCode code = GetClient().DoPost("/2/ticket", body, &s, Headers_, &outHeaders); + +        const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings"); + +        return {code, {}, "/2/ticket", s.Str(), settings ? settings->Value() : ""}; +    } + +    NUtils::TFetchResult TThreadedUpdater::FetchPublicKeysFromHttp() const { +        TStringStream s; + +        THttpHeaders outHeaders; +        TKeepAliveHttpClient::THttpCode code = GetClient().DoGet(PublicKeysUrl_, &s, {}, &outHeaders); + +        const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings"); + +        return {code, {}, "/2/keys", s.Str(), settings ? settings->Value() : ""}; +    } + +    bool TThreadedUpdater::UpdateRetrySettings(const TString& header) const { +        if (header.empty()) { +            // Probably it is some kind of test? +            return false; +        } + +        try { +            TString raw = NUtils::Base64url2bin(header); +            Y_ENSURE(raw, "Invalid base64url in settings"); + +            retry_settings::v1::Settings proto; +            Y_ENSURE(proto.ParseFromString(raw), "Invalid proto"); + +            // This ugly hack helps to process these settings in any case +            TThreadedUpdater& this_ = *const_cast<TThreadedUpdater*>(this); +            TRetrySettings& res = this_.RetrySettings_; + +            TStringStream diff; +            auto update = [&diff](auto& l, const auto& r, TStringBuf desc) { +                if (l != r) { +                    diff << desc << ":" << l << "->" << r << ";"; +                    l = r; +                } +            }; + +            if (proto.has_exponential_backoff_min_sec()) { +                update(res.BackoffSettings.Min, +                       TDuration::Seconds(proto.exponential_backoff_min_sec()), +                       "exponential_backoff_min"); +            } +            if (proto.has_exponential_backoff_max_sec()) { +                update(res.BackoffSettings.Max, +                       TDuration::Seconds(proto.exponential_backoff_max_sec()), +                       "exponential_backoff_max"); +            } +            if (proto.has_exponential_backoff_factor()) { +                update(res.BackoffSettings.Factor, +                       proto.exponential_backoff_factor(), +                       "exponential_backoff_factor"); +            } +            if (proto.has_exponential_backoff_jitter()) { +                update(res.BackoffSettings.Jitter, +                       proto.exponential_backoff_jitter(), +                       "exponential_backoff_jitter"); +            } +            this_.ExpBackoff_.UpdateSettings(res.BackoffSettings); + +            if (proto.has_max_random_sleep_default()) { +                update(res.MaxRandomSleepDefault, +                       TDuration::MilliSeconds(proto.max_random_sleep_default()), +                       "max_random_sleep_default"); +            } +            if (proto.has_max_random_sleep_when_ok()) { +                update(res.MaxRandomSleepWhenOk, +                       TDuration::MilliSeconds(proto.max_random_sleep_when_ok()), +                       "max_random_sleep_when_ok"); +            } +            if (proto.has_retries_on_start()) { +                Y_ENSURE(proto.retries_on_start(), "retries_on_start==0"); +                update(res.RetriesOnStart, +                       proto.retries_on_start(), +                       "retries_on_start"); +            } +            if (proto.has_retries_in_background()) { +                Y_ENSURE(proto.retries_in_background(), "retries_in_background==0"); +                update(res.RetriesInBackground, +                       proto.retries_in_background(), +                       "retries_in_background"); +            } +            if (proto.has_worker_awaking_period_sec()) { +                update(res.WorkerAwakingPeriod, +                       TDuration::Seconds(proto.worker_awaking_period_sec()), +                       "worker_awaking_period"); +                this_.WorkerAwakingPeriod_ = res.WorkerAwakingPeriod; +            } +            if (proto.has_dsts_limit()) { +                Y_ENSURE(proto.dsts_limit(), "dsts_limit==0"); +                update(res.DstsLimit, +                       proto.dsts_limit(), +                       "dsts_limit"); +            } + +            if (proto.has_roles_update_period_sec()) { +                Y_ENSURE(proto.roles_update_period_sec(), "roles_update_period==0"); +                update(res.RolesUpdatePeriod, +                       TDuration::Seconds(proto.roles_update_period_sec()), +                       "roles_update_period_sec"); +            } +            if (proto.has_roles_warn_period_sec()) { +                Y_ENSURE(proto.roles_warn_period_sec(), "roles_warn_period_sec==0"); +                update(res.RolesWarnPeriod, +                       TDuration::Seconds(proto.roles_warn_period_sec()), +                       "roles_warn_period_sec"); +            } + +            if (diff.empty()) { +                return false; +            } + +            LogDebug("Retry settings were updated: " + diff.Str()); +            return true; +        } catch (const std::exception& e) { +            LogWarning(TStringBuilder() +                       << "Failed to update retry settings from server, header '" +                       << header << "': " +                       << e.what()); +        } + +        return false; +    } + +    template <typename Func> +    NUtils::TFetchResult TThreadedUpdater::FetchWithRetries(Func func, EScope scope) const { +        const ui32 tries = Inited_ ? RetrySettings_.RetriesInBackground +                                   : RetrySettings_.RetriesOnStart; + +        for (size_t idx = 1;; ++idx) { +            RandomSleep(); + +            try { +                NUtils::TFetchResult result = func(); + +                if (UpdateRetrySettings(result.RetrySettings) && RetrySettingsFilepath_) { +                    TDiskWriter w(RetrySettingsFilepath_, Logger_.Get()); +                    w.Write(result.RetrySettings); +                } + +                if (400 <= result.Code && result.Code <= 499) { +                    throw TNonRetriableException() << ProcessHttpError(scope, result.Path, result.Code, result.Response); +                } +                if (result.Code < 200 || result.Code >= 399) { +                    throw yexception() << ProcessHttpError(scope, result.Path, result.Code, result.Response); +                } + +                ExpBackoff_.Decrease(); +                return result; +            } catch (const TNonRetriableException& e) { +                LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what()); +                ExpBackoff_.Increase(); +                throw; +            } catch (const std::exception& e) { +                LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what()); +                ExpBackoff_.Increase(); +                if (idx >= tries) { +                    throw; +                } +            } +        } + +        throw yexception() << "unreachable"; +    } + +    void TThreadedUpdater::RandomSleep() const { +        const TDuration maxSleep = TClientStatus::ECode::Ok == GetState() +                                       ? RetrySettings_.MaxRandomSleepWhenOk +                                       : RetrySettings_.MaxRandomSleepDefault; + +        if (maxSleep) { +            ui32 toSleep = Random_.GenRand() % maxSleep.MilliSeconds(); +            ExpBackoff_.Sleep(TDuration::MilliSeconds(toSleep)); +        } +    } + +    TString TThreadedUpdater::PrepareRequestForServiceTickets(TTvmId src, +                                                              const TServiceContext& ctx, +                                                              const TClientSettings::TDstVector& dsts, +                                                              const NUtils::TProcInfo& procInfo, +                                                              time_t now) { +        TStringStream s; + +        const TString ts = IntToString<10>(now); +        TStringStream dst; +        dst.Reserve(10 * dsts.size()); +        for (const TClientSettings::TDst& d : dsts) { +            if (dst.Str()) { +                dst << ','; +            } +            dst << d.Id; +        } + +        s << "grant_type=client_credentials"; +        s << "&src=" << src; +        s << "&dst=" << dst.Str(); +        s << "&ts=" << ts; +        s << "&sign=" << ctx.SignCgiParamsForTvm(ts, dst.Str()); +        s << "&get_retry_settings=yes"; + +        s << "&"; +        procInfo.AddToRequest(s); + +        return s.Str(); +    } + +    template <class Dsts> +    void TThreadedUpdater::ParseTicketsFromResponse(TStringBuf resp, +                                                    const Dsts& dsts, +                                                    TPairTicketsErrors& out) const { +        NJson::TJsonValue doc; +        Y_ENSURE(NJson::ReadJsonTree(resp, &doc), "Invalid json from tvm-api: " << resp); + +        const NJson::TJsonValue* currentResp = doc.IsMap() ? &doc : nullptr; +        auto find = [¤tResp, &doc](TTvmId id, NJson::TJsonValue& obj) -> bool { +            const TString idStr = IntToString<10>(id); +            if (currentResp && currentResp->GetValue(idStr, &obj)) { +                return true; +            } + +            for (const NJson::TJsonValue& val : doc.GetArray()) { +                currentResp = &val; +                if (currentResp->GetValue(idStr, &obj)) { +                    return true; +                } +            } + +            return false; +        }; + +        for (const TClientSettings::TDst& d : dsts) { +            NJson::TJsonValue obj; +            NJson::TJsonValue val; + +            if (!find(d.Id, obj) || !obj.GetValue("ticket", &val)) { +                TString err; +                if (obj.GetValue("error", &val)) { +                    err = val.GetString(); +                } else { +                    err = "Missing tvm_id in response, should never happend: " + IntToString<10>(d.Id); +                } + +                TStringStream s; +                s << "Failed to get ServiceTicket for " << d.Id << ": " << err; +                ProcessError(EType::NonRetriable, EScope::ServiceTickets, s.Str()); + +                out.Errors.insert({d.Id, std::move(err)}); +                continue; +            } + +            out.Tickets.insert({d.Id, val.GetString()}); +        } +    } + +    static const char DELIMETER = '\t'; +    TString TThreadedUpdater::PrepareTicketsForDisk(TStringBuf tvmResponse, TTvmId selfId) { +        TStringStream s; +        s << tvmResponse << DELIMETER << selfId; +        return s.Str(); +    } + +    std::pair<TStringBuf, TTvmId> TThreadedUpdater::ParseTicketsFromDisk(TStringBuf data) { +        TStringBuf tvmId = data.RNextTok(DELIMETER); +        return {data, IntFromString<TTvmId, 10>(tvmId)}; +    } + +    const TDstSet& TThreadedUpdater::GetDsts() const { +        return Destinations_; +    } + +    void TThreadedUpdater::AddDstToSettings(const TClientSettings::TDst& dst) { +        Destinations_.insert(dst); +    } + +    bool TThreadedUpdater::IsTimeToUpdateServiceTickets(TInstant lastUpdate) const { +        return TInstant::Now() - lastUpdate > ServiceTicketsDurations_.RefreshPeriod; +    } + +    bool TThreadedUpdater::IsTimeToUpdatePublicKeys(TInstant lastUpdate) const { +        return TInstant::Now() - lastUpdate > PublicKeysDurations_.RefreshPeriod; +    } + +    bool TThreadedUpdater::AreServicesTicketsOk() const { +        if (!Settings_.IsServiceTicketFetchingRequired()) { +            return true; +        } +        auto c = GetCachedServiceTickets(); +        return c && (!Settings_.IsIncompleteTicketsSetAnError || c->TicketsById.size() == Destinations_.size()); +    } + +    bool TThreadedUpdater::IsServiceContextOk() const { +        if (!Settings_.IsServiceTicketCheckingRequired()) { +            return true; +        } + +        return bool(GetCachedServiceContext()); +    } + +    bool TThreadedUpdater::IsUserContextOk() const { +        if (!Settings_.IsUserTicketCheckingRequired()) { +            return true; +        } +        return bool(GetCachedUserContext()); +    } + +    void TThreadedUpdater::Worker() { +        UpdateServiceTickets(); +        UpdatePublicKeys(); +        UpdateRoles(); +    } + +    TServiceTickets::TMapAliasId TThreadedUpdater::MakeAliasMap(const TClientSettings& settings) { +        TServiceTickets::TMapAliasId res; + +        if (settings.HasDstAliases()) { +            for (const auto& pair : settings.GetDstAliases()) { +                res.insert({pair.first, pair.second.Id}); +            } +        } + +        return res; +    } + +    TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(TServiceTicketsPtr available, const TDstSet& required) { +        Y_ENSURE(available); +        TDstSet set; +        // available->TicketsById is not sorted +        for (const auto& pair : available->TicketsById) { +            set.insert(pair.first); +        } +        return FindMissingDsts(set, required); +    } + +    TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(const TDstSet& available, const TDstSet& required) { +        TClientSettings::TDstVector res; +        std::set_difference(required.begin(), required.end(), +                            available.begin(), available.end(), +                            std::inserter(res, res.begin())); +        return res; +    } + +    TString TThreadedUpdater::CreateJsonArray(const TSmallVec<TString>& responses) { +        if (responses.empty()) { +            return "[]"; +        } + +        size_t size = 0; +        for (const TString& r : responses) { +            size += r.size() + 1; +        } + +        TString res; +        res.reserve(size + 2); + +        res.push_back('['); +        for (const TString& r : responses) { +            res.append(r).push_back(','); +        } +        res.back() = ']'; + +        return res; +    } + +    TString TThreadedUpdater::AppendToJsonArray(const TString& json, const TSmallVec<TString>& responses) { +        Y_ENSURE(json, "previous body required"); + +        size_t size = 0; +        for (const TString& r : responses) { +            size += r.size() + 1; +        } + +        TString res; +        res.reserve(size + 2 + json.size()); + +        res.push_back('['); +        if (json.StartsWith('[')) { +            Y_ENSURE(json.EndsWith(']'), "array is broken:" << json); +            res.append(TStringBuf(json).Chop(1).Skip(1)); +        } else { +            res.append(json); +        } + +        res.push_back(','); +        for (const TString& r : responses) { +            res.append(r).push_back(','); +        } +        res.back() = ']'; + +        return res; +    } +}  | 
