diff options
| author | atarasov5 <[email protected]> | 2026-05-20 11:09:21 +0300 |
|---|---|---|
| committer | atarasov5 <[email protected]> | 2026-05-20 11:36:16 +0300 |
| commit | 7610b2d650b671e194c00a54ec5a7950ae81e957 (patch) | |
| tree | 52e7a96d2c18e42ab9ee44d95b292b653750c563 /yql | |
| parent | 22fa9f58b3d741847907a1263cb5f0ad30106a18 (diff) | |
YQL-19813: Save and load qplayer
**Режим capture** (qContext.CanWrite())
1. Проходим по всем элементам source с индексом idx.
2. Для каждого элемента вызываем filter(item) ровно один раз (фильтр может иметь побочные эффекты — например, запись статистики об активации).
3. Элементы, прошедшие фильтр, добавляются в возвращаемый результат.
4. Сохраняем индексы всех прошедших фильтр элементов в QStorage в формате V2:
\{"version": "v2", "indexes": \[0, 2, 5, ...\]\}
---
**Режим replay** (qContext.CanRead())
1. Читаем YSON-запись из QStorage по ключу \{Component="Activation", Label=label\}.
2. Определяем версию формата по наличию ключа "version":
**Формат V1** (старый, обратная совместимость) — ключ "version" отсутствует:
- Структура: \{"имя\_флага": сериализованный\_proto, ...\}
- Десериализуем каждый флаг через ParseFromString, возвращаем список напрямую.
- Фильтр не вызывается. Список source игнорируется.
**Формат V2 (текущий)** — "version": "v2":
- Структура: \{"version": "v2", "indexes": \[0, 2, 5, ...\]\}
- Читаем сохранённое множество индексов.
- Проходим по source с *индексом* idx:
- Элемент без условия активации (!HasActivation()) → всегда включается (такие элементы не зависят от активации).
- Элемент с условием активации (HasActivation()) → включается только если его idx есть в сохранённом множестве.
- Фильтр не вызывается.
commit_hash:306113465bcd614cd3c0cd311ab20542d1f8e254
Diffstat (limited to 'yql')
13 files changed, 418 insertions, 107 deletions
diff --git a/yql/essentials/core/facade/yql_facade.cpp b/yql/essentials/core/facade/yql_facade.cpp index 3ca85ea29ff..e2f6cc5343b 100644 --- a/yql/essentials/core/facade/yql_facade.cpp +++ b/yql/essentials/core/facade/yql_facade.cpp @@ -2089,7 +2089,7 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us typeAnnotationContext->AddRemoteLayersProvider(alias, provider); } if (GatewaysConfig_) { - typeAnnotationContext->RuntimeSettings = CreateRuntimeSettingsFromProto(GatewaysConfig_->GetRuntimeSettings(), username, Credentials_); + typeAnnotationContext->RuntimeSettings = CreateRuntimeSettingsFromProto(GatewaysConfig_->GetRuntimeSettings(), username, Credentials_, QContext_); } if (UdfIndex_ && UdfIndexPackageSet_) { // setup default versions at the beginning diff --git a/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.cpp b/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.cpp index 21e3e9277d7..0d2f1a4d197 100644 --- a/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.cpp +++ b/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.cpp @@ -10,8 +10,14 @@ TRuntimeSettingsConfiguration::TRuntimeSettingsConfiguration() { } -TRuntimeSettingsConfiguration::TRuntimeSettingsConfiguration(const TRuntimeSettings& settings) - : TRuntimeSettings(settings) +TRuntimeSettingsConfiguration::TRuntimeSettingsConfiguration(const TQContext& QContext) + : TRuntimeSettingsConfiguration(TRuntimeSettings(), QContext) +{ +} + +TRuntimeSettingsConfiguration::TRuntimeSettingsConfiguration(const TRuntimeSettings& settings, const TQContext& QContext) + : TSettingDispatcher("runtime_settings", QContext) + , TRuntimeSettings(settings) { REGISTER_RUNTIME_SETTING(DatumValidation); REGISTER_RUNTIME_SETTING(TestHostSetting); diff --git a/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.h b/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.h index 64e210a1b0c..735570e71ff 100644 --- a/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.h +++ b/yql/essentials/minikql/runtime_settings/runtime_settings_configuration.h @@ -14,7 +14,8 @@ public: using TConstPtr = TSharedPtr<const TRuntimeSettingsConfiguration, TAtomicCounter>; TRuntimeSettingsConfiguration(); - explicit TRuntimeSettingsConfiguration(const TRuntimeSettings& settings); + explicit TRuntimeSettingsConfiguration(const TQContext& QContext); + explicit TRuntimeSettingsConfiguration(const TRuntimeSettings& settings, const TQContext& QContext = {}); }; TRuntimeSettingsConfiguration::TConstPtr MakeRuntimeSettingsConfiguration(auto&&... args) { diff --git a/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.cpp b/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.cpp index a63eccb3c0b..ad2c87b7a8f 100644 --- a/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.cpp +++ b/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.cpp @@ -10,13 +10,15 @@ namespace { void FillUdfSettings( TRuntimeSettingsConfiguration& config, const google::protobuf::RepeatedPtrField<NProto::TUdfSettings>& udfSettings, - const std::function<bool(const NProto::TRuntimeSetting&)>& filter) + const std::function<bool(const NProto::TRuntimeSetting&)>& filter, + const TQContext& qContext) { for (const auto& udf : udfSettings) { - for (const auto& setting : udf.GetRuntimeSettings()) { - if (filter(setting)) { - config.SetUdfSetting(udf.GetModule(), setting.GetName(), setting.GetValue()); - } + TString activationLabel = TStringBuilder() << "runtime_settings_udf_" << udf.GetModule(); + auto flags = NCommon::SelectAndSaveActivatedFlags<NProto::TRuntimeSetting>( + activationLabel, qContext, udf.GetRuntimeSettings(), filter, /*hasProviderName=*/true); + for (const auto& setting : flags) { + config.SetUdfSetting(udf.GetModule(), setting.GetName(), setting.GetValue()); } } } @@ -25,27 +27,24 @@ TRuntimeSettings::TPtr CreateRuntimeSettingsFromProtoImpl( const NProto::TRuntimeSettings& proto, const TString& userName, TCredentials::TPtr credentials, - bool allowActivation) + bool allowActivation, + const TQContext& qContext) { - auto config = MakeRuntimeSettingsConfigurationMutable(); + auto config = MakeRuntimeSettingsConfigurationMutable(qContext); auto filter = NConfig::MakeActivationFilter<NProto::TRuntimeSetting>(userName, credentials, [&](const TString&) { YQL_ENSURE(allowActivation, "Activation is not allowed. " "Seems you are trying to load runtime settings with activation but all settings must be already activated."); }); config->Dispatch(proto.GetHostSettings(), filter); - FillUdfSettings(*config, proto.GetUdfSettings(), filter); + FillUdfSettings(*config, proto.GetUdfSettings(), filter, qContext); return config; } -TRuntimeSettings::TPtr CreateRuntimeSettingsFromStringImpl( - const TString& data, - const TString& userName, - TCredentials::TPtr credentials, - bool allowActivation) +TRuntimeSettings::TPtr CreateRuntimeSettingsFromStringImpl(const TString& data) { NProto::TRuntimeSettings proto; proto.ParseFromStringOrThrow(data); - return CreateRuntimeSettingsFromProtoImpl(proto, userName, credentials, allowActivation); + return CreateRuntimeSettingsFromProtoImpl(proto, "", nullptr, /*allowActivation=*/false, TQContext()); } } // namespace @@ -53,15 +52,16 @@ TRuntimeSettings::TPtr CreateRuntimeSettingsFromStringImpl( TRuntimeSettings::TPtr CreateRuntimeSettingsFromProto( const NProto::TRuntimeSettings& proto, const TString& userName, - TCredentials::TPtr credentials) + TCredentials::TPtr credentials, + const TQContext& qContext) { - return CreateRuntimeSettingsFromProtoImpl(proto, userName, credentials, /*allowActivation=*/true); + return CreateRuntimeSettingsFromProtoImpl(proto, userName, credentials, /*allowActivation=*/true, qContext); } TRuntimeSettings::TPtr DeserializeRuntimeSettingsFromProto( const NProto::TRuntimeSettings& proto) { - return CreateRuntimeSettingsFromProtoImpl(proto, "", nullptr, /*allowActivation=*/false); + return CreateRuntimeSettingsFromProtoImpl(proto, "", nullptr, /*allowActivation=*/false, TQContext()); } NProto::TRuntimeSettings SerializeRuntimeSettingsToProto( @@ -95,19 +95,9 @@ TString SerializeRuntimeSettingsToString(const TRuntimeSettings& config) { } TRuntimeSettings::TPtr CreateRuntimeSettingsFromString( - const TString& data, - const TString& userName, - TCredentials::TPtr credentials) -{ - return CreateRuntimeSettingsFromStringImpl(data, userName, credentials, /*allowActivation=*/true); -} - -TRuntimeSettings::TPtr CreateRuntimeSettingsFromString( const TString& data) { - NProto::TRuntimeSettings proto; - proto.ParseFromStringOrThrow(data); - return CreateRuntimeSettingsFromStringImpl(data, "", nullptr, /*allowActivation=*/false); + return CreateRuntimeSettingsFromStringImpl(data); } } // namespace NYql diff --git a/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.h b/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.h index e358a783248..6375769b6a2 100644 --- a/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.h +++ b/yql/essentials/minikql/runtime_settings/runtime_settings_serialization.h @@ -13,7 +13,8 @@ namespace NYql { TRuntimeSettings::TPtr CreateRuntimeSettingsFromProto( const NProto::TRuntimeSettings& proto, const TString& userName, - TCredentials::TPtr credentials); + TCredentials::TPtr credentials, + const TQContext& qContext); TRuntimeSettings::TPtr DeserializeRuntimeSettingsFromProto( const NProto::TRuntimeSettings& proto); @@ -24,11 +25,6 @@ NProto::TRuntimeSettings SerializeRuntimeSettingsToProto( TString SerializeRuntimeSettingsToString(const TRuntimeSettings& config); TRuntimeSettings::TPtr CreateRuntimeSettingsFromString( - const TString& data, - const TString& userName, - TCredentials::TPtr credentials); - -TRuntimeSettings::TPtr CreateRuntimeSettingsFromString( const TString& data); } // namespace NYql diff --git a/yql/essentials/minikql/runtime_settings/ut/runtime_settings_serialization_ut.cpp b/yql/essentials/minikql/runtime_settings/ut/runtime_settings_serialization_ut.cpp index 72e0f17b517..2d3df33bdcb 100644 --- a/yql/essentials/minikql/runtime_settings/ut/runtime_settings_serialization_ut.cpp +++ b/yql/essentials/minikql/runtime_settings/ut/runtime_settings_serialization_ut.cpp @@ -1,4 +1,5 @@ #include <yql/essentials/minikql/runtime_settings/runtime_settings_serialization.h> +#include <yql/essentials/core/qplayer/storage/memory/yql_qstorage_memory.h> #include <library/cpp/testing/unittest/registar.h> @@ -50,7 +51,7 @@ Y_UNIT_TEST(Deserialization) { TString data; UNIT_ASSERT(proto.SerializeToString(&data)); - auto config = CreateRuntimeSettingsFromString(data, TString{}, nullptr); + auto config = CreateRuntimeSettingsFromString(data); UNIT_ASSERT_VALUES_EQUAL(config->DatumValidation.Get(), true); UNIT_ASSERT_VALUES_EQUAL(config->TestHostSetting.Get(), true); @@ -67,13 +68,10 @@ Y_UNIT_TEST(HostSettingActivation50Percent) { hostSetting->SetValue("true"); hostSetting->MutableActivation()->SetPercentage(50); - TString data; - UNIT_ASSERT(proto.SerializeToString(&data)); - constexpr int Iterations = 10000; int activatedCount = 0; for (int i = 0; i < Iterations; ++i) { - auto config = CreateRuntimeSettingsFromString(data, TString{}, nullptr); + auto config = CreateRuntimeSettingsFromProto(proto, TString{}, nullptr, TQContext()); if (config->DatumValidation.Get()) { ++activatedCount; } @@ -92,13 +90,10 @@ Y_UNIT_TEST(UdfSettingActivation50Percent) { udfSetting->SetValue("Val"); udfSetting->MutableActivation()->SetPercentage(50); - TString data; - UNIT_ASSERT(proto.SerializeToString(&data)); - constexpr int Iterations = 10000; int activatedCount = 0; for (int i = 0; i < Iterations; ++i) { - auto config = CreateRuntimeSettingsFromString(data, TString{}, nullptr); + auto config = CreateRuntimeSettingsFromProto(proto, TString{}, nullptr, TQContext()); if (!config->GetUdfSetting("MyModule", "Key").empty()) { ++activatedCount; } @@ -108,6 +103,41 @@ Y_UNIT_TEST(UdfSettingActivation50Percent) { UNIT_ASSERT_LE(activatedCount, Iterations * 3 / 4); } +Y_UNIT_TEST(ActivationStatePreservedAfterQContextRoundTrip) { + NProto::TRuntimeSettings proto; + + auto* hostSetting = proto.AddHostSettings(); + hostSetting->SetName("TestHostSetting"); + hostSetting->SetValue("true"); + hostSetting->MutableActivation()->SetPercentage(50); + + auto* udfSettings = proto.AddUdfSettings(); + udfSettings->SetModule("MyModule"); + auto* udfSetting = udfSettings->AddRuntimeSettings(); + udfSetting->SetName("Key"); + udfSetting->SetValue("Val"); + udfSetting->MutableActivation()->SetPercentage(50); + + constexpr int Iterations = 400; + for (int i = 0; i < Iterations; ++i) { + auto qStorage = MakeMemoryQStorage(); + + auto writer = qStorage->MakeWriter("op", {}); + auto capturedConfig = CreateRuntimeSettingsFromProto( + proto, TString{}, nullptr, TQContext(writer)); + writer->Commit().GetValueSync(); + + const bool capturedHostActivated = capturedConfig->TestHostSetting.Get(); + const bool capturedUdfActivated = !capturedConfig->GetUdfSetting("MyModule", "Key").empty(); + + auto replayedConfig = CreateRuntimeSettingsFromProto( + proto, TString{}, nullptr, TQContext(qStorage->MakeReader("op", {}))); + + UNIT_ASSERT_VALUES_EQUAL(capturedHostActivated, replayedConfig->TestHostSetting.Get()); + UNIT_ASSERT_VALUES_EQUAL(capturedUdfActivated, !replayedConfig->GetUdfSetting("MyModule", "Key").empty()); + } +} + } // Y_UNIT_TEST_SUITE(TRuntimeSettingsSerializationTest) } // namespace NYql diff --git a/yql/essentials/minikql/runtime_settings/ut/ya.make b/yql/essentials/minikql/runtime_settings/ut/ya.make index 6c39edb2b5d..7fe0ce37c4a 100644 --- a/yql/essentials/minikql/runtime_settings/ut/ya.make +++ b/yql/essentials/minikql/runtime_settings/ut/ya.make @@ -8,6 +8,7 @@ PEERDIR( yql/essentials/public/udf/service/stub yql/essentials/public/udf yql/essentials/core/credentials + yql/essentials/core/qplayer/storage/memory yql/essentials/providers/common/activation ) diff --git a/yql/essentials/providers/common/config/ut/ya.make b/yql/essentials/providers/common/config/ut/ya.make index d02b9ebdf0a..0c06da98c57 100644 --- a/yql/essentials/providers/common/config/ut/ya.make +++ b/yql/essentials/providers/common/config/ut/ya.make @@ -2,6 +2,11 @@ UNITTEST_FOR(yql/essentials/providers/common/config) SRCS( yql_config_ut.cpp + yql_config_qplayer_ut.cpp +) + +PEERDIR( + yql/essentials/core/qplayer/storage/memory ) YQL_LAST_ABI_VERSION() diff --git a/yql/essentials/providers/common/config/ut/yql_config_qplayer_ut.cpp b/yql/essentials/providers/common/config/ut/yql_config_qplayer_ut.cpp new file mode 100644 index 00000000000..4d6e8f1507a --- /dev/null +++ b/yql/essentials/providers/common/config/ut/yql_config_qplayer_ut.cpp @@ -0,0 +1,200 @@ +#include <yql/essentials/providers/common/config/yql_config_qplayer.h> +#include <yql/essentials/core/qplayer/storage/memory/yql_qstorage_memory.h> + +#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <util/string/split.h> + +namespace NYql::NCommon { + +namespace { + +struct TTestAttr { + TString Name; + TString Value; + bool WithActivation = false; + bool MustBeActivated = true; + + const TString& GetName() const { + return Name; + } + + bool HasActivation() const { + return WithActivation; + } + + bool SerializeToString(TString* out) const { + *out = Name + "|" + Value + "|" + (WithActivation ? "1" : "0"); + return true; + } + + bool ParseFromString(const TString& data) { + TVector<TStringBuf> fields; + StringSplitter(data).Split('|').AddTo(&fields); + if (fields.size() != 3) { + return false; + } + Name = TString(fields[0]); + Value = TString(fields[1]); + WithActivation = (fields[2] == "1"); + return true; + } +}; + +auto MakeActivationFilter() { + return [](const TTestAttr& attr) { + return attr.MustBeActivated; + }; +} + +TQContext MakeWriteContext(IQStoragePtr storage) { + return TQContext(storage->MakeWriter("op", {})); +} + +TQContext MakeReadContext(IQStoragePtr storage) { + return TQContext(storage->MakeReader("op", {})); +} + +struct TExpectedFlag { + TString Name; + TString Value; +}; + +TVector<TExpectedFlag> ToExpected(const TVector<TTestAttr>& flags) { + TVector<TExpectedFlag> result; + for (const auto& flag : flags) { + result.push_back({.Name = flag.Name, .Value = flag.Value}); + } + return result; +} + +} // namespace + +Y_UNIT_TEST_SUITE(TActivationFlagsQPlayerTest) { + +struct TCaptureReplayCase { + TVector<TTestAttr> Source; + TVector<TExpectedFlag> Expected; +}; + +Y_UNIT_TEST(CaptureAndReplayProduceSameFlags) { + const TVector<TCaptureReplayCase> cases = { + { + .Source = { + TTestAttr{.Name = "plain_a", .Value = "val_a", .WithActivation = false, .MustBeActivated = true}, + TTestAttr{.Name = "activated_b", .Value = "val_b", .WithActivation = true, .MustBeActivated = true}, + TTestAttr{.Name = "skipped_c", .Value = "val_c", .WithActivation = true, .MustBeActivated = false}, + TTestAttr{.Name = "plain_d", .Value = "val_d", .WithActivation = false, .MustBeActivated = true}, + }, + .Expected = {{.Name = "plain_a", .Value = "val_a"}, {.Name = "activated_b", .Value = "val_b"}, {.Name = "plain_d", .Value = "val_d"}}, + }, + { + .Source = { + TTestAttr{.Name = "plain_a", .Value = "val_a", .WithActivation = false, .MustBeActivated = true}, + TTestAttr{.Name = "skipped_b", .Value = "val_b", .WithActivation = true, .MustBeActivated = false}, + }, + .Expected = {{.Name = "plain_a", .Value = "val_a"}}, + }, + { + .Source = { + TTestAttr{.Name = "only_activated", .Value = "val_x", .WithActivation = true, .MustBeActivated = true}, + }, + .Expected = {{.Name = "only_activated", .Value = "val_x"}}, + }, + { + .Source = { + TTestAttr{.Name = "plain_only", .Value = "val_y", .WithActivation = false, .MustBeActivated = true}, + }, + .Expected = {{.Name = "plain_only", .Value = "val_y"}}, + }, + }; + + for (const auto& testCase : cases) { + auto storage = MakeMemoryQStorage(); + + TVector<TExpectedFlag> capturedExpected; + { + auto writeContext = MakeWriteContext(storage); + auto capturedFlags = SelectAndSaveActivatedFlags<TTestAttr>( + "label", writeContext, testCase.Source, MakeActivationFilter(), /*hasProviderName=*/true); + capturedExpected = ToExpected(capturedFlags); + writeContext.GetWriter()->Commit().GetValueSync(); + } + + UNIT_ASSERT_VALUES_EQUAL(testCase.Expected.size(), capturedExpected.size()); + for (size_t idx = 0; idx < testCase.Expected.size(); ++idx) { + UNIT_ASSERT_VALUES_EQUAL(testCase.Expected[idx].Name, capturedExpected[idx].Name); + UNIT_ASSERT_VALUES_EQUAL(testCase.Expected[idx].Value, capturedExpected[idx].Value); + } + + auto readContext = MakeReadContext(storage); + auto replayedFlags = SelectAndSaveActivatedFlags<TTestAttr>( + "label", readContext, testCase.Source, MakeActivationFilter(), false); + auto replayedActual = ToExpected(replayedFlags); + + UNIT_ASSERT_VALUES_EQUAL(capturedExpected.size(), replayedActual.size()); + for (size_t idx = 0; idx < capturedExpected.size(); ++idx) { + UNIT_ASSERT_VALUES_EQUAL(capturedExpected[idx].Name, replayedActual[idx].Name); + UNIT_ASSERT_VALUES_EQUAL(capturedExpected[idx].Value, replayedActual[idx].Value); + } + } +} + +Y_UNIT_TEST(OldFormatFlagsReturnedAsIs) { + auto storage = MakeMemoryQStorage(); + { + TTestAttr plainAttr = {.Name = "plain_a", .Value = "value_a", .WithActivation = false}; + TTestAttr activatedAttr = {.Name = "activated_b", .Value = "value_b", .WithActivation = true}; + auto oldFormatNode = NYT::TNode::CreateMap(); + TString serializedPlain; + TString serializedActivated; + Y_ENSURE(plainAttr.SerializeToString(&serializedPlain)); + Y_ENSURE(activatedAttr.SerializeToString(&serializedActivated)); + oldFormatNode[plainAttr.Name] = serializedPlain; + oldFormatNode[activatedAttr.Name] = serializedActivated; + auto yson = NYT::NodeToYsonString(oldFormatNode, NYT::NYson::EYsonFormat::Binary); + auto writer = storage->MakeWriter("op", {}); + writer->Put({.Component = TString(NPrivate::QplayerActivationComponent), .Label = "label"}, yson).GetValueSync(); + writer->Commit().GetValueSync(); + } + + const TVector<TTestAttr> source = { + TTestAttr{.Name = "plain_a", .Value = "value_a", .WithActivation = false}, + TTestAttr{.Name = "activated_b", .Value = "value_b_other", .WithActivation = true}, + TTestAttr{.Name = "not_in_store", .Value = "value_x", .WithActivation = true}, + }; + + auto readContext = MakeReadContext(storage); + auto replayedFlags = SelectAndSaveActivatedFlags<TTestAttr>( + "label", readContext, source, MakeActivationFilter(), false); + + UNIT_ASSERT_VALUES_EQUAL(2U, replayedFlags.size()); + THashMap<TString, TString> replayedByName; + for (const auto& flag : replayedFlags) { + replayedByName[flag.Name] = flag.Value; + } + UNIT_ASSERT_VALUES_EQUAL("value_a", replayedByName["plain_a"]); + UNIT_ASSERT_VALUES_EQUAL("value_b", replayedByName["activated_b"]); +} + +Y_UNIT_TEST(FilterCalledOncePerItem) { + const TVector<TTestAttr> source = { + TTestAttr{.Name = "plain_a", .Value = "value_a", .WithActivation = false}, + TTestAttr{.Name = "activated_b", .Value = "value_b", .WithActivation = true}, + TTestAttr{.Name = "activated_c", .Value = "value_c", .WithActivation = true}, + }; + int filterCallCount = 0; + auto countingFilter = [&](const TTestAttr&) { + ++filterCallCount; + return true; + }; + + TQContext emptyContext; + SelectAndSaveActivatedFlags<TTestAttr>("label", emptyContext, source, countingFilter, false); + UNIT_ASSERT_VALUES_EQUAL(static_cast<int>(source.size()), filterCallCount); +} + +} // Y_UNIT_TEST_SUITE(TActivationFlagsQPlayerTest) + +} // namespace NYql::NCommon diff --git a/yql/essentials/providers/common/config/yql_config_qplayer.h b/yql/essentials/providers/common/config/yql_config_qplayer.h index 99f5a7be218..71da71d846b 100644 --- a/yql/essentials/providers/common/config/yql_config_qplayer.h +++ b/yql/essentials/providers/common/config/yql_config_qplayer.h @@ -6,46 +6,148 @@ #include <library/cpp/yson/node/node_io.h> +#include <util/generic/hash_set.h> +#include <util/generic/maybe.h> #include <util/generic/vector.h> +#include <util/generic/overloaded.h> + +#include <variant> namespace NYql::NCommon { -inline const TString ActivationComponent = "Activation"; +namespace NPrivate { + +inline constexpr TStringBuf QplayerActivationComponent = "Activation"; +inline constexpr TStringBuf QplayerActivationVersion2 = "v2"; +inline constexpr TStringBuf QplayerActivationVersion1 = "v1"; + +inline constexpr TStringBuf QPlayerActivationVersionKey = "version"; +inline constexpr TStringBuf QPlayerActivationIndexesKey = "indexes"; + +enum class EQContextFlagsVersion { + V1, + V2 +}; + +inline EQContextFlagsVersion EQContextFlagsVersionFromString(TStringBuf versionStr) { + if (versionStr == QplayerActivationVersion1) { + return EQContextFlagsVersion::V1; + } + if (versionStr == QplayerActivationVersion2) { + return EQContextFlagsVersion::V2; + } + ythrow yexception() << "Unknown activation storage version: " << versionStr; +} + +template <typename TAttribute> +using TActivationLoadResult = std::variant<TVector<TAttribute>, THashSet<size_t>>; template <typename TAttribute> -TMaybe<TVector<TAttribute>> LoadActivatedFlagsFromQContext(const TString& activationLabel, const TQContext& qContext) { - TMaybe<TVector<TAttribute>> loadedFlags; +TMaybe<TActivationLoadResult<TAttribute>> LoadActivatedFlagsFromQContext(const TString& activationLabel, const TQContext& qContext) { if (!qContext.CanRead()) { - return loadedFlags; + return Nothing(); + } + auto loaded = qContext.GetReader()->Get({.Component = TString(QplayerActivationComponent), .Label = activationLabel}).GetValueSync(); + if (!loaded) { + return Nothing(); + } + auto flagsNode = NYT::NodeFromYsonString(loaded->Value); + EQContextFlagsVersion version = EQContextFlagsVersion::V1; + if (flagsNode.HasKey(QPlayerActivationVersionKey)) { + version = EQContextFlagsVersionFromString(flagsNode[QPlayerActivationVersionKey].AsString()); } - if (auto loaded = qContext.GetReader()->Get({.Component = ActivationComponent, .Label = activationLabel}).GetValueSync()) { - auto flagsNode = NYT::NodeFromYsonString(loaded->Value); - TVector<TAttribute> flags; - for (const auto& [flagName, flagValue] : flagsNode.AsMap()) { - TAttribute flag; - YQL_ENSURE(flag.ParseFromString(flagValue.AsString())); - flags.emplace_back(std::move(flag)); + + switch (version) { + case EQContextFlagsVersion::V1: { + TVector<TAttribute> storedFlags; + for (const auto& [flagName, flagValue] : flagsNode.AsMap()) { + TAttribute flag; + YQL_ENSURE(flag.ParseFromString(flagValue.AsString())); + storedFlags.emplace_back(std::move(flag)); + } + return {std::move(storedFlags)}; + } + + case EQContextFlagsVersion::V2: { + THashSet<size_t> activatedIndexes; + for (const auto& indexNode : flagsNode[QPlayerActivationIndexesKey].AsList()) { + activatedIndexes.insert(static_cast<size_t>(indexNode.AsInt64())); + } + return {std::move(activatedIndexes)}; } - loadedFlags = std::move(flags); - YQL_CLOG(INFO, ProviderConfig) << activationLabel << " activated flags are loaded at replay mode"; } - return loadedFlags; } -template <typename TAttribute> -void SaveActivatedFlagsToQContext(const TVector<TAttribute>& flags, const TString& activationLabel, const TQContext& qContext) { +inline void SaveActivatedFlagsToQContext(const TVector<size_t>& activatedIndexes, const TString& activationLabel, const TQContext& qContext) { if (!qContext.CanWrite()) { return; } - auto flagsNode = NYT::TNode::CreateMap(); - for (const auto& flag : flags) { - TString data; - YQL_ENSURE(flag.SerializeToString(&data)); - flagsNode[flag.GetName()] = std::move(data); + auto node = NYT::TNode::CreateMap(); + node[QPlayerActivationVersionKey] = QplayerActivationVersion2; + auto indexList = NYT::TNode::CreateList(); + for (size_t index : activatedIndexes) { + indexList.Add(static_cast<i64>(index)); } - auto flagsYson = NYT::NodeToYsonString(flagsNode, NYT::NYson::EYsonFormat::Binary); - qContext.GetWriter()->Put({.Component = ActivationComponent, .Label = activationLabel}, flagsYson).GetValueSync(); + node[QPlayerActivationIndexesKey] = std::move(indexList); + auto yson = NYT::NodeToYsonString(node, NYT::NYson::EYsonFormat::Binary); + qContext.GetWriter()->Put({.Component = TString(QplayerActivationComponent), .Label = activationLabel}, yson).GetValueSync(); YQL_CLOG(INFO, ProviderConfig) << activationLabel << " activated flags are saved to QStorage"; } +template <typename TAttribute, typename TContainer> +TVector<TAttribute> CollectFlagsForReplayV2(const TContainer& source, const THashSet<size_t>& activatedIndexes) { + TVector<TAttribute> flags; + for (size_t idx = 0; idx < static_cast<size_t>(source.size()); ++idx) { + if (!source[idx].HasActivation() || activatedIndexes.contains(idx)) { + flags.emplace_back(source[idx]); + } + } + return flags; +} + +template <typename TAttribute, typename TContainer, typename TFilter> +TVector<TAttribute> FilterSourceAndSaveIndexes( + const TContainer& source, + const TFilter& filter, + const TString& activationLabel, + const TQContext& qContext, + bool hasProviderName) +{ + TVector<TAttribute> flags; + TVector<size_t> activatedIndexes; + for (size_t idx = 0; idx < static_cast<size_t>(source.size()); ++idx) { + if (filter(source[idx])) { + flags.emplace_back(source[idx]); + activatedIndexes.push_back(idx); + } + } + if (hasProviderName) { + SaveActivatedFlagsToQContext(activatedIndexes, activationLabel, qContext); + } + return flags; +} + +} // namespace NPrivate + +template <typename TAttribute, typename TContainer, typename TFilter> +TVector<TAttribute> SelectAndSaveActivatedFlags( + const TString& activationLabel, + const TQContext& qContext, + const TContainer& source, + const TFilter& filter, + bool hasProviderName) +{ + if (auto loaded = NPrivate::LoadActivatedFlagsFromQContext<TAttribute>(activationLabel, qContext)) { + YQL_CLOG(INFO, ProviderConfig) << activationLabel << " activated flags are loaded at replay mode"; + return std::visit(TOverloaded{ + [](const TVector<TAttribute>& flags) { + return flags; + }, + [&](const THashSet<size_t>& indexes) { + return NPrivate::CollectFlagsForReplayV2<TAttribute>(source, indexes); + }}, *loaded); + } + return NPrivate::FilterSourceAndSaveIndexes<TAttribute>(source, filter, activationLabel, qContext, hasProviderName); +} + } // namespace NYql::NCommon diff --git a/yql/essentials/providers/common/config/yql_dispatch.cpp b/yql/essentials/providers/common/config/yql_dispatch.cpp index 1f4c8083c47..cb36f83046e 100644 --- a/yql/essentials/providers/common/config/yql_dispatch.cpp +++ b/yql/essentials/providers/common/config/yql_dispatch.cpp @@ -9,7 +9,7 @@ #include <util/random/random.h> #include <util/datetime/base.h> -namespace NYql { +namespace NYql::NCommon { namespace NPrivate { @@ -143,8 +143,6 @@ YQL_CONTAINER_SETTING_PARSER_TYPES(YQL_DEFINE_CONTAINER_SETTING_SERIALIZER) } // namespace NPrivate -namespace NCommon { - bool TSettingDispatcher::IsRuntime(const TString& name) { auto normalizedName = NormalizeName(name); if (auto handler = Handlers_.Value(normalizedName, TSettingHandler::TPtr())) { @@ -264,5 +262,4 @@ TSettingDispatcher::TErrorCallback TSettingDispatcher::GetErrorCallback(TPositio }; } -} // namespace NCommon -} // namespace NYql +} // namespace NYql::NCommon diff --git a/yql/essentials/providers/common/config/yql_dispatch.h b/yql/essentials/providers/common/config/yql_dispatch.h index 41d2f1cbd38..4cf5aeeb421 100644 --- a/yql/essentials/providers/common/config/yql_dispatch.h +++ b/yql/essentials/providers/common/config/yql_dispatch.h @@ -28,7 +28,7 @@ #include <ranges> #include <utility> -namespace NYql { +namespace NYql::NCommon { namespace NPrivate { @@ -148,8 +148,6 @@ concept AttributeFilter = std::predicate<TFilter, std::ranges::range_value_t<TCo } // namespace NPrivate -namespace NCommon { - class TSettingDispatcher: public TThrRefBase { public: using TPtr = TIntrusivePtr<TSettingDispatcher>; @@ -199,8 +197,8 @@ public: TSettingHandlerImpl(const TString& name, TConfSetting<TType, SettingType>& setting) : TSettingHandler(name) , Setting_(setting) - , Parser_(::NYql::NPrivate::GetDefaultParser<TType>()) - , Serializer_(::NYql::NPrivate::GetDefaultSerializer<TType>()) + , Parser_(NPrivate::GetDefaultParser<TType>()) + , Serializer_(NPrivate::GetDefaultSerializer<TType>()) , ValueSetter_([this](const TString& cluster, TType value) { Setting_[cluster] = value; }) @@ -337,22 +335,22 @@ public: return *this; } - TSettingHandlerImpl& Parser(::NYql::NPrivate::TParser<TType>&& parser) { + TSettingHandlerImpl& Parser(NPrivate::TParser<TType>&& parser) { Parser_ = std::move(parser); return *this; } - TSettingHandlerImpl& Parser(const ::NYql::NPrivate::TParser<TType>& parser) { + TSettingHandlerImpl& Parser(const NPrivate::TParser<TType>& parser) { Parser_ = parser; return *this; } - TSettingHandlerImpl& Serializer(::NYql::NPrivate::TSerializer<TType>&& serializer) { + TSettingHandlerImpl& Serializer(NPrivate::TSerializer<TType>&& serializer) { Serializer_ = std::move(serializer); return *this; } - TSettingHandlerImpl& Serializer(const ::NYql::NPrivate::TSerializer<TType>& serializer) { + TSettingHandlerImpl& Serializer(const NPrivate::TSerializer<TType>& serializer) { Serializer_ = serializer; return *this; } @@ -415,8 +413,8 @@ public: private: TConfSetting<TType, SettingType>& Setting_; TMaybe<TConfSetting<TType, SettingType>> Default_; - ::NYql::NPrivate::TParser<TType> Parser_; - ::NYql::NPrivate::TSerializer<TType> Serializer_; + NPrivate::TParser<TType> Parser_; + NPrivate::TSerializer<TType> Serializer_; TValueCallback ValueSetter_; TVector<TValueCallback> Validators_; TString Warning_; @@ -479,26 +477,18 @@ public: auto errorCallback = GetDefaultErrorCallback(); TString activationLabel = TStringBuilder() << ProviderName_ << "_" << cluster; - TVector<TAttribute> flags; - if (auto loadedFlags = NCommon::LoadActivatedFlagsFromQContext<TAttribute>(activationLabel, QContext_)) { - flags = std::move(*loadedFlags); - } else { - CopyIf(clusterValues.begin(), clusterValues.end(), std::back_inserter(flags), filter); - } + const auto flags = NCommon::SelectAndSaveActivatedFlags<TAttribute>( + activationLabel, QContext_, clusterValues, filter, !ProviderName_.empty()); for (const auto& flag : flags) { Dispatch(cluster, flag.GetName(), flag.GetValue(), EStage::CONFIG, errorCallback); } - - if (ProviderName_) { - NCommon::SaveActivatedFlagsToQContext<TAttribute>(flags, activationLabel, QContext_); - } } template <NPrivate::ConfigFeatureList TContainer> void Dispatch(const TString& cluster, const TContainer& clusterValues) { auto errorCallback = GetDefaultErrorCallback(); - for (auto& v : clusterValues) { + for (const auto& v : clusterValues) { Dispatch(cluster, v.GetName(), v.GetValue(), EStage::CONFIG, errorCallback); } } @@ -529,8 +519,7 @@ protected: const TQContext QContext_; }; -} // namespace NCommon -} // namespace NYql +} // namespace NYql::NCommon #define REGISTER_SETTING(dispatcher, setting) \ (dispatcher).AddSetting(#setting, setting) diff --git a/yql/essentials/providers/config/yql_config_provider.cpp b/yql/essentials/providers/config/yql_config_provider.cpp index d1265c7bb18..f3b07999c91 100644 --- a/yql/essentials/providers/config/yql_config_provider.cpp +++ b/yql/essentials/providers/config/yql_config_provider.cpp @@ -167,13 +167,8 @@ public: }; if (CoreConfig_) { TPosition pos; - TVector<TCoreAttr> flags; - if (auto loadedFlags = NCommon::LoadActivatedFlagsFromQContext<TCoreAttr>(YqlCoreActivationLabel, Types_.QContext)) { - flags = std::move(*loadedFlags); - } else { - const auto& configFlags = CoreConfig_->GetFlags(); - CopyIf(configFlags.begin(), configFlags.end(), std::back_inserter(flags), filter); - } + const auto flags = NCommon::SelectAndSaveActivatedFlags<TCoreAttr>( + YqlCoreActivationLabel, Types_.QContext, CoreConfig_->GetFlags(), filter, /*hasProviderName=*/true); for (const auto& flag : flags) { const auto& flagArgs = flag.GetArgs(); TVector<TStringBuf> args(flagArgs.begin(), flagArgs.end()); @@ -181,7 +176,6 @@ public: return false; } } - NCommon::SaveActivatedFlagsToQContext<TCoreAttr>(flags, YqlCoreActivationLabel, Types_.QContext); } return true; } |
