diff options
author | aneporada <[email protected]> | 2025-05-15 23:30:38 +0300 |
---|---|---|
committer | aneporada <[email protected]> | 2025-05-15 23:45:15 +0300 |
commit | b112eec6052c726ba4123cb18d4ceefecbb1535f (patch) | |
tree | 0527b09c64b8a64e48780d9b3a4c8876c38ba1e5 | |
parent | fe4b7acaa919cb7e6343a40bcb5a7bff9445dc4a (diff) |
Support per-cluster yt.ExternalTx
commit_hash:3168025bdc4040710c7679109c4dd80cb5097b14
7 files changed, 37 insertions, 20 deletions
diff --git a/yql/essentials/providers/common/config/yql_dispatch.h b/yql/essentials/providers/common/config/yql_dispatch.h index 0b7a9077dd4..97630e857de 100644 --- a/yql/essentials/providers/common/config/yql_dispatch.h +++ b/yql/essentials/providers/common/config/yql_dispatch.h @@ -77,12 +77,22 @@ YQL_PRIMITIVE_SETTING_PARSER_TYPES(YQL_DECLARE_SETTING_PARSER) YQL_CONTAINER_SETTING_PARSER_TYPES(YQL_DECLARE_SETTING_PARSER) template<typename TType> -TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, true>& setting, const TString& cluster) { +TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, true, false>& setting, const TString& cluster) { return setting.Get(cluster); } template<typename TType> -TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, false>& setting, const TString& cluster) { +TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, true, true>& setting, const TString& cluster) { + return setting.Get(cluster); +} + +template<typename TType> +TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, false, true>& setting, const TString& cluster) { + return setting.Get(cluster); +} + +template<typename TType> +TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, false, false>& setting, const TString& cluster) { Y_UNUSED(cluster); return setting.Get(); } @@ -125,7 +135,7 @@ public: TString Name_; }; - template <typename TType, bool RUNTIME> + template <typename TType, bool RUNTIME, bool PERCLUSTER> class TSettingHandlerImpl: public TSettingHandler { public: using TValueCallback = std::function<void(const TString&, TType)>; @@ -133,7 +143,7 @@ public: private: friend class TSettingDispatcher; - TSettingHandlerImpl(const TString& name, TConfSetting<TType, RUNTIME>& setting) + TSettingHandlerImpl(const TString& name, TConfSetting<TType, RUNTIME, PERCLUSTER>& setting) : TSettingHandler(name) , Setting_(setting) , Parser_(::NYql::NPrivate::GetDefaultParser<TType>()) @@ -312,8 +322,8 @@ public: } private: - TConfSetting<TType, RUNTIME>& Setting_; - TMaybe<TConfSetting<TType, RUNTIME>> Defaul_; + TConfSetting<TType, RUNTIME, PERCLUSTER>& Setting_; + TMaybe<TConfSetting<TType, RUNTIME, PERCLUSTER>> Defaul_; ::NYql::NPrivate::TParser<TType> Parser_; TValueCallback ValueSetter_; TVector<TValueCallback> Validators_; @@ -339,9 +349,9 @@ public: ValidClusters.insert(cluster); } - template <typename TType, bool RUNTIME> - TSettingHandlerImpl<TType, RUNTIME>& AddSetting(const TString& name, TConfSetting<TType, RUNTIME>& setting) { - TIntrusivePtr<TSettingHandlerImpl<TType, RUNTIME>> handler = new TSettingHandlerImpl<TType, RUNTIME>(name, setting); + template <typename TType, bool RUNTIME, bool PERCLUSTER> + TSettingHandlerImpl<TType, RUNTIME, PERCLUSTER>& AddSetting(const TString& name, TConfSetting<TType, RUNTIME, PERCLUSTER>& setting) { + TIntrusivePtr<TSettingHandlerImpl<TType, RUNTIME, PERCLUSTER>> handler = new TSettingHandlerImpl<TType, RUNTIME, PERCLUSTER>(name, setting); if (!Handlers.insert({NormalizeName(name), handler}).second) { ythrow yexception() << "Duplicate configuration setting name " << name.Quote(); } diff --git a/yql/essentials/providers/common/config/yql_setting.h b/yql/essentials/providers/common/config/yql_setting.h index 148b211bb7b..78e90ca0c5a 100644 --- a/yql/essentials/providers/common/config/yql_setting.h +++ b/yql/essentials/providers/common/config/yql_setting.h @@ -12,7 +12,8 @@ namespace NCommon { const TString ALL_CLUSTERS = "$all"; -template <typename TType, bool RUNTIME = true> +// TODO: replace RUNTIME/PERCLUSTER with enum +template <typename TType, bool RUNTIME = true, bool PERCLUSTER = false> class TConfSetting { public: TConfSetting() = default; @@ -80,7 +81,7 @@ private: }; template <typename TType> -class TConfSetting<TType, false> { +class TConfSetting<TType, false, false> { public: TConfSetting() = default; TConfSetting(const TType& value) @@ -97,7 +98,7 @@ public: TType& operator[](const TString& cluster) { if (cluster != ALL_CLUSTERS) { - ythrow yexception() << "Static setting cannot be set for specific cluster"; + ythrow yexception() << "Global static setting cannot be set for specific cluster"; } Value.ConstructInPlace(); return Value.GetRef(); diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h index ebe96a0dd50..acca4d22e90 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.h +++ b/yt/yql/providers/yt/common/yql_yt_settings.h @@ -86,8 +86,12 @@ struct TYtSettings { using TConstPtr = std::shared_ptr<const TYtSettings>; // should be static, because are used on earlier stages + + // static per-cluster + NCommon::TConfSetting<TGUID, false, true> ExternalTx; + + // static global NCommon::TConfSetting<TString, false> Auth; - NCommon::TConfSetting<TGUID, false> ExternalTx; NCommon::TConfSetting<TString, false> TmpFolder; NCommon::TConfSetting<TString, false> TablesTmpFolder; NCommon::TConfSetting<TDuration, false> TempTablesTtl; diff --git a/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp b/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp index 2b3d4342e72..0538db57846 100644 --- a/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp +++ b/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp @@ -433,11 +433,11 @@ TTransactionCache::TEntry::TPtr TTransactionCache::TryGetEntry(const TString& se return {}; } -TTransactionCache::TEntry::TPtr TTransactionCache::GetOrCreateEntry(const TString& server, const TString& token, +TTransactionCache::TEntry::TPtr TTransactionCache::GetOrCreateEntry(const TString& cluster, const TString& server, const TString& token, const TMaybe<TString>& impersonationUser, const TSpecProvider& specProvider, const TYtSettings::TConstPtr& config, IMetricsRegistryPtr metrics) { TEntry::TPtr createdEntry = nullptr; - NYT::TTransactionId externalTx = config->ExternalTx.Get().GetOrElse(TGUID()); + NYT::TTransactionId externalTx = config->ExternalTx.Get(cluster).GetOrElse(TGUID()); with_lock(Lock_) { auto it = TxMap_.find(server); if (it != TxMap_.end()) { @@ -447,6 +447,7 @@ TTransactionCache::TEntry::TPtr TTransactionCache::GetOrCreateEntry(const TStrin TString tmpFolder = GetTablesTmpFolder(*config); createdEntry = MakeIntrusive<TEntry>(); + createdEntry->Cluster = cluster; createdEntry->Server = server; auto createClientOptions = TCreateClientOptions().Token(token); if (impersonationUser) { @@ -485,9 +486,9 @@ TTransactionCache::TEntry::TPtr TTransactionCache::GetOrCreateEntry(const TStrin TxMap_.emplace(server, createdEntry); } if (externalTx) { - YQL_CLOG(INFO, ProviderYt) << "Attached to external tx " << GetGuidAsString(externalTx); + YQL_CLOG(INFO, ProviderYt) << "Attached to external tx " << GetGuidAsString(externalTx) << " on cluster " << cluster; } - YQL_CLOG(INFO, ProviderYt) << "Created tx " << GetGuidAsString(createdEntry->Tx->GetId()) << " on " << server; + YQL_CLOG(INFO, ProviderYt) << "Created tx " << GetGuidAsString(createdEntry->Tx->GetId()) << " on " << server << " cluster " << cluster; return createdEntry; } diff --git a/yt/yql/providers/yt/gateway/lib/transaction_cache.h b/yt/yql/providers/yt/gateway/lib/transaction_cache.h index 61fc1254e05..1bfcefef6a4 100644 --- a/yt/yql/providers/yt/gateway/lib/transaction_cache.h +++ b/yt/yql/providers/yt/gateway/lib/transaction_cache.h @@ -32,6 +32,7 @@ public: using TSpecProvider = std::function<NYT::TNode()>; struct TEntry : public TThrRefBase { + TString Cluster; TString Server; NYT::IClientPtr Client; NYT::ITransactionPtr Tx; @@ -166,7 +167,7 @@ public: TTransactionCache(const TString& userName); TEntry::TPtr GetEntry(const TString& server); - TEntry::TPtr GetOrCreateEntry(const TString& server, const TString& token, const TMaybe<TString>& impersonationUser, const TSpecProvider& specProvider, const TYtSettings::TConstPtr& config, IMetricsRegistryPtr metrics); + TEntry::TPtr GetOrCreateEntry(const TString& cluster, const TString& server, const TString& token, const TMaybe<TString>& impersonationUser, const TSpecProvider& specProvider, const TYtSettings::TConstPtr& config, IMetricsRegistryPtr metrics); TEntry::TPtr TryGetEntry(const TString& server); void Commit(const TString& server); diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp index 4f239d9be21..87e0c8eda00 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp @@ -404,7 +404,7 @@ TTransactionCache::TEntry::TPtr TExecContextBase::GetOrCreateEntry(const TYtSett "Accessing YT cluster " << Cluster_.Quote() << " without OAuth token is not allowed"; } - return Session_->TxCache_.GetOrCreateEntry(YtServer_, token, impersonationUser, [s = Session_]() { return s->CreateSpecWithDesc(); }, settings, Metrics); + return Session_->TxCache_.GetOrCreateEntry(Cluster_, YtServer_, token, impersonationUser, [s = Session_]() { return s->CreateSpecWithDesc(); }, settings, Metrics); } TExpressionResorceUsage TExecContextBase::ScanExtraResourceUsageImpl(const TExprNode& node, const TYtSettings::TConstPtr& config, bool withInput) { diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 5ffc4cf1ef6..5612a85adf9 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -889,7 +889,7 @@ public: if (res.ExternalTransactionId) { param("external_tx", *res.ExternalTransactionId); } - } else if (auto externalTx = State_->Configuration->ExternalTx.Get().GetOrElse(TGUID())) { + } else if (auto externalTx = State_->Configuration->ExternalTx.Get(cluster).GetOrElse(TGUID())) { param("external_tx", GetGuidAsString(externalTx)); } TString tokenName; |