summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yql/essentials/core/facade/yql_facade.cpp2
-rw-r--r--yql/essentials/minikql/runtime_settings/runtime_settings_configuration.cpp10
-rw-r--r--yql/essentials/minikql/runtime_settings/runtime_settings_configuration.h3
-rw-r--r--yql/essentials/minikql/runtime_settings/runtime_settings_serialization.cpp46
-rw-r--r--yql/essentials/minikql/runtime_settings/runtime_settings_serialization.h8
-rw-r--r--yql/essentials/minikql/runtime_settings/ut/runtime_settings_serialization_ut.cpp48
-rw-r--r--yql/essentials/minikql/runtime_settings/ut/ya.make1
-rw-r--r--yql/essentials/providers/common/config/ut/ya.make5
-rw-r--r--yql/essentials/providers/common/config/ut/yql_config_qplayer_ut.cpp200
-rw-r--r--yql/essentials/providers/common/config/yql_config_qplayer.h148
-rw-r--r--yql/essentials/providers/common/config/yql_dispatch.cpp7
-rw-r--r--yql/essentials/providers/common/config/yql_dispatch.h37
-rw-r--r--yql/essentials/providers/config/yql_config_provider.cpp10
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp4
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_spec.h6
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp2
16 files changed, 424 insertions, 113 deletions
diff --git a/yql/essentials/core/facade/yql_facade.cpp b/yql/essentials/core/facade/yql_facade.cpp
index 3ca85ea29ff..e2f6cc5343b 100644
--- a/yql/essentials/core/facade/yql_facade.cpp
+++ b/yql/essentials/core/facade/yql_facade.cpp
@@ -2089,7 +2089,7 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us
typeAnnotationContext->AddRemoteLayersProvider(alias, provider);
}
if (GatewaysConfig_) {
- typeAnnotationContext->RuntimeSettings = CreateRuntimeSettingsFromProto(GatewaysConfig_->GetRuntimeSettings(), username, Credentials_);
+ typeAnnotationContext->RuntimeSettings = CreateRuntimeSettingsFromProto(GatewaysConfig_->GetRuntimeSettings(), username, Credentials_, QContext_);
}
if (UdfIndex_ && UdfIndexPackageSet_) {
// setup default versions at the beginning
diff --git a/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.cpp b/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.cpp
index 21e3e9277d7..0d2f1a4d197 100644
--- a/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.cpp
+++ b/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.cpp
@@ -10,8 +10,14 @@ TRuntimeSettingsConfiguration::TRuntimeSettingsConfiguration()
{
}
-TRuntimeSettingsConfiguration::TRuntimeSettingsConfiguration(const TRuntimeSettings& settings)
- : TRuntimeSettings(settings)
+TRuntimeSettingsConfiguration::TRuntimeSettingsConfiguration(const TQContext& QContext)
+ : TRuntimeSettingsConfiguration(TRuntimeSettings(), QContext)
+{
+}
+
+TRuntimeSettingsConfiguration::TRuntimeSettingsConfiguration(const TRuntimeSettings& settings, const TQContext& QContext)
+ : TSettingDispatcher("runtime_settings", QContext)
+ , TRuntimeSettings(settings)
{
REGISTER_RUNTIME_SETTING(DatumValidation);
REGISTER_RUNTIME_SETTING(TestHostSetting);
diff --git a/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.h b/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.h
index 64e210a1b0c..735570e71ff 100644
--- a/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.h
+++ b/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.h
@@ -14,7 +14,8 @@ public:
using TConstPtr = TSharedPtr<const TRuntimeSettingsConfiguration, TAtomicCounter>;
TRuntimeSettingsConfiguration();
- explicit TRuntimeSettingsConfiguration(const TRuntimeSettings& settings);
+ explicit TRuntimeSettingsConfiguration(const TQContext& QContext);
+ explicit TRuntimeSettingsConfiguration(const TRuntimeSettings& settings, const TQContext& QContext = {});
};
TRuntimeSettingsConfiguration::TConstPtr MakeRuntimeSettingsConfiguration(auto&&... args) {
diff --git a/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.cpp b/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.cpp
index a63eccb3c0b..ad2c87b7a8f 100644
--- a/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.cpp
+++ b/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.cpp
@@ -10,13 +10,15 @@ namespace {
void FillUdfSettings(
TRuntimeSettingsConfiguration& config,
const google::protobuf::RepeatedPtrField<NProto::TUdfSettings>& udfSettings,
- const std::function<bool(const NProto::TRuntimeSetting&)>& filter)
+ const std::function<bool(const NProto::TRuntimeSetting&)>& filter,
+ const TQContext& qContext)
{
for (const auto& udf : udfSettings) {
- for (const auto& setting : udf.GetRuntimeSettings()) {
- if (filter(setting)) {
- config.SetUdfSetting(udf.GetModule(), setting.GetName(), setting.GetValue());
- }
+ TString activationLabel = TStringBuilder() << "runtime_settings_udf_" << udf.GetModule();
+ auto flags = NCommon::SelectAndSaveActivatedFlags<NProto::TRuntimeSetting>(
+ activationLabel, qContext, udf.GetRuntimeSettings(), filter, /*hasProviderName=*/true);
+ for (const auto& setting : flags) {
+ config.SetUdfSetting(udf.GetModule(), setting.GetName(), setting.GetValue());
}
}
}
@@ -25,27 +27,24 @@ TRuntimeSettings::TPtr CreateRuntimeSettingsFromProtoImpl(
const NProto::TRuntimeSettings& proto,
const TString& userName,
TCredentials::TPtr credentials,
- bool allowActivation)
+ bool allowActivation,
+ const TQContext& qContext)
{
- auto config = MakeRuntimeSettingsConfigurationMutable();
+ auto config = MakeRuntimeSettingsConfigurationMutable(qContext);
auto filter = NConfig::MakeActivationFilter<NProto::TRuntimeSetting>(userName, credentials, [&](const TString&) {
YQL_ENSURE(allowActivation, "Activation is not allowed. "
"Seems you are trying to load runtime settings with activation but all settings must be already activated.");
});
config->Dispatch(proto.GetHostSettings(), filter);
- FillUdfSettings(*config, proto.GetUdfSettings(), filter);
+ FillUdfSettings(*config, proto.GetUdfSettings(), filter, qContext);
return config;
}
-TRuntimeSettings::TPtr CreateRuntimeSettingsFromStringImpl(
- const TString& data,
- const TString& userName,
- TCredentials::TPtr credentials,
- bool allowActivation)
+TRuntimeSettings::TPtr CreateRuntimeSettingsFromStringImpl(const TString& data)
{
NProto::TRuntimeSettings proto;
proto.ParseFromStringOrThrow(data);
- return CreateRuntimeSettingsFromProtoImpl(proto, userName, credentials, allowActivation);
+ return CreateRuntimeSettingsFromProtoImpl(proto, "", nullptr, /*allowActivation=*/false, TQContext());
}
} // namespace
@@ -53,15 +52,16 @@ TRuntimeSettings::TPtr CreateRuntimeSettingsFromStringImpl(
TRuntimeSettings::TPtr CreateRuntimeSettingsFromProto(
const NProto::TRuntimeSettings& proto,
const TString& userName,
- TCredentials::TPtr credentials)
+ TCredentials::TPtr credentials,
+ const TQContext& qContext)
{
- return CreateRuntimeSettingsFromProtoImpl(proto, userName, credentials, /*allowActivation=*/true);
+ return CreateRuntimeSettingsFromProtoImpl(proto, userName, credentials, /*allowActivation=*/true, qContext);
}
TRuntimeSettings::TPtr DeserializeRuntimeSettingsFromProto(
const NProto::TRuntimeSettings& proto)
{
- return CreateRuntimeSettingsFromProtoImpl(proto, "", nullptr, /*allowActivation=*/false);
+ return CreateRuntimeSettingsFromProtoImpl(proto, "", nullptr, /*allowActivation=*/false, TQContext());
}
NProto::TRuntimeSettings SerializeRuntimeSettingsToProto(
@@ -95,19 +95,9 @@ TString SerializeRuntimeSettingsToString(const TRuntimeSettings& config) {
}
TRuntimeSettings::TPtr CreateRuntimeSettingsFromString(
- const TString& data,
- const TString& userName,
- TCredentials::TPtr credentials)
-{
- return CreateRuntimeSettingsFromStringImpl(data, userName, credentials, /*allowActivation=*/true);
-}
-
-TRuntimeSettings::TPtr CreateRuntimeSettingsFromString(
const TString& data)
{
- NProto::TRuntimeSettings proto;
- proto.ParseFromStringOrThrow(data);
- return CreateRuntimeSettingsFromStringImpl(data, "", nullptr, /*allowActivation=*/false);
+ return CreateRuntimeSettingsFromStringImpl(data);
}
} // namespace NYql
diff --git a/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.h b/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.h
index e358a783248..6375769b6a2 100644
--- a/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.h
+++ b/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.h
@@ -13,7 +13,8 @@ namespace NYql {
TRuntimeSettings::TPtr CreateRuntimeSettingsFromProto(
const NProto::TRuntimeSettings& proto,
const TString& userName,
- TCredentials::TPtr credentials);
+ TCredentials::TPtr credentials,
+ const TQContext& qContext);
TRuntimeSettings::TPtr DeserializeRuntimeSettingsFromProto(
const NProto::TRuntimeSettings& proto);
@@ -24,11 +25,6 @@ NProto::TRuntimeSettings SerializeRuntimeSettingsToProto(
TString SerializeRuntimeSettingsToString(const TRuntimeSettings& config);
TRuntimeSettings::TPtr CreateRuntimeSettingsFromString(
- const TString& data,
- const TString& userName,
- TCredentials::TPtr credentials);
-
-TRuntimeSettings::TPtr CreateRuntimeSettingsFromString(
const TString& data);
} // namespace NYql
diff --git a/yql/essentials/minikql/runtime_settings/ut/runtime_settings_serialization_ut.cpp b/yql/essentials/minikql/runtime_settings/ut/runtime_settings_serialization_ut.cpp
index 72e0f17b517..2d3df33bdcb 100644
--- a/yql/essentials/minikql/runtime_settings/ut/runtime_settings_serialization_ut.cpp
+++ b/yql/essentials/minikql/runtime_settings/ut/runtime_settings_serialization_ut.cpp
@@ -1,4 +1,5 @@
#include <yql/essentials/minikql/runtime_settings/runtime_settings_serialization.h>
+#include <yql/essentials/core/qplayer/storage/memory/yql_qstorage_memory.h>
#include <library/cpp/testing/unittest/registar.h>
@@ -50,7 +51,7 @@ Y_UNIT_TEST(Deserialization) {
TString data;
UNIT_ASSERT(proto.SerializeToString(&data));
- auto config = CreateRuntimeSettingsFromString(data, TString{}, nullptr);
+ auto config = CreateRuntimeSettingsFromString(data);
UNIT_ASSERT_VALUES_EQUAL(config->DatumValidation.Get(), true);
UNIT_ASSERT_VALUES_EQUAL(config->TestHostSetting.Get(), true);
@@ -67,13 +68,10 @@ Y_UNIT_TEST(HostSettingActivation50Percent) {
hostSetting->SetValue("true");
hostSetting->MutableActivation()->SetPercentage(50);
- TString data;
- UNIT_ASSERT(proto.SerializeToString(&data));
-
constexpr int Iterations = 10000;
int activatedCount = 0;
for (int i = 0; i < Iterations; ++i) {
- auto config = CreateRuntimeSettingsFromString(data, TString{}, nullptr);
+ auto config = CreateRuntimeSettingsFromProto(proto, TString{}, nullptr, TQContext());
if (config->DatumValidation.Get()) {
++activatedCount;
}
@@ -92,13 +90,10 @@ Y_UNIT_TEST(UdfSettingActivation50Percent) {
udfSetting->SetValue("Val");
udfSetting->MutableActivation()->SetPercentage(50);
- TString data;
- UNIT_ASSERT(proto.SerializeToString(&data));
-
constexpr int Iterations = 10000;
int activatedCount = 0;
for (int i = 0; i < Iterations; ++i) {
- auto config = CreateRuntimeSettingsFromString(data, TString{}, nullptr);
+ auto config = CreateRuntimeSettingsFromProto(proto, TString{}, nullptr, TQContext());
if (!config->GetUdfSetting("MyModule", "Key").empty()) {
++activatedCount;
}
@@ -108,6 +103,41 @@ Y_UNIT_TEST(UdfSettingActivation50Percent) {
UNIT_ASSERT_LE(activatedCount, Iterations * 3 / 4);
}
+Y_UNIT_TEST(ActivationStatePreservedAfterQContextRoundTrip) {
+ NProto::TRuntimeSettings proto;
+
+ auto* hostSetting = proto.AddHostSettings();
+ hostSetting->SetName("TestHostSetting");
+ hostSetting->SetValue("true");
+ hostSetting->MutableActivation()->SetPercentage(50);
+
+ auto* udfSettings = proto.AddUdfSettings();
+ udfSettings->SetModule("MyModule");
+ auto* udfSetting = udfSettings->AddRuntimeSettings();
+ udfSetting->SetName("Key");
+ udfSetting->SetValue("Val");
+ udfSetting->MutableActivation()->SetPercentage(50);
+
+ constexpr int Iterations = 400;
+ for (int i = 0; i < Iterations; ++i) {
+ auto qStorage = MakeMemoryQStorage();
+
+ auto writer = qStorage->MakeWriter("op", {});
+ auto capturedConfig = CreateRuntimeSettingsFromProto(
+ proto, TString{}, nullptr, TQContext(writer));
+ writer->Commit().GetValueSync();
+
+ const bool capturedHostActivated = capturedConfig->TestHostSetting.Get();
+ const bool capturedUdfActivated = !capturedConfig->GetUdfSetting("MyModule", "Key").empty();
+
+ auto replayedConfig = CreateRuntimeSettingsFromProto(
+ proto, TString{}, nullptr, TQContext(qStorage->MakeReader("op", {})));
+
+ UNIT_ASSERT_VALUES_EQUAL(capturedHostActivated, replayedConfig->TestHostSetting.Get());
+ UNIT_ASSERT_VALUES_EQUAL(capturedUdfActivated, !replayedConfig->GetUdfSetting("MyModule", "Key").empty());
+ }
+}
+
} // Y_UNIT_TEST_SUITE(TRuntimeSettingsSerializationTest)
} // namespace NYql
diff --git a/yql/essentials/minikql/runtime_settings/ut/ya.make b/yql/essentials/minikql/runtime_settings/ut/ya.make
index 6c39edb2b5d..7fe0ce37c4a 100644
--- a/yql/essentials/minikql/runtime_settings/ut/ya.make
+++ b/yql/essentials/minikql/runtime_settings/ut/ya.make
@@ -8,6 +8,7 @@ PEERDIR(
yql/essentials/public/udf/service/stub
yql/essentials/public/udf
yql/essentials/core/credentials
+ yql/essentials/core/qplayer/storage/memory
yql/essentials/providers/common/activation
)
diff --git a/yql/essentials/providers/common/config/ut/ya.make b/yql/essentials/providers/common/config/ut/ya.make
index d02b9ebdf0a..0c06da98c57 100644
--- a/yql/essentials/providers/common/config/ut/ya.make
+++ b/yql/essentials/providers/common/config/ut/ya.make
@@ -2,6 +2,11 @@ UNITTEST_FOR(yql/essentials/providers/common/config)
SRCS(
yql_config_ut.cpp
+ yql_config_qplayer_ut.cpp
+)
+
+PEERDIR(
+ yql/essentials/core/qplayer/storage/memory
)
YQL_LAST_ABI_VERSION()
diff --git a/yql/essentials/providers/common/config/ut/yql_config_qplayer_ut.cpp b/yql/essentials/providers/common/config/ut/yql_config_qplayer_ut.cpp
new file mode 100644
index 00000000000..4d6e8f1507a
--- /dev/null
+++ b/yql/essentials/providers/common/config/ut/yql_config_qplayer_ut.cpp
@@ -0,0 +1,200 @@
+#include <yql/essentials/providers/common/config/yql_config_qplayer.h>
+#include <yql/essentials/core/qplayer/storage/memory/yql_qstorage_memory.h>
+
+#include <library/cpp/yson/node/node_io.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/string/split.h>
+
+namespace NYql::NCommon {
+
+namespace {
+
+struct TTestAttr {
+ TString Name;
+ TString Value;
+ bool WithActivation = false;
+ bool MustBeActivated = true;
+
+ const TString& GetName() const {
+ return Name;
+ }
+
+ bool HasActivation() const {
+ return WithActivation;
+ }
+
+ bool SerializeToString(TString* out) const {
+ *out = Name + "|" + Value + "|" + (WithActivation ? "1" : "0");
+ return true;
+ }
+
+ bool ParseFromString(const TString& data) {
+ TVector<TStringBuf> fields;
+ StringSplitter(data).Split('|').AddTo(&fields);
+ if (fields.size() != 3) {
+ return false;
+ }
+ Name = TString(fields[0]);
+ Value = TString(fields[1]);
+ WithActivation = (fields[2] == "1");
+ return true;
+ }
+};
+
+auto MakeActivationFilter() {
+ return [](const TTestAttr& attr) {
+ return attr.MustBeActivated;
+ };
+}
+
+TQContext MakeWriteContext(IQStoragePtr storage) {
+ return TQContext(storage->MakeWriter("op", {}));
+}
+
+TQContext MakeReadContext(IQStoragePtr storage) {
+ return TQContext(storage->MakeReader("op", {}));
+}
+
+struct TExpectedFlag {
+ TString Name;
+ TString Value;
+};
+
+TVector<TExpectedFlag> ToExpected(const TVector<TTestAttr>& flags) {
+ TVector<TExpectedFlag> result;
+ for (const auto& flag : flags) {
+ result.push_back({.Name = flag.Name, .Value = flag.Value});
+ }
+ return result;
+}
+
+} // namespace
+
+Y_UNIT_TEST_SUITE(TActivationFlagsQPlayerTest) {
+
+struct TCaptureReplayCase {
+ TVector<TTestAttr> Source;
+ TVector<TExpectedFlag> Expected;
+};
+
+Y_UNIT_TEST(CaptureAndReplayProduceSameFlags) {
+ const TVector<TCaptureReplayCase> cases = {
+ {
+ .Source = {
+ TTestAttr{.Name = "plain_a", .Value = "val_a", .WithActivation = false, .MustBeActivated = true},
+ TTestAttr{.Name = "activated_b", .Value = "val_b", .WithActivation = true, .MustBeActivated = true},
+ TTestAttr{.Name = "skipped_c", .Value = "val_c", .WithActivation = true, .MustBeActivated = false},
+ TTestAttr{.Name = "plain_d", .Value = "val_d", .WithActivation = false, .MustBeActivated = true},
+ },
+ .Expected = {{.Name = "plain_a", .Value = "val_a"}, {.Name = "activated_b", .Value = "val_b"}, {.Name = "plain_d", .Value = "val_d"}},
+ },
+ {
+ .Source = {
+ TTestAttr{.Name = "plain_a", .Value = "val_a", .WithActivation = false, .MustBeActivated = true},
+ TTestAttr{.Name = "skipped_b", .Value = "val_b", .WithActivation = true, .MustBeActivated = false},
+ },
+ .Expected = {{.Name = "plain_a", .Value = "val_a"}},
+ },
+ {
+ .Source = {
+ TTestAttr{.Name = "only_activated", .Value = "val_x", .WithActivation = true, .MustBeActivated = true},
+ },
+ .Expected = {{.Name = "only_activated", .Value = "val_x"}},
+ },
+ {
+ .Source = {
+ TTestAttr{.Name = "plain_only", .Value = "val_y", .WithActivation = false, .MustBeActivated = true},
+ },
+ .Expected = {{.Name = "plain_only", .Value = "val_y"}},
+ },
+ };
+
+ for (const auto& testCase : cases) {
+ auto storage = MakeMemoryQStorage();
+
+ TVector<TExpectedFlag> capturedExpected;
+ {
+ auto writeContext = MakeWriteContext(storage);
+ auto capturedFlags = SelectAndSaveActivatedFlags<TTestAttr>(
+ "label", writeContext, testCase.Source, MakeActivationFilter(), /*hasProviderName=*/true);
+ capturedExpected = ToExpected(capturedFlags);
+ writeContext.GetWriter()->Commit().GetValueSync();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(testCase.Expected.size(), capturedExpected.size());
+ for (size_t idx = 0; idx < testCase.Expected.size(); ++idx) {
+ UNIT_ASSERT_VALUES_EQUAL(testCase.Expected[idx].Name, capturedExpected[idx].Name);
+ UNIT_ASSERT_VALUES_EQUAL(testCase.Expected[idx].Value, capturedExpected[idx].Value);
+ }
+
+ auto readContext = MakeReadContext(storage);
+ auto replayedFlags = SelectAndSaveActivatedFlags<TTestAttr>(
+ "label", readContext, testCase.Source, MakeActivationFilter(), false);
+ auto replayedActual = ToExpected(replayedFlags);
+
+ UNIT_ASSERT_VALUES_EQUAL(capturedExpected.size(), replayedActual.size());
+ for (size_t idx = 0; idx < capturedExpected.size(); ++idx) {
+ UNIT_ASSERT_VALUES_EQUAL(capturedExpected[idx].Name, replayedActual[idx].Name);
+ UNIT_ASSERT_VALUES_EQUAL(capturedExpected[idx].Value, replayedActual[idx].Value);
+ }
+ }
+}
+
+Y_UNIT_TEST(OldFormatFlagsReturnedAsIs) {
+ auto storage = MakeMemoryQStorage();
+ {
+ TTestAttr plainAttr = {.Name = "plain_a", .Value = "value_a", .WithActivation = false};
+ TTestAttr activatedAttr = {.Name = "activated_b", .Value = "value_b", .WithActivation = true};
+ auto oldFormatNode = NYT::TNode::CreateMap();
+ TString serializedPlain;
+ TString serializedActivated;
+ Y_ENSURE(plainAttr.SerializeToString(&serializedPlain));
+ Y_ENSURE(activatedAttr.SerializeToString(&serializedActivated));
+ oldFormatNode[plainAttr.Name] = serializedPlain;
+ oldFormatNode[activatedAttr.Name] = serializedActivated;
+ auto yson = NYT::NodeToYsonString(oldFormatNode, NYT::NYson::EYsonFormat::Binary);
+ auto writer = storage->MakeWriter("op", {});
+ writer->Put({.Component = TString(NPrivate::QplayerActivationComponent), .Label = "label"}, yson).GetValueSync();
+ writer->Commit().GetValueSync();
+ }
+
+ const TVector<TTestAttr> source = {
+ TTestAttr{.Name = "plain_a", .Value = "value_a", .WithActivation = false},
+ TTestAttr{.Name = "activated_b", .Value = "value_b_other", .WithActivation = true},
+ TTestAttr{.Name = "not_in_store", .Value = "value_x", .WithActivation = true},
+ };
+
+ auto readContext = MakeReadContext(storage);
+ auto replayedFlags = SelectAndSaveActivatedFlags<TTestAttr>(
+ "label", readContext, source, MakeActivationFilter(), false);
+
+ UNIT_ASSERT_VALUES_EQUAL(2U, replayedFlags.size());
+ THashMap<TString, TString> replayedByName;
+ for (const auto& flag : replayedFlags) {
+ replayedByName[flag.Name] = flag.Value;
+ }
+ UNIT_ASSERT_VALUES_EQUAL("value_a", replayedByName["plain_a"]);
+ UNIT_ASSERT_VALUES_EQUAL("value_b", replayedByName["activated_b"]);
+}
+
+Y_UNIT_TEST(FilterCalledOncePerItem) {
+ const TVector<TTestAttr> source = {
+ TTestAttr{.Name = "plain_a", .Value = "value_a", .WithActivation = false},
+ TTestAttr{.Name = "activated_b", .Value = "value_b", .WithActivation = true},
+ TTestAttr{.Name = "activated_c", .Value = "value_c", .WithActivation = true},
+ };
+ int filterCallCount = 0;
+ auto countingFilter = [&](const TTestAttr&) {
+ ++filterCallCount;
+ return true;
+ };
+
+ TQContext emptyContext;
+ SelectAndSaveActivatedFlags<TTestAttr>("label", emptyContext, source, countingFilter, false);
+ UNIT_ASSERT_VALUES_EQUAL(static_cast<int>(source.size()), filterCallCount);
+}
+
+} // Y_UNIT_TEST_SUITE(TActivationFlagsQPlayerTest)
+
+} // namespace NYql::NCommon
diff --git a/yql/essentials/providers/common/config/yql_config_qplayer.h b/yql/essentials/providers/common/config/yql_config_qplayer.h
index 99f5a7be218..71da71d846b 100644
--- a/yql/essentials/providers/common/config/yql_config_qplayer.h
+++ b/yql/essentials/providers/common/config/yql_config_qplayer.h
@@ -6,46 +6,148 @@
#include <library/cpp/yson/node/node_io.h>
+#include <util/generic/hash_set.h>
+#include <util/generic/maybe.h>
#include <util/generic/vector.h>
+#include <util/generic/overloaded.h>
+
+#include <variant>
namespace NYql::NCommon {
-inline const TString ActivationComponent = "Activation";
+namespace NPrivate {
+
+inline constexpr TStringBuf QplayerActivationComponent = "Activation";
+inline constexpr TStringBuf QplayerActivationVersion2 = "v2";
+inline constexpr TStringBuf QplayerActivationVersion1 = "v1";
+
+inline constexpr TStringBuf QPlayerActivationVersionKey = "version";
+inline constexpr TStringBuf QPlayerActivationIndexesKey = "indexes";
+
+enum class EQContextFlagsVersion {
+ V1,
+ V2
+};
+
+inline EQContextFlagsVersion EQContextFlagsVersionFromString(TStringBuf versionStr) {
+ if (versionStr == QplayerActivationVersion1) {
+ return EQContextFlagsVersion::V1;
+ }
+ if (versionStr == QplayerActivationVersion2) {
+ return EQContextFlagsVersion::V2;
+ }
+ ythrow yexception() << "Unknown activation storage version: " << versionStr;
+}
+
+template <typename TAttribute>
+using TActivationLoadResult = std::variant<TVector<TAttribute>, THashSet<size_t>>;
template <typename TAttribute>
-TMaybe<TVector<TAttribute>> LoadActivatedFlagsFromQContext(const TString& activationLabel, const TQContext& qContext) {
- TMaybe<TVector<TAttribute>> loadedFlags;
+TMaybe<TActivationLoadResult<TAttribute>> LoadActivatedFlagsFromQContext(const TString& activationLabel, const TQContext& qContext) {
if (!qContext.CanRead()) {
- return loadedFlags;
+ return Nothing();
+ }
+ auto loaded = qContext.GetReader()->Get({.Component = TString(QplayerActivationComponent), .Label = activationLabel}).GetValueSync();
+ if (!loaded) {
+ return Nothing();
+ }
+ auto flagsNode = NYT::NodeFromYsonString(loaded->Value);
+ EQContextFlagsVersion version = EQContextFlagsVersion::V1;
+ if (flagsNode.HasKey(QPlayerActivationVersionKey)) {
+ version = EQContextFlagsVersionFromString(flagsNode[QPlayerActivationVersionKey].AsString());
}
- if (auto loaded = qContext.GetReader()->Get({.Component = ActivationComponent, .Label = activationLabel}).GetValueSync()) {
- auto flagsNode = NYT::NodeFromYsonString(loaded->Value);
- TVector<TAttribute> flags;
- for (const auto& [flagName, flagValue] : flagsNode.AsMap()) {
- TAttribute flag;
- YQL_ENSURE(flag.ParseFromString(flagValue.AsString()));
- flags.emplace_back(std::move(flag));
+
+ switch (version) {
+ case EQContextFlagsVersion::V1: {
+ TVector<TAttribute> storedFlags;
+ for (const auto& [flagName, flagValue] : flagsNode.AsMap()) {
+ TAttribute flag;
+ YQL_ENSURE(flag.ParseFromString(flagValue.AsString()));
+ storedFlags.emplace_back(std::move(flag));
+ }
+ return {std::move(storedFlags)};
+ }
+
+ case EQContextFlagsVersion::V2: {
+ THashSet<size_t> activatedIndexes;
+ for (const auto& indexNode : flagsNode[QPlayerActivationIndexesKey].AsList()) {
+ activatedIndexes.insert(static_cast<size_t>(indexNode.AsInt64()));
+ }
+ return {std::move(activatedIndexes)};
}
- loadedFlags = std::move(flags);
- YQL_CLOG(INFO, ProviderConfig) << activationLabel << " activated flags are loaded at replay mode";
}
- return loadedFlags;
}
-template <typename TAttribute>
-void SaveActivatedFlagsToQContext(const TVector<TAttribute>& flags, const TString& activationLabel, const TQContext& qContext) {
+inline void SaveActivatedFlagsToQContext(const TVector<size_t>& activatedIndexes, const TString& activationLabel, const TQContext& qContext) {
if (!qContext.CanWrite()) {
return;
}
- auto flagsNode = NYT::TNode::CreateMap();
- for (const auto& flag : flags) {
- TString data;
- YQL_ENSURE(flag.SerializeToString(&data));
- flagsNode[flag.GetName()] = std::move(data);
+ auto node = NYT::TNode::CreateMap();
+ node[QPlayerActivationVersionKey] = QplayerActivationVersion2;
+ auto indexList = NYT::TNode::CreateList();
+ for (size_t index : activatedIndexes) {
+ indexList.Add(static_cast<i64>(index));
}
- auto flagsYson = NYT::NodeToYsonString(flagsNode, NYT::NYson::EYsonFormat::Binary);
- qContext.GetWriter()->Put({.Component = ActivationComponent, .Label = activationLabel}, flagsYson).GetValueSync();
+ node[QPlayerActivationIndexesKey] = std::move(indexList);
+ auto yson = NYT::NodeToYsonString(node, NYT::NYson::EYsonFormat::Binary);
+ qContext.GetWriter()->Put({.Component = TString(QplayerActivationComponent), .Label = activationLabel}, yson).GetValueSync();
YQL_CLOG(INFO, ProviderConfig) << activationLabel << " activated flags are saved to QStorage";
}
+template <typename TAttribute, typename TContainer>
+TVector<TAttribute> CollectFlagsForReplayV2(const TContainer& source, const THashSet<size_t>& activatedIndexes) {
+ TVector<TAttribute> flags;
+ for (size_t idx = 0; idx < static_cast<size_t>(source.size()); ++idx) {
+ if (!source[idx].HasActivation() || activatedIndexes.contains(idx)) {
+ flags.emplace_back(source[idx]);
+ }
+ }
+ return flags;
+}
+
+template <typename TAttribute, typename TContainer, typename TFilter>
+TVector<TAttribute> FilterSourceAndSaveIndexes(
+ const TContainer& source,
+ const TFilter& filter,
+ const TString& activationLabel,
+ const TQContext& qContext,
+ bool hasProviderName)
+{
+ TVector<TAttribute> flags;
+ TVector<size_t> activatedIndexes;
+ for (size_t idx = 0; idx < static_cast<size_t>(source.size()); ++idx) {
+ if (filter(source[idx])) {
+ flags.emplace_back(source[idx]);
+ activatedIndexes.push_back(idx);
+ }
+ }
+ if (hasProviderName) {
+ SaveActivatedFlagsToQContext(activatedIndexes, activationLabel, qContext);
+ }
+ return flags;
+}
+
+} // namespace NPrivate
+
+template <typename TAttribute, typename TContainer, typename TFilter>
+TVector<TAttribute> SelectAndSaveActivatedFlags(
+ const TString& activationLabel,
+ const TQContext& qContext,
+ const TContainer& source,
+ const TFilter& filter,
+ bool hasProviderName)
+{
+ if (auto loaded = NPrivate::LoadActivatedFlagsFromQContext<TAttribute>(activationLabel, qContext)) {
+ YQL_CLOG(INFO, ProviderConfig) << activationLabel << " activated flags are loaded at replay mode";
+ return std::visit(TOverloaded{
+ [](const TVector<TAttribute>& flags) {
+ return flags;
+ },
+ [&](const THashSet<size_t>& indexes) {
+ return NPrivate::CollectFlagsForReplayV2<TAttribute>(source, indexes);
+ }}, *loaded);
+ }
+ return NPrivate::FilterSourceAndSaveIndexes<TAttribute>(source, filter, activationLabel, qContext, hasProviderName);
+}
+
} // namespace NYql::NCommon
diff --git a/yql/essentials/providers/common/config/yql_dispatch.cpp b/yql/essentials/providers/common/config/yql_dispatch.cpp
index 1f4c8083c47..cb36f83046e 100644
--- a/yql/essentials/providers/common/config/yql_dispatch.cpp
+++ b/yql/essentials/providers/common/config/yql_dispatch.cpp
@@ -9,7 +9,7 @@
#include <util/random/random.h>
#include <util/datetime/base.h>
-namespace NYql {
+namespace NYql::NCommon {
namespace NPrivate {
@@ -143,8 +143,6 @@ YQL_CONTAINER_SETTING_PARSER_TYPES(YQL_DEFINE_CONTAINER_SETTING_SERIALIZER)
} // namespace NPrivate
-namespace NCommon {
-
bool TSettingDispatcher::IsRuntime(const TString& name) {
auto normalizedName = NormalizeName(name);
if (auto handler = Handlers_.Value(normalizedName, TSettingHandler::TPtr())) {
@@ -264,5 +262,4 @@ TSettingDispatcher::TErrorCallback TSettingDispatcher::GetErrorCallback(TPositio
};
}
-} // namespace NCommon
-} // namespace NYql
+} // namespace NYql::NCommon
diff --git a/yql/essentials/providers/common/config/yql_dispatch.h b/yql/essentials/providers/common/config/yql_dispatch.h
index 41d2f1cbd38..4cf5aeeb421 100644
--- a/yql/essentials/providers/common/config/yql_dispatch.h
+++ b/yql/essentials/providers/common/config/yql_dispatch.h
@@ -28,7 +28,7 @@
#include <ranges>
#include <utility>
-namespace NYql {
+namespace NYql::NCommon {
namespace NPrivate {
@@ -148,8 +148,6 @@ concept AttributeFilter = std::predicate<TFilter, std::ranges::range_value_t<TCo
} // namespace NPrivate
-namespace NCommon {
-
class TSettingDispatcher: public TThrRefBase {
public:
using TPtr = TIntrusivePtr<TSettingDispatcher>;
@@ -199,8 +197,8 @@ public:
TSettingHandlerImpl(const TString& name, TConfSetting<TType, SettingType>& setting)
: TSettingHandler(name)
, Setting_(setting)
- , Parser_(::NYql::NPrivate::GetDefaultParser<TType>())
- , Serializer_(::NYql::NPrivate::GetDefaultSerializer<TType>())
+ , Parser_(NPrivate::GetDefaultParser<TType>())
+ , Serializer_(NPrivate::GetDefaultSerializer<TType>())
, ValueSetter_([this](const TString& cluster, TType value) {
Setting_[cluster] = value;
})
@@ -337,22 +335,22 @@ public:
return *this;
}
- TSettingHandlerImpl& Parser(::NYql::NPrivate::TParser<TType>&& parser) {
+ TSettingHandlerImpl& Parser(NPrivate::TParser<TType>&& parser) {
Parser_ = std::move(parser);
return *this;
}
- TSettingHandlerImpl& Parser(const ::NYql::NPrivate::TParser<TType>& parser) {
+ TSettingHandlerImpl& Parser(const NPrivate::TParser<TType>& parser) {
Parser_ = parser;
return *this;
}
- TSettingHandlerImpl& Serializer(::NYql::NPrivate::TSerializer<TType>&& serializer) {
+ TSettingHandlerImpl& Serializer(NPrivate::TSerializer<TType>&& serializer) {
Serializer_ = std::move(serializer);
return *this;
}
- TSettingHandlerImpl& Serializer(const ::NYql::NPrivate::TSerializer<TType>& serializer) {
+ TSettingHandlerImpl& Serializer(const NPrivate::TSerializer<TType>& serializer) {
Serializer_ = serializer;
return *this;
}
@@ -415,8 +413,8 @@ public:
private:
TConfSetting<TType, SettingType>& Setting_;
TMaybe<TConfSetting<TType, SettingType>> Default_;
- ::NYql::NPrivate::TParser<TType> Parser_;
- ::NYql::NPrivate::TSerializer<TType> Serializer_;
+ NPrivate::TParser<TType> Parser_;
+ NPrivate::TSerializer<TType> Serializer_;
TValueCallback ValueSetter_;
TVector<TValueCallback> Validators_;
TString Warning_;
@@ -479,26 +477,18 @@ public:
auto errorCallback = GetDefaultErrorCallback();
TString activationLabel = TStringBuilder() << ProviderName_ << "_" << cluster;
- TVector<TAttribute> flags;
- if (auto loadedFlags = NCommon::LoadActivatedFlagsFromQContext<TAttribute>(activationLabel, QContext_)) {
- flags = std::move(*loadedFlags);
- } else {
- CopyIf(clusterValues.begin(), clusterValues.end(), std::back_inserter(flags), filter);
- }
+ const auto flags = NCommon::SelectAndSaveActivatedFlags<TAttribute>(
+ activationLabel, QContext_, clusterValues, filter, !ProviderName_.empty());
for (const auto& flag : flags) {
Dispatch(cluster, flag.GetName(), flag.GetValue(), EStage::CONFIG, errorCallback);
}
-
- if (ProviderName_) {
- NCommon::SaveActivatedFlagsToQContext<TAttribute>(flags, activationLabel, QContext_);
- }
}
template <NPrivate::ConfigFeatureList TContainer>
void Dispatch(const TString& cluster, const TContainer& clusterValues) {
auto errorCallback = GetDefaultErrorCallback();
- for (auto& v : clusterValues) {
+ for (const auto& v : clusterValues) {
Dispatch(cluster, v.GetName(), v.GetValue(), EStage::CONFIG, errorCallback);
}
}
@@ -529,8 +519,7 @@ protected:
const TQContext QContext_;
};
-} // namespace NCommon
-} // namespace NYql
+} // namespace NYql::NCommon
#define REGISTER_SETTING(dispatcher, setting) \
(dispatcher).AddSetting(#setting, setting)
diff --git a/yql/essentials/providers/config/yql_config_provider.cpp b/yql/essentials/providers/config/yql_config_provider.cpp
index d1265c7bb18..f3b07999c91 100644
--- a/yql/essentials/providers/config/yql_config_provider.cpp
+++ b/yql/essentials/providers/config/yql_config_provider.cpp
@@ -167,13 +167,8 @@ public:
};
if (CoreConfig_) {
TPosition pos;
- TVector<TCoreAttr> flags;
- if (auto loadedFlags = NCommon::LoadActivatedFlagsFromQContext<TCoreAttr>(YqlCoreActivationLabel, Types_.QContext)) {
- flags = std::move(*loadedFlags);
- } else {
- const auto& configFlags = CoreConfig_->GetFlags();
- CopyIf(configFlags.begin(), configFlags.end(), std::back_inserter(flags), filter);
- }
+ const auto flags = NCommon::SelectAndSaveActivatedFlags<TCoreAttr>(
+ YqlCoreActivationLabel, Types_.QContext, CoreConfig_->GetFlags(), filter, /*hasProviderName=*/true);
for (const auto& flag : flags) {
const auto& flagArgs = flag.GetArgs();
TVector<TStringBuf> args(flagArgs.begin(), flagArgs.end());
@@ -181,7 +176,6 @@ public:
return false;
}
}
- NCommon::SaveActivatedFlagsToQContext<TCoreAttr>(flags, YqlCoreActivationLabel, Types_.QContext);
}
return true;
}
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index 225160ebc27..133bd411f5a 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -4804,7 +4804,7 @@ private:
(execCtx->Config_->HasExecuteUdfLocallyIfPossible()
? execCtx->Config_->GetExecuteUdfLocallyIfPossible() : false);
bool hasLayerPaths = false;
- if constexpr (NPrivate::THasLayersPaths<TRunOptions>::value) {
+ if constexpr (::NYql::NPrivate::THasLayersPaths<TRunOptions>::value) {
hasLayerPaths |= !execCtx->Options_.LayersPaths().empty();
localRun &= execCtx->Options_.LayersPaths().empty();
}
@@ -5516,7 +5516,7 @@ private:
auto tmpFiles = MakeIntrusive<TTempFiles>(execCtx->FileStorage_->GetTemp());
bool localRun = execCtx->Config_->HasExecuteUdfLocallyIfPossible() ? execCtx->Config_->GetExecuteUdfLocallyIfPossible() : false;
bool hasLayerPaths = false;
- if constexpr (NPrivate::THasLayersPaths<decltype(execCtx->Options_)>::value) {
+ if constexpr (::NYql::NPrivate::THasLayersPaths<decltype(execCtx->Options_)>::value) {
hasLayerPaths |= !execCtx->Options_.LayersPaths().empty();
localRun &= execCtx->Options_.LayersPaths().empty();
}
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_spec.h b/yt/yql/providers/yt/gateway/native/yql_yt_spec.h
index 7418016d17f..4f076eb3060 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_spec.h
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_spec.h
@@ -82,15 +82,15 @@ inline void FillSpec(NYT::TNode& spec,
EYtOpProps opProps = 0)
{
TSet<TString> addSecTags = {};
- if constexpr (NPrivate::THasAdditionalSecurityTags<TOptions>::value) {
+ if constexpr (::NYql::NPrivate::THasAdditionalSecurityTags<TOptions>::value) {
addSecTags = execCtx.Options_.AdditionalSecurityTags();
}
- if constexpr (NPrivate::THasLayersPaths<TOptions>::value) {
+ if constexpr (::NYql::NPrivate::THasLayersPaths<TOptions>::value) {
FillSpec(spec, execCtx, execCtx.Options_.Config(), entry, extraCpu, secondExtraCpu, opProps, addSecTags, execCtx.Options_.LayersPaths());
} else {
FillSpec(spec, execCtx, execCtx.Options_.Config(), entry, extraCpu, secondExtraCpu, opProps, addSecTags, {});
}
- if constexpr (NPrivate::THasSecureParams<TOptions>::value) {
+ if constexpr (::NYql::NPrivate::THasSecureParams<TOptions>::value) {
FillSecureVault(spec, execCtx.Options_.SecureParams());
}
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
index 93c91ea1c52..e79d35184fa 100644
--- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
@@ -406,7 +406,7 @@ public:
}
if (pragma == "pooltrees" && node.ChildrenSize() >= 5) {
- auto pools = NPrivate::GetDefaultParser<TVector<TString>>()(TString{node.Child(4)->Content()});
+ auto pools = NCommon::NPrivate::GetDefaultParser<TVector<TString>>()(TString{node.Child(4)->Content()});
for (const auto& pool : pools) {
if (!POOL_TREES_WHITELIST.contains(pool)) {
AddInfo(ctx, TStringBuilder() << "unsupported pool tree: " << pool, skipIssues);