diff options
author | komels <komels@ydb.tech> | 2023-01-10 22:30:01 +0300 |
---|---|---|
committer | komels <komels@ydb.tech> | 2023-01-10 22:30:01 +0300 |
commit | 642cc455679331d934a4c09d933cb475f7c3277e (patch) | |
tree | 989ebc9e0136dba71ef1568c5d5cdee528fe5d7c | |
parent | 68b2c4085c9b130da7ed09e32adca681508303b7 (diff) | |
download | ydb-642cc455679331d934a4c09d933cb475f7c3277e.tar.gz |
Set proper database for pq table requests
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.h | 3 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.ipp | 31 |
2 files changed, 22 insertions, 12 deletions
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 1c08ee1853c..c2ef61c8f3d 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -160,6 +160,7 @@ private: ui32 CalculateFirstClassPartition(const TActorContext& ctx); void SendCreateManagerRequest(const TActorContext& ctx); void DiscoverPartition(const NActors::TActorContext& ctx); + TString GetDatabaseName(const NActors::TActorContext& ctx); void StartSession(const NActors::TActorContext& ctx); void SendSelectPartitionRequest(const TString& topic, const NActors::TActorContext& ctx); @@ -307,6 +308,8 @@ private: NKikimr::NPQ::TMultiCounter SLIBigLatency; TInitRequest InitRequest; + NPQ::ESourceIdTableGeneration SrcIdTableGeneration; + }; } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index de0f4187812..6887857823c 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -210,10 +210,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Bootstrap(const TActorContext& ct Y_VERIFY(Request); //ToDo !! - Set proper table paths. const auto& pqConfig = AppData(ctx)->PQConfig; - ESourceIdTableGeneration gen = pqConfig.GetTopicsAreFirstClassCitizen() ? - ESourceIdTableGeneration::PartitionMapping : ESourceIdTableGeneration::SrcIdMeta2; - SelectSourceIdQuery = GetSourceIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath(), gen); - UpdateSourceIdQuery = GetUpdateIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath(), gen); + SrcIdTableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping + : ESourceIdTableGeneration::SrcIdMeta2; + SelectSourceIdQuery = GetSourceIdSelectQueryFromPath(pqConfig.GetSourceIdTablePath(),SrcIdTableGeneration); + UpdateSourceIdQuery = GetUpdateIdSelectQueryFromPath(pqConfig.GetSourceIdTablePath(), SrcIdTableGeneration); LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "Select srcid query: " << SelectSourceIdQuery); Request->GetStreamCtx()->Attach(ctx.SelfID); @@ -473,12 +473,8 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorContext& ctx) { - ESourceIdTableGeneration gen = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? - ESourceIdTableGeneration::PartitionMapping - : - ESourceIdTableGeneration::SrcIdMeta2; try { - EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId, gen); + EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId, SrcIdTableGeneration); } catch (yexception& e) { CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), PersQueue::ErrorCode::BAD_REQUEST, ctx); return; @@ -659,10 +655,20 @@ void TWriteSessionActor<UseMigrationProtocol>::DiscoverPartition(const NActors:: } template<bool UseMigrationProtocol> +TString TWriteSessionActor<UseMigrationProtocol>::GetDatabaseName(const NActors::TActorContext& ctx) { + switch (SrcIdTableGeneration) { + case ESourceIdTableGeneration::SrcIdMeta2: + return NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig); + case ESourceIdTableGeneration::PartitionMapping: + return AppData(ctx)->TenantName; + } +} + +template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::StartSession(const NActors::TActorContext& ctx) { auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); - ev->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig)); + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); State = ES_WAIT_SESSION; @@ -718,7 +724,8 @@ void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(const ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); ev->Record.MutableRequest()->SetQuery(SelectSourceIdQuery); - ev->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig)); + + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); // fill tx settings: set commit tx flag & begin new serializable tx. ev->Record.MutableRequest()->SetSessionId(KqpSessionId); ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false); @@ -875,7 +882,7 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor<UseMigrationProtocol>: ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); ev->Record.MutableRequest()->SetQuery(UpdateSourceIdQuery); - ev->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig)); + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); // fill tx settings: set commit tx flag & begin new serializable tx. ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); if (KqpSessionId) { |