#include "threaded_updater.h"

#include <library/cpp/tvmauth/client/misc/disk_cache.h>
#include <library/cpp/tvmauth/client/misc/utils.h>
#include <library/cpp/tvmauth/client/misc/retry_settings/v1/settings.pb.h>

#include <library/cpp/tvmauth/client/logger.h>

#include <library/cpp/json/json_reader.h>

#include <util/stream/str.h>
#include <util/string/builder.h>
#include <util/string/cast.h>
#include <util/system/thread.h>

namespace NTvmAuth::NTvmApi {
    static TString CreatePublicKeysUrl(const TClientSettings& settings,
                                       const NUtils::TProcInfo& procInfo) {
        TStringStream s;
        s << "/2/keys";
        s << "?";
        procInfo.AddToRequest(s);

        s << "&get_retry_settings=yes";

        if (settings.GetSelfTvmId() != 0) {
            s << "&src=" << settings.GetSelfTvmId();
        }

        if (settings.IsUserTicketCheckingRequired()) {
            s << "&env=" << static_cast<int>(settings.GetEnvForUserTickets());
        }

        return s.Str();
    }

    TAsyncUpdaterPtr TThreadedUpdater::Create(const TClientSettings& settings, TLoggerPtr logger) {
        Y_ENSURE_EX(logger, TNonRetriableException() << "Logger is required");
        THolder<TThreadedUpdater> p(new TThreadedUpdater(settings, std::move(logger)));
        p->Init();
        p->StartWorker();
        return p.Release();
    }

    TThreadedUpdater::~TThreadedUpdater() {
        ExpBackoff_.SetEnabled(false);
        ExpBackoff_.Interrupt();
        StopWorker(); // Required here to avoid using of deleted members
    }

    TClientStatus TThreadedUpdater::GetStatus() const {
        const TClientStatus::ECode state = GetState();
        return TClientStatus(state, GetLastError(state == TClientStatus::Ok || state == TClientStatus::IncompleteTicketsSet));
    }

    NRoles::TRolesPtr TThreadedUpdater::GetRoles() const {
        Y_ENSURE_EX(RolesFetcher_,
                    TBrokenTvmClientSettings() << "Roles were not configured in settings");
        return RolesFetcher_->GetCurrentRoles();
    }

