aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-07-19 17:25:35 +0300
committerilnaz <ilnaz@ydb.tech>2022-07-19 17:25:35 +0300
commitedd9f3e1beabc380ef8c4f6d8c4b7ab2d6424241 (patch)
treeb05936e3911fbf81b51662d2734666f289c35654
parentabde364b0acbae7ee6d607413f567ee0a4876a34 (diff)
downloadydb-edd9f3e1beabc380ef8c4f6d8c4b7ab2d6424241.tar.gz
Get rid of 'streamImpl' in responses
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp8
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.cpp13
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.h6
-rw-r--r--ydb/services/lib/actors/type_definitions.h1
-rw-r--r--ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp7
5 files changed, 29 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 2a8864352da..00fe8f76d40 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -857,19 +857,27 @@ Y_UNIT_TEST_SUITE(Cdc) {
auto ev = reader->GetEvent(true);
UNIT_ASSERT(ev);
+ TPartitionStream::TPtr pStream;
if (auto* data = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*ev)) {
+ pStream = data->GetPartitionStream();
for (const auto& item : data->GetMessages()) {
const auto& record = records.at(reads++);
UNIT_ASSERT_VALUES_EQUAL(record, item.GetData());
UNIT_ASSERT_VALUES_EQUAL(CalcPartitionKey(record), item.GetPartitionKey());
}
} else if (auto* create = std::get_if<TReadSessionEvent::TCreatePartitionStreamEvent>(&*ev)) {
+ pStream = create->GetPartitionStream();
create->Confirm();
} else if (auto* destroy = std::get_if<TReadSessionEvent::TDestroyPartitionStreamEvent>(&*ev)) {
+ pStream = destroy->GetPartitionStream();
destroy->Confirm();
} else if (std::get_if<TSessionClosedEvent>(&*ev)) {
break;
}
+
+ if (pStream) {
+ UNIT_ASSERT_VALUES_EQUAL(pStream->GetTopicPath(), "/Root/Table/Stream");
+ }
}
// remove consumer
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.cpp b/ydb/library/persqueue/topic_parser/topic_parser.cpp
index fe7ff48264e..7ea4d0101d3 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.cpp
+++ b/ydb/library/persqueue/topic_parser/topic_parser.cpp
@@ -245,10 +245,13 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr
}
TTopicConverterPtr TDiscoveryConverter::UpgradeToFullConverter(
- const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const TString& ydbDatabaseRootOverride
+ const NKikimrPQ::TPQTabletConfig& pqTabletConfig,
+ const TString& ydbDatabaseRootOverride,
+ const TMaybe<TString>& clientsideNameOverride
) {
Y_VERIFY_S(Valid, Reason.c_str());
- auto* res = new TTopicNameConverter(FstClass, PQPrefix, pqTabletConfig, ydbDatabaseRootOverride);
+ auto* res = new TTopicNameConverter(FstClass, PQPrefix, pqTabletConfig,
+ ydbDatabaseRootOverride, clientsideNameOverride);
return TTopicConverterPtr(res);
}
@@ -673,12 +676,16 @@ TTopicConverterPtr TTopicNameConverter::ForFederation(
TTopicNameConverter::TTopicNameConverter(
bool firstClass, const TString& pqPrefix,
const NKikimrPQ::TPQTabletConfig& pqTabletConfig,
- const TString& ydbDatabaseRootOverride
+ const TString& ydbDatabaseRootOverride,
+ const TMaybe<TString>& clientsideNameOverride
)
: TDiscoveryConverter(firstClass, pqPrefix, pqTabletConfig, ydbDatabaseRootOverride)
{
if (Valid) {
BuildInternals(pqTabletConfig);
+ if (clientsideNameOverride) {
+ ClientsideName = *clientsideNameOverride;
+ }
}
}
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h
index 559bf4f110b..04099477f7b 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.h
+++ b/ydb/library/persqueue/topic_parser/topic_parser.h
@@ -79,7 +79,8 @@ public:
const TString& GetReason() const;
const TString& GetOriginalTopic() const;
TTopicConverterPtr UpgradeToFullConverter(const NKikimrPQ::TPQTabletConfig& pqTabletConfig,
- const TString& ydbDatabaseRootOverride);
+ const TString& ydbDatabaseRootOverride,
+ const TMaybe<TString>& clientsideNameOverride = {});
TString GetPrintableString() const;
@@ -172,7 +173,8 @@ protected:
const TString& pqPrefix,
//const TVector<TString>& rootDatabases,
const NKikimrPQ::TPQTabletConfig& pqTabletConfig,
- const TString& ydbDatabaseRootOverride);
+ const TString& ydbDatabaseRootOverride,
+ const TMaybe<TString>& clientsideNameOverride = {});
public:
static TTopicConverterPtr ForFirstClass(const NKikimrPQ::TPQTabletConfig& pqTabletConfig);
diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h
index e13e5ff9d82..a942b32f8d5 100644
--- a/ydb/services/lib/actors/type_definitions.h
+++ b/ydb/services/lib/actors/type_definitions.h
@@ -16,6 +16,7 @@ namespace NKikimr::NGRpcProxy {
TString FolderId;
NPersQueue::TDiscoveryConverterPtr DiscoveryConverter;
NPersQueue::TTopicConverterPtr FullConverter;
+ TMaybe<TString> CdcStreamPath;
TVector<ui32> Groups;
TMap<ui64, ui64> Partitions;
diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
index 79bed090316..7450d0590f9 100644
--- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
@@ -100,7 +100,9 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse(
return false;
}
topicsIter->second.FullConverter = topicsIter->second.DiscoveryConverter->UpgradeToFullConverter(
- pqDescr.GetPQTabletConfig(), AppData(ctx)->PQConfig.GetTestDatabaseRoot()
+ pqDescr.GetPQTabletConfig(),
+ AppData(ctx)->PQConfig.GetTestDatabaseRoot(),
+ topicsIter->second.CdcStreamPath
);
Y_VERIFY(topicsIter->second.FullConverter->IsValid());
return CheckTopicACL(entry, topicsIter->first, ctx);
@@ -122,7 +124,10 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon
Y_VERIFY(entry.ListNodeEntry->Children.size() == 1);
const auto& topic = entry.ListNodeEntry->Children.at(0);
+ // primary path used to re-describe
it->second.DiscoveryConverter->SetPrimaryPath(JoinPath(ChildPath(entry.Path, topic.Name)));
+ it->second.CdcStreamPath = CanonizePath(entry.Path);
+
// Topics[it->second.DiscoveryConverter->GetInternalName()] = it->second;
// Topics.erase(it);