diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-07-19 17:25:35 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-07-19 17:25:35 +0300 |
commit | edd9f3e1beabc380ef8c4f6d8c4b7ab2d6424241 (patch) | |
tree | b05936e3911fbf81b51662d2734666f289c35654 | |
parent | abde364b0acbae7ee6d607413f567ee0a4876a34 (diff) | |
download | ydb-edd9f3e1beabc380ef8c4f6d8c4b7ab2d6424241.tar.gz |
Get rid of 'streamImpl' in responses
5 files changed, 29 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 2a8864352da..00fe8f76d40 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -857,19 +857,27 @@ Y_UNIT_TEST_SUITE(Cdc) { auto ev = reader->GetEvent(true); UNIT_ASSERT(ev); + TPartitionStream::TPtr pStream; if (auto* data = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*ev)) { + pStream = data->GetPartitionStream(); for (const auto& item : data->GetMessages()) { const auto& record = records.at(reads++); UNIT_ASSERT_VALUES_EQUAL(record, item.GetData()); UNIT_ASSERT_VALUES_EQUAL(CalcPartitionKey(record), item.GetPartitionKey()); } } else if (auto* create = std::get_if<TReadSessionEvent::TCreatePartitionStreamEvent>(&*ev)) { + pStream = create->GetPartitionStream(); create->Confirm(); } else if (auto* destroy = std::get_if<TReadSessionEvent::TDestroyPartitionStreamEvent>(&*ev)) { + pStream = destroy->GetPartitionStream(); destroy->Confirm(); } else if (std::get_if<TSessionClosedEvent>(&*ev)) { break; } + + if (pStream) { + UNIT_ASSERT_VALUES_EQUAL(pStream->GetTopicPath(), "/Root/Table/Stream"); + } } // remove consumer diff --git a/ydb/library/persqueue/topic_parser/topic_parser.cpp b/ydb/library/persqueue/topic_parser/topic_parser.cpp index fe7ff48264e..7ea4d0101d3 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.cpp +++ b/ydb/library/persqueue/topic_parser/topic_parser.cpp @@ -245,10 +245,13 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr } TTopicConverterPtr TDiscoveryConverter::UpgradeToFullConverter( - const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const TString& ydbDatabaseRootOverride + const NKikimrPQ::TPQTabletConfig& pqTabletConfig, + const TString& ydbDatabaseRootOverride, + const TMaybe<TString>& clientsideNameOverride ) { Y_VERIFY_S(Valid, Reason.c_str()); - auto* res = new TTopicNameConverter(FstClass, PQPrefix, pqTabletConfig, ydbDatabaseRootOverride); + auto* res = new TTopicNameConverter(FstClass, PQPrefix, pqTabletConfig, + ydbDatabaseRootOverride, clientsideNameOverride); return TTopicConverterPtr(res); } @@ -673,12 +676,16 @@ TTopicConverterPtr TTopicNameConverter::ForFederation( TTopicNameConverter::TTopicNameConverter( bool firstClass, const TString& pqPrefix, const NKikimrPQ::TPQTabletConfig& pqTabletConfig, - const TString& ydbDatabaseRootOverride + const TString& ydbDatabaseRootOverride, + const TMaybe<TString>& clientsideNameOverride ) : TDiscoveryConverter(firstClass, pqPrefix, pqTabletConfig, ydbDatabaseRootOverride) { if (Valid) { BuildInternals(pqTabletConfig); + if (clientsideNameOverride) { + ClientsideName = *clientsideNameOverride; + } } } diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h index 559bf4f110b..04099477f7b 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.h +++ b/ydb/library/persqueue/topic_parser/topic_parser.h @@ -79,7 +79,8 @@ public: const TString& GetReason() const; const TString& GetOriginalTopic() const; TTopicConverterPtr UpgradeToFullConverter(const NKikimrPQ::TPQTabletConfig& pqTabletConfig, - const TString& ydbDatabaseRootOverride); + const TString& ydbDatabaseRootOverride, + const TMaybe<TString>& clientsideNameOverride = {}); TString GetPrintableString() const; @@ -172,7 +173,8 @@ protected: const TString& pqPrefix, //const TVector<TString>& rootDatabases, const NKikimrPQ::TPQTabletConfig& pqTabletConfig, - const TString& ydbDatabaseRootOverride); + const TString& ydbDatabaseRootOverride, + const TMaybe<TString>& clientsideNameOverride = {}); public: static TTopicConverterPtr ForFirstClass(const NKikimrPQ::TPQTabletConfig& pqTabletConfig); diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h index e13e5ff9d82..a942b32f8d5 100644 --- a/ydb/services/lib/actors/type_definitions.h +++ b/ydb/services/lib/actors/type_definitions.h @@ -16,6 +16,7 @@ namespace NKikimr::NGRpcProxy { TString FolderId; NPersQueue::TDiscoveryConverterPtr DiscoveryConverter; NPersQueue::TTopicConverterPtr FullConverter; + TMaybe<TString> CdcStreamPath; TVector<ui32> Groups; TMap<ui64, ui64> Partitions; diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index 79bed090316..7450d0590f9 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -100,7 +100,9 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse( return false; } topicsIter->second.FullConverter = topicsIter->second.DiscoveryConverter->UpgradeToFullConverter( - pqDescr.GetPQTabletConfig(), AppData(ctx)->PQConfig.GetTestDatabaseRoot() + pqDescr.GetPQTabletConfig(), + AppData(ctx)->PQConfig.GetTestDatabaseRoot(), + topicsIter->second.CdcStreamPath ); Y_VERIFY(topicsIter->second.FullConverter->IsValid()); return CheckTopicACL(entry, topicsIter->first, ctx); @@ -122,7 +124,10 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon Y_VERIFY(entry.ListNodeEntry->Children.size() == 1); const auto& topic = entry.ListNodeEntry->Children.at(0); + // primary path used to re-describe it->second.DiscoveryConverter->SetPrimaryPath(JoinPath(ChildPath(entry.Path, topic.Name))); + it->second.CdcStreamPath = CanonizePath(entry.Path); + // Topics[it->second.DiscoveryConverter->GetInternalName()] = it->second; // Topics.erase(it); |