summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <[email protected]>2025-05-15 23:30:38 +0300
committeraneporada <[email protected]>2025-05-15 23:45:15 +0300
commitb112eec6052c726ba4123cb18d4ceefecbb1535f (patch)
tree0527b09c64b8a64e48780d9b3a4c8876c38ba1e5
parentfe4b7acaa919cb7e6343a40bcb5a7bff9445dc4a (diff)
Support per-cluster yt.ExternalTx
commit_hash:3168025bdc4040710c7679109c4dd80cb5097b14
-rw-r--r--yql/essentials/providers/common/config/yql_dispatch.h28
-rw-r--r--yql/essentials/providers/common/config/yql_setting.h7
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.h6
-rw-r--r--yt/yql/providers/yt/gateway/lib/transaction_cache.cpp9
-rw-r--r--yt/yql/providers/yt/gateway/lib/transaction_cache.h3
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp2
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp2
7 files changed, 37 insertions, 20 deletions
diff --git a/yql/essentials/providers/common/config/yql_dispatch.h b/yql/essentials/providers/common/config/yql_dispatch.h
index 0b7a9077dd4..97630e857de 100644
--- a/yql/essentials/providers/common/config/yql_dispatch.h
+++ b/yql/essentials/providers/common/config/yql_dispatch.h
@@ -77,12 +77,22 @@ YQL_PRIMITIVE_SETTING_PARSER_TYPES(YQL_DECLARE_SETTING_PARSER)
YQL_CONTAINER_SETTING_PARSER_TYPES(YQL_DECLARE_SETTING_PARSER)
template<typename TType>
-TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, true>& setting, const TString& cluster) {
+TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, true, false>& setting, const TString& cluster) {
return setting.Get(cluster);
}
template<typename TType>
-TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, false>& setting, const TString& cluster) {
+TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, true, true>& setting, const TString& cluster) {
+ return setting.Get(cluster);
+}
+
+template<typename TType>
+TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, false, true>& setting, const TString& cluster) {
+ return setting.Get(cluster);
+}
+
+template<typename TType>
+TMaybe<TType> GetValue(const NCommon::TConfSetting<TType, false, false>& setting, const TString& cluster) {
Y_UNUSED(cluster);
return setting.Get();
}
@@ -125,7 +135,7 @@ public:
TString Name_;
};
- template <typename TType, bool RUNTIME>
+ template <typename TType, bool RUNTIME, bool PERCLUSTER>
class TSettingHandlerImpl: public TSettingHandler {
public:
using TValueCallback = std::function<void(const TString&, TType)>;
@@ -133,7 +143,7 @@ public:
private:
friend class TSettingDispatcher;
- TSettingHandlerImpl(const TString& name, TConfSetting<TType, RUNTIME>& setting)
+ TSettingHandlerImpl(const TString& name, TConfSetting<TType, RUNTIME, PERCLUSTER>& setting)
: TSettingHandler(name)
, Setting_(setting)
, Parser_(::NYql::NPrivate::GetDefaultParser<TType>())
@@ -312,8 +322,8 @@ public:
}
private:
- TConfSetting<TType, RUNTIME>& Setting_;
- TMaybe<TConfSetting<TType, RUNTIME>> Defaul_;
+ TConfSetting<TType, RUNTIME, PERCLUSTER>& Setting_;
+ TMaybe<TConfSetting<TType, RUNTIME, PERCLUSTER>> Defaul_;
::NYql::NPrivate::TParser<TType> Parser_;
TValueCallback ValueSetter_;
TVector<TValueCallback> Validators_;
@@ -339,9 +349,9 @@ public:
ValidClusters.insert(cluster);
}
- template <typename TType, bool RUNTIME>
- TSettingHandlerImpl<TType, RUNTIME>& AddSetting(const TString& name, TConfSetting<TType, RUNTIME>& setting) {
- TIntrusivePtr<TSettingHandlerImpl<TType, RUNTIME>> handler = new TSettingHandlerImpl<TType, RUNTIME>(name, setting);
+ template <typename TType, bool RUNTIME, bool PERCLUSTER>
+ TSettingHandlerImpl<TType, RUNTIME, PERCLUSTER>& AddSetting(const TString& name, TConfSetting<TType, RUNTIME, PERCLUSTER>& setting) {
+ TIntrusivePtr<TSettingHandlerImpl<TType, RUNTIME, PERCLUSTER>> handler = new TSettingHandlerImpl<TType, RUNTIME, PERCLUSTER>(name, setting);
if (!Handlers.insert({NormalizeName(name), handler}).second) {
ythrow yexception() << "Duplicate configuration setting name " << name.Quote();
}
diff --git a/yql/essentials/providers/common/config/yql_setting.h b/yql/essentials/providers/common/config/yql_setting.h
index 148b211bb7b..78e90ca0c5a 100644
--- a/yql/essentials/providers/common/config/yql_setting.h
+++ b/yql/essentials/providers/common/config/yql_setting.h
@@ -12,7 +12,8 @@ namespace NCommon {
const TString ALL_CLUSTERS = "$all";
-template <typename TType, bool RUNTIME = true>
+// TODO: replace RUNTIME/PERCLUSTER with enum
+template <typename TType, bool RUNTIME = true, bool PERCLUSTER = false>
class TConfSetting {
public:
TConfSetting() = default;
@@ -80,7 +81,7 @@ private:
};
template <typename TType>
-class TConfSetting<TType, false> {
+class TConfSetting<TType, false, false> {
public:
TConfSetting() = default;
TConfSetting(const TType& value)
@@ -97,7 +98,7 @@ public:
TType& operator[](const TString& cluster) {
if (cluster != ALL_CLUSTERS) {
- ythrow yexception() << "Static setting cannot be set for specific cluster";
+ ythrow yexception() << "Global static setting cannot be set for specific cluster";
}
Value.ConstructInPlace();
return Value.GetRef();
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h
index ebe96a0dd50..acca4d22e90 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.h
+++ b/yt/yql/providers/yt/common/yql_yt_settings.h
@@ -86,8 +86,12 @@ struct TYtSettings {
using TConstPtr = std::shared_ptr<const TYtSettings>;
// should be static, because are used on earlier stages
+
+ // static per-cluster
+ NCommon::TConfSetting<TGUID, false, true> ExternalTx;
+
+ // static global
NCommon::TConfSetting<TString, false> Auth;
- NCommon::TConfSetting<TGUID, false> ExternalTx;
NCommon::TConfSetting<TString, false> TmpFolder;
NCommon::TConfSetting<TString, false> TablesTmpFolder;
NCommon::TConfSetting<TDuration, false> TempTablesTtl;
diff --git a/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp b/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp
index 2b3d4342e72..0538db57846 100644
--- a/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp
+++ b/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp
@@ -433,11 +433,11 @@ TTransactionCache::TEntry::TPtr TTransactionCache::TryGetEntry(const TString& se
return {};
}
-TTransactionCache::TEntry::TPtr TTransactionCache::GetOrCreateEntry(const TString& server, const TString& token,
+TTransactionCache::TEntry::TPtr TTransactionCache::GetOrCreateEntry(const TString& cluster, const TString& server, const TString& token,
const TMaybe<TString>& impersonationUser, const TSpecProvider& specProvider, const TYtSettings::TConstPtr& config, IMetricsRegistryPtr metrics)
{
TEntry::TPtr createdEntry = nullptr;
- NYT::TTransactionId externalTx = config->ExternalTx.Get().GetOrElse(TGUID());
+ NYT::TTransactionId externalTx = config->ExternalTx.Get(cluster).GetOrElse(TGUID());
with_lock(Lock_) {
auto it = TxMap_.find(server);
if (it != TxMap_.end()) {
@@ -447,6 +447,7 @@ TTransactionCache::TEntry::TPtr TTransactionCache::GetOrCreateEntry(const TStrin
TString tmpFolder = GetTablesTmpFolder(*config);
createdEntry = MakeIntrusive<TEntry>();
+ createdEntry->Cluster = cluster;
createdEntry->Server = server;
auto createClientOptions = TCreateClientOptions().Token(token);
if (impersonationUser) {
@@ -485,9 +486,9 @@ TTransactionCache::TEntry::TPtr TTransactionCache::GetOrCreateEntry(const TStrin
TxMap_.emplace(server, createdEntry);
}
if (externalTx) {
- YQL_CLOG(INFO, ProviderYt) << "Attached to external tx " << GetGuidAsString(externalTx);
+ YQL_CLOG(INFO, ProviderYt) << "Attached to external tx " << GetGuidAsString(externalTx) << " on cluster " << cluster;
}
- YQL_CLOG(INFO, ProviderYt) << "Created tx " << GetGuidAsString(createdEntry->Tx->GetId()) << " on " << server;
+ YQL_CLOG(INFO, ProviderYt) << "Created tx " << GetGuidAsString(createdEntry->Tx->GetId()) << " on " << server << " cluster " << cluster;
return createdEntry;
}
diff --git a/yt/yql/providers/yt/gateway/lib/transaction_cache.h b/yt/yql/providers/yt/gateway/lib/transaction_cache.h
index 61fc1254e05..1bfcefef6a4 100644
--- a/yt/yql/providers/yt/gateway/lib/transaction_cache.h
+++ b/yt/yql/providers/yt/gateway/lib/transaction_cache.h
@@ -32,6 +32,7 @@ public:
using TSpecProvider = std::function<NYT::TNode()>;
struct TEntry : public TThrRefBase {
+ TString Cluster;
TString Server;
NYT::IClientPtr Client;
NYT::ITransactionPtr Tx;
@@ -166,7 +167,7 @@ public:
TTransactionCache(const TString& userName);
TEntry::TPtr GetEntry(const TString& server);
- TEntry::TPtr GetOrCreateEntry(const TString& server, const TString& token, const TMaybe<TString>& impersonationUser, const TSpecProvider& specProvider, const TYtSettings::TConstPtr& config, IMetricsRegistryPtr metrics);
+ TEntry::TPtr GetOrCreateEntry(const TString& cluster, const TString& server, const TString& token, const TMaybe<TString>& impersonationUser, const TSpecProvider& specProvider, const TYtSettings::TConstPtr& config, IMetricsRegistryPtr metrics);
TEntry::TPtr TryGetEntry(const TString& server);
void Commit(const TString& server);
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp
index 4f239d9be21..87e0c8eda00 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp
@@ -404,7 +404,7 @@ TTransactionCache::TEntry::TPtr TExecContextBase::GetOrCreateEntry(const TYtSett
"Accessing YT cluster " << Cluster_.Quote() << " without OAuth token is not allowed";
}
- return Session_->TxCache_.GetOrCreateEntry(YtServer_, token, impersonationUser, [s = Session_]() { return s->CreateSpecWithDesc(); }, settings, Metrics);
+ return Session_->TxCache_.GetOrCreateEntry(Cluster_, YtServer_, token, impersonationUser, [s = Session_]() { return s->CreateSpecWithDesc(); }, settings, Metrics);
}
TExpressionResorceUsage TExecContextBase::ScanExtraResourceUsageImpl(const TExprNode& node, const TYtSettings::TConstPtr& config, bool withInput) {
diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
index 5ffc4cf1ef6..5612a85adf9 100644
--- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
@@ -889,7 +889,7 @@ public:
if (res.ExternalTransactionId) {
param("external_tx", *res.ExternalTransactionId);
}
- } else if (auto externalTx = State_->Configuration->ExternalTx.Get().GetOrElse(TGUID())) {
+ } else if (auto externalTx = State_->Configuration->ExternalTx.Get(cluster).GetOrElse(TGUID())) {
param("external_tx", GetGuidAsString(externalTx));
}
TString tokenName;