aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2023-10-16 17:34:42 +0300
committerkomels <komels@ydb.tech>2023-10-16 18:00:45 +0300
commitee254664ac9703699434b1a36351cebbcac2f9b0 (patch)
treef13465437213405423b1bcc9bb6b433bd053e212
parentf4951205ef53c5cfceaf24eeed20c85991df56eb (diff)
downloadydb-ee254664ac9703699434b1a36351cebbcac2f9b0.tar.gz
BugFix PQv0 partitions mapping
-rw-r--r--ydb/core/persqueue/writer/source_id_encoding.cpp19
-rw-r--r--ydb/core/persqueue/writer/source_id_encoding.h8
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp4
3 files changed, 16 insertions, 15 deletions
diff --git a/ydb/core/persqueue/writer/source_id_encoding.cpp b/ydb/core/persqueue/writer/source_id_encoding.cpp
index 6adab598a43..8e962697bca 100644
--- a/ydb/core/persqueue/writer/source_id_encoding.cpp
+++ b/ydb/core/persqueue/writer/source_id_encoding.cpp
@@ -14,7 +14,7 @@
namespace NKikimr::NPQ {
-TString GetSourceIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration generation) {
+TString GetSelectSourceIdQueryFromPath(const TString& path, ESourceIdTableGeneration generation) {
switch (generation) {
case ESourceIdTableGeneration::SrcIdMeta2:
return TStringBuilder() << "--!syntax_v1\n"
@@ -36,12 +36,12 @@ TString GetSourceIdSelectQueryFromPath(const TString& path, ESourceIdTableGenera
}
}
-TString GetSourceIdSelectQuery(const TString& root, ESourceIdTableGeneration generation) {
+TString GetSelectSourceIdQuery(const TString& root, ESourceIdTableGeneration generation) {
switch (generation) {
case ESourceIdTableGeneration::SrcIdMeta2:
- return GetSourceIdSelectQueryFromPath(root + "/SourceIdMeta2", generation);
+ return GetSelectSourceIdQueryFromPath(root + "/SourceIdMeta2", generation);
case ESourceIdTableGeneration::PartitionMapping:
- return GetUpdateIdSelectQueryFromPath(
+ return GetSelectSourceIdQueryFromPath(
NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath(),
generation
);
@@ -50,7 +50,7 @@ TString GetSourceIdSelectQuery(const TString& root, ESourceIdTableGeneration gen
}
}
-TString GetUpdateIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration generation) {
+TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGeneration generation) {
switch (generation) {
case ESourceIdTableGeneration::SrcIdMeta2:
return TStringBuilder() << "--!syntax_v1\n"
@@ -78,13 +78,14 @@ TString GetUpdateIdSelectQueryFromPath(const TString& path, ESourceIdTableGenera
}
}
-TString GetUpdateIdSelectQuery(const TString& root, ESourceIdTableGeneration generation) {
+TString GetUpdateSourceIdQuery(const TString& root, ESourceIdTableGeneration generation) {
switch (generation) {
case ESourceIdTableGeneration::SrcIdMeta2:
- return GetUpdateIdSelectQueryFromPath(root + "/SourceIdMeta2");
+ return GetUpdateSourceIdQueryFromPath(root + "/SourceIdMeta2", generation);
case ESourceIdTableGeneration::PartitionMapping:
- return GetUpdateIdSelectQueryFromPath(
- NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath()
+ return GetUpdateSourceIdQueryFromPath(
+ NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath(),
+ generation
);
default:
Y_FAIL();
diff --git a/ydb/core/persqueue/writer/source_id_encoding.h b/ydb/core/persqueue/writer/source_id_encoding.h
index 1aa61114d67..99cdef19f65 100644
--- a/ydb/core/persqueue/writer/source_id_encoding.h
+++ b/ydb/core/persqueue/writer/source_id_encoding.h
@@ -13,11 +13,11 @@ enum class ESourceIdTableGeneration {
PartitionMapping
};
-TString GetSourceIdSelectQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
-TString GetUpdateIdSelectQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
+TString GetSelectSourceIdQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
+TString GetUpdateSourceIdQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
-TString GetSourceIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
-TString GetUpdateIdSelectQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
+TString GetSelectSourceIdQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
+TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
namespace NSourceIdEncoding {
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 89b2291f5d1..ceac3dd80a8 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -214,8 +214,8 @@ void TWriteSessionActor<UseMigrationProtocol>::Bootstrap(const TActorContext& ct
const auto& pqConfig = AppData(ctx)->PQConfig;
SrcIdTableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping
: ESourceIdTableGeneration::SrcIdMeta2;
- SelectSourceIdQuery = GetSourceIdSelectQueryFromPath(pqConfig.GetSourceIdTablePath(),SrcIdTableGeneration);
- UpdateSourceIdQuery = GetUpdateIdSelectQueryFromPath(pqConfig.GetSourceIdTablePath(), SrcIdTableGeneration);
+ SelectSourceIdQuery = GetSelectSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(),SrcIdTableGeneration);
+ UpdateSourceIdQuery = GetUpdateSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), SrcIdTableGeneration);
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "Select srcid query: " << SelectSourceIdQuery);
Request->GetStreamCtx()->Attach(ctx.SelfID);