    TClientStatus::ECode TThreadedUpdater::GetState() const {
        const TInstant now = TInstant::Now();

        if (Settings_.IsServiceTicketFetchingRequired()) {
            if (AreServiceTicketsInvalid(now)) {
                return TClientStatus::Error;
            }
            auto tickets = GetCachedServiceTickets();
            if (!tickets) {
                return TClientStatus::Error;
            }
            if (tickets->TicketsById.size() < Destinations_.size()) {
                if (Settings_.IsIncompleteTicketsSetAnError) {
                    return TClientStatus::Error;
                } else {
                    return TClientStatus::IncompleteTicketsSet;
                }
            }
        }
        if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) && ArePublicKeysInvalid(now)) {
            return TClientStatus::Error;
        }

        const TDuration sincePublicKeysUpdate = now - GetUpdateTimeOfPublicKeys();
        const TDuration sinceServiceTicketsUpdate = now - GetUpdateTimeOfServiceTickets();
        const TDuration sinceRolesUpdate = now - GetUpdateTimeOfRoles();

        if (Settings_.IsServiceTicketFetchingRequired() && sinceServiceTicketsUpdate > ServiceTicketsDurations_.Expiring) {
            return TClientStatus::Warning;
        }
        if ((Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) &&
            sincePublicKeysUpdate > PublicKeysDurations_.Expiring)
        {
            return TClientStatus::Warning;
        }
        if (RolesFetcher_ && TRolesFetcher::ShouldWarn(RetrySettings_, sinceRolesUpdate)) {
            return TClientStatus::Warning;
        }

        return TClientStatus::Ok;
    }

    TThreadedUpdater::TThreadedUpdater(const TClientSettings& settings, TLoggerPtr logger)
        : TThreadedUpdaterBase(
              TRetrySettings{}.WorkerAwakingPeriod,
              std::move(logger),
              settings.GetTvmHost(),
              settings.GetTvmPort(),
              settings.TvmSocketTimeout,
              settings.TvmConnectTimeout)
        , ExpBackoff_(RetrySettings_.BackoffSettings)
        , Settings_(settings.CloneNormalized())
        , ProcInfo_(NUtils::TProcInfo::Create(Settings_.GetLibVersionPrefix()))
        , PublicKeysUrl_(CreatePublicKeysUrl(Settings_, ProcInfo_))
        , DstAliases_(MakeAliasMap(Settings_))
        , Headers_({{"Content-Type", "application/x-www-form-urlencoded"}})
        , Random_(TInstant::Now().MicroSeconds())
    {
        if (Settings_.IsServiceTicketFetchingRequired()) {
            SigningContext_ = TServiceContext::SigningFactory(Settings_.GetSelfSecret());
        }

        if (Settings_.IsServiceTicketFetchingRequired()) {
            Destinations_ = {Settings_.GetDestinations().begin(), Settings_.GetDestinations().end()};
        }

        PublicKeysDurations_.RefreshPeriod = TDuration::Days(1);
        ServiceTicketsDurations_.RefreshPeriod = TDuration::Hours(1);

        if (Settings_.IsUserTicketCheckingRequired()) {
            SetBbEnv(Settings_.GetEnvForUserTickets());
        }

        if (Settings_.IsRolesFetchingEnabled()) {
            RolesFetcher_ = std::make_unique<TRolesFetcher>(
                TRolesFetcherSettings{
                    Settings_.GetTiroleHost(),
                    Settings_.GetTirolePort(),
                    Settings_.GetDiskCacheDir(),
                    ProcInfo_,
                    Settings_.GetSelfTvmId(),
                    Settings_.GetIdmSystemSlug(),
                },
                Logger_);
        }

        if (Settings_.IsDiskCacheUsed()) {
            TString path = Settings_.GetDiskCacheDir();
            if (path.back() != '/') {
                path.push_back('/');
            }

            if (Settings_.IsServiceTicketFetchingRequired()) {
                ServiceTicketsFilepath_ = path;
                ServiceTicketsFilepath_.append("service_tickets");
            }

            if (Settings_.IsServiceTicketCheckingRequired() || Settings_.IsUserTicketCheckingRequired()) {
                PublicKeysFilepath_ = path;
                PublicKeysFilepath_.append("public_keys");
            }

            RetrySettingsFilepath_ = path + "retry_settings";
        } else {
            LogInfo("Disk cache disabled. Please set disk cache directory in settings for best reliability");
        }
    }

    void TThreadedUpdater::Init() {
        ReadStateFromDisk();
        ClearErrors();
        ExpBackoff_.SetEnabled(false);

        // First of all try to get tickets: there are a lot of reasons to fail this request.
        // As far as disk cache usually disabled, client will not fetch keys before fail on every ctor call.
        UpdateServiceTickets();
        if (!AreServicesTicketsOk()) {
            ThrowLastError();
        }

        UpdatePublicKeys();
        if (!IsServiceContextOk() || !IsUserContextOk()) {
            ThrowLastError();
        }

        UpdateRoles();
        if (RolesFetcher_ && !RolesFetcher_->AreRolesOk()) {
            ThrowLastError();
        }

        Inited_ = true;
        ExpBackoff_.SetEnabled(true);
    }

    void TThreadedUpdater::UpdateServiceTickets() {
        if (!Settings_.IsServiceTicketFetchingRequired()) {
            return;
        }

        TInstant stut = GetUpdateTimeOfServiceTickets();
        try {
            if (IsTimeToUpdateServiceTickets(stut)) {
                UpdateAllServiceTickets();
                NeedFetchMissingServiceTickets_ = false;
            } else if (NeedFetchMissingServiceTickets_ && GetCachedServiceTickets()->TicketsById.size() < Destinations_.size()) {
                UpdateMissingServiceTickets(Destinations_);
                NeedFetchMissingServiceTickets_ = false;
            }
            if (AreServicesTicketsOk()) {
                ClearError(EScope::ServiceTickets);
            }
        } catch (const std::exception& e) {
            ProcessError(EType::Retriable, EScope::ServiceTickets, e.what());
            LogWarning(TStringBuilder() << "Failed to update service tickets: " << e.what());
            if (TInstant::Now() - stut > ServiceTicketsDurations_.Expiring) {
                LogError("Service tickets have not been refreshed for too long period");
            }
        }
    }

    void TThreadedUpdater::UpdateAllServiceTickets() {
        THttpResult st = GetServiceTicketsFromHttp(Destinations_, RetrySettings_.DstsLimit);

        auto oldCache = GetCachedServiceTickets();
        if (oldCache) {
            for (const auto& pair : oldCache->ErrorsById) {
                st.TicketsWithErrors.Errors.insert(pair);
            }
        }

        UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), TInstant::Now());
        if (ServiceTicketsFilepath_) {
            DiskCacheServiceTickets_ = CreateJsonArray(st.Responses);
            TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get());
            w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId()));
        }
    }

    TServiceTicketsPtr TThreadedUpdater::UpdateMissingServiceTickets(const TDstSet& required) {
        TServiceTicketsPtr cache = GetCachedServiceTickets();
        TClientSettings::TDstVector dsts = FindMissingDsts(cache, required);

        if (dsts.empty()) {
            return cache;
        }

        THttpResult st = GetServiceTicketsFromHttp(dsts, RetrySettings_.DstsLimit);

        size_t gotTickets = st.TicketsWithErrors.Tickets.size();

        for (const auto& pair : cache->TicketsById) {
            st.TicketsWithErrors.Tickets.insert(pair);
        }
        for (const auto& pair : cache->ErrorsById) {
            st.TicketsWithErrors.Errors.insert(pair);
        }
        for (const auto& pair : st.TicketsWithErrors.Tickets) {
            st.TicketsWithErrors.Errors.erase(pair.first);
        }

        TServiceTicketsPtr c = UpdateServiceTicketsCachePartly(
            std::move(st.TicketsWithErrors),
            gotTickets);
        if (!c) {
            LogWarning("UpdateMissingServiceTickets: new cache is NULL. BUG?");
            c = cache;
        }

        if (!ServiceTicketsFilepath_) {
            return c;
        }

        DiskCacheServiceTickets_ = AppendToJsonArray(DiskCacheServiceTickets_, st.Responses);

        TDiskWriter w(ServiceTicketsFilepath_, Logger_.Get());
        w.Write(PrepareTicketsForDisk(DiskCacheServiceTickets_, Settings_.GetSelfTvmId()));

        return c;
    }

    void TThreadedUpdater::UpdatePublicKeys() {
        if (!Settings_.IsServiceTicketCheckingRequired() && !Settings_.IsUserTicketCheckingRequired()) {
            return;
        }

        TInstant pkut = GetUpdateTimeOfPublicKeys();
        if (!IsTimeToUpdatePublicKeys(pkut)) {
            return;
        }

        try {
            TString publicKeys = GetPublicKeysFromHttp();

            UpdatePublicKeysCache(publicKeys, TInstant::Now());
            if (PublicKeysFilepath_) {
                TDiskWriter w(PublicKeysFilepath_, Logger_.Get());
                w.Write(publicKeys);
            }
            if (IsServiceContextOk() && IsUserContextOk()) {
                ClearError(EScope::PublicKeys);
            }
        } catch (const std::exception& e) {
            ProcessError(EType::Retriable, EScope::PublicKeys, e.what());
            LogWarning(TStringBuilder() << "Failed to update public keys: " << e.what());
            if (TInstant::Now() - pkut > PublicKeysDurations_.Expiring) {
                LogError("Public keys have not been refreshed for too long period");
            }
        }
    }

    void TThreadedUpdater::UpdateRoles() {
        if (!RolesFetcher_) {
            return;
        }

        TInstant rut = GetUpdateTimeOfRoles();
        if (!TRolesFetcher::IsTimeToUpdate(RetrySettings_, TInstant::Now() - rut)) {
            return;
        }

        struct TCloser {
            TRolesFetcher* Fetcher;
            ~TCloser() {
                Fetcher->ResetConnection();
            }
        } closer{RolesFetcher_.get()};

        try {
            TServiceTicketsPtr st = GetCachedServiceTickets();
            Y_ENSURE(st, "No one service ticket in memory: how it possible?");
            auto it = st->TicketsById.find(Settings_.GetTiroleTvmId());
            Y_ENSURE(it != st->TicketsById.end(),
                     "Missing tvmid for tirole in cache: " << Settings_.GetTiroleTvmId());

            RolesFetcher_->Update(
                FetchWithRetries(
                    [&]() { return RolesFetcher_->FetchActualRoles(it->second); },
                    EScope::Roles));
            SetUpdateTimeOfRoles(TInstant::Now());

            if (RolesFetcher_->AreRolesOk()) {
                ClearError(EScope::Roles);
            }
        } catch (const std::exception& e) {
            ProcessError(EType::Retriable, EScope::Roles, e.what());
            LogWarning(TStringBuilder() << "Failed to update roles: " << e.what());
            if (TRolesFetcher::ShouldWarn(RetrySettings_, TInstant::Now() - rut)) {
                LogError("Roles have not been refreshed for too long period");
            }
        }
    }

    TServiceTicketsPtr TThreadedUpdater::UpdateServiceTicketsCachePartly(
        TAsyncUpdaterBase::TPairTicketsErrors&& tickets,
        size_t got) {
        size_t count = tickets.Tickets.size();
        TServiceTicketsPtr c = MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets),
                                                                   std::move(tickets.Errors),
                                                                   DstAliases_);
        SetServiceTickets(c);

        LogInfo(TStringBuilder()
                << "Cache was partly updated with " << got
                << " service ticket(s). total: " << count);

        return c;
    }

    void TThreadedUpdater::UpdateServiceTicketsCache(TPairTicketsErrors&& tickets, TInstant time) {
        size_t count = tickets.Tickets.size();
        SetServiceTickets(MakeIntrusiveConst<TServiceTickets>(std::move(tickets.Tickets),
                                                              std::move(tickets.Errors),
                                                              DstAliases_));

        SetUpdateTimeOfServiceTickets(time);

        if (count > 0) {
            LogInfo(TStringBuilder() << "Cache was updated with " << count << " service ticket(s): " << time);
        }
    }

    void TThreadedUpdater::UpdatePublicKeysCache(const TString& publicKeys, TInstant time) {
        if (publicKeys.empty()) {
            return;
        }

        if (Settings_.IsServiceTicketCheckingRequired()) {
            SetServiceContext(MakeIntrusiveConst<TServiceContext>(
                TServiceContext::CheckingFactory(Settings_.GetSelfTvmId(),
                                                 publicKeys)));
        }

        if (Settings_.IsUserTicketCheckingRequired()) {
            SetUserContext(publicKeys);
        }

        SetUpdateTimeOfPublicKeys(time);

        LogInfo(TStringBuilder() << "Cache was updated with public keys: " << time);
    }

    void TThreadedUpdater::ReadStateFromDisk() {
        try {
            TServiceTicketsFromDisk st = ReadServiceTicketsFromDisk();
            UpdateServiceTicketsCache(std::move(st.TicketsWithErrors), st.BornDate);
            DiskCacheServiceTickets_ = st.FileBody;
        } catch (const std::exception& e) {
            LogWarning(TStringBuilder() << "Failed to read service tickets from disk: " << e.what());
        }

        try {
            std::pair<TString, TInstant> pk = ReadPublicKeysFromDisk();
            UpdatePublicKeysCache(pk.first, pk.second);
        } catch (const std::exception& e) {
            LogWarning(TStringBuilder() << "Failed to read public keys from disk: " << e.what());
        }

        try {
            TString rs = ReadRetrySettingsFromDisk();
            UpdateRetrySettings(rs);
        } catch (const std::exception& e) {
            LogWarning(TStringBuilder() << "Failed to read retry settings from disk: " << e.what());
        }

        try {
            if (RolesFetcher_) {
                SetUpdateTimeOfRoles(RolesFetcher_->ReadFromDisk());
            }
        } catch (const std::exception& e) {
            LogWarning(TStringBuilder() << "Failed to read roles from disk: " << e.what());
        }
    }

    TThreadedUpdater::TServiceTicketsFromDisk TThreadedUpdater::ReadServiceTicketsFromDisk() const {
        if (!ServiceTicketsFilepath_) {
            return {};
        }

        TDiskReader r(ServiceTicketsFilepath_, Logger_.Get());
        if (!r.Read()) {
            return {};
        }

        std::pair<TStringBuf, TTvmId> data = ParseTicketsFromDisk(r.Data());
        if (data.second != Settings_.GetSelfTvmId()) {
            TStringStream s;
            s << "Disk cache is for another tvmId (" << data.second << "). ";
            s << "Self=" << Settings_.GetSelfTvmId();
            LogWarning(s.Str());
            return {};
        }

        TPairTicketsErrors res;
        ParseTicketsFromResponse(data.first, Destinations_, res);
        if (IsInvalid(TServiceTickets::GetInvalidationTime(res.Tickets), TInstant::Now())) {
            LogWarning("Disk cache (service tickets) is too old");
            return {};
        }

        LogInfo(TStringBuilder() << "Got " << res.Tickets.size() << " service ticket(s) from disk");
        return {std::move(res), r.Time(), TString(data.first)};
    }

    std::pair<TString, TInstant> TThreadedUpdater::ReadPublicKeysFromDisk() const {
        if (!PublicKeysFilepath_) {
            return {};
        }

        TDiskReader r(PublicKeysFilepath_, Logger_.Get());
        if (!r.Read()) {
            return {};
        }

        if (TInstant::Now() - r.Time() > PublicKeysDurations_.Invalid) {
            LogWarning("Disk cache (public keys) is too old");
            return {};
        }

        return {r.Data(), r.Time()};
    }

    TString TThreadedUpdater::ReadRetrySettingsFromDisk() const {
        if (!RetrySettingsFilepath_) {
            return {};
        }

        TDiskReader r(RetrySettingsFilepath_, Logger_.Get());
        if (!r.Read()) {
            return {};
        }

        return r.Data();
    }

    template <class Dsts>
    TThreadedUpdater::THttpResult TThreadedUpdater::GetServiceTicketsFromHttp(const Dsts& dsts, const size_t dstLimit) const {
        Y_ENSURE(SigningContext_, "Internal error");

        TClientSettings::TDstVector part;
        part.reserve(dstLimit);
        THttpResult res;
        res.TicketsWithErrors.Tickets.reserve(dsts.size());
        res.Responses.reserve(dsts.size() / dstLimit + 1);

        for (auto it = dsts.begin(); it != dsts.end();) {
            part.clear();
            for (size_t count = 0; it != dsts.end() && count < dstLimit; ++count, ++it) {
                part.push_back(*it);
            }

            TString response =
                FetchWithRetries(
                    [this, &part]() {
                        // create request here to keep 'ts' actual
                        return FetchServiceTicketsFromHttp(PrepareRequestForServiceTickets(
                            Settings_.GetSelfTvmId(),
                            *SigningContext_,
                            part,
                            ProcInfo_));
                    },
                    EScope::ServiceTickets)
                    .Response;
            ParseTicketsFromResponse(response, part, res.TicketsWithErrors);
            LogDebug(TStringBuilder()
                     << "Response with service tickets for " << part.size()
                     << " destination(s) was successfully fetched from " << TvmUrl_);

            res.Responses.push_back(response);
        }

        LogDebug(TStringBuilder()
                 << "Got responses with service tickets with " << res.Responses.size() << " pages for "
                 << dsts.size() << " destination(s)");
        for (const auto& p : res.TicketsWithErrors.Errors) {
            LogError(TStringBuilder()
                     << "Failed to get service ticket for dst=" << p.first << ": " << p.second);
        }

        return res;
    }

    TString TThreadedUpdater::GetPublicKeysFromHttp() const {
        TString publicKeys =
            FetchWithRetries(
                [this]() { return FetchPublicKeysFromHttp(); },
                EScope::PublicKeys)
                .Response;

        LogDebug("Public keys were successfully fetched from " + TvmUrl_);

        return publicKeys;
    }

    NUtils::TFetchResult TThreadedUpdater::FetchServiceTicketsFromHttp(const TString& body) const {
        TStringStream s;

        THttpHeaders outHeaders;
        TKeepAliveHttpClient::THttpCode code = GetClient().DoPost("/2/ticket", body, &s, Headers_, &outHeaders);

        const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings");

        return {code, {}, "/2/ticket", s.Str(), settings ? settings->Value() : ""};
    }

    NUtils::TFetchResult TThreadedUpdater::FetchPublicKeysFromHttp() const {
        TStringStream s;

        THttpHeaders outHeaders;
        TKeepAliveHttpClient::THttpCode code = GetClient().DoGet(PublicKeysUrl_, &s, {}, &outHeaders);

        const THttpInputHeader* settings = outHeaders.FindHeader("X-Ya-Retry-Settings");

        return {code, {}, "/2/keys", s.Str(), settings ? settings->Value() : ""};
    }

    bool TThreadedUpdater::UpdateRetrySettings(const TString& header) const {
        if (header.empty()) {
            // Probably it is some kind of test?
            return false;
        }

        try {
            TString raw = NUtils::Base64url2bin(header);
            Y_ENSURE(raw, "Invalid base64url in settings");

            retry_settings::v1::Settings proto;
            Y_ENSURE(proto.ParseFromString(raw), "Invalid proto");

            // This ugly hack helps to process these settings in any case
            TThreadedUpdater& this_ = *const_cast<TThreadedUpdater*>(this);
            TRetrySettings& res = this_.RetrySettings_;

            TStringStream diff;
            auto update = [&diff](auto& l, const auto& r, TStringBuf desc) {
                if (l != r) {
                    diff << desc << ":" << l << "->" << r << ";";
                    l = r;
                }
            };

            if (proto.has_exponential_backoff_min_sec()) {
                update(res.BackoffSettings.Min,
                       TDuration::Seconds(proto.exponential_backoff_min_sec()),
                       "exponential_backoff_min");
            }
            if (proto.has_exponential_backoff_max_sec()) {
                update(res.BackoffSettings.Max,
                       TDuration::Seconds(proto.exponential_backoff_max_sec()),
                       "exponential_backoff_max");
            }
            if (proto.has_exponential_backoff_factor()) {
                update(res.BackoffSettings.Factor,
                       proto.exponential_backoff_factor(),
                       "exponential_backoff_factor");
            }
            if (proto.has_exponential_backoff_jitter()) {
                update(res.BackoffSettings.Jitter,
                       proto.exponential_backoff_jitter(),
                       "exponential_backoff_jitter");
            }
            this_.ExpBackoff_.UpdateSettings(res.BackoffSettings);

            if (proto.has_max_random_sleep_default()) {
                update(res.MaxRandomSleepDefault,
                       TDuration::MilliSeconds(proto.max_random_sleep_default()),
                       "max_random_sleep_default");
            }
            if (proto.has_max_random_sleep_when_ok()) {
                update(res.MaxRandomSleepWhenOk,
                       TDuration::MilliSeconds(proto.max_random_sleep_when_ok()),
                       "max_random_sleep_when_ok");
            }
            if (proto.has_retries_on_start()) {
                Y_ENSURE(proto.retries_on_start(), "retries_on_start==0");
                update(res.RetriesOnStart,
                       proto.retries_on_start(),
                       "retries_on_start");
            }
            if (proto.has_retries_in_background()) {
                Y_ENSURE(proto.retries_in_background(), "retries_in_background==0");
                update(res.RetriesInBackground,
                       proto.retries_in_background(),
                       "retries_in_background");
            }
            if (proto.has_worker_awaking_period_sec()) {
                update(res.WorkerAwakingPeriod,
                       TDuration::Seconds(proto.worker_awaking_period_sec()),
                       "worker_awaking_period");
                this_.WorkerAwakingPeriod_ = res.WorkerAwakingPeriod;
            }
            if (proto.has_dsts_limit()) {
                Y_ENSURE(proto.dsts_limit(), "dsts_limit==0");
                update(res.DstsLimit,
                       proto.dsts_limit(),
                       "dsts_limit");
            }

            if (proto.has_roles_update_period_sec()) {
                Y_ENSURE(proto.roles_update_period_sec(), "roles_update_period==0");
                update(res.RolesUpdatePeriod,
                       TDuration::Seconds(proto.roles_update_period_sec()),
                       "roles_update_period_sec");
            }
            if (proto.has_roles_warn_period_sec()) {
                Y_ENSURE(proto.roles_warn_period_sec(), "roles_warn_period_sec==0");
                update(res.RolesWarnPeriod,
                       TDuration::Seconds(proto.roles_warn_period_sec()),
                       "roles_warn_period_sec");
            }

            if (diff.empty()) {
                return false;
            }

            LogDebug("Retry settings were updated: " + diff.Str());
            return true;
        } catch (const std::exception& e) {
            LogWarning(TStringBuilder()
                       << "Failed to update retry settings from server, header '"
                       << header << "': "
                       << e.what());
        }

        return false;
    }

    template <typename Func>
    NUtils::TFetchResult TThreadedUpdater::FetchWithRetries(Func func, EScope scope) const {
        const ui32 tries = Inited_ ? RetrySettings_.RetriesInBackground
                                   : RetrySettings_.RetriesOnStart;

        for (size_t idx = 1;; ++idx) {
            RandomSleep();

            try {
                NUtils::TFetchResult result = func();

                if (UpdateRetrySettings(result.RetrySettings) && RetrySettingsFilepath_) {
                    TDiskWriter w(RetrySettingsFilepath_, Logger_.Get());
                    w.Write(result.RetrySettings);
                }

                if (400 <= result.Code && result.Code <= 499) {
                    throw TNonRetriableException() << ProcessHttpError(scope, result.Path, result.Code, result.Response);
                }
                if (result.Code < 200 || result.Code >= 399) {
                    throw yexception() << ProcessHttpError(scope, result.Path, result.Code, result.Response);
                }

                ExpBackoff_.Decrease();
                return result;
            } catch (const TNonRetriableException& e) {
                LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what());
                ExpBackoff_.Increase();
                throw;
            } catch (const std::exception& e) {
                LogWarning(TStringBuilder() << "Failed to get " << scope << ": " << e.what());
                ExpBackoff_.Increase();
                if (idx >= tries) {
                    throw;
                }
            }
        }

        throw yexception() << "unreachable";
    }

    void TThreadedUpdater::RandomSleep() const {
        const TDuration maxSleep = TClientStatus::ECode::Ok == GetState()
                                       ? RetrySettings_.MaxRandomSleepWhenOk
                                       : RetrySettings_.MaxRandomSleepDefault;

        if (maxSleep) {
            ui32 toSleep = Random_.GenRand() % maxSleep.MilliSeconds();
            ExpBackoff_.Sleep(TDuration::MilliSeconds(toSleep));
        }
    }

    TString TThreadedUpdater::PrepareRequestForServiceTickets(TTvmId src,
                                                              const TServiceContext& ctx,
                                                              const TClientSettings::TDstVector& dsts,
                                                              const NUtils::TProcInfo& procInfo,
                                                              time_t now) {
        TStringStream s;

        const TString ts = IntToString<10>(now);
        TStringStream dst;
        dst.Reserve(10 * dsts.size());
        for (const TClientSettings::TDst& d : dsts) {
            if (dst.Str()) {
                dst << ',';
            }
            dst << d.Id;
        }

        s << "grant_type=client_credentials";
        s << "&src=" << src;
        s << "&dst=" << dst.Str();
        s << "&ts=" << ts;
        s << "&sign=" << ctx.SignCgiParamsForTvm(ts, dst.Str());
        s << "&get_retry_settings=yes";

        s << "&";
        procInfo.AddToRequest(s);

        return s.Str();
    }

    template <class Dsts>
    void TThreadedUpdater::ParseTicketsFromResponse(TStringBuf resp,
                                                    const Dsts& dsts,
                                                    TPairTicketsErrors& out) const {
        NJson::TJsonValue doc;
        Y_ENSURE(NJson::ReadJsonTree(resp, &doc), "Invalid json from tvm-api: " << resp);

        const NJson::TJsonValue* currentResp = doc.IsMap() ? &doc : nullptr;
        auto find = [&currentResp, &doc](TTvmId id, NJson::TJsonValue& obj) -> bool {
            const TString idStr = IntToString<10>(id);
            if (currentResp && currentResp->GetValue(idStr, &obj)) {
                return true;
            }

            for (const NJson::TJsonValue& val : doc.GetArray()) {
                currentResp = &val;
                if (currentResp->GetValue(idStr, &obj)) {
                    return true;
                }
            }

            return false;
        };

        for (const TClientSettings::TDst& d : dsts) {
            NJson::TJsonValue obj;
            NJson::TJsonValue val;

            if (!find(d.Id, obj) || !obj.GetValue("ticket", &val)) {
                TString err;
                if (obj.GetValue("error", &val)) {
                    err = val.GetString();
                } else {
                    err = "Missing tvm_id in response, should never happend: " + IntToString<10>(d.Id);
                }

                TStringStream s;
                s << "Failed to get ServiceTicket for " << d.Id << ": " << err;
                ProcessError(EType::NonRetriable, EScope::ServiceTickets, s.Str());

                out.Errors.insert({d.Id, std::move(err)});
                continue;
            }

            out.Tickets.insert({d.Id, val.GetString()});
        }
    }

    static const char DELIMETER = '\t';
    TString TThreadedUpdater::PrepareTicketsForDisk(TStringBuf tvmResponse, TTvmId selfId) {
        TStringStream s;
        s << tvmResponse << DELIMETER << selfId;
        return s.Str();
    }

    std::pair<TStringBuf, TTvmId> TThreadedUpdater::ParseTicketsFromDisk(TStringBuf data) {
        TStringBuf tvmId = data.RNextTok(DELIMETER);
        return {data, IntFromString<TTvmId, 10>(tvmId)};
    }

    const TDstSet& TThreadedUpdater::GetDsts() const {
        return Destinations_;
    }

    void TThreadedUpdater::AddDstToSettings(const TClientSettings::TDst& dst) {
        Destinations_.insert(dst);
    }

    bool TThreadedUpdater::IsTimeToUpdateServiceTickets(TInstant lastUpdate) const {
        return TInstant::Now() - lastUpdate > ServiceTicketsDurations_.RefreshPeriod;
    }

    bool TThreadedUpdater::IsTimeToUpdatePublicKeys(TInstant lastUpdate) const {
        return TInstant::Now() - lastUpdate > PublicKeysDurations_.RefreshPeriod;
    }

    bool TThreadedUpdater::AreServicesTicketsOk() const {
        if (!Settings_.IsServiceTicketFetchingRequired()) {
            return true;
        }
        auto c = GetCachedServiceTickets();
        return c && (!Settings_.IsIncompleteTicketsSetAnError || c->TicketsById.size() == Destinations_.size());
    }

    bool TThreadedUpdater::IsServiceContextOk() const {
        if (!Settings_.IsServiceTicketCheckingRequired()) {
            return true;
        }

        return bool(GetCachedServiceContext());
    }

    bool TThreadedUpdater::IsUserContextOk() const {
        if (!Settings_.IsUserTicketCheckingRequired()) {
            return true;
        }
        return bool(GetCachedUserContext());
    }

    void TThreadedUpdater::Worker() {
        UpdateServiceTickets();
        UpdatePublicKeys();
        UpdateRoles();
    }

    TServiceTickets::TMapAliasId TThreadedUpdater::MakeAliasMap(const TClientSettings& settings) {
        TServiceTickets::TMapAliasId res;

        if (settings.HasDstAliases()) {
            for (const auto& pair : settings.GetDstAliases()) {
                res.insert({pair.first, pair.second.Id});
            }
        }

        return res;
    }

    TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(TServiceTicketsPtr available, const TDstSet& required) {
        Y_ENSURE(available);
        TDstSet set;
        // available->TicketsById is not sorted
        for (const auto& pair : available->TicketsById) {
            set.insert(pair.first);
        }
        return FindMissingDsts(set, required);
    }

    TClientSettings::TDstVector TThreadedUpdater::FindMissingDsts(const TDstSet& available, const TDstSet& required) {
        TClientSettings::TDstVector res;
        std::set_difference(required.begin(), required.end(),
                            available.begin(), available.end(),
                            std::inserter(res, res.begin()));
        return res;
    }

    TString TThreadedUpdater::CreateJsonArray(const TSmallVec<TString>& responses) {
        if (responses.empty()) {
            return "[]";
        }

        size_t size = 0;
        for (const TString& r : responses) {
            size += r.size() + 1;
        }

        TString res;
        res.reserve(size + 2);

        res.push_back('[');
        for (const TString& r : responses) {
            res.append(r).push_back(',');
        }
        res.back() = ']';

        return res;
    }

    TString TThreadedUpdater::AppendToJsonArray(const TString& json, const TSmallVec<TString>& responses) {
        Y_ENSURE(json, "previous body required");

        size_t size = 0;
        for (const TString& r : responses) {
            size += r.size() + 1;
        }

        TString res;
        res.reserve(size + 2 + json.size());

        res.push_back('[');
        if (json.StartsWith('[')) {
            Y_ENSURE(json.EndsWith(']'), "array is broken:" << json);
            res.append(TStringBuf(json).Chop(1).Skip(1));
        } else {
            res.append(json);
        }

        res.push_back(',');
        for (const TString& r : responses) {
            res.append(r).push_back(',');
        }
        res.back() = ']';

        return res;
    }
}