diff options
| author | Yuriy Kaminskiy <[email protected]> | 2026-06-26 18:49:58 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-06-26 18:49:58 +0300 |
| commit | ff211d5f4a451d289ebad8f2ef40498addef3dbc (patch) | |
| tree | 15deb0159327c417d0173740f7ecf29887e82c11 | |
| parent | c85a586ab2c1cc6b1e1f2c8410d5c131742b707c (diff) | |
kqp/ut/...datastreams: add tests for leaking streamlookup actors (#43840)
7 files changed, 38 insertions, 1 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 6f7d642dc11..31223c0fc90 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -145,6 +145,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) { ServerSettings->Controls = settings.Controls; ServerSettings->SetEnableForceFollowers(settings.EnableForceFollowers); ServerSettings->SetEnableScriptExecutionBackgroundChecks(settings.EnableScriptExecutionBackgroundChecks); + ServerSettings->SetNeedStatsCollectors(settings.NeedsStatsCollectors); if (!settings.FeatureFlags.HasEnableOlapCompression()) { ServerSettings->SetEnableOlapCompression(true); diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index e05138d8630..34e979d1dd0 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -79,6 +79,7 @@ public: bool UseRealThreads = true; bool EnableForceFollowers = false; bool EnableScriptExecutionBackgroundChecks = true; + bool NeedsStatsCollectors = false; TDuration KeepSnapshotTimeout = TDuration::Zero(); IOutputStream* LogStream = nullptr; TVector<TString> StoragePoolTypes; @@ -125,6 +126,7 @@ public: TKikimrSettings& SetQueryReplayBackendFactory(std::shared_ptr<NKqp::IQueryReplayBackendFactory> value) { QueryReplayBackendFactory = std::move(value); return *this; }; TKikimrSettings& SetUseRealThreads(bool value) { UseRealThreads = value; return *this; }; TKikimrSettings& SetEnableForceFollowers(bool value) { EnableForceFollowers = value; return *this; }; + TKikimrSettings& SetNeedsStatsCollectors(bool value) { NeedsStatsCollectors = value; return *this; }; TKikimrSettings& SetS3ActorsFactory(std::shared_ptr<NYql::NDq::IS3ActorsFactory> value) { S3ActorsFactory = std::move(value); return *this; }; TKikimrSettings& SetControls(const NKikimrConfig::TImmediateControlsConfig& value) { Controls = value; return *this; } TKikimrSettings& SetColumnShardReaderClassName(const TString& value) { AppConfig.MutableColumnShardConfig()->SetReaderClassName(value); return *this; } diff --git a/ydb/core/kqp/ut/federated_query/common/common.cpp b/ydb/core/kqp/ut/federated_query/common/common.cpp index 3e4d843084c..8848f9fd410 100644 --- a/ydb/core/kqp/ut/federated_query/common/common.cpp +++ b/ydb/core/kqp/ut/federated_query/common/common.cpp @@ -160,6 +160,7 @@ namespace NKikimr::NKqp::NFederatedQueryTest { .SetCheckpointPeriod(options.CheckpointPeriod) .SetUseLocalCheckpointsInStreamingQueries(options.UseLocalCheckpointsInStreamingQueries) .SetLogSettings(std::move(logSettings)) + .SetNeedsStatsCollectors(options.NeedsStatsCollectors) .SetInitFederatedQuerySetupFactory(options.InternalInitFederatedQuerySetupFactory); settings.EnableScriptExecutionBackgroundChecks = options.EnableScriptExecutionBackgroundChecks; diff --git a/ydb/core/kqp/ut/federated_query/common/common.h b/ydb/core/kqp/ut/federated_query/common/common.h index 0d26959072f..6b093436c8f 100644 --- a/ydb/core/kqp/ut/federated_query/common/common.h +++ b/ydb/core/kqp/ut/federated_query/common/common.h @@ -30,6 +30,7 @@ namespace NKikimr::NKqp::NFederatedQueryTest { TTestLogSettings LogSettings; bool UseLocalCheckpointsInStreamingQueries = false; bool InternalInitFederatedQuerySetupFactory = false; + bool NeedsStatsCollectors = false; TVector<TString> StoragePoolTypes; }; diff --git a/ydb/core/kqp/ut/federated_query/datastreams/common.cpp b/ydb/core/kqp/ut/federated_query/datastreams/common.cpp index 7608ee6f621..895ae0ceedf 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/common.cpp +++ b/ydb/core/kqp/ut/federated_query/datastreams/common.cpp @@ -1,5 +1,6 @@ #include "common.h" +#include <ydb/core/base/counters.h> #include <ydb/core/cms/console/console.h> #include <ydb/core/kqp/common/kqp_script_executions.h> #include <ydb/core/kqp/proxy_service/kqp_script_executions.h> @@ -115,6 +116,7 @@ std::shared_ptr<TKikimrRunner> TStreamingTestFixture::GetKikimrRunner() { .LogSettings = LogSettings, .UseLocalCheckpointsInStreamingQueries = true, .InternalInitFederatedQuerySetupFactory = InternalInitFederatedQuerySetupFactory, + .NeedsStatsCollectors = NeedsStatsCollectors, .StoragePoolTypes = StoragePoolTypes, }); @@ -195,6 +197,10 @@ void TStreamingTestFixture::KillTopicPqrbTablet(const std::string& topicPath) { tabletClient->KillTablet(GetKikimrRunner()->GetTestServer(), persQueueGroup.GetBalancerTabletID()); } +TIntrusivePtr<NMonitoring::TDynamicCounters> TStreamingTestFixture::GetCounters(const TString& svc, ui32 nodeIdx) { + return NKikimr::GetServiceCounters(GetRuntime().GetAppData(nodeIdx).Counters, svc); +} + // External YDB recipe std::shared_ptr<TDriver> TStreamingTestFixture::GetExternalDriver() { diff --git a/ydb/core/kqp/ut/federated_query/datastreams/common.h b/ydb/core/kqp/ut/federated_query/datastreams/common.h index 339bf36d589..c3d4c638a10 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/common.h +++ b/ydb/core/kqp/ut/federated_query/datastreams/common.h @@ -105,6 +105,8 @@ public: void KillTopicPqrbTablet(const std::string& topicPath); + TIntrusivePtr<NMonitoring::TDynamicCounters> GetCounters(const TString& svc = "kqp", ui32 nodeIdx = 0); + // External YDB recipe std::shared_ptr<NYdb::TDriver> GetExternalDriver(); @@ -220,6 +222,7 @@ protected: TTestLogSettings LogSettings; bool InternalInitFederatedQuerySetupFactory = false; TVector<TString> StoragePoolTypes; + bool NeedsStatsCollectors = false; NYdb::NQuery::TClientSettings QueryClientSettings = NYdb::NQuery::TClientSettings().AuthToken(BUILTIN_ACL_ROOT); NYdb::NTopic::TTopicClientSettings TopicClientSettings = NYdb::NTopic::TTopicClientSettings().AuthToken(BUILTIN_ACL_ROOT); diff --git a/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp index c4c32632c1b..e0f7d42fc80 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp @@ -1058,6 +1058,7 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { // legal, but nothing to check return; } + NeedsStatsCollectors = true; constexpr ui32 combinations = WithFeatureFlag && !WithFullscanFlag ? 2 : 1; { auto& setupAppConfig = SetupAppConfig(); @@ -1187,6 +1188,13 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { readSession->AddDataReceivedEvent(sampleMessages); writeSession->ExpectMessages({"A-P4", "B-P6", "A-P4"}); + auto actorsAlive = GetCounters("utils")->GetSubgroup("execpool", "User")->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", "NYql::NDq::(anonymous namespace)::TInputTransformStreamLookupBase"); + WaitFor(TDuration::Seconds(10), "ActorsAlive", [&](TString& error) { + auto val = actorsAlive->Val(); + error = TStringBuilder() << "InputTransform actors count is " << val << ", expected 1"; + return val == 1; + }); + CheckScriptExecutionsCount(1, 1); const auto results = ExecQuery( "SELECT ast_compressed FROM `.metadata/script_executions`;" @@ -1198,6 +1206,21 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { UNIT_ASSERT_STRING_CONTAINS(*ast, "DqCnStreamLookup"); }); + ExecQuery(fmt::format(R"( + ALTER STREAMING QUERY `{query_name}` SET ( + RUN = FALSE + );)", + "query_name"_a = queryName + )); + + CheckScriptExecutionsCount(1, 0); + + WaitFor(TDuration::Seconds(10), "ActorsAlive", [&](TString& error) { + auto val = actorsAlive->Val(); + error = TStringBuilder() << "InputTransform actors count is " << val << ", expected 0"; + return val == 0; + }); + if (!WithFullscanFlag) { // extra check that fullscan option without fullscan feature-flag fails ExecQuery(fmt::format(R"( @@ -1231,7 +1254,7 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { EStatus::GENERIC_ERROR, "EnableDqSourceStreamLookupJoinFullscan disabled, but FullscanLimit is 123"); - CheckScriptExecutionsCount(2, 1); + CheckScriptExecutionsCount(2, 0); } } |
