diff options
author | FloatingCrowbar <komels@ydb.tech> | 2024-10-30 13:23:19 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-30 13:23:19 +0300 |
commit | 39435d7b60108b2cc00dd20f1dcdcdb1b7b2f08c (patch) | |
tree | 3203c5659d5d7fa95076e62d9662b96620056278 | |
parent | 03e5dfd899db12a8df49b93e492962f48c70ccfb (diff) | |
download | ydb-39435d7b60108b2cc00dd20f1dcdcdb1b7b2f08c.tar.gz |
Fix issue #9461 with altering CDC streams (#11077)
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 64 | ||||
-rw-r--r-- | ydb/services/lib/actors/pq_schema_actor.h | 10 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/schema_actors.cpp | 15 |
3 files changed, 85 insertions, 4 deletions
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 191ee25c2c..b7b8cce619 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3127,7 +3127,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } - + { auto result = client.ExecuteQuery(R"( SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col3 = 1; @@ -4737,6 +4737,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CheckDirEntry(kikimr, entriesToCheck); } } + Y_UNIT_TEST(CreateOrDropTopicOverTable) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); @@ -4808,6 +4809,65 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } + Y_UNIT_TEST(AlterCdcTopic) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr{serverSettings}; + auto tableClient = kikimr.GetTableClient(); + + { + auto tcSession = tableClient.CreateSession().GetValueSync().GetSession(); + UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/TmpTable` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )").GetValueSync().IsSuccess()); + + UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"( + ALTER TABLE `/Root/TmpTable` ADD CHANGEFEED `feed` WITH ( + MODE = 'KEYS_ONLY', FORMAT = 'JSON' + ); + )").GetValueSync().IsSuccess()); + tcSession.Close(); + } + + auto pq = NYdb::NTopic::TTopicClient(kikimr.GetDriver(), + NYdb::NTopic::TTopicClientSettings().Database("/Root").AuthToken("root@builtin")); + + auto client = kikimr.GetQueryClient(NYdb::NQuery::TClientSettings{}.AuthToken("root@builtin")); + auto session = client.GetSession().GetValueSync().GetSession(); + { + + const auto query = Q_(R"( + --!syntax_v1 + ALTER TOPIC `/Root/TmpTable/feed` ADD CONSUMER consumer21; + )"); + + RunQuery(query, session); + auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync(); + const auto& consumers = desc.GetTopicDescription().GetConsumers(); + UNIT_ASSERT_VALUES_EQUAL(consumers.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(consumers[0].GetConsumerName(), "consumer21"); + + } + { + const auto query = Q_(R"( + --!syntax_v1 + ALTER TOPIC `/Root/TmpTable/feed` SET (min_active_partitions = 10); + )"); + RunQuery(query, session, false); + auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 1); + } + + } + Y_UNIT_TEST(TableSink_OlapRWQueries) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); @@ -4914,7 +4974,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto it = client.StreamExecuteQuery(R"sql( SELECT r.Col3 FROM `/Root/DataShard` AS r - JOIN `/Root/ColumnShard` AS c + JOIN `/Root/ColumnShard` AS c ON r.Col1 = c.Col1; )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index 37795c0cf6..01f7106a11 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -548,6 +548,10 @@ namespace NKikimr::NGRpcProxy::V1 { return path; } + const TMaybe<TString>& GetCdcStreamName() const { + return CdcStreamName; + } + void SendDescribeProposeRequest(bool showPrivate = false) { return TBase::SendDescribeProposeRequest(this->ActorContext(), showPrivate); } @@ -603,6 +607,10 @@ namespace NKikimr::NGRpcProxy::V1 { if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) { Y_ABORT_UNLESS(response.ListNodeEntry->Children.size() == 1); PrivateTopicName = response.ListNodeEntry->Children.at(0).Name; + + if (response.Self) { + CdcStreamName = response.Self->Info.GetName(); + } SendDescribeProposeRequest(true); return true; } @@ -620,6 +628,8 @@ namespace NKikimr::NGRpcProxy::V1 { TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo; TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TDirEntryInfo> Self; TMaybe<TString> PrivateTopicName; + TMaybe<TString> CdcStreamName; + }; } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index a7a350f79e..f1bbf16449 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1587,7 +1587,18 @@ void TAlterTopicActorInternal::HandleCacheNavigateResponse(TEvTxProxySchemeCache } TUpdateSchemeBase::HandleCacheNavigateResponse(ev); auto& schemeTx = Response->Response.ModifyScheme; - FillModifyScheme(schemeTx, ActorContext(), GetRequest().WorkingDir, GetRequest().Name); + std::pair <TString, TString> pathPair; + try { + pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath()); + } catch (const std::exception &ex) { + Response->Response.Issues.AddIssue(NYql::ExceptionToIssue(ex)); + RespondWithCode(Ydb::StatusIds::BAD_REQUEST); + return; + } + + const auto& workingDir = pathPair.first; + const auto& name = pathPair.second; + FillModifyScheme(schemeTx, ActorContext(), workingDir, name); } void TAlterTopicActorInternal::ModifyPersqueueConfig( @@ -1601,7 +1612,7 @@ void TAlterTopicActorInternal::ModifyPersqueueConfig( TString error; Y_UNUSED(selfInfo); - auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, false); + auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, GetCdcStreamName().Defined()); if (!error.empty()) { Response->Response.Issues.AddIssue(error); } |