summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYuriy Kaminskiy <[email protected]>2026-06-26 18:49:58 +0300
committerGitHub <[email protected]>2026-06-26 18:49:58 +0300
commitff211d5f4a451d289ebad8f2ef40498addef3dbc (patch)
tree15deb0159327c417d0173740f7ecf29887e82c11
parentc85a586ab2c1cc6b1e1f2c8410d5c131742b707c (diff)
kqp/ut/...datastreams: add tests for leaking streamlookup actors (#43840)
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp1
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h2
-rw-r--r--ydb/core/kqp/ut/federated_query/common/common.cpp1
-rw-r--r--ydb/core/kqp/ut/federated_query/common/common.h1
-rw-r--r--ydb/core/kqp/ut/federated_query/datastreams/common.cpp6
-rw-r--r--ydb/core/kqp/ut/federated_query/datastreams/common.h3
-rw-r--r--ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp25
7 files changed, 38 insertions, 1 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 6f7d642dc11..31223c0fc90 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -145,6 +145,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
ServerSettings->Controls = settings.Controls;
ServerSettings->SetEnableForceFollowers(settings.EnableForceFollowers);
ServerSettings->SetEnableScriptExecutionBackgroundChecks(settings.EnableScriptExecutionBackgroundChecks);
+ ServerSettings->SetNeedStatsCollectors(settings.NeedsStatsCollectors);
if (!settings.FeatureFlags.HasEnableOlapCompression()) {
ServerSettings->SetEnableOlapCompression(true);
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index e05138d8630..34e979d1dd0 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -79,6 +79,7 @@ public:
bool UseRealThreads = true;
bool EnableForceFollowers = false;
bool EnableScriptExecutionBackgroundChecks = true;
+ bool NeedsStatsCollectors = false;
TDuration KeepSnapshotTimeout = TDuration::Zero();
IOutputStream* LogStream = nullptr;
TVector<TString> StoragePoolTypes;
@@ -125,6 +126,7 @@ public:
TKikimrSettings& SetQueryReplayBackendFactory(std::shared_ptr<NKqp::IQueryReplayBackendFactory> value) { QueryReplayBackendFactory = std::move(value); return *this; };
TKikimrSettings& SetUseRealThreads(bool value) { UseRealThreads = value; return *this; };
TKikimrSettings& SetEnableForceFollowers(bool value) { EnableForceFollowers = value; return *this; };
+ TKikimrSettings& SetNeedsStatsCollectors(bool value) { NeedsStatsCollectors = value; return *this; };
TKikimrSettings& SetS3ActorsFactory(std::shared_ptr<NYql::NDq::IS3ActorsFactory> value) { S3ActorsFactory = std::move(value); return *this; };
TKikimrSettings& SetControls(const NKikimrConfig::TImmediateControlsConfig& value) { Controls = value; return *this; }
TKikimrSettings& SetColumnShardReaderClassName(const TString& value) { AppConfig.MutableColumnShardConfig()->SetReaderClassName(value); return *this; }
diff --git a/ydb/core/kqp/ut/federated_query/common/common.cpp b/ydb/core/kqp/ut/federated_query/common/common.cpp
index 3e4d843084c..8848f9fd410 100644
--- a/ydb/core/kqp/ut/federated_query/common/common.cpp
+++ b/ydb/core/kqp/ut/federated_query/common/common.cpp
@@ -160,6 +160,7 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
.SetCheckpointPeriod(options.CheckpointPeriod)
.SetUseLocalCheckpointsInStreamingQueries(options.UseLocalCheckpointsInStreamingQueries)
.SetLogSettings(std::move(logSettings))
+ .SetNeedsStatsCollectors(options.NeedsStatsCollectors)
.SetInitFederatedQuerySetupFactory(options.InternalInitFederatedQuerySetupFactory);
settings.EnableScriptExecutionBackgroundChecks = options.EnableScriptExecutionBackgroundChecks;
diff --git a/ydb/core/kqp/ut/federated_query/common/common.h b/ydb/core/kqp/ut/federated_query/common/common.h
index 0d26959072f..6b093436c8f 100644
--- a/ydb/core/kqp/ut/federated_query/common/common.h
+++ b/ydb/core/kqp/ut/federated_query/common/common.h
@@ -30,6 +30,7 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
TTestLogSettings LogSettings;
bool UseLocalCheckpointsInStreamingQueries = false;
bool InternalInitFederatedQuerySetupFactory = false;
+ bool NeedsStatsCollectors = false;
TVector<TString> StoragePoolTypes;
};
diff --git a/ydb/core/kqp/ut/federated_query/datastreams/common.cpp b/ydb/core/kqp/ut/federated_query/datastreams/common.cpp
index 7608ee6f621..895ae0ceedf 100644
--- a/ydb/core/kqp/ut/federated_query/datastreams/common.cpp
+++ b/ydb/core/kqp/ut/federated_query/datastreams/common.cpp
@@ -1,5 +1,6 @@
#include "common.h"
+#include <ydb/core/base/counters.h>
#include <ydb/core/cms/console/console.h>
#include <ydb/core/kqp/common/kqp_script_executions.h>
#include <ydb/core/kqp/proxy_service/kqp_script_executions.h>
@@ -115,6 +116,7 @@ std::shared_ptr<TKikimrRunner> TStreamingTestFixture::GetKikimrRunner() {
.LogSettings = LogSettings,
.UseLocalCheckpointsInStreamingQueries = true,
.InternalInitFederatedQuerySetupFactory = InternalInitFederatedQuerySetupFactory,
+ .NeedsStatsCollectors = NeedsStatsCollectors,
.StoragePoolTypes = StoragePoolTypes,
});
@@ -195,6 +197,10 @@ void TStreamingTestFixture::KillTopicPqrbTablet(const std::string& topicPath) {
tabletClient->KillTablet(GetKikimrRunner()->GetTestServer(), persQueueGroup.GetBalancerTabletID());
}
+TIntrusivePtr<NMonitoring::TDynamicCounters> TStreamingTestFixture::GetCounters(const TString& svc, ui32 nodeIdx) {
+ return NKikimr::GetServiceCounters(GetRuntime().GetAppData(nodeIdx).Counters, svc);
+}
+
// External YDB recipe
std::shared_ptr<TDriver> TStreamingTestFixture::GetExternalDriver() {
diff --git a/ydb/core/kqp/ut/federated_query/datastreams/common.h b/ydb/core/kqp/ut/federated_query/datastreams/common.h
index 339bf36d589..c3d4c638a10 100644
--- a/ydb/core/kqp/ut/federated_query/datastreams/common.h
+++ b/ydb/core/kqp/ut/federated_query/datastreams/common.h
@@ -105,6 +105,8 @@ public:
void KillTopicPqrbTablet(const std::string& topicPath);
+ TIntrusivePtr<NMonitoring::TDynamicCounters> GetCounters(const TString& svc = "kqp", ui32 nodeIdx = 0);
+
// External YDB recipe
std::shared_ptr<NYdb::TDriver> GetExternalDriver();
@@ -220,6 +222,7 @@ protected:
TTestLogSettings LogSettings;
bool InternalInitFederatedQuerySetupFactory = false;
TVector<TString> StoragePoolTypes;
+ bool NeedsStatsCollectors = false;
NYdb::NQuery::TClientSettings QueryClientSettings = NYdb::NQuery::TClientSettings().AuthToken(BUILTIN_ACL_ROOT);
NYdb::NTopic::TTopicClientSettings TopicClientSettings = NYdb::NTopic::TTopicClientSettings().AuthToken(BUILTIN_ACL_ROOT);
diff --git a/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp
index c4c32632c1b..e0f7d42fc80 100644
--- a/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp
@@ -1058,6 +1058,7 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
// legal, but nothing to check
return;
}
+ NeedsStatsCollectors = true;
constexpr ui32 combinations = WithFeatureFlag && !WithFullscanFlag ? 2 : 1;
{
auto& setupAppConfig = SetupAppConfig();
@@ -1187,6 +1188,13 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
readSession->AddDataReceivedEvent(sampleMessages);
writeSession->ExpectMessages({"A-P4", "B-P6", "A-P4"});
+ auto actorsAlive = GetCounters("utils")->GetSubgroup("execpool", "User")->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", "NYql::NDq::(anonymous namespace)::TInputTransformStreamLookupBase");
+ WaitFor(TDuration::Seconds(10), "ActorsAlive", [&](TString& error) {
+ auto val = actorsAlive->Val();
+ error = TStringBuilder() << "InputTransform actors count is " << val << ", expected 1";
+ return val == 1;
+ });
+
CheckScriptExecutionsCount(1, 1);
const auto results = ExecQuery(
"SELECT ast_compressed FROM `.metadata/script_executions`;"
@@ -1198,6 +1206,21 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
UNIT_ASSERT_STRING_CONTAINS(*ast, "DqCnStreamLookup");
});
+ ExecQuery(fmt::format(R"(
+ ALTER STREAMING QUERY `{query_name}` SET (
+ RUN = FALSE
+ );)",
+ "query_name"_a = queryName
+ ));
+
+ CheckScriptExecutionsCount(1, 0);
+
+ WaitFor(TDuration::Seconds(10), "ActorsAlive", [&](TString& error) {
+ auto val = actorsAlive->Val();
+ error = TStringBuilder() << "InputTransform actors count is " << val << ", expected 0";
+ return val == 0;
+ });
+
if (!WithFullscanFlag) {
// extra check that fullscan option without fullscan feature-flag fails
ExecQuery(fmt::format(R"(
@@ -1231,7 +1254,7 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
EStatus::GENERIC_ERROR,
"EnableDqSourceStreamLookupJoinFullscan disabled, but FullscanLimit is 123");
- CheckScriptExecutionsCount(2, 1);
+ CheckScriptExecutionsCount(2, 0);
}
}