aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFloatingCrowbar <komels@ydb.tech>2024-10-30 13:23:19 +0300
committerGitHub <noreply@github.com>2024-10-30 13:23:19 +0300
commit39435d7b60108b2cc00dd20f1dcdcdb1b7b2f08c (patch)
tree3203c5659d5d7fa95076e62d9662b96620056278
parent03e5dfd899db12a8df49b93e492962f48c70ccfb (diff)
downloadydb-39435d7b60108b2cc00dd20f1dcdcdb1b7b2f08c.tar.gz
Fix issue #9461 with altering CDC streams (#11077)
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp64
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.h10
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp15
3 files changed, 85 insertions, 4 deletions
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index 191ee25c2c..b7b8cce619 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -3127,7 +3127,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
-
+
{
auto result = client.ExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col3 = 1;
@@ -4737,6 +4737,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CheckDirEntry(kikimr, entriesToCheck);
}
}
+
Y_UNIT_TEST(CreateOrDropTopicOverTable) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
@@ -4808,6 +4809,65 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}
+ Y_UNIT_TEST(AlterCdcTopic) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting});
+ TKikimrRunner kikimr{serverSettings};
+ auto tableClient = kikimr.GetTableClient();
+
+ {
+ auto tcSession = tableClient.CreateSession().GetValueSync().GetSession();
+ UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"(
+ CREATE TABLE `/Root/TmpTable` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )").GetValueSync().IsSuccess());
+
+ UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"(
+ ALTER TABLE `/Root/TmpTable` ADD CHANGEFEED `feed` WITH (
+ MODE = 'KEYS_ONLY', FORMAT = 'JSON'
+ );
+ )").GetValueSync().IsSuccess());
+ tcSession.Close();
+ }
+
+ auto pq = NYdb::NTopic::TTopicClient(kikimr.GetDriver(),
+ NYdb::NTopic::TTopicClientSettings().Database("/Root").AuthToken("root@builtin"));
+
+ auto client = kikimr.GetQueryClient(NYdb::NQuery::TClientSettings{}.AuthToken("root@builtin"));
+ auto session = client.GetSession().GetValueSync().GetSession();
+ {
+
+ const auto query = Q_(R"(
+ --!syntax_v1
+ ALTER TOPIC `/Root/TmpTable/feed` ADD CONSUMER consumer21;
+ )");
+
+ RunQuery(query, session);
+ auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync();
+ const auto& consumers = desc.GetTopicDescription().GetConsumers();
+ UNIT_ASSERT_VALUES_EQUAL(consumers.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(consumers[0].GetConsumerName(), "consumer21");
+
+ }
+ {
+ const auto query = Q_(R"(
+ --!syntax_v1
+ ALTER TOPIC `/Root/TmpTable/feed` SET (min_active_partitions = 10);
+ )");
+ RunQuery(query, session, false);
+ auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 1);
+ }
+
+ }
+
Y_UNIT_TEST(TableSink_OlapRWQueries) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
@@ -4914,7 +4974,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto it = client.StreamExecuteQuery(R"sql(
SELECT r.Col3
FROM `/Root/DataShard` AS r
- JOIN `/Root/ColumnShard` AS c
+ JOIN `/Root/ColumnShard` AS c
ON r.Col1 = c.Col1;
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h
index 37795c0cf6..01f7106a11 100644
--- a/ydb/services/lib/actors/pq_schema_actor.h
+++ b/ydb/services/lib/actors/pq_schema_actor.h
@@ -548,6 +548,10 @@ namespace NKikimr::NGRpcProxy::V1 {
return path;
}
+ const TMaybe<TString>& GetCdcStreamName() const {
+ return CdcStreamName;
+ }
+
void SendDescribeProposeRequest(bool showPrivate = false) {
return TBase::SendDescribeProposeRequest(this->ActorContext(), showPrivate);
}
@@ -603,6 +607,10 @@ namespace NKikimr::NGRpcProxy::V1 {
if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) {
Y_ABORT_UNLESS(response.ListNodeEntry->Children.size() == 1);
PrivateTopicName = response.ListNodeEntry->Children.at(0).Name;
+
+ if (response.Self) {
+ CdcStreamName = response.Self->Info.GetName();
+ }
SendDescribeProposeRequest(true);
return true;
}
@@ -620,6 +628,8 @@ namespace NKikimr::NGRpcProxy::V1 {
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TDirEntryInfo> Self;
TMaybe<TString> PrivateTopicName;
+ TMaybe<TString> CdcStreamName;
+
};
}
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index a7a350f79e..f1bbf16449 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -1587,7 +1587,18 @@ void TAlterTopicActorInternal::HandleCacheNavigateResponse(TEvTxProxySchemeCache
}
TUpdateSchemeBase::HandleCacheNavigateResponse(ev);
auto& schemeTx = Response->Response.ModifyScheme;
- FillModifyScheme(schemeTx, ActorContext(), GetRequest().WorkingDir, GetRequest().Name);
+ std::pair <TString, TString> pathPair;
+ try {
+ pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath());
+ } catch (const std::exception &ex) {
+ Response->Response.Issues.AddIssue(NYql::ExceptionToIssue(ex));
+ RespondWithCode(Ydb::StatusIds::BAD_REQUEST);
+ return;
+ }
+
+ const auto& workingDir = pathPair.first;
+ const auto& name = pathPair.second;
+ FillModifyScheme(schemeTx, ActorContext(), workingDir, name);
}
void TAlterTopicActorInternal::ModifyPersqueueConfig(
@@ -1601,7 +1612,7 @@ void TAlterTopicActorInternal::ModifyPersqueueConfig(
TString error;
Y_UNUSED(selfInfo);
- auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, false);
+ auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, GetCdcStreamName().Defined());
if (!error.empty()) {
Response->Response.Issues.AddIssue(error);
}