aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2023-01-10 22:30:01 +0300
committerkomels <komels@ydb.tech>2023-01-10 22:30:01 +0300
commit642cc455679331d934a4c09d933cb475f7c3277e (patch)
tree989ebc9e0136dba71ef1568c5d5cdee528fe5d7c
parent68b2c4085c9b130da7ed09e32adca681508303b7 (diff)
downloadydb-642cc455679331d934a4c09d933cb475f7c3277e.tar.gz
Set proper database for pq table requests
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h3
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp31
2 files changed, 22 insertions, 12 deletions
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index 1c08ee1853c..c2ef61c8f3d 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -160,6 +160,7 @@ private:
ui32 CalculateFirstClassPartition(const TActorContext& ctx);
void SendCreateManagerRequest(const TActorContext& ctx);
void DiscoverPartition(const NActors::TActorContext& ctx);
+ TString GetDatabaseName(const NActors::TActorContext& ctx);
void StartSession(const NActors::TActorContext& ctx);
void SendSelectPartitionRequest(const TString& topic, const NActors::TActorContext& ctx);
@@ -307,6 +308,8 @@ private:
NKikimr::NPQ::TMultiCounter SLIBigLatency;
TInitRequest InitRequest;
+ NPQ::ESourceIdTableGeneration SrcIdTableGeneration;
+
};
}
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index de0f4187812..6887857823c 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -210,10 +210,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Bootstrap(const TActorContext& ct
Y_VERIFY(Request);
//ToDo !! - Set proper table paths.
const auto& pqConfig = AppData(ctx)->PQConfig;
- ESourceIdTableGeneration gen = pqConfig.GetTopicsAreFirstClassCitizen() ?
- ESourceIdTableGeneration::PartitionMapping : ESourceIdTableGeneration::SrcIdMeta2;
- SelectSourceIdQuery = GetSourceIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath(), gen);
- UpdateSourceIdQuery = GetUpdateIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath(), gen);
+ SrcIdTableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping
+ : ESourceIdTableGeneration::SrcIdMeta2;
+ SelectSourceIdQuery = GetSourceIdSelectQueryFromPath(pqConfig.GetSourceIdTablePath(),SrcIdTableGeneration);
+ UpdateSourceIdQuery = GetUpdateIdSelectQueryFromPath(pqConfig.GetSourceIdTablePath(), SrcIdTableGeneration);
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "Select srcid query: " << SelectSourceIdQuery);
Request->GetStreamCtx()->Attach(ctx.SelfID);
@@ -473,12 +473,8 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorContext& ctx) {
- ESourceIdTableGeneration gen = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ?
- ESourceIdTableGeneration::PartitionMapping
- :
- ESourceIdTableGeneration::SrcIdMeta2;
try {
- EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId, gen);
+ EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId, SrcIdTableGeneration);
} catch (yexception& e) {
CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
@@ -659,10 +655,20 @@ void TWriteSessionActor<UseMigrationProtocol>::DiscoverPartition(const NActors::
}
template<bool UseMigrationProtocol>
+TString TWriteSessionActor<UseMigrationProtocol>::GetDatabaseName(const NActors::TActorContext& ctx) {
+ switch (SrcIdTableGeneration) {
+ case ESourceIdTableGeneration::SrcIdMeta2:
+ return NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig);
+ case ESourceIdTableGeneration::PartitionMapping:
+ return AppData(ctx)->TenantName;
+ }
+}
+
+template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::StartSession(const NActors::TActorContext& ctx) {
auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
- ev->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));
+ ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
State = ES_WAIT_SESSION;
@@ -718,7 +724,8 @@ void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(const
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
ev->Record.MutableRequest()->SetQuery(SelectSourceIdQuery);
- ev->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));
+
+ ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
// fill tx settings: set commit tx flag & begin new serializable tx.
ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false);
@@ -875,7 +882,7 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor<UseMigrationProtocol>:
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
ev->Record.MutableRequest()->SetQuery(UpdateSourceIdQuery);
- ev->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));
+ ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
// fill tx settings: set commit tx flag & begin new serializable tx.
ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
if (KqpSessionId) {