diff options
author | AlexSm <alex@ydb.tech> | 2024-01-23 17:40:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-23 17:40:54 +0100 |
commit | 4295d15342518cd2f871859c10116bac7f0f1695 (patch) | |
tree | 4ffdf2023528f4ee3e9697034b4fbd623cf438b6 | |
parent | 9692b5b65c92c840426b78e5c350427e12e5e6d9 (diff) | |
download | ydb-4295d15342518cd2f871859c10116bac7f0f1695.tar.gz |
Revert "Choose partition for topic split/merge" (#1243)
34 files changed, 760 insertions, 2130 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index c6dbd10747..428f3e9b36 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -169,8 +169,6 @@ struct TEvPQ { EvCacheProxyForgetRead, EvGetFullDirectReadData, EvProvideDirectReadInfo, - EvCheckPartitionStatusRequest, - EvCheckPartitionStatusResponse, EvEnd }; @@ -493,13 +491,12 @@ struct TEvPQ { }; struct TEvChangeOwner : public TEventLocal<TEvChangeOwner, EvChangeOwner> { - explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force, const bool registerIfNotExists) + explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force) : Cookie(cookie) , Owner(owner) , PipeClient(pipeClient) , Sender(sender) , Force(force) - , RegisterIfNotExists(registerIfNotExists) {} ui64 Cookie; @@ -507,7 +504,6 @@ struct TEvPQ { TActorId PipeClient; TActorId Sender; bool Force; - bool RegisterIfNotExists; }; struct TEvPipeDisconnected : public TEventLocal<TEvPipeDisconnected, EvPipeDisconnected> { @@ -993,18 +989,6 @@ struct TEvPQ { struct TEvProvideDirectReadInfo : public TEventLocal<TEvProvideDirectReadInfo, EvProvideDirectReadInfo> { }; - struct TEvCheckPartitionStatusRequest : public TEventPB<TEvCheckPartitionStatusRequest, NKikimrPQ::TEvCheckPartitionStatusRequest, EvCheckPartitionStatusRequest> { - TEvCheckPartitionStatusRequest() = default; - - TEvCheckPartitionStatusRequest(ui32 partitionId) { - Record.SetPartition(partitionId); - } - }; - - struct TEvCheckPartitionStatusResponse : public TEventPB<TEvCheckPartitionStatusResponse, NKikimrPQ::TEvCheckPartitionStatusResponse, EvCheckPartitionStatusResponse> { - }; - - }; } //NKikimr diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index b73c10d7ab..b996667d53 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1796,7 +1796,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf { Config = config; PartitionConfig = GetPartitionConfig(Config, Partition); - PartitionGraph = MakePartitionGraph(Config); + PartitionGraph.Rebuild(Config); TopicConverter = topicConverter; NewPartition = false; @@ -2616,23 +2616,4 @@ void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext Send(ev->Sender, response.Release()); } -void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record; - - if (Partition != record.GetPartition()) { - LOG_INFO_S( - ctx, NKikimrServices::PERSQUEUE, - "TEvCheckPartitionStatusRequest for wrong partition " << record.GetPartition() << "." << - " Topic: \"" << TopicName() << "\"." << - " Partition: " << Partition << "." - ); - return; - } - - auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>(); - response->Record.SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active); - - Send(ev->Sender, response.Release()); -} - } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 264b78275a..dd0fa7e36e 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -100,7 +100,7 @@ private: void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp); void ReplyOk(const TActorContext& ctx, const ui64 dst); - void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, ui64 seqNo); + void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie); void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime); @@ -344,7 +344,6 @@ private: // void DestroyReadSession(const TReadSessionKey& key); void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx); TString LogPrefix() const; @@ -482,7 +481,6 @@ private: HFuncTraced(TEvPQ::TEvTxRollback, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle); - HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle); HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle); @@ -540,7 +538,6 @@ private: HFuncTraced(TEvPQ::TEvTxRollback, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle); - HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle); HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle); diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index fcab0f62f3..913812a063 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -180,7 +180,7 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon case NKikimrProto::NODATA: Partition()->Config = Partition()->TabletConfig; Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition); - Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config); + Partition()->PartitionGraph.Rebuild(Partition()->Config); break; case NKikimrProto::ERROR: diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp index ac5eff21be..0e8f6d7812 100644 --- a/ydb/core/persqueue/partition_sourcemanager.cpp +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -8,7 +8,7 @@ namespace NKikimr::NPQ { IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId); -bool IsResearchRequires(const TPartitionGraph::Node* node); +bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node); // // TPartitionSourceManager @@ -37,7 +37,7 @@ void TPartitionSourceManager::ScheduleBatch() { PendingSourceIds = std::move(UnknownSourceIds); - for(const auto* parent : node->HierarhicalParents) { + for(const auto* parent : node.value()->HierarhicalParents) { PendingCookies.insert(++Cookie); TActorId actorId = PartitionRequester(parent->Id, parent->TabletId); @@ -141,7 +141,7 @@ void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const } } -const TPartitionSourceManager::TPartitionNode* TPartitionSourceManager::GetPartitionNode() const { +TPartitionSourceManager::TPartitionNode TPartitionSourceManager::GetPartitionNode() const { return Partition.PartitionGraph.GetPartition(Partition.Partition); } @@ -185,7 +185,7 @@ TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const { bool TPartitionSourceManager::HasParents() const { auto node = Partition.PartitionGraph.GetPartition(Partition.Partition); - return node && !node->Parents.empty(); + return node && !node.value()->Parents.empty(); } TActorId TPartitionSourceManager::PartitionRequester(TPartitionId id, ui64 tabletId) { @@ -484,8 +484,8 @@ IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId p return new TSourceIdRequester(parent, partition, tabletId); } -bool IsResearchRequires(const TPartitionGraph::Node* node) { - return node && !node->Parents.empty(); +bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node) { + return node && !node.value()->Parents.empty(); } NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value) { diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h index 6a304e04bf..a05a51aa97 100644 --- a/ydb/core/persqueue/partition_sourcemanager.h +++ b/ydb/core/persqueue/partition_sourcemanager.h @@ -12,7 +12,7 @@ class TPartition; class TPartitionSourceManager { private: - using TPartitionNode = TPartitionGraph::Node; + using TPartitionNode = std::optional<const TPartitionGraph::Node *>; public: using TPartitionId = ui32; @@ -96,7 +96,7 @@ public: private: TPartitionSourceManager& Manager; - const TPartitionNode* Node; + TPartitionNode Node; TSourceIdWriter SourceIdWriter; THeartbeatEmitter HeartbeatEmitter; }; @@ -125,7 +125,7 @@ private: void FinishBatch(const TActorContext& ctx); bool RequireEnqueue(const TString& sourceId); - const TPartitionNode* GetPartitionNode() const; + TPartitionNode GetPartitionNode() const; TSourceIdStorage& GetSourceIdStorage() const; bool HasParents() const; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 4c3a78ea08..164e21643b 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -31,18 +31,14 @@ static const ui32 MAX_INLINE_SIZE = 1000; static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL; -void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) { +void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition); THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst); NKikimrClient::TResponse& resp = *response->Response; resp.SetStatus(NMsgBusProxy::MSTATUS_OK); resp.SetErrorCode(NPersQueue::NErrorCode::OK); - auto* r = resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult(); - r->SetOwnerCookie(cookie); - r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active); - r->SetSeqNo(seqNo); - + resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult()->SetOwnerCookie(cookie); ctx.Send(Tablet, response.Release()); } @@ -150,12 +146,8 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c auto &owner = ev->Owner; auto it = Owners.find(owner); if (it == Owners.end()) { - if (ev->RegisterIfNotExists) { - Owners[owner]; - it = Owners.find(owner); - } else { - return ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, "SourceId isn't registered"); - } + Owners[owner]; + it = Owners.find(owner); } if (it->second.NeedResetOwner || ev->Force) { //change owner Y_ABORT_UNLESS(ReservedSize >= it->second.ReservedSize); @@ -354,13 +346,10 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion) ++offset; } else if (response.IsOwnership()) { - const auto& r = response.GetOwnership(); - const TString& ownerCookie = r.OwnerCookie; + const TString& ownerCookie = response.GetOwnership().OwnerCookie; auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie)); if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) { - auto sit = SourceIdStorage.GetInMemorySourceIds().find(NSourceIdEncoding::EncodeSimple(it->first)); - auto seqNo = sit == SourceIdStorage.GetInMemorySourceIds().end() ? 0 : sit->second.SeqNo; - ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie, seqNo); + ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie); } else { ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already"); } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index aa4653b99f..a26eef6602 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1230,7 +1230,6 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c } ProcessSourceIdRequests(partitionId); - ProcessCheckPartitionStatusRequests(partitionId); if (allInitialized) { SourceIdRequests.clear(); } @@ -2049,8 +2048,7 @@ void TPersQueue::HandleGetOwnershipRequest(const ui64 responseCookie, const TAct it->second = TPipeInfo::ForOwner(partActor, owner, it->second.ServerActors); InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_GET_OWNERSHIP); - THolder<TEvPQ::TEvChangeOwner> event = MakeHolder<TEvPQ::TEvChangeOwner>(responseCookie, owner, pipeClient, sender, - req.GetCmdGetOwnership().GetForce(), req.GetCmdGetOwnership().GetRegisterIfNotExists()); + THolder<TEvPQ::TEvChangeOwner> event = MakeHolder<TEvPQ::TEvChangeOwner>(responseCookie, owner, pipeClient, sender, req.GetCmdGetOwnership().GetForce()); ctx.Send(partActor, event.Release()); } @@ -3917,37 +3915,6 @@ void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) { } } -void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record; - auto it = Partitions.find(record.GetPartition()); - if (it == Partitions.end()) { - LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition()); - - auto response = THolder<TEvPQ::TEvCheckPartitionStatusResponse>(); - response->Record.SetStatus(NKikimrPQ::ETopicPartitionStatus::Deleted); - Send(ev->Sender, response.Release()); - - return; - } - - if (it->second.InitDone) { - Forward(ev, it->second.Actor); - } else { - CheckPartitionStatusRequests[record.GetPartition()].push_back(ev); - } -} - -void TPersQueue::ProcessCheckPartitionStatusRequests(ui32 partitionId) { - auto sit = CheckPartitionStatusRequests.find(partitionId); - if (sit != CheckPartitionStatusRequests.end()) { - auto it = Partitions.find(partitionId); - for (auto& r : sit->second) { - Forward(r, it->second.Actor); - } - CheckPartitionStatusRequests.erase(partitionId); - } -} - TString TPersQueue::LogPrefix() const { return TStringBuilder() << SelfId() << " "; } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 61eabfaa0b..dd78e89add 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -168,9 +168,6 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx); void ProcessSourceIdRequests(ui32 partitionId); - void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx); - void ProcessCheckPartitionStatusRequests(ui32 partitionId); - TString LogPrefix() const; static constexpr const char * KeyConfig() { return "_config"; } @@ -408,7 +405,6 @@ private: bool UseMediatorTimeCast = true; THashMap<ui32, TVector<TEvPQ::TEvSourceIdRequest::TPtr>> SourceIdRequests; - THashMap<ui32, TVector<TEvPQ::TEvCheckPartitionStatusRequest::TPtr>> CheckPartitionStatusRequests; TMaybe<ui64> TabletGeneration; }; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index b613822c8e..86a1839e4b 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -147,7 +147,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans TabletConfig = txBody.GetTabletConfig(); BootstrapConfig = txBody.GetBootstrapConfig(); - TPartitionGraph graph = MakePartitionGraph(TabletConfig); + TPartitionGraph graph; + graph.Rebuild(TabletConfig); for (const auto& p : TabletConfig.GetPartitions()) { auto node = graph.GetPartition(p.GetPartitionId()); @@ -155,15 +156,15 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans // Old configuration format without AllPartitions. Split/Merge is not supported. continue; } - if (node->Children.empty()) { - for (const auto* r : node->Parents) { + if (node.value()->Children.empty()) { + for (const auto* r : node.value()->Parents) { if (extractTabletId != r->TabletId) { Senders.insert(r->TabletId); } } } - for (const auto* r : node->Children) { + for (const auto* r : node.value()->Children) { if (r->Children.empty()) { if (extractTabletId != r->TabletId) { Receivers.insert(r->TabletId); diff --git a/ydb/core/persqueue/ut/partition_chooser_ut.cpp b/ydb/core/persqueue/ut/partition_chooser_ut.cpp index a66e345dd6..e43f99ad85 100644 --- a/ydb/core/persqueue/ut/partition_chooser_ut.cpp +++ b/ydb/core/persqueue/ut/partition_chooser_ut.cpp @@ -5,43 +5,17 @@ #include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> -#include <ydb/core/persqueue/writer/pipe_utils.h> using namespace NKikimr::NPQ; static constexpr bool SMEnabled = true; static constexpr bool SMDisabled = false; -using namespace NKikimr; -using namespace NActors; -using namespace NKikimrPQ; - -void AddPartition(NKikimrSchemeOp::TPersQueueGroupDescription& conf, - ui32 id, - const std::optional<TString>&& boundaryFrom, - const std::optional<TString>&& boundaryTo, - std::vector<ui32> children = {}) { - auto* p = conf.AddPartitions(); - p->SetPartitionId(id); - p->SetTabletId(1000 + id); - p->SetStatus(children.empty() ? NKikimrPQ::ETopicPartitionStatus::Active : NKikimrPQ::ETopicPartitionStatus::Inactive); - if (boundaryFrom) { - p->MutableKeyRange()->SetFromBound(boundaryFrom.value()); - } - if (boundaryTo) { - p->MutableKeyRange()->SetToBound(boundaryTo.value()); - } - for(ui32 c : children) { - p->AddChildPartitionIds(c); - } -} - -NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig0(bool SplitMergeEnabled) { +NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig(bool SplitMergeEnabled) { + Cerr << ">>>>> SplitMergeEnabled=" << SplitMergeEnabled << Endl; NKikimrSchemeOp::TPersQueueGroupDescription result; NKikimrPQ::TPQTabletConfig* config = result.MutablePQTabletConfig(); - result.SetBalancerTabletID(999); - auto* partitionStrategy = config->MutablePartitionStrategy(); partitionStrategy->SetMinPartitionCount(3); partitionStrategy->SetMaxPartitionCount(SplitMergeEnabled ? 10 : 3); @@ -49,17 +23,27 @@ NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig0(bool SplitMergeEnabled config->SetTopicName("/Root/topic-1"); config->SetTopicPath("/Root"); - return result; -} + auto* p0 = result.AddPartitions(); + p0->SetPartitionId(0); + p0->SetTabletId(1000); + p0->MutableKeyRange()->SetToBound("C"); -NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig(bool SplitMergeEnabled) { - NKikimrSchemeOp::TPersQueueGroupDescription result = CreateConfig0(SplitMergeEnabled); + auto* p1 = result.AddPartitions(); + p1->SetPartitionId(1); + p1->SetTabletId(1001); + p1->MutableKeyRange()->SetFromBound("C"); + p1->MutableKeyRange()->SetToBound("F"); + + auto* p2 = result.AddPartitions(); + p2->SetPartitionId(2); + p2->SetTabletId(1002); + p2->MutableKeyRange()->SetFromBound("F"); - AddPartition(result, 0, {}, "C"); - AddPartition(result, 1, "C", "F"); - AddPartition(result, 2, "F", "Z"); - AddPartition(result, 3, "Z", {}, {4}); - AddPartition(result, 4, "Z", {}); + auto* p3 = result.AddPartitions(); + p3->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Inactive); + p3->SetPartitionId(3); + p3->SetTabletId(1003); + p3->MutableKeyRange()->SetFromBound("D"); return result; } @@ -188,17 +172,14 @@ NPersQueue::TTopicConverterPtr CreateTopicConverter() { return NPersQueue::TTopicNameConverter::ForFirstClass(CreateConfig(SMDisabled).GetPQTabletConfig()); } -TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server, - const NKikimrSchemeOp::TPersQueueGroupDescription& config, - const TString& sourceId, - std::optional<ui32> preferedPartition = std::nullopt) { - +TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server, bool spliMergeEnabled, const TString& sourceId, std::optional<ui32> preferedPartition = std::nullopt) { NPersQueue::TTopicConverterPtr fullConverter = CreateTopicConverter(); + TWriteSessionMock* mock = new TWriteSessionMock(); NActors::TActorId parentId = server.GetRuntime()->Register(mock); - server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActorM(parentId, - config, + server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActor(parentId, + CreateConfig(spliMergeEnabled), fullConverter, sourceId, preferedPartition, @@ -207,55 +188,15 @@ TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server, mock->Promise.GetFuture().GetValueSync(); return mock; - -} - -TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server, bool spliMergeEnabled, const TString& sourceId, std::optional<ui32> preferedPartition = std::nullopt) { - auto config = CreateConfig(spliMergeEnabled); - return ChoosePartition(server, config, sourceId, preferedPartition); } -void InitTable(NPersQueue::TTestServer& server) { - class Initializer: public TActorBootstrapped<Initializer> { - public: - Initializer(NThreading::TPromise<void>& promise) - : Promise(promise) {} - - void Bootstrap(const TActorContext& ctx) { - Become(&TThis::StateWork); - ctx.Send( - NKikimr::NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()), - new NKikimr::NMetadata::NProvider::TEvPrepareManager(NKikimr::NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()) - ); - } - - private: - void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext&) { - Promise.SetValue(); - } - - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle); - } - } - - private: - NThreading::TPromise<void>& Promise; - }; - - NThreading::TPromise<void> promise = NThreading::NewPromise<void>(); - server.GetRuntime()->Register(new Initializer(promise)); - promise.GetFuture().GetValueSync(); -} - -void WriteToTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32 partitionId, ui64 seqNo = 0) { - InitTable(server); - +void WriteToTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32 partitionId) { const auto& pqConfig = server.CleverServer->GetRuntime()->GetAppData().PQConfig; auto tableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping : ESourceIdTableGeneration::SrcIdMeta2; + Cerr << ">>>>> pqConfig.GetTopicsAreFirstClassCitizen()=" << pqConfig.GetTopicsAreFirstClassCitizen() << Endl; + NPersQueue::TTopicConverterPtr fullConverter = CreateTopicConverter(); NKikimr::NPQ::NSourceIdEncoding::TEncodedSourceId encoded = NSourceIdEncoding::EncodeSrcId( fullConverter->GetTopicForSrcIdHash(), sourceId, tableGeneration @@ -264,361 +205,64 @@ void WriteToTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32 TString query; if (pqConfig.GetTopicsAreFirstClassCitizen()) { query = TStringBuilder() << "--!syntax_v1\n" - "UPSERT INTO `//Root/.metadata/TopicPartitionsMapping` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition, SeqNo) VALUES " + "UPSERT INTO `//Root/.metadata/TopicPartitionsMapping` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition) VALUES " "(" << encoded.KeysHash << ", \"" << fullConverter->GetClientsideName() << "\", \"" << encoded.EscapedSourceId << "\", "<< TInstant::Now().MilliSeconds() << ", " - << TInstant::Now().MilliSeconds() << ", " << partitionId << ", " << seqNo << ");"; + << TInstant::Now().MilliSeconds() << ", " << partitionId << ");"; } else { query = TStringBuilder() << "--!syntax_v1\n" - "UPSERT INTO `/Root/PQ/SourceIdMeta2` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition, SeqNo) VALUES (" + "UPSERT INTO `/Root/PQ/SourceIdMeta2` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES (" << encoded.Hash << ", \"" << fullConverter->GetClientsideName() << "\", \"" << encoded.EscapedSourceId << "\", " - << TInstant::Now().MilliSeconds() << ", " << TInstant::Now().MilliSeconds() << ", " << partitionId << ", " << seqNo << "); "; + << TInstant::Now().MilliSeconds() << ", " << TInstant::Now().MilliSeconds() << ", " << partitionId << "); "; } Cerr << "Run query:\n" << query << Endl; auto scResult = server.AnnoyingClient->RunYqlDataQuery(query); } -TMaybe<NYdb::TResultSet> SelectTable(NPersQueue::TTestServer& server, const TString& sourceId) { - const auto& pqConfig = server.CleverServer->GetRuntime()->GetAppData().PQConfig; - auto tableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping - : ESourceIdTableGeneration::SrcIdMeta2; - - NPersQueue::TTopicConverterPtr fullConverter = CreateTopicConverter(); - NKikimr::NPQ::NSourceIdEncoding::TEncodedSourceId encoded = NSourceIdEncoding::EncodeSrcId( - fullConverter->GetTopicForSrcIdHash(), sourceId, tableGeneration - ); +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_Test) { + NPersQueue::TTestServer server{}; - TString query; - if (pqConfig.GetTopicsAreFirstClassCitizen()) { - query = TStringBuilder() << "--!syntax_v1\n" - "SELECT Partition, SeqNo " - "FROM `//Root/.metadata/TopicPartitionsMapping` " - "WHERE Hash = " << encoded.KeysHash << - " AND Topic = \"" << fullConverter->GetClientsideName() << "\"" << - " AND ProducerId = \"" << encoded.EscapedSourceId << "\""; - } else { - query = TStringBuilder() << "--!syntax_v1\n" - "SELECT Partition, SeqNo " - "FROM `/Root/PQ/SourceIdMeta2` " - "WHERE Hash = " << encoded.KeysHash << - " AND Topic = \"" << fullConverter->GetClientsideName() << "\"" << - " AND SourceId = \"" << encoded.EscapedSourceId << "\""; + { + auto r = ChoosePartition(server, SMEnabled, "A_Source"); + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); + } + { + auto r = ChoosePartition(server, SMEnabled, "Y_Source"); + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2); } - Cerr << "Run query:\n" << query << Endl; - return server.AnnoyingClient->RunYqlDataQuery(query); -} - -void AssertTableEmpty(NPersQueue::TTestServer& server, const TString& sourceId) { - auto result = SelectTable(server, sourceId); - - UNIT_ASSERT(result); - UNIT_ASSERT_VALUES_EQUAL_C(result->RowsCount(), 0, "Table must not contains SourceId='" << sourceId << "'"); -} - -void AssertTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32 partitionId, ui64 seqNo) { - auto result = SelectTable(server, sourceId); - - UNIT_ASSERT(result); - UNIT_ASSERT_VALUES_EQUAL_C(result->RowsCount(), 1, "Table must contains SourceId='" << sourceId << "'"); - - NYdb::TResultSetParser parser(*result); - UNIT_ASSERT(parser.TryNextRow()); - NYdb::TValueParser p(parser.GetValue(0)); - NYdb::TValueParser s(parser.GetValue(1)); - UNIT_ASSERT_VALUES_EQUAL(*p.GetOptionalUint32().Get(), partitionId); - UNIT_ASSERT_VALUES_EQUAL(*s.GetOptionalUint64().Get(), seqNo); -} - -class TPQTabletMock: public TActor<TPQTabletMock> { -public: - TPQTabletMock(ETopicPartitionStatus status, std::optional<ui64> seqNo) - : TActor(&TThis::StateMockWork) - , Status(status) - , SeqNo(seqNo) { + { + // Define partition for sourceId that is not in partition boundary + WriteToTable(server, "X_Source_w_0", 0); + auto r = ChoosePartition(server, SMEnabled, "X_Source_w_0"); + UNIT_ASSERT(r->Error); } - -private: - void Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ctx) { - auto response = MakeHolder<TEvPersQueue::TEvResponse>(); - - response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); - response->Record.SetErrorCode(NPersQueue::NErrorCode::OK); - - if (ev->Get()->Record.GetPartitionRequest().HasCmdGetOwnership()) { - auto& o = ev->Get()->Record.GetPartitionRequest().GetCmdGetOwnership(); - if (o.GetRegisterIfNotExists() || SeqNo) { - auto* cmd = response->Record.MutablePartitionResponse()->MutableCmdGetOwnershipResult(); - cmd->SetOwnerCookie("ower_cookie"); - cmd->SetStatus(Status); - cmd->SetSeqNo(SeqNo.value_or(0)); - } else { - response->Record.SetErrorCode(NPersQueue::NErrorCode::SOURCEID_DELETED); - } - } - - auto* sn = response->Record.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult()->AddSourceIdInfo(); - sn->SetSeqNo(SeqNo.value_or(0)); - sn->SetState(SeqNo ? NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED : NKikimrPQ::TMessageGroupInfo::STATE_PENDING_REGISTRATION); - - ctx.Send(ev->Sender, response.Release()); + { + // Redefine partition for sourceId. Check that partition changed; + WriteToTable(server, "X_Source_w_0", 2); + auto r = ChoosePartition(server, SMEnabled, "X_Source_w_0"); + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2); } - - void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) { - auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>(); - - ctx.Send(ev->Sender, response.Release()); + { + // Redefine partition for sourceId to inactive partition. Select new partition use partition boundary. + WriteToTable(server, "A_Source_w_0", 3); + auto r = ChoosePartition(server, SMEnabled, "A_Source_w_0"); + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); } - - STFUNC(StateMockWork) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(TEvPersQueue::TEvRequest, Handle); - HFunc(TEvPQ::TEvCheckPartitionStatusRequest, Handle); - } + { + // Use prefered partition, but sourceId not in partition boundary + auto r = ChoosePartition(server, SMEnabled, "A_Source_1", 1); + UNIT_ASSERT(r->Error); } - -private: - ETopicPartitionStatus Status; - std::optional<ui64> SeqNo; -}; - - -TPQTabletMock* CreatePQTabletMock(NPersQueue::TTestServer& server, ui32 partitionId, ETopicPartitionStatus status, std::optional<ui64> seqNo = std::nullopt) { - TPQTabletMock* mock = new TPQTabletMock(status, seqNo); - auto actorId = server.GetRuntime()->Register(mock); - NKikimr::NTabletPipe::NTest::TPipeMock::Register(partitionId + 1000, actorId); - return mock; } -NPersQueue::TTestServer CreateServer() { +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) { NPersQueue::TTestServer server{}; server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(true); server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetUseSrcIdMetaMappingInFirstClass(true); - server.EnableLogs({NKikimrServices::PQ_PARTITION_CHOOSER}, NActors::NLog::PRI_TRACE); - - return server; -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_NewSourceId_Test) { - // We check the scenario when writing is performed with a new SourceID - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 0, {}, {}); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); - - auto r = ChoosePartition(server, config, "A_Source_0"); - - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); - AssertTable(server, "A_Source_0", 0, 0); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_BoundaryTrue_Test) { - // We check the partition selection scenario when we have already written with the - // specified SourceID, the partition to which we wrote is active, and the partition - // boundaries coincide with the distribution. - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 0, {}, "F"); - AddPartition(config, 1, "F", {}); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - - WriteToTable(server, "A_Source_1", 0, 11); - auto r = ChoosePartition(server, config, "A_Source_1"); - - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); - AssertTable(server, "A_Source_1", 0, 11); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_BoundaryFalse_Test) { - // We check the partition selection scenario when we have already written with the - // specified SourceID, the partition to which we wrote is active, and the partition - // boundaries is not coincide with the distribution. - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 0, {}, "F"); - AddPartition(config, 1, "F", {}); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - - WriteToTable(server, "A_Source_2", 1, 13); - auto r = ChoosePartition(server, config, "A_Source_2"); - - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); - AssertTable(server, "A_Source_2", 1, 13); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionInactive_0_Test) { - // Boundary partition is inactive. It is configuration error - required reload of configuration. - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 0, {}, "F"); - AddPartition(config, 1, "F", {}); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - - WriteToTable(server, "A_Source_3", 0, 13); - auto r = ChoosePartition(server, config, "A_Source_3"); - - UNIT_ASSERT(r->Error); - AssertTable(server, "A_Source_3", 0, 13); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionInactive_1_Test) { - // Boundary partition is inactive. It is configuration error - required reload of configuration. - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 0, {}, "F"); - AddPartition(config, 1, "F", {}); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); // Active but not written - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Inactive); - - WriteToTable(server, "A_Source_4", 1, 13); - auto r = ChoosePartition(server, config, "A_Source_4"); - - UNIT_ASSERT(r->Error); - AssertTable(server, "A_Source_4", 1, 13); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionNotExists_Test) { - // Old partition alredy deleted. Choose new partition by boundary and save SeqNo - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 1, {}, "F"); - AddPartition(config, 2, "F", {}); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); - - WriteToTable(server, "A_Source_5", 0, 13); - auto r = ChoosePartition(server, config, "A_Source_5"); - - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 13); - AssertTable(server, "A_Source_5", 1, 13); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_OldPartitionExists_Test) { - // Old partition exists. Receive SeqNo from the partition. Choose new partition by boundary and save SeqNo - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 0, {}, {}, {1, 2}); - AddPartition(config, 1, {}, "F"); - AddPartition(config, 2, "F", {}); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive, 157); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); - - WriteToTable(server, "A_Source_6", 0, 13); - auto r = ChoosePartition(server, config, "A_Source_6"); - - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 157); - AssertTable(server, "A_Source_6", 1, 157); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_OldPartitionExists_NotWritten_Test) { - // Old partition exists but not written. Choose new partition by boundary and save SeqNo - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 0, {}, {}, {1, 2}); - AddPartition(config, 1, {}, "F"); - AddPartition(config, 2, "F", {}); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); - - WriteToTable(server, "A_Source_7", 0, 13); - auto r = ChoosePartition(server, config, "A_Source_7"); - - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 13); - AssertTable(server, "A_Source_7", 1, 13); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_OldPartitionExists_NotBoundary_Test) { - // Old partition exists. Receive SeqNo from the partition. Choose new partition from children and save SeqNo - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 0, {}, "F", { 2}); - AddPartition(config, 1, "F", {}); - AddPartition(config, 2, {}, "F"); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive, 157); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); - - WriteToTable(server, "Y_Source_7", 0, 13); - auto r = ChoosePartition(server, config, "Y_Source_7"); - - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 157); - AssertTable(server, "Y_Source_7", 2, 157); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_Active_Test) { - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 0, {}, "F"); - AddPartition(config, 1, "F", {}); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - - auto r = ChoosePartition(server, config, "", 0); - - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_InactiveConfig_Test) { - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 0, {}, {}, {1}); - AddPartition(config, 1, {}, {}); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - - auto r = ChoosePartition(server, config, "", 0); - - UNIT_ASSERT(r->Error); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_InactiveActor_Test) { - NPersQueue::TTestServer server = CreateServer(); - - auto config = CreateConfig0(true); - AddPartition(config, 0, {}, "F"); - AddPartition(config, 1, "F", {}); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - - auto r = ChoosePartition(server, config, "", 0); - - UNIT_ASSERT(r->Error); -} - -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) { - NPersQueue::TTestServer server = CreateServer(); - - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); { auto r = ChoosePartition(server, SMDisabled, "A_Source"); diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index eba55773cd..2ea5da2d6a 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -514,7 +514,7 @@ void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const void TPartitionFixture::SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force) { - auto event = MakeHolder<TEvPQ::TEvChangeOwner>(cookie, owner, pipeClient, Ctx->Edge, force, true); + auto event = MakeHolder<TEvPQ::TEvChangeOwner>(cookie, owner, pipeClient, Ctx->Edge, force); Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); } diff --git a/ydb/core/persqueue/ut/partitiongraph_ut.cpp b/ydb/core/persqueue/ut/partitiongraph_ut.cpp index 9067d76fec..0003f18cec 100644 --- a/ydb/core/persqueue/ut/partitiongraph_ut.cpp +++ b/ydb/core/persqueue/ut/partitiongraph_ut.cpp @@ -44,37 +44,45 @@ Y_UNIT_TEST_SUITE(TPartitionGraphTest) { p5->AddParentPartitionIds(4); TPartitionGraph graph; - graph = std::move(MakePartitionGraph(config)); - - const auto n0 = graph.GetPartition(0); - const auto n1 = graph.GetPartition(1); - const auto n2 = graph.GetPartition(2); - const auto n3 = graph.GetPartition(3); - const auto n4 = graph.GetPartition(4); - const auto n5 = graph.GetPartition(5); - - UNIT_ASSERT(n0); - UNIT_ASSERT(n1); - UNIT_ASSERT(n2); - UNIT_ASSERT(n3); - UNIT_ASSERT(n4); - UNIT_ASSERT(n5); - - UNIT_ASSERT_VALUES_EQUAL(n0->Parents.size(), 0); - UNIT_ASSERT_VALUES_EQUAL(n0->Children.size(), 0); - UNIT_ASSERT_VALUES_EQUAL(n0->HierarhicalParents.size(), 0); - - UNIT_ASSERT_VALUES_EQUAL(n1->Parents.size(), 0); - UNIT_ASSERT_VALUES_EQUAL(n1->Children.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(n1->HierarhicalParents.size(), 0); - - UNIT_ASSERT_VALUES_EQUAL(n5->Parents.size(), 2); - UNIT_ASSERT_VALUES_EQUAL(n5->Children.size(), 0u); - UNIT_ASSERT_VALUES_EQUAL(n5->HierarhicalParents.size(), 4); - UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n0) == n5->HierarhicalParents.end()); - UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n1) != n5->HierarhicalParents.end()); - UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n2) != n5->HierarhicalParents.end()); - UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n3) != n5->HierarhicalParents.end()); - UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n4) != n5->HierarhicalParents.end()); + graph.Rebuild(config); + + const auto n0o = graph.GetPartition(0); + const auto n1o = graph.GetPartition(1); + const auto n2o = graph.GetPartition(2); + const auto n3o = graph.GetPartition(3); + const auto n4o = graph.GetPartition(4); + const auto n5o = graph.GetPartition(5); + + UNIT_ASSERT(n0o); + UNIT_ASSERT(n1o); + UNIT_ASSERT(n2o); + UNIT_ASSERT(n3o); + UNIT_ASSERT(n4o); + UNIT_ASSERT(n5o); + + auto& n0 = *n0o.value(); + auto& n1 = *n1o.value(); + auto& n2 = *n2o.value(); + auto& n3 = *n3o.value(); + auto& n4 = *n4o.value(); + auto& n5 = *n5o.value(); + + + UNIT_ASSERT_EQUAL(n0.Parents.size(), 0); + UNIT_ASSERT_EQUAL(n0.Children.size(), 0); + UNIT_ASSERT_EQUAL(n0.HierarhicalParents.size(), 0); + + UNIT_ASSERT_EQUAL(n1.Parents.size(), 0); + UNIT_ASSERT_EQUAL(n1.Children.size(), 1); + UNIT_ASSERT_EQUAL(n1.HierarhicalParents.size(), 0); + + UNIT_ASSERT_EQUAL_C(n5.Parents.size(), 2, "n5.Parents.size() == " << n5.Parents.size() << " but expected 2"); + UNIT_ASSERT_EQUAL_C(n5.Children.size(), 0, "n5.Children.size() == " << n5.Children.size() << " but expected 0"); + UNIT_ASSERT_EQUAL_C(n5.HierarhicalParents.size(), 4, "n5.HierarhicalParents.size() == " << n5.HierarhicalParents.size() << " but expected 4"); + UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n0) == n5.HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n1) != n5.HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n2) != n5.HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n3) != n5.HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n4) != n5.HierarhicalParents.end()); } } diff --git a/ydb/core/persqueue/ut/pqtablet_mock.h b/ydb/core/persqueue/ut/pqtablet_mock.h index a64526da77..0fd2cdc734 100644 --- a/ydb/core/persqueue/ut/pqtablet_mock.h +++ b/ydb/core/persqueue/ut/pqtablet_mock.h @@ -4,7 +4,6 @@ #include <ydb/core/tablet/tablet_pipe_client_cache.h> #include <ydb/core/testlib/actors/test_runtime.h> #include <ydb/core/tx/tx_processing.h> -#include <ydb/core/persqueue/events/global.h> #include <ydb/library/actors/core/actor.h> @@ -60,7 +59,6 @@ private: switch (ev->GetTypeRewrite()) { HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); - // TX HFunc(TEvTxProcessing::TEvReadSet, Handle); HFunc(TEvTxProcessing::TEvReadSetAck, Handle); HFunc(TEvPQTablet::TEvSendReadSet, Handle); diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 0c133e998b..c4a934454a 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -54,69 +54,29 @@ const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ return nullptr; } -TPartitionGraph::TPartitionGraph() { -} - -TPartitionGraph::TPartitionGraph(std::unordered_map<ui32, Node>&& partitions) { - Partitions = std::move(partitions); -} - -const TPartitionGraph::Node* TPartitionGraph::GetPartition(ui32 id) const { - auto it = Partitions.find(id); - if (it == Partitions.end()) { - return nullptr; - } - return &it->second; -} - -std::set<ui32> TPartitionGraph::GetActiveChildren(ui32 id) const { - const auto* p = GetPartition(id); - if (!p) { - return {}; - } - - std::deque<const Node*> queue; - queue.push_back(p); - - std::set<ui32> result; - while(!queue.empty()) { - const auto* n = queue.front(); - queue.pop_front(); +void TPartitionGraph::Rebuild(const NKikimrPQ::TPQTabletConfig& config) { + Partitions.clear(); - if (n->Children.empty()) { - result.emplace(n->Id); - } else { - queue.insert(queue.end(), n->Children.begin(), n->Children.end()); - } - } - - return result; -} - -template<typename TPartition> -std::unordered_map<ui32, TPartitionGraph::Node> BuildGraph(const ::google::protobuf::RepeatedPtrField<TPartition>& partitions) { - std::unordered_map<ui32, TPartitionGraph::Node> result; - - if (0 == partitions.size()) { - return result; + if (0 == config.AllPartitionsSize()) { + return; } - for (const auto& p : partitions) { - result.emplace(p.GetPartitionId(), TPartitionGraph::Node(p.GetPartitionId(), p.GetTabletId())); + for (const auto& p : config.GetAllPartitions()) { + Partitions.emplace(p.GetPartitionId(), p); } - std::deque<TPartitionGraph::Node*> queue; - for(const auto& p : partitions) { - auto& node = result[p.GetPartitionId()]; + std::deque<Node*> queue; + for(const auto& p : config.GetAllPartitions()) { + auto& node = Partitions[p.GetPartitionId()]; node.Children.reserve(p.ChildPartitionIdsSize()); for (auto id : p.GetChildPartitionIds()) { - node.Children.push_back(&result[id]); + node.Children.push_back(&Partitions[id]); } node.Parents.reserve(p.ParentPartitionIdsSize()); for (auto id : p.GetParentPartitionIds()) { - node.Parents.push_back(&result[id]); + node.Parents.push_back(&Partitions[id]); } if (p.GetParentPartitionIds().empty()) { @@ -144,22 +104,19 @@ std::unordered_map<ui32, TPartitionGraph::Node> BuildGraph(const ::google::proto queue.insert(queue.end(), n->Children.begin(), n->Children.end()); } } - - return result; -} - - -TPartitionGraph::Node::Node(ui32 id, ui64 tabletId) - : Id(id) - , TabletId(tabletId) { } -TPartitionGraph MakePartitionGraph(const NKikimrPQ::TPQTabletConfig& config) { - return TPartitionGraph(BuildGraph<NKikimrPQ::TPQTabletConfig::TPartition>(config.GetAllPartitions())); +std::optional<const TPartitionGraph::Node*> TPartitionGraph::GetPartition(ui32 id) const { + auto it = Partitions.find(id); + if (it == Partitions.end()) { + return std::nullopt; + } + return std::optional(&it->second); } -TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescription& config) { - return TPartitionGraph(BuildGraph<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition>(config.GetPartitions())); +TPartitionGraph::Node::Node(const NKikimrPQ::TPQTabletConfig::TPartition& config) { + Id = config.GetPartitionId(); + TabletId = config.GetTabletId(); } } // NKikimr::NPQ diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index 65d76ada40..daa7f20d0a 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -1,6 +1,5 @@ #pragma once -#include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/protos/pqconfig.pb.h> namespace NKikimr::NPQ { @@ -23,7 +22,7 @@ public: Node() = default; Node(Node&&) = default; - Node(ui32 id, ui64 tabletId); + Node(const NKikimrPQ::TPQTabletConfig::TPartition& config); ui32 Id; ui64 TabletId; @@ -36,17 +35,11 @@ public: std::set<Node*> HierarhicalParents; }; - TPartitionGraph(); - TPartitionGraph(std::unordered_map<ui32, Node>&& partitions); - - const Node* GetPartition(ui32 id) const; - std::set<ui32> GetActiveChildren(ui32 id) const; + void Rebuild(const NKikimrPQ::TPQTabletConfig& config); + std::optional<const Node*> GetPartition(ui32 id) const; private: std::unordered_map<ui32, Node> Partitions; }; -TPartitionGraph MakePartitionGraph(const NKikimrPQ::TPQTabletConfig& config); -TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescription& config); - } // NKikimr::NPQ diff --git a/ydb/core/persqueue/writer/common.h b/ydb/core/persqueue/writer/common.h deleted file mode 100644 index 2d5a67c71f..0000000000 --- a/ydb/core/persqueue/writer/common.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include <ydb/core/base/defs.h> -#include <ydb/core/protos/msgbus.pb.h> -#include <ydb/core/protos/msgbus_pq.pb.h> - -#include <ydb/public/lib/base/msgbus_status.h> - -namespace NKikimr::NPQ { - -inline bool BasicCheck(const NKikimrClient::TResponse& response, TString& error, bool mustHaveResponse = true) { - if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) { - error = TStringBuilder() << "Status is not ok" - << ": status# " << static_cast<ui32>(response.GetStatus()); - return false; - } - - if (response.GetErrorCode() != NPersQueue::NErrorCode::OK) { - error = TStringBuilder() << "Error code is not ok" - << ": code# " << static_cast<ui32>(response.GetErrorCode()); - return false; - } - - if (mustHaveResponse && !response.HasPartitionResponse()) { - error = "Absent partition response"; - return false; - } - - return true; -} - - -} // namespace NKikimr::NPQ - diff --git a/ydb/core/persqueue/writer/metadata_initializers.cpp b/ydb/core/persqueue/writer/metadata_initializers.cpp index dff6ade3ad..178a669d52 100644 --- a/ydb/core/persqueue/writer/metadata_initializers.cpp +++ b/ydb/core/persqueue/writer/metadata_initializers.cpp @@ -60,20 +60,6 @@ void TSrcIdMetaInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr cont } result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogCreateTable>(request, "create")); - - { - Ydb::Table::AlterTableRequest request; - request.set_session_id(""); - request.set_path(tablePath); - - { - auto& column = *request.add_add_columns(); - column.set_name("SeqNo"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64); - } - - result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogAlterTable>(request, "add_column_SeqNo")); - } } result.emplace_back(NInitializer::TACLModifierConstructor::GetReadOnlyModifier(tablePath, "acl")); controller->OnPreparationFinished(result); diff --git a/ydb/core/persqueue/writer/partition_chooser.h b/ydb/core/persqueue/writer/partition_chooser.h index a4d19c64ad..9ab9eee1cb 100644 --- a/ydb/core/persqueue/writer/partition_chooser.h +++ b/ydb/core/persqueue/writer/partition_chooser.h @@ -24,17 +24,13 @@ struct TEvPartitionChooser { static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER)"); struct TEvChooseResult: public TEventLocal<TEvChooseResult, EvChooseResult> { - TEvChooseResult(ui32 partitionId, ui64 tabletId, const TString& ownerCookie, std::optional<ui64> seqNo) + TEvChooseResult(ui32 partitionId, ui64 tabletId) : PartitionId(partitionId) - , TabletId(tabletId) - , OwnerCookie(ownerCookie) - , SeqNo(seqNo) { + , TabletId(tabletId) { } ui32 PartitionId; ui64 TabletId; - TString OwnerCookie; - std::optional<ui64> SeqNo; }; struct TEvChooseError: public TEventLocal<TEvChooseError, EvChooseError> { @@ -67,7 +63,6 @@ public: virtual const TPartitionInfo* GetPartition(const TString& sourceId) const = 0; virtual const TPartitionInfo* GetPartition(ui32 partitionId) const = 0; - virtual const TPartitionInfo* GetRandomPartition() const = 0; }; std::shared_ptr<IPartitionChooser> CreatePartitionChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config, bool withoutHash = false); diff --git a/ydb/core/persqueue/writer/partition_chooser_impl.cpp b/ydb/core/persqueue/writer/partition_chooser_impl.cpp index 3ac107bd35..d8102d2f32 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl.cpp +++ b/ydb/core/persqueue/writer/partition_chooser_impl.cpp @@ -1,5 +1,5 @@ #include "partition_chooser_impl.h" -#include "ydb/library/actors/interconnect/types.h" +#include "ydb/public/sdk/cpp/client/ydb_proto/accessor.h" #include <library/cpp/digest/md5/md5.h> #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> @@ -32,6 +32,424 @@ TString TMd5Converter::operator()(const TString& sourceId) const { return AsKeyBound(Hash(sourceId)); } + +// +// TPartitionChooserActor +// + +TPartitionChooserActor::TPartitionChooserActor(TActorId parent, + const NKikimrSchemeOp::TPersQueueGroupDescription& config, + std::shared_ptr<IPartitionChooser>& chooser, + NPersQueue::TTopicConverterPtr& fullConverter, + const TString& sourceId, + std::optional<ui32> preferedPartition) + : Parent(parent) + , FullConverter(fullConverter) + , SourceId(sourceId) + , PreferedPartition(preferedPartition) + , Chooser(chooser) + , SplitMergeEnabled_(SplitMergeEnabled(config.GetPQTabletConfig())) + , Partition(nullptr) + , BalancerTabletId(config.GetBalancerTabletID()) { +} + +void TPartitionChooserActor::Bootstrap(const TActorContext& ctx) { + const auto& pqConfig = AppData(ctx)->PQConfig; + + NeedUpdateTable = (!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass()) && !SplitMergeEnabled_ && SourceId; + + if (!SourceId) { + return ChoosePartition(ctx); + } + + TableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping + : ESourceIdTableGeneration::SrcIdMeta2; + try { + EncodedSourceId = NSourceIdEncoding::EncodeSrcId( + FullConverter->GetTopicForSrcIdHash(), SourceId, TableGeneration + ); + } catch (yexception& e) { + return ReplyError(ErrorCode::BAD_REQUEST, TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), ctx); + } + + SelectQuery = GetSelectSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); + UpdateQuery = GetUpdateSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); + + DEBUG("SelectQuery: " << SelectQuery); + + if (pqConfig.GetTopicsAreFirstClassCitizen()) { + if (pqConfig.GetUseSrcIdMetaMappingInFirstClass()) { + TThisActor::Become(&TThis::StateInit); + InitTable(ctx); + } else { + ChoosePartition(ctx); + } + } else { + TThisActor::Become(&TThis::StateInit); + StartKqpSession(ctx); + } +} + +void TPartitionChooserActor::Stop(const TActorContext& ctx) { + CloseKqpSession(ctx); + if (PipeToBalancer) { + NTabletPipe::CloseClient(ctx, PipeToBalancer); + } + IActor::Die(ctx); +} + +void TPartitionChooserActor::ScheduleStop() { + TThisActor::Become(&TThis::StateDestroy); +} + +TString TPartitionChooserActor::GetDatabaseName(const NActors::TActorContext& ctx) { + const auto& pqConfig = AppData(ctx)->PQConfig; + switch (TableGeneration) { + case ESourceIdTableGeneration::SrcIdMeta2: + return NKikimr::NPQ::GetDatabaseFromConfig(pqConfig); + case ESourceIdTableGeneration::PartitionMapping: + return AppData(ctx)->TenantName; + } +} + +void TPartitionChooserActor::InitTable(const NActors::TActorContext& ctx) { + ctx.Send( + NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()), + new NMetadata::NProvider::TEvPrepareManager(NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()) + ); +} + +void TPartitionChooserActor::StartKqpSession(const NActors::TActorContext& ctx) { + auto ev = MakeCreateSessionRequest(ctx); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); +} + +void TPartitionChooserActor::CloseKqpSession(const TActorContext& ctx) { + if (KqpSessionId) { + auto ev = MakeCloseSessionRequest(); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + + KqpSessionId = ""; + } +} + +void TPartitionChooserActor::SendUpdateRequests(const TActorContext& ctx) { + TThisActor::Become(&TThis::StateUpdate); + + auto ev = MakeUpdateQueryRequest(ctx); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + + UpdatesInflight++; +} + +void TPartitionChooserActor::SendSelectRequest(const NActors::TActorContext& ctx) { + TThisActor::Become(&TThis::StateSelect); + + auto ev = MakeSelectQueryRequest(ctx); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + + SelectInflight++; +} + +THolder<NKqp::TEvKqp::TEvCreateSessionRequest> TPartitionChooserActor::MakeCreateSessionRequest(const NActors::TActorContext& ctx) { + auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); + return ev; +} + +THolder<NKqp::TEvKqp::TEvCloseSessionRequest> TPartitionChooserActor::MakeCloseSessionRequest() { + auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); + ev->Record.MutableRequest()->SetSessionId(KqpSessionId); + return ev; +} + +THolder<NKqp::TEvKqp::TEvQueryRequest> TPartitionChooserActor::MakeSelectQueryRequest(const NActors::TActorContext& ctx) { + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + ev->Record.MutableRequest()->SetQuery(SelectQuery); + + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); + // fill tx settings: set commit tx flag& begin new serializable tx. + ev->Record.MutableRequest()->SetSessionId(KqpSessionId); + ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false); + ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + // keep compiled query in cache. + ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + + NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); + + SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); + + paramsBuilder + .AddParam("$Topic") + .Utf8(FullConverter->GetClientsideName()) + .Build() + .AddParam("$SourceId") + .Utf8(EncodedSourceId.EscapedSourceId) + .Build(); + + NYdb::TParams params = paramsBuilder.Build(); + + ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); + + return ev; +} + +THolder<NKqp::TEvKqp::TEvQueryRequest> TPartitionChooserActor::MakeUpdateQueryRequest(const NActors::TActorContext& ctx) { + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + ev->Record.MutableRequest()->SetQuery(UpdateQuery); + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); + // fill tx settings: set commit tx flag& begin new serializable tx. + ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + if (KqpSessionId) { + ev->Record.MutableRequest()->SetSessionId(KqpSessionId); + } + if (TxId) { + ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId); + TxId = ""; + } else { + ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + } + // keep compiled query in cache. + ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + + NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); + + SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); + + paramsBuilder + .AddParam("$Topic") + .Utf8(FullConverter->GetClientsideName()) + .Build() + .AddParam("$SourceId") + .Utf8(EncodedSourceId.EscapedSourceId) + .Build() + .AddParam("$CreateTime") + .Uint64(CreateTime) + .Build() + .AddParam("$AccessTime") + .Uint64(TInstant::Now().MilliSeconds()) + .Build() + .AddParam("$Partition") + .Uint32(Partition->PartitionId) + .Build(); + + NYdb::TParams params = paramsBuilder.Build(); + + ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); + + return ev; +} + +void TPartitionChooserActor::RequestPQRB(const NActors::TActorContext& ctx) { + Y_ABORT_UNLESS(BalancerTabletId); + + if (!PipeToBalancer) { + NTabletPipe::TClientConfig clientConfig; + clientConfig.RetryPolicy = { + .RetryLimitCount = 6, + .MinRetryTime = TDuration::MilliSeconds(10), + .MaxRetryTime = TDuration::MilliSeconds(100), + .BackoffMultiplier = 2, + .DoFirstRetryInstantly = true + }; + PipeToBalancer = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, BalancerTabletId, clientConfig)); + } + + TThisActor::Become(&TThis::StateSelect); + NTabletPipe::SendData(ctx, PipeToBalancer, new TEvPersQueue::TEvGetPartitionIdForWrite()); +} + +void TPartitionChooserActor::ReplyResult(const NActors::TActorContext& ctx) { + ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId)); +} + +void TPartitionChooserActor::ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx) { + ctx.Send(Parent, new TEvPartitionChooser::TEvChooseError(code, std::move(errorMessage))); + + Stop(ctx); +} + +void TPartitionChooserActor::HandleInit(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx) { + StartKqpSession(ctx); +} + +void TPartitionChooserActor::HandleInit(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { + const auto& record = ev->Get()->Record; + + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ53 : " << record, ctx); + return; + } + + KqpSessionId = record.GetResponse().GetSessionId(); + Y_ABORT_UNLESS(!KqpSessionId.empty()); + + SendSelectRequest(ctx); +} + +void TPartitionChooserActor::HandleDestroy(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { + const auto& record = ev->Get()->Record; + KqpSessionId = record.GetResponse().GetSessionId(); + + Stop(ctx); +} + +void TPartitionChooserActor::HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record.GetRef(); + + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ50 : " << record, ctx); + } + + auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); + + TxId = record.GetResponse().GetTxMeta().id(); + Y_ABORT_UNLESS(!TxId.empty()); + + if (t.ListSize() != 0) { + auto& tt = t.GetList(0).GetStruct(0); + if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition + auto accessTime = t.GetList(0).GetStruct(2).GetOptional().GetUint64(); + if (accessTime > AccessTime) { // AccessTime + PartitionId = tt.GetOptional().GetUint32(); + DEBUG("Received partition " << PartitionId << " from table for SourceId=" << SourceId); + Partition = Chooser->GetPartition(PartitionId.value()); + CreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64(); + AccessTime = accessTime; + } + } + } + + if (CreateTime == 0) { + CreateTime = TInstant::Now().MilliSeconds(); + } + + if (!Partition) { + ChoosePartition(ctx); + } else { + OnPartitionChosen(ctx); + } +} + +void TPartitionChooserActor::HandleSelect(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) { + PartitionId = ev->Get()->Record.GetPartitionId(); + DEBUG("Received partition " << PartitionId << " from PQRB for SourceId=" << SourceId); + Partition = Chooser->GetPartition(PartitionId.value()); + + OnPartitionChosen(ctx); +} + +void TPartitionChooserActor::HandleSelect(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + Y_UNUSED(ev); + + ReplyError(ErrorCode::INITIALIZING, "Pipe destroyed", ctx); +} + +void TPartitionChooserActor::HandleUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record.GetRef(); + + if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) { + if (!PartitionPersisted) { + CloseKqpSession(ctx); + StartKqpSession(ctx); + } + return; + } + + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + if (!PartitionPersisted) { + ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ51 : " << record, ctx); + } + return; + } + + if (!PartitionPersisted) { + ReplyResult(ctx); + PartitionPersisted = true; + // Use tx only for query after select. Updating AccessTime without transaction. + CloseKqpSession(ctx); + } + + TThisActor::Become(&TThis::StateIdle); +} + +void TPartitionChooserActor::HandleDestroy(NKqp::TEvKqp::TEvQueryResponse::TPtr&, const TActorContext& ctx) { + Stop(ctx); +} + +void TPartitionChooserActor::HandleIdle(TEvPartitionChooser::TEvRefreshRequest::TPtr&, const TActorContext& ctx) { + if (PartitionPersisted) { + // we do not update AccessTime for Split/Merge partitions because don't use table. + SendUpdateRequests(ctx); + } +} + +void TPartitionChooserActor::ChoosePartition(const TActorContext& ctx) { + auto [roundRobin, p] = ChoosePartitionSync(ctx); + if (roundRobin) { + RequestPQRB(ctx); + } else { + Partition = p; + OnPartitionChosen(ctx); + } +} + +void TPartitionChooserActor::OnPartitionChosen(const TActorContext& ctx) { + if (!Partition && PreferedPartition) { + return ReplyError(ErrorCode::BAD_REQUEST, + TStringBuilder() << "Prefered partition " << (PreferedPartition.value() + 1) << " is not exists or inactive.", + ctx); + } + + if (!Partition) { + return ReplyError(ErrorCode::INITIALIZING, "Can't choose partition", ctx); + } + + if (PreferedPartition && Partition->PartitionId != PreferedPartition.value()) { + return ReplyError(ErrorCode::BAD_REQUEST, + TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " + << (Partition->PartitionId + 1) << ", but client provided " << (PreferedPartition.value() + 1) + << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use " + "another MessageGroupId, specify PartitionGroupId " << (Partition->PartitionId + 1) + << ", or do not specify PartitionGroupId at all.", + ctx); + } + + if (SplitMergeEnabled_ && SourceId && PartitionId) { + if (Partition != Chooser->GetPartition(SourceId)) { + return ReplyError(ErrorCode::BAD_REQUEST, + TStringBuilder() << "Message group " << SourceId << " not in a partition boundary", ctx); + } + } + + if (NeedUpdateTable) { + SendUpdateRequests(ctx); + } else { + TThisActor::Become(&TThis::StateIdle); + + ReplyResult(ctx); + } +} + +std::pair<bool, const typename TPartitionChooserActor::TPartitionInfo*> TPartitionChooserActor::ChoosePartitionSync(const TActorContext& ctx) const { + const auto& pqConfig = AppData(ctx)->PQConfig; + if (SourceId && SplitMergeEnabled_) { + return {false, Chooser->GetPartition(SourceId)}; + } else if (PreferedPartition) { + return {false, Chooser->GetPartition(PreferedPartition.value())}; + } else if (pqConfig.GetTopicsAreFirstClassCitizen() && SourceId) { + return {false, Chooser->GetPartition(SourceId)}; + } else { + return {true, nullptr}; + } +} + } // namespace NPartitionChooser @@ -51,6 +469,7 @@ std::shared_ptr<IPartitionChooser> CreatePartitionChooser(const NKikimrSchemeOp: } } + IActor* CreatePartitionChooserActor(TActorId parentId, const NKikimrSchemeOp::TPersQueueGroupDescription& config, NPersQueue::TTopicConverterPtr& fullConverter, @@ -58,13 +477,8 @@ IActor* CreatePartitionChooserActor(TActorId parentId, std::optional<ui32> preferedPartition, bool withoutHash) { auto chooser = CreatePartitionChooser(config, withoutHash); - if (SplitMergeEnabled(config.GetPQTabletConfig())) { - return new NPartitionChooser::TSMPartitionChooserActor<NTabletPipe::TPipeHelper>(parentId, config, chooser, fullConverter, sourceId, preferedPartition); - } else { - return new NPartitionChooser::TPartitionChooserActor<NTabletPipe::TPipeHelper>(parentId, config, chooser, fullConverter, sourceId, preferedPartition); - } + return new NPartitionChooser::TPartitionChooserActor(parentId, config, chooser, fullConverter, sourceId, preferedPartition); } -} // namespace NKikimr::NPQ -std::unordered_map<ui64, TActorId> NKikimr::NTabletPipe::NTest::TPipeMock::Tablets; +} // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/writer/partition_chooser_impl.h b/ydb/core/persqueue/writer/partition_chooser_impl.h index 9fdacf20df..2fede5ae12 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl.h +++ b/ydb/core/persqueue/writer/partition_chooser_impl.h @@ -1,15 +1,29 @@ #pragma once +#include <ydb/library/actors/core/actor_bootstrapped.h> #include <util/random/random.h> +#include <ydb/core/base/appdata_fwd.h> +#include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/kqp/common/events/events.h> +#include <ydb/core/kqp/common/simple/services.h> +#include <ydb/core/persqueue/events/global.h> +#include <ydb/core/persqueue/pq_database.h> #include <ydb/core/persqueue/utils.h> +#include <ydb/core/persqueue/writer/metadata_initializers.h> +#include <ydb/library/yql/public/decimal/yql_decimal.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> -#include "partition_chooser_impl__old_chooser_actor.h" -#include "partition_chooser_impl__sm_chooser_actor.h" - +#include "partition_chooser.h" namespace NKikimr::NPQ { namespace NPartitionChooser { +#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, message); + +using namespace NActors; +using namespace NSourceIdEncoding; +using namespace Ydb::PersQueue::ErrorCode; + // For testing purposes struct TAsIsSharder { ui32 operator()(const TString& sourceId, ui32 totalShards) const; @@ -46,7 +60,6 @@ public: const TPartitionInfo* GetPartition(const TString& sourceId) const override; const TPartitionInfo* GetPartition(ui32 partitionId) const override; - const TPartitionInfo* GetRandomPartition() const override; private: const TString TopicName; @@ -55,14 +68,13 @@ private: }; // It is old alghoritm of choosing partition by SourceId -template<typename THasher = TMd5Sharder> +template<class THasher = TMd5Sharder> class THashChooser: public IPartitionChooser { public: THashChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config); const TPartitionInfo* GetPartition(const TString& sourceId) const override; const TPartitionInfo* GetPartition(ui32 partitionId) const override; - const TPartitionInfo* GetRandomPartition() const override; private: std::vector<TPartitionInfo> Partitions; @@ -70,6 +82,139 @@ private: }; +class TPartitionChooserActor: public TActorBootstrapped<TPartitionChooserActor> { + using TThis = TPartitionChooserActor; + using TThisActor = TActor<TThis>; + + friend class TActorBootstrapped<TThis>; +public: + using TPartitionInfo = typename IPartitionChooser::TPartitionInfo; + + TPartitionChooserActor(TActorId parentId, + const NKikimrSchemeOp::TPersQueueGroupDescription& config, + std::shared_ptr<IPartitionChooser>& chooser, + NPersQueue::TTopicConverterPtr& fullConverter, + const TString& sourceId, + std::optional<ui32> preferedPartition); + + void Bootstrap(const TActorContext& ctx); + +private: + void HandleInit(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx); + void HandleInit(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx); + + STATEFN(StateInit) { + switch (ev->GetTypeRewrite()) { + HFunc(NMetadata::NProvider::TEvManagerPrepared, HandleInit); + HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleInit); + sFunc(TEvents::TEvPoison, ScheduleStop); + } + } + +private: + void HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx); + void HandleSelect(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx); + void HandleSelect(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx); + + STATEFN(StateSelect) { + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleSelect); + HFunc(TEvPersQueue::TEvGetPartitionIdForWriteResponse, HandleSelect); + HFunc(TEvTabletPipe::TEvClientDestroyed, HandleSelect); + sFunc(TEvents::TEvPoison, ScheduleStop); + } + } + +private: + void HandleIdle(TEvPartitionChooser::TEvRefreshRequest::TPtr& ev, const TActorContext& ctx); + + STATEFN(StateIdle) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvPartitionChooser::TEvRefreshRequest, HandleIdle); + SFunc(TEvents::TEvPoison, Stop); + } + } + +private: + void HandleUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx); + + STATEFN(StateUpdate) { + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleUpdate); + sFunc(TEvents::TEvPoison, ScheduleStop); + } + } + +private: + void HandleDestroy(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx); + void HandleDestroy(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx); + + STATEFN(StateDestroy) { + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleDestroy); + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleDestroy); + } + } + +private: + void ScheduleStop(); + void Stop(const TActorContext& ctx); + + void ChoosePartition(const TActorContext& ctx); + void OnPartitionChosen(const TActorContext& ctx); + std::pair<bool, const TPartitionInfo*> ChoosePartitionSync(const TActorContext& ctx) const; + + TString GetDatabaseName(const NActors::TActorContext& ctx); + + void InitTable(const NActors::TActorContext& ctx); + + void StartKqpSession(const NActors::TActorContext& ctx); + void CloseKqpSession(const TActorContext& ctx); + void SendUpdateRequests(const TActorContext& ctx); + void SendSelectRequest(const NActors::TActorContext& ctx); + + void RequestPQRB(const NActors::TActorContext& ctx); + + THolder<NKqp::TEvKqp::TEvCreateSessionRequest> MakeCreateSessionRequest(const NActors::TActorContext& ctx); + THolder<NKqp::TEvKqp::TEvCloseSessionRequest> MakeCloseSessionRequest(); + THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSelectQueryRequest(const NActors::TActorContext& ctx); + THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateQueryRequest(const NActors::TActorContext& ctx); + + void ReplyResult(const NActors::TActorContext& ctx); + void ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx); + +private: + const TActorId Parent; + const NPersQueue::TTopicConverterPtr FullConverter; + const TString SourceId; + const std::optional<ui32> PreferedPartition; + const std::shared_ptr<IPartitionChooser> Chooser; + const bool SplitMergeEnabled_; + + std::optional<ui32> PartitionId; + const TPartitionInfo* Partition; + bool PartitionPersisted = false; + ui64 CreateTime = 0; + ui64 AccessTime = 0; + + bool NeedUpdateTable = false; + + NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId; + TString KqpSessionId; + TString TxId; + + NPQ::ESourceIdTableGeneration TableGeneration; + TString SelectQuery; + TString UpdateQuery; + + size_t UpdatesInflight = 0; + size_t SelectInflight = 0; + + ui64 BalancerTabletId; + TActorId PipeToBalancer; +}; + + // // TBoundaryChooser // @@ -88,7 +233,7 @@ TBoundaryChooser<THasher>::TBoundaryChooser(const NKikimrSchemeOp::TPersQueueGro } std::sort(Partitions.begin(), Partitions.end(), - [](const TPartitionInfo& a, const TPartitionInfo& b) { return !b.ToBound || (a.ToBound && a.ToBound < b.ToBound); }); + [](const TPartitionInfo& a, const TPartitionInfo& b) { return a.ToBound && a.ToBound < b.ToBound; }); } template<class THasher> @@ -107,15 +252,6 @@ const typename TBoundaryChooser<THasher>::TPartitionInfo* TBoundaryChooser<THash return it == Partitions.end() ? nullptr : it; } -template<class THasher> -const typename TBoundaryChooser<THasher>::TPartitionInfo* TBoundaryChooser<THasher>::GetRandomPartition() const { - if (Partitions.empty()) { - return nullptr; - } - size_t p = RandomNumber<size_t>(Partitions.size()); - return &Partitions[p]; -} - // @@ -136,9 +272,6 @@ THashChooser<THasher>::THashChooser(const NKikimrSchemeOp::TPersQueueGroupDescri template<class THasher> const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::GetPartition(const TString& sourceId) const { - if (Partitions.empty()) { - return nullptr; - } return &Partitions[Hasher(sourceId, Partitions.size())]; } @@ -152,32 +285,5 @@ const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::Get return it->PartitionId == partitionId ? it : nullptr; } -template<class THasher> -const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::GetRandomPartition() const { - if (Partitions.empty()) { - return nullptr; - } - size_t p = RandomNumber<size_t>(Partitions.size()); - return &Partitions[p]; -} - - } // namespace NPartitionChooser - - -inline IActor* CreatePartitionChooserActorM(TActorId parentId, - const NKikimrSchemeOp::TPersQueueGroupDescription& config, - NPersQueue::TTopicConverterPtr& fullConverter, - const TString& sourceId, - std::optional<ui32> preferedPartition, - bool withoutHash) { - auto chooser = CreatePartitionChooser(config, withoutHash); - if (SplitMergeEnabled(config.GetPQTabletConfig())) { - return new NPartitionChooser::TSMPartitionChooserActor<NTabletPipe::NTest::TPipeMock>(parentId, config, chooser, fullConverter, sourceId, preferedPartition); - } else { - return new NPartitionChooser::TPartitionChooserActor<NTabletPipe::NTest::TPipeMock>(parentId, config, chooser, fullConverter, sourceId, preferedPartition); - } -} - - } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h deleted file mode 100644 index 8d718d8d45..0000000000 --- a/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h +++ /dev/null @@ -1,368 +0,0 @@ -#pragma once - -#include "partition_chooser.h" -#include "partition_chooser_impl__partition_helper.h" -#include "partition_chooser_impl__table_helper.h" - -#include <ydb/core/persqueue/pq_database.h> -#include <ydb/core/persqueue/writer/metadata_initializers.h> -#include <ydb/library/actors/core/actor_bootstrapped.h> - -namespace NKikimr::NPQ::NPartitionChooser { - -#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR) -#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR" -#endif - - -#define LOG_PREFIX "TPartitionChooser " << SelfId() \ - << " (SourceId=" << SourceId \ - << ", PreferedPartition=" << PreferedPartition \ - << ") " -#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); - -using TPartitionInfo = typename IPartitionChooser::TPartitionInfo; - -using namespace NActors; -using namespace NSourceIdEncoding; -using namespace Ydb::PersQueue::ErrorCode; - -template<typename TDerived, typename TPipeCreator> -class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> { -public: - using TThis = TAbstractPartitionChooserActor<TDerived, TPipeCreator>; - using TThisActor = TActor<TThis>; - - - TAbstractPartitionChooserActor(TActorId parentId, - std::shared_ptr<IPartitionChooser>& chooser, - NPersQueue::TTopicConverterPtr& fullConverter, - const TString& sourceId, - std::optional<ui32> preferedPartition) - : Parent(parentId) - , SourceId(sourceId) - , PreferedPartition(preferedPartition) - , Chooser(chooser) - , TableHelper(fullConverter->GetClientsideName(), fullConverter->GetTopicForSrcIdHash()) - , PartitionHelper() { - } - - TActorIdentity SelfId() const { - return TActor<TDerived>::SelfId(); - } - - void Initialize(const NActors::TActorContext& ctx) { - TableHelper.Initialize(ctx, SourceId); - } - - void PassAway() { - auto ctx = TActivationContext::ActorContextFor(SelfId()); - TableHelper.CloseKqpSession(ctx); - PartitionHelper.Close(ctx); - } - - bool NeedTable(const NActors::TActorContext& ctx) { - const auto& pqConfig = AppData(ctx)->PQConfig; - return SourceId && (!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass()); - } - -protected: - void InitTable(const NActors::TActorContext& ctx) { - TThis::Become(&TThis::StateInitTable); - const auto& pqConfig = AppData(ctx)->PQConfig; - if (SourceId && pqConfig.GetTopicsAreFirstClassCitizen() && pqConfig.GetUseSrcIdMetaMappingInFirstClass()) { - DEBUG("InitTable"); - TableHelper.SendInitTableRequest(ctx); - } else { - StartKqpSession(ctx); - } - } - - void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx) { - StartKqpSession(ctx); - } - - STATEFN(StateInitTable) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle); - SFunc(TEvents::TEvPoison, TThis::Die); - } - } - -protected: - void StartKqpSession(const NActors::TActorContext& ctx) { - if (NeedTable(ctx)) { - DEBUG("StartKqpSession") - TThis::Become(&TThis::StateCreateKqpSession); - TableHelper.SendCreateSessionRequest(ctx); - } else { - OnSelected(ctx); - } - } - - void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { - if (!TableHelper.Handle(ev, ctx)) { - return ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ53 : " << ev->Get()->Record.DebugString(), ctx); - } - - SendSelectRequest(ctx); - } - - void ScheduleStop() { - TThis::Become(&TThis::StateDestroying); - } - - STATEFN(StateCreateKqpSession) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle); - sFunc(TEvents::TEvPoison, ScheduleStop); - } - } - -protected: - void SendSelectRequest(const NActors::TActorContext& ctx) { - TThis::Become(&TThis::StateSelect); - DEBUG("Select from the table"); - TableHelper.SendSelectRequest(ctx); - } - - void HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { - if (!TableHelper.HandleSelect(ev, ctx)) { - return ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ50 : " << ev->Get()->Record.DebugString(), ctx); - } - - TRACE("Selected from table PartitionId=" << TableHelper.PartitionId() << " SeqNo=" << TableHelper.SeqNo()); - if (TableHelper.PartitionId()) { - Partition = Chooser->GetPartition(TableHelper.PartitionId().value()); - } - SeqNo = TableHelper.SeqNo(); - - OnSelected(ctx); - } - - virtual void OnSelected(const NActors::TActorContext& ctx) = 0; - - STATEFN(StateSelect) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleSelect); - SFunc(TEvents::TEvPoison, TThis::Die); - } - } - -protected: - void SendUpdateRequests(const TActorContext& ctx) { - if (NeedTable(ctx)) { - TThis::Become(&TThis::StateUpdate); - DEBUG("Update the table"); - TableHelper.SendUpdateRequest(Partition->PartitionId, SeqNo, ctx); - } else { - StartGetOwnership(ctx); - } - } - - void HandleUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record.GetRef(); - DEBUG("HandleUpdate PartitionPersisted=" << PartitionPersisted << " Status=" << record.GetYdbStatus()); - - if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) { - if (!PartitionPersisted) { - TableHelper.CloseKqpSession(ctx); - StartKqpSession(ctx); - } - return; - } - - if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - if (!PartitionPersisted) { - ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ51 : " << record, ctx); - } - return; - } - - if (!PartitionPersisted) { - PartitionPersisted = true; - // Use tx only for query after select. Updating AccessTime without transaction. - TableHelper.CloseKqpSession(ctx); - - return StartGetOwnership(ctx); - } - - StartIdle(); - } - - STATEFN(StateUpdate) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleUpdate); - sFunc(TEvents::TEvPoison, ScheduleStop); - } - } - -protected: - void StartCheckPartitionRequest(const TActorContext &ctx) { - TThis::Become(&TThis::StateCheckPartition); - PartitionHelper.Open(Partition->TabletId, ctx); - PartitionHelper.SendCheckPartitionStatusRequest(Partition->PartitionId, ctx); - } - - void Handle(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse::TPtr& ev, const NActors::TActorContext& ctx) { - if (NKikimrPQ::ETopicPartitionStatus::Active == ev->Get()->Record.GetStatus()) { - PartitionHelper.Close(ctx); - return SendUpdateRequests(ctx); - } - ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "Partition isn`t active", ctx); - } - - STATEFN(StateCheckPartition) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse, Handle); - HFunc(TEvTabletPipe::TEvClientConnected, HandleOwnership); - HFunc(TEvTabletPipe::TEvClientDestroyed, HandleOwnership); - sFunc(TEvents::TEvPoison, ScheduleStop); - } - } - -protected: - void StartGetOwnership(const TActorContext &ctx) { - TThis::Become(&TThis::StateOwnership); - if (!Partition) { - return ReplyError(ErrorCode::INITIALIZING, "Partition not choosed", ctx); - } - - DEBUG("GetOwnership Partition TabletId=" << Partition->TabletId); - - PartitionHelper.Open(Partition->TabletId, ctx); - PartitionHelper.SendGetOwnershipRequest(Partition->PartitionId, SourceId, true, ctx); - } - - void HandleOwnership(TEvPersQueue::TEvResponse::TPtr& ev, const NActors::TActorContext& ctx) { - DEBUG("HandleOwnership"); - auto& record = ev->Get()->Record; - - TString error; - if (!BasicCheck(record, error)) { - return ReplyError(ErrorCode::INITIALIZING, std::move(error), ctx); - } - - const auto& response = record.GetPartitionResponse(); - if (!response.HasCmdGetOwnershipResult()) { - return ReplyError(ErrorCode::INITIALIZING, "Absent Ownership result", ctx); - } - - if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) { - return ReplyError(ErrorCode::INITIALIZING, "Partition is not active", ctx); - } - - OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie(); - - PartitionHelper.Close(ctx); - - OnOwnership(ctx); - } - - void HandleOwnership(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) { - auto msg = ev->Get(); - if (PartitionHelper.IsPipe(ev->Sender) && msg->Status != NKikimrProto::OK) { - TableHelper.CloseKqpSession(ctx); - ReplyError(ErrorCode::INITIALIZING, "Pipe closed", ctx); - } - } - - void HandleOwnership(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx) { - if (PartitionHelper.IsPipe(ev->Sender)) { - TableHelper.CloseKqpSession(ctx); - ReplyError(ErrorCode::INITIALIZING, "Pipe closed", ctx); - } - } - - virtual void OnOwnership(const TActorContext &ctx) = 0; - - STATEFN(StateOwnership) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(TEvPersQueue::TEvResponse, HandleOwnership); - HFunc(TEvTabletPipe::TEvClientConnected, HandleOwnership); - HFunc(TEvTabletPipe::TEvClientDestroyed, HandleOwnership); - sFunc(TEvents::TEvPoison, ScheduleStop); - } - } - - -protected: - void StartIdle() { - TThis::Become(&TThis::StateIdle); - DEBUG("Start idle"); - } - - void HandleIdle(TEvPartitionChooser::TEvRefreshRequest::TPtr&, const TActorContext& ctx) { - if (PartitionPersisted) { - SendUpdateRequests(ctx); - } - } - - STATEFN(StateIdle) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(TEvPartitionChooser::TEvRefreshRequest, HandleIdle); - SFunc(TEvents::TEvPoison, TThis::Die); - } - } - - -protected: - void HandleDestroy(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { - TableHelper.Handle(ev, ctx); - TThis::Die(ctx); - } - - STATEFN(StateDestroying) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleDestroy); - } - } - -protected: - void ReplyResult(const NActors::TActorContext& ctx) { - ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId, TThis::OwnerCookie, SeqNo)); - } - - void ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx) { - INFO("ReplyError: " << errorMessage); - ctx.Send(Parent, new TEvPartitionChooser::TEvChooseError(code, std::move(errorMessage))); - - TThis::Die(ctx); - } - - -protected: - const TActorId Parent; - const TString SourceId; - const std::optional<ui32> PreferedPartition; - const std::shared_ptr<IPartitionChooser> Chooser; - - const TPartitionInfo* Partition = nullptr; - - TTableHelper TableHelper; - TPartitionHelper<TPipeCreator> PartitionHelper; - - bool PartitionPersisted = false; - - TString OwnerCookie; - std::optional<ui64> SeqNo = 0; -}; - -#undef LOG_PREFIX -#undef TRACE -#undef DEBUG -#undef INFO -#undef ERROR - -} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h deleted file mode 100644 index 97a9750952..0000000000 --- a/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h +++ /dev/null @@ -1,163 +0,0 @@ -#pragma once - -#include "partition_chooser_impl__abstract_chooser_actor.h" -#include "partition_chooser_impl__pqrb_helper.h" - -namespace NKikimr::NPQ::NPartitionChooser { - -#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR) -#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR" -#endif - - -#define LOG_PREFIX "TPartitionChooser " << SelfId() \ - << " (SourceId=" << TThis::SourceId \ - << ", PreferedPartition=" << TThis::PreferedPartition \ - << ") " -#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); - -template<typename TPipeCreator> -class TPartitionChooserActor: public TAbstractPartitionChooserActor<TPartitionChooserActor<TPipeCreator>, TPipeCreator> { -public: - using TThis = TPartitionChooserActor<TPipeCreator>; - using TThisActor = TActor<TThis>; - using TParentActor = TAbstractPartitionChooserActor<TPartitionChooserActor<TPipeCreator>, TPipeCreator>; - - TPartitionChooserActor(TActorId parentId, - const NKikimrSchemeOp::TPersQueueGroupDescription& config, - std::shared_ptr<IPartitionChooser>& chooser, - NPersQueue::TTopicConverterPtr& fullConverter, - const TString& sourceId, - std::optional<ui32> preferedPartition) - : TAbstractPartitionChooserActor<TPartitionChooserActor<TPipeCreator>, TPipeCreator>(parentId, chooser, fullConverter, sourceId, preferedPartition) - , PQRBHelper(config.GetBalancerTabletID()) { - } - - void Bootstrap(const TActorContext& ctx) { - TThis::Initialize(ctx); - TThis::InitTable(ctx); - } - - TActorIdentity SelfId() const { - return TActor<TPartitionChooserActor<TPipeCreator>>::SelfId(); - } - - void OnSelected(const TActorContext &ctx) override { - if (TThis::Partition) { - return OnPartitionChosen(ctx); - } - - auto [roundRobin, p] = ChoosePartitionSync(ctx); - if (roundRobin) { - RequestPQRB(ctx); - } else { - TThis::Partition = p; - OnPartitionChosen(ctx); - } - } - - void OnOwnership(const TActorContext &ctx) override { - DEBUG("OnOwnership"); - TThis::ReplyResult(ctx); - } - -private: - void RequestPQRB(const NActors::TActorContext& ctx) { - DEBUG("RequestPQRB") - TThis::Become(&TThis::StatePQRB); - - if (PQRBHelper.PartitionId()) { - PartitionId = PQRBHelper.PartitionId(); - OnPartitionChosen(ctx); - } else { - PQRBHelper.SendRequest(ctx); - } - } - - void Handle(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) { - PartitionId = PQRBHelper.Handle(ev, ctx); - DEBUG("Received partition " << PartitionId << " from PQRB for SourceId=" << TThis::SourceId); - TThis::Partition = TThis::Chooser->GetPartition(PQRBHelper.PartitionId().value()); - - PQRBHelper.Close(ctx); - - OnPartitionChosen(ctx); - } - - void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) { - if (PQRBHelper.IsPipe(ev->Sender) && ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { - TThis::ReplyError(ErrorCode::INITIALIZING, "Pipe connection fail", ctx); - } - } - - void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx) { - if(PQRBHelper.IsPipe(ev->Sender)) { - TThis::ReplyError(ErrorCode::INITIALIZING, "Pipe destroyed", ctx); - } - } - - STATEFN(StatePQRB) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(TEvPersQueue::TEvGetPartitionIdForWriteResponse, Handle); - HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); - SFunc(TEvents::TEvPoison, TThis::Die); - } - } - - -private: - void OnPartitionChosen(const TActorContext& ctx) { - TRACE("OnPartitionChosen"); - - if (!TThis::Partition && TThis::PreferedPartition) { - return TThis::ReplyError(ErrorCode::BAD_REQUEST, - TStringBuilder() << "Prefered partition " << (TThis::PreferedPartition.value() + 1) << " is not exists or inactive.", - ctx); - } - - if (!TThis::Partition) { - return TThis::ReplyError(ErrorCode::INITIALIZING, "Can't choose partition", ctx); - } - - if (TThis::PreferedPartition && TThis::Partition->PartitionId != TThis::PreferedPartition.value()) { - return TThis::ReplyError(ErrorCode::BAD_REQUEST, - TStringBuilder() << "MessageGroupId " << TThis::SourceId << " is already bound to PartitionGroupId " - << (TThis::Partition->PartitionId + 1) << ", but client provided " << (TThis::PreferedPartition.value() + 1) - << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use " - "another MessageGroupId, specify PartitionGroupId " << (TThis::Partition->PartitionId + 1) - << ", or do not specify PartitionGroupId at all.", - ctx); - } - - TThis::SendUpdateRequests(ctx); - } - - std::pair<bool, const TPartitionInfo*> ChoosePartitionSync(const TActorContext& ctx) const { - const auto& pqConfig = AppData(ctx)->PQConfig; - if (TThis::PreferedPartition) { - return {false, TThis::Chooser->GetPartition(TThis::PreferedPartition.value())}; - } else if (pqConfig.GetTopicsAreFirstClassCitizen() && TThis::SourceId) { - return {false, TThis::Chooser->GetPartition(TThis::SourceId)}; - } else { - return {true, nullptr}; - } - }; - - -private: - std::optional<ui32> PartitionId; - - TPQRBHelper<TPipeCreator> PQRBHelper; -}; - -#undef LOG_PREFIX -#undef TRACE -#undef DEBUG -#undef INFO -#undef ERROR - -} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h deleted file mode 100644 index 463217557c..0000000000 --- a/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h +++ /dev/null @@ -1,86 +0,0 @@ -#pragma once - -#include "common.h" -#include "pipe_utils.h" -#include "source_id_encoding.h" - -#include <ydb/core/persqueue/events/global.h> -#include <ydb/core/persqueue/events/internal.h> -#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> - -namespace NKikimr::NPQ::NPartitionChooser { - -template<typename TPipeCreator> -class TPartitionHelper { -public: - void Open(ui64 tabletId, const TActorContext& ctx) { - Close(ctx); - - NTabletPipe::TClientConfig clientConfig; - clientConfig.RetryPolicy = { - .RetryLimitCount = 6, - .MinRetryTime = TDuration::MilliSeconds(10), - .MaxRetryTime = TDuration::MilliSeconds(100), - .BackoffMultiplier = 2, - .DoFirstRetryInstantly = true - }; - Pipe = ctx.RegisterWithSameMailbox(TPipeCreator::CreateClient(ctx.SelfID, tabletId, clientConfig)); - } - - void SendGetOwnershipRequest(ui32 partitionId, const TString& sourceId, bool registerIfNotExists, const TActorContext& ctx) { - auto ev = MakeRequest(partitionId, Pipe); - - auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetOwnership(); - cmd.SetOwner(sourceId ? sourceId : CreateGuidAsString()); - cmd.SetForce(true); - cmd.SetRegisterIfNotExists(registerIfNotExists); - - NTabletPipe::SendData(ctx, Pipe, ev.Release()); - } - - void SendMaxSeqNoRequest(ui32 partitionId, const TString& sourceId, const TActorContext& ctx) { - auto ev = MakeRequest(partitionId, Pipe); - - auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetMaxSeqNo(); - cmd.AddSourceId(NSourceIdEncoding::EncodeSimple(sourceId)); - - NTabletPipe::SendData(ctx, Pipe, ev.Release()); - } - - void SendCheckPartitionStatusRequest(ui32 partitionId, const TActorContext& ctx) { - auto ev = MakeHolder<NKikimr::TEvPQ::TEvCheckPartitionStatusRequest>(partitionId); - - NTabletPipe::SendData(ctx, Pipe, ev.Release()); - } - - void Close(const TActorContext& ctx) { - if (Pipe) { - NTabletPipe::CloseClient(ctx, Pipe); - Pipe = TActorId(); - } - } - - const TString& OwnerCookie() const { - return OwnerCookie_; - } - - bool IsPipe(const TActorId& actorId) const { - return actorId == Pipe; - } - -private: - THolder<TEvPersQueue::TEvRequest> MakeRequest(ui32 partitionId, TActorId pipe) { - auto ev = MakeHolder<TEvPersQueue::TEvRequest>(); - - ev->Record.MutablePartitionRequest()->SetPartition(partitionId); - ActorIdToProto(pipe, ev->Record.MutablePartitionRequest()->MutablePipeClient()); - - return ev; - } - -private: - TActorId Pipe; - TString OwnerCookie_; -}; - -} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__pqrb_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__pqrb_helper.h deleted file mode 100644 index c1a235901b..0000000000 --- a/ydb/core/persqueue/writer/partition_chooser_impl__pqrb_helper.h +++ /dev/null @@ -1,65 +0,0 @@ -#pragma once - -#include "pipe_utils.h" - -#include <ydb/core/persqueue/events/global.h> - - -namespace NKikimr::NPQ::NPartitionChooser { - -template<typename TPipeCreator> -class TPQRBHelper { -public: - TPQRBHelper(ui64 balancerTabletId) - : BalancerTabletId(balancerTabletId) { - } - - std::optional<ui32> PartitionId() const { - return PartitionId_; - } - - void SendRequest(const NActors::TActorContext& ctx) { - Y_ABORT_UNLESS(BalancerTabletId); - - if (!Pipe) { - NTabletPipe::TClientConfig clientConfig; - clientConfig.RetryPolicy = { - .RetryLimitCount = 6, - .MinRetryTime = TDuration::MilliSeconds(10), - .MaxRetryTime = TDuration::MilliSeconds(100), - .BackoffMultiplier = 2, - .DoFirstRetryInstantly = true - }; - Pipe = ctx.RegisterWithSameMailbox(TPipeCreator::CreateClient(ctx.SelfID, BalancerTabletId, clientConfig)); - } - - NTabletPipe::SendData(ctx, Pipe, new TEvPersQueue::TEvGetPartitionIdForWrite()); - } - - ui32 Handle(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) { - Close(ctx); - - PartitionId_ = ev->Get()->Record.GetPartitionId(); - return PartitionId_.value(); - } - - void Close(const TActorContext& ctx) { - if (Pipe) { - NTabletPipe::CloseClient(ctx, Pipe); - Pipe = TActorId(); - } - } - - bool IsPipe(const TActorId& actorId) const { - return actorId == Pipe; - } - -private: - const ui64 BalancerTabletId; - - TActorId Pipe; - std::optional<ui32> PartitionId_; -}; - - -} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h deleted file mode 100644 index 695af84d59..0000000000 --- a/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h +++ /dev/null @@ -1,272 +0,0 @@ -#pragma once - -#include "partition_chooser_impl__abstract_chooser_actor.h" - -#include <ydb/core/persqueue/utils.h> - -namespace NKikimr::NPQ::NPartitionChooser { - -#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR) -#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR" -#endif - - -#define LOG_PREFIX "TPartitionChooser " << SelfId() \ - << " (SourceId=" << TThis::SourceId \ - << ", PreferedPartition=" << TThis::PreferedPartition \ - << ") " -#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define INFO(message) LOG_INFO_S (*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); - -template<typename TPipeCreator> -class TSMPartitionChooserActor: public TAbstractPartitionChooserActor<TSMPartitionChooserActor<TPipeCreator>, TPipeCreator> { -public: - using TThis = TSMPartitionChooserActor<TPipeCreator>; - using TThisActor = TActor<TThis>; - using TParentActor = TAbstractPartitionChooserActor<TSMPartitionChooserActor<TPipeCreator>, TPipeCreator>; - - TSMPartitionChooserActor(TActorId parentId, - const NKikimrSchemeOp::TPersQueueGroupDescription& config, - std::shared_ptr<IPartitionChooser>& chooser, - NPersQueue::TTopicConverterPtr& fullConverter, - const TString& sourceId, - std::optional<ui32> preferedPartition) - : TAbstractPartitionChooserActor<TSMPartitionChooserActor<TPipeCreator>, TPipeCreator>(parentId, chooser, fullConverter, sourceId, preferedPartition) - , Graph(MakePartitionGraph(config)) { - - } - - void Bootstrap(const TActorContext& ctx) { - BoundaryPartition = ChoosePartitionSync(); - - if (TThis::SourceId) { - TThis::Initialize(ctx); - GetOwnershipFast(ctx); - } else { - TThis::Partition = BoundaryPartition; - TThis::StartGetOwnership(ctx); - } - } - - TActorIdentity SelfId() const { - return TActor<TSMPartitionChooserActor<TPipeCreator>>::SelfId(); - } - - void OnSelected(const TActorContext &ctx) override { - if (TThis::Partition) { - // If we have found a partition, it means that the partition is active, which means - // that we need to continue writing to it, and it does not matter whether it falls - // within the boundaries of the distribution. - return OnPartitionChosen(ctx); - } - - if (!TThis::TableHelper.PartitionId()) { - // They didn't write with this SourceId earlier, or the SourceID has already been deleted. - TThis::Partition = BoundaryPartition; - return OnPartitionChosen(ctx); - } - - const auto* node = Graph.GetPartition(TThis::TableHelper.PartitionId().value()); - if (!node) { - // The partition where the writting was performed earlier has already been deleted. - // We can write without taking into account the hierarchy of the partition. - TThis::Partition = BoundaryPartition; - return OnPartitionChosen(ctx); - } - - // Choosing a partition based on the split and merge hierarchy. - auto activeChildren = Graph.GetActiveChildren(TThis::TableHelper.PartitionId().value()); - if (activeChildren.empty()) { - return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "has't active partition Marker# PC01", ctx); - } - - if (activeChildren.contains(BoundaryPartition->PartitionId)) { - // First of all, we are trying to write to the partition taking into account the - // distribution boundaries. - TThis::Partition = BoundaryPartition; - } else { - // It is important to save the write into account the hierarchy of partitions, because - // this will preserve the guarantees of the order of reading. - auto n = RandomNumber<size_t>(activeChildren.size()); - std::vector<ui32> ac; - ac.reserve(activeChildren.size()); - ac.insert(ac.end(), activeChildren.begin(), activeChildren.end()); - auto id = ac[n]; - TThis::Partition = TThis::Chooser->GetPartition(id); - } - - if (!TThis::Partition) { - return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "can't choose partition Marker# PC02", ctx); - } - - GetOldSeqNo(ctx); - } - - void OnOwnership(const TActorContext &ctx) override { - DEBUG("OnOwnership"); - TThis::ReplyResult(ctx); - } - -private: - void GetOwnershipFast(const TActorContext &ctx) { - TThis::Become(&TThis::StateOwnershipFast); - if (!BoundaryPartition) { - return TThis::ReplyError(ErrorCode::INITIALIZING, "A partition not choosed", ctx); - } - - DEBUG("GetOwnershipFast Partition=" << BoundaryPartition->PartitionId << " TabletId=" << BoundaryPartition->TabletId); - - TThis::PartitionHelper.Open(BoundaryPartition->TabletId, ctx); - TThis::PartitionHelper.SendGetOwnershipRequest(BoundaryPartition->PartitionId, TThis::SourceId, false, ctx); - } - - void HandleOwnershipFast(TEvPersQueue::TEvResponse::TPtr& ev, const NActors::TActorContext& ctx) { - DEBUG("HandleOwnershipFast"); - auto& record = ev->Get()->Record; - - TString error; - if (!BasicCheck(record, error)) { - return TThis::InitTable(ctx); - } - - const auto& response = record.GetPartitionResponse(); - if (!response.HasCmdGetOwnershipResult()) { - return TThis::ReplyError(ErrorCode::INITIALIZING, "Absent Ownership result", ctx); - } - - if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) { - return TThis::ReplyError(ErrorCode::INITIALIZING, "Configuration changed", ctx); - } - - TThis::OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie(); - - if (response.GetCmdGetOwnershipResult().GetSeqNo() > 0) { - // Fast path: the partition ative and already written - TThis::SendUpdateRequests(ctx); - return TThis::ReplyResult(ctx); - } - - TThis::InitTable(ctx); - } - - STATEFN(StateOwnershipFast) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(TEvPersQueue::TEvResponse, HandleOwnershipFast); - HFunc(TEvTabletPipe::TEvClientConnected, TThis::HandleOwnership); - HFunc(TEvTabletPipe::TEvClientDestroyed, TThis::HandleOwnership); - SFunc(TEvents::TEvPoison, TThis::Die); - } - } - - -private: - void GetOldSeqNo(const TActorContext &ctx) { - DEBUG("GetOldSeqNo"); - TThis::Become(&TThis::StateGetMaxSeqNo); - - const auto* oldNode = Graph.GetPartition(TThis::TableHelper.PartitionId().value()); - - if (!oldNode) { - return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "Inconsistent status Marker# PC03", ctx); - } - - TThis::PartitionHelper.Open(oldNode->TabletId, ctx); - TThis::PartitionHelper.SendMaxSeqNoRequest(oldNode->Id, TThis::SourceId, ctx); - } - - void HandleMaxSeqNo(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record; - - TString error; - if (!BasicCheck(record, error)) { - return TThis::ReplyError(ErrorCode::INITIALIZING, std::move(error), ctx); - } - - const auto& response = record.GetPartitionResponse(); - if (!response.HasCmdGetMaxSeqNoResult()) { - return TThis::ReplyError(ErrorCode::INITIALIZING, "Absent MaxSeqNo result", ctx); - } - - const auto& result = response.GetCmdGetMaxSeqNoResult(); - if (result.SourceIdInfoSize() < 1) { - return TThis::ReplyError(ErrorCode::INITIALIZING, "Empty source id info", ctx); - } - - const auto& sourceIdInfo = result.GetSourceIdInfo(0); - switch (sourceIdInfo.GetState()) { - case NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED: - TThis::SeqNo = sourceIdInfo.GetSeqNo(); - break; - case NKikimrPQ::TMessageGroupInfo::STATE_PENDING_REGISTRATION: - case NKikimrPQ::TMessageGroupInfo::STATE_UNKNOWN: - TThis::SeqNo = TThis::TableHelper.SeqNo(); - break; - } - - TThis::PartitionHelper.Close(ctx); - TThis::StartCheckPartitionRequest(ctx); - } - - STATEFN(StateGetMaxSeqNo) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvPersQueue::TEvResponse, HandleMaxSeqNo); - SFunc(TEvents::TEvPoison, TThis::Die); - HFunc(TEvTabletPipe::TEvClientConnected, TThis::HandleOwnership); - HFunc(TEvTabletPipe::TEvClientDestroyed, TThis::HandleOwnership); - } - } - - -private: - void OnPartitionChosen(const TActorContext& ctx) { - TRACE("OnPartitionChosen"); - - if (!TThis::Partition && TThis::PreferedPartition) { - return TThis::ReplyError(ErrorCode::BAD_REQUEST, - TStringBuilder() << "Prefered partition " << (TThis::PreferedPartition.value() + 1) << " is not exists or inactive.", - ctx); - } - - if (!TThis::Partition) { - return TThis::ReplyError(ErrorCode::INITIALIZING, "Can't choose partition", ctx); - } - - if (TThis::PreferedPartition && TThis::Partition->PartitionId != TThis::PreferedPartition.value()) { - return TThis::ReplyError(ErrorCode::BAD_REQUEST, - TStringBuilder() << "MessageGroupId " << TThis::SourceId << " is already bound to PartitionGroupId " - << (TThis::Partition->PartitionId + 1) << ", but client provided " << (TThis::PreferedPartition.value() + 1) - << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use " - "another MessageGroupId, specify PartitionGroupId " << (TThis::Partition->PartitionId + 1) - << ", or do not specify PartitionGroupId at all.", - ctx); - } - - TThis::StartCheckPartitionRequest(ctx); - } - - const TPartitionInfo* ChoosePartitionSync() const { - if (TThis::PreferedPartition) { - return TThis::Chooser->GetPartition(TThis::PreferedPartition.value()); - } else if (TThis::SourceId) { - return TThis::Chooser->GetPartition(TThis::SourceId); - } else { - return TThis::Chooser->GetRandomPartition(); - } - }; - - -private: - const TPartitionInfo* BoundaryPartition = nullptr; - const TPartitionGraph Graph; -}; - -#undef LOG_PREFIX -#undef TRACE -#undef DEBUG -#undef INFO -#undef ERROR - -} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h deleted file mode 100644 index 210ab86ff4..0000000000 --- a/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h +++ /dev/null @@ -1,280 +0,0 @@ -#pragma once - -#include "metadata_initializers.h" -#include "source_id_encoding.h" - -#include <ydb/core/base/appdata_fwd.h> -#include <ydb/core/kqp/common/events/events.h> -#include <ydb/core/kqp/common/simple/services.h> -#include <ydb/core/persqueue/pq_database.h> -#include <ydb/library/yql/public/decimal/yql_decimal.h> -#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> -#include <ydb/services/metadata/service.h> - - -namespace NKikimr::NPQ::NPartitionChooser { - -#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR) -#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR" -#endif - - -#define LOG_PREFIX "TTableHelper " -#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); -#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); - - -class TTableHelper { -public: - TTableHelper(const TString& topicName, const TString& topicHashName) - : TopicName(topicName) - , TopicHashName(topicHashName) { - }; - - std::optional<ui32> PartitionId() const { - return PartitionId_; - } - - std::optional<ui64> SeqNo() const { - return SeqNo_; - } - - bool Initialize(const TActorContext& ctx, const TString& sourceId) { - const auto& pqConfig = AppData(ctx)->PQConfig; - - TableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping - : ESourceIdTableGeneration::SrcIdMeta2; - try { - EncodedSourceId = NSourceIdEncoding::EncodeSrcId( - TopicHashName, sourceId, TableGeneration - ); - } catch (yexception& e) { - return false; - } - - SelectQuery = GetSelectSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); - UpdateQuery = GetUpdateSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); - UpdateAccessTimeQuery = GetUpdateAccessTimeQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); - - DEBUG("SelectQuery: " << SelectQuery); - DEBUG("UpdateQuery: " << UpdateQuery); - DEBUG("UpdateAccessTimeQuery: " << UpdateAccessTimeQuery); - - return true; - } - - TString GetDatabaseName(const TActorContext& ctx) { - const auto& pqConfig = AppData(ctx)->PQConfig; - switch (TableGeneration) { - case ESourceIdTableGeneration::SrcIdMeta2: - return NKikimr::NPQ::GetDatabaseFromConfig(pqConfig); - case ESourceIdTableGeneration::PartitionMapping: - return AppData(ctx)->TenantName; - } - } - - void SendInitTableRequest(const TActorContext& ctx) { - ctx.Send( - NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()), - new NMetadata::NProvider::TEvPrepareManager(NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()) - ); - } - - void SendCreateSessionRequest(const TActorContext& ctx) { - auto ev = MakeCreateSessionRequest(ctx); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - } - - THolder<NKqp::TEvKqp::TEvCreateSessionRequest> MakeCreateSessionRequest(const TActorContext& ctx) { - auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); - ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); - return ev; - } - - bool Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& /*ctx*/) { - const auto& record = ev->Get()->Record; - - if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - return false; - } - - KqpSessionId = record.GetResponse().GetSessionId(); - Y_ABORT_UNLESS(!KqpSessionId.empty()); - - return true; - } - - void CloseKqpSession(const TActorContext& ctx) { - if (KqpSessionId) { - auto ev = MakeCloseSessionRequest(); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - - KqpSessionId = ""; - } - } - - THolder<NKqp::TEvKqp::TEvCloseSessionRequest> MakeCloseSessionRequest() { - auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); - ev->Record.MutableRequest()->SetSessionId(KqpSessionId); - return ev; - } - - void SendSelectRequest(const TActorContext& ctx) { - auto ev = MakeSelectQueryRequest(ctx); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - } - - THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSelectQueryRequest(const NActors::TActorContext& ctx) { - auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); - - ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - ev->Record.MutableRequest()->SetQuery(SelectQuery); - - ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); - // fill tx settings: set commit tx flag& begin new serializable tx. - ev->Record.MutableRequest()->SetSessionId(KqpSessionId); - ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false); - ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - // keep compiled query in cache. - ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - - NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); - - SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); - - paramsBuilder - .AddParam("$Topic") - .Utf8(TopicName) - .Build() - .AddParam("$SourceId") - .Utf8(EncodedSourceId.EscapedSourceId) - .Build(); - - NYdb::TParams params = paramsBuilder.Build(); - - ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); - - return ev; - } - - bool HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& /*ctx*/) { - auto& record = ev->Get()->Record.GetRef(); - - if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - return false; - } - - auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); - - TxId = record.GetResponse().GetTxMeta().id(); - Y_ABORT_UNLESS(!TxId.empty()); - - if (t.ListSize() != 0) { - auto& list = t.GetList(0); - auto& tt = list.GetStruct(0); - if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition - auto accessTime = list.GetStruct(2).GetOptional().GetUint64(); - if (accessTime > AccessTime) { // AccessTime - PartitionId_ = tt.GetOptional().GetUint32(); - CreateTime = list.GetStruct(1).GetOptional().GetUint64(); - AccessTime = accessTime; - SeqNo_ = list.GetStruct(3).GetOptional().GetUint64(); - } - } - } - - if (CreateTime == 0) { - CreateTime = TInstant::Now().MilliSeconds(); - } - - return true; - } - - void SendUpdateRequest(ui32 partitionId, std::optional<ui64> seqNo, const TActorContext& ctx) { - auto ev = MakeUpdateQueryRequest(partitionId, seqNo, ctx); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - } - - THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateQueryRequest(ui32 partitionId, std::optional<ui64> seqNo, const NActors::TActorContext& ctx) { - auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); - - ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - ev->Record.MutableRequest()->SetQuery(TxId ? UpdateQuery : UpdateAccessTimeQuery); - ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); - // fill tx settings: set commit tx flag& begin new serializable tx. - ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); - if (KqpSessionId) { - ev->Record.MutableRequest()->SetSessionId(KqpSessionId); - } - if (TxId) { - ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId); - TxId = ""; - } else { - ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - } - // keep compiled query in cache. - ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - - NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); - - SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); - - paramsBuilder - .AddParam("$Topic") - .Utf8(TopicName) - .Build() - .AddParam("$SourceId") - .Utf8(EncodedSourceId.EscapedSourceId) - .Build() - .AddParam("$CreateTime") - .Uint64(CreateTime) - .Build() - .AddParam("$AccessTime") - .Uint64(TInstant::Now().MilliSeconds()) - .Build() - .AddParam("$SeqNo") - .Uint64(seqNo.value_or(0)) - .Build() - .AddParam("$Partition") - .Uint32(partitionId) - .Build(); - - NYdb::TParams params = paramsBuilder.Build(); - - ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); - - return ev; - } - -private: - const TString TopicName; - const TString TopicHashName; - - NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId; - - NPQ::ESourceIdTableGeneration TableGeneration; - TString SelectQuery; - TString UpdateQuery; - TString UpdateAccessTimeQuery; - - TString KqpSessionId; - TString TxId; - - ui64 CreateTime = 0; - ui64 AccessTime = 0; - - std::optional<ui32> PartitionId_; - std::optional<ui64> SeqNo_; -}; - -#undef LOG_PREFIX -#undef TRACE -#undef DEBUG -#undef INFO -#undef ERROR - -} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/pipe_utils.h b/ydb/core/persqueue/writer/pipe_utils.h deleted file mode 100644 index db78a3ce12..0000000000 --- a/ydb/core/persqueue/writer/pipe_utils.h +++ /dev/null @@ -1,76 +0,0 @@ -#pragma once - -#include <ydb/core/base/tablet_pipe.h> -#include <ydb/library/actors/core/actor_bootstrapped.h> - -#include <unordered_map> - -namespace NKikimr::NTabletPipe { - -class TPipeHelper { -public: - static IActor* CreateClient(const TActorId& owner, ui64 tabletId, const TClientConfig& config = TClientConfig()) { - return NKikimr::NTabletPipe::CreateClient(owner, tabletId, config); - } - - static void SendData(const TActorContext& ctx, const TActorId& clientId, IEventBase* payload, ui64 cookie = 0, NWilson::TTraceId traceId = {}) { - return NKikimr::NTabletPipe::SendData(ctx, clientId, payload, cookie, std::move(traceId)); - } -}; - -namespace NTest { - - class TPipeMock { - private: - class TPipeActorMock: public TActorBootstrapped<TPipeActorMock> { - public: - TPipeActorMock(const TActorId& clientId, ui64 tabletId, const TActorId& forwardTo) - : ClientId(clientId) - , TabletId(tabletId) - , ForwardTo(forwardTo) {} - - void Bootstrap(const TActorContext& ctx) { - if (ForwardTo) { - ctx.Send(ForwardTo, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::EReplyStatus::OK, ClientId, ClientId, true, false, 1)); - Become(&TPipeActorMock::StateForward); - } else { - ctx.Send(ForwardTo, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::EReplyStatus::ERROR, ClientId, ClientId, true, false, 0)); - Die(ctx); - } - } - - private: - STFUNC(StateForward) { - auto ctx = TActivationContext::ActorContextFor(TActor<TPipeActorMock>::SelfId()); - ctx.Forward(ev, ForwardTo); - } - - private: - TActorId ClientId; - ui64 TabletId; - TActorId ForwardTo; - }; - - public: - static IActor* CreateClient(const TActorId& owner, ui64 tabletId, const TClientConfig& config = TClientConfig()) { - Y_UNUSED(config); - - auto it = Tablets.find(tabletId); - auto actorId = it == Tablets.end() ? TActorId() : it->second; - return new TPipeActorMock(owner, tabletId, actorId); - } - - static void Clear() { - Tablets.clear(); - } - - static void Register(ui64 tabletId, const TActorId& actorId) { - Tablets[tabletId] = actorId; - } - private: - static std::unordered_map<ui64, TActorId> Tablets; - }; - -} // namespace NTest - -} // namespace NKikimr::NTabletPipe diff --git a/ydb/core/persqueue/writer/source_id_encoding.cpp b/ydb/core/persqueue/writer/source_id_encoding.cpp index 6773cb5893..b4f4af39d5 100644 --- a/ydb/core/persqueue/writer/source_id_encoding.cpp +++ b/ydb/core/persqueue/writer/source_id_encoding.cpp @@ -20,15 +20,15 @@ TString GetSelectSourceIdQueryFromPath(const TString& path, ESourceIdTableGenera return TStringBuilder() << "--!syntax_v1\n" "DECLARE $Hash AS Uint32; " "DECLARE $Topic AS Utf8; " - "DECLARE $SourceId AS Utf8;\n" - "SELECT Partition, CreateTime, AccessTime, SeqNo FROM `" << path << "` " + "DECLARE $SourceId AS Utf8; " + "SELECT Partition, CreateTime, AccessTime FROM `" << path << "` " "WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId;"; case ESourceIdTableGeneration::PartitionMapping: return TStringBuilder() << "--!syntax_v1\n" "DECLARE $Hash AS Uint64; " "DECLARE $Topic AS Utf8; " - "DECLARE $SourceId AS Utf8;\n" - "SELECT Partition, CreateTime, AccessTime, SeqNo FROM `" + "DECLARE $SourceId AS Utf8; " + "SELECT Partition, CreateTime, AccessTime FROM `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() << "` WHERE Hash == $Hash AND Topic == $Topic AND ProducerId == $SourceId;"; default: @@ -59,40 +59,9 @@ TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGenera "DECLARE $Hash AS Uint32; " "DECLARE $Partition AS Uint32; " "DECLARE $CreateTime AS Uint64; " - "DECLARE $AccessTime AS Uint64;" - "DECLARE $SeqNo AS Uint64;\n" - "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition, SeqNo) VALUES " - "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition, $SeqNo);"; - case ESourceIdTableGeneration::PartitionMapping: - return TStringBuilder() << "--!syntax_v1\n" - "DECLARE $SourceId AS Utf8; " - "DECLARE $Topic AS Utf8; " - "DECLARE $Hash AS Uint64; " - "DECLARE $Partition AS Uint32; " - "DECLARE $CreateTime AS Uint64; " - "DECLARE $AccessTime AS Uint64; " - "DECLARE $SeqNo AS Uint64;\n" - "UPSERT INTO `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() - << "` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition, SeqNo) VALUES " - "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition, $SeqNo);"; - default: - Y_ABORT(); - } -} - -TString GetUpdateAccessTimeQueryFromPath(const TString& path, ESourceIdTableGeneration generation) { - switch (generation) { - case ESourceIdTableGeneration::SrcIdMeta2: - return TStringBuilder() << "--!syntax_v1\n" - "DECLARE $SourceId AS Utf8; " - "DECLARE $Topic AS Utf8; " - "DECLARE $Hash AS Uint32; " - "DECLARE $Partition AS Uint32; " - "DECLARE $CreateTime AS Uint64; " "DECLARE $AccessTime AS Uint64;\n" - "UPDATE `" << path << "` " - "SET AccessTime = $AccessTime " - "WHERE Hash = $Hash AND Topic = $Topic AND SourceId = $SourceId AND Partition = $Partition;"; + "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES " + "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);"; case ESourceIdTableGeneration::PartitionMapping: return TStringBuilder() << "--!syntax_v1\n" "DECLARE $SourceId AS Utf8; " @@ -101,9 +70,9 @@ TString GetUpdateAccessTimeQueryFromPath(const TString& path, ESourceIdTableGene "DECLARE $Partition AS Uint32; " "DECLARE $CreateTime AS Uint64; " "DECLARE $AccessTime AS Uint64;\n" - "UPDATE `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() << "` " - "SET AccessTime = $AccessTime " - "WHERE Hash = $Hash AND Topic = $Topic AND ProducerId = $SourceId AND Partition = $Partition;"; + "UPSERT INTO `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() + << "` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition) VALUES " + "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);"; default: Y_ABORT(); } @@ -123,20 +92,6 @@ TString GetUpdateSourceIdQuery(const TString& root, ESourceIdTableGeneration gen } } -TString GetUpdateAccessTimeQuery(const TString& root, ESourceIdTableGeneration generation) { - switch (generation) { - case ESourceIdTableGeneration::SrcIdMeta2: - return GetUpdateAccessTimeQueryFromPath(root + "/SourceIdMeta2", generation); - case ESourceIdTableGeneration::PartitionMapping: - return GetUpdateAccessTimeQueryFromPath( - NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath(), - generation - ); - default: - Y_ABORT(); - } -} - namespace NSourceIdEncoding { static const ui32 MURMUR_ARRAY_SEED = 0x9747b28c; diff --git a/ydb/core/persqueue/writer/source_id_encoding.h b/ydb/core/persqueue/writer/source_id_encoding.h index 39b6487864..99cdef19f6 100644 --- a/ydb/core/persqueue/writer/source_id_encoding.h +++ b/ydb/core/persqueue/writer/source_id_encoding.h @@ -15,11 +15,9 @@ enum class ESourceIdTableGeneration { TString GetSelectSourceIdQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); TString GetUpdateSourceIdQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); -TString GetUpdateAccessTimeQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); TString GetSelectSourceIdQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); -TString GetUpdateAccessTimeQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); namespace NSourceIdEncoding { diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index dc5bddf72e..1cc4ab2e9b 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -1,4 +1,3 @@ -#include "common.h" #include "source_id_encoding.h" #include "util/generic/fwd.h" #include "writer.h" @@ -130,6 +129,27 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl return ev; } + static bool BasicCheck(const NKikimrClient::TResponse& response, TString& error, bool mustHaveResponse = true) { + if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) { + error = TStringBuilder() << "Status is not ok" + << ": status# " << static_cast<ui32>(response.GetStatus()); + return false; + } + + if (response.GetErrorCode() != NPersQueue::NErrorCode::OK) { + error = TStringBuilder() << "Error code is not ok" + << ": code# " << static_cast<ui32>(response.GetErrorCode()); + return false; + } + + if (mustHaveResponse && !response.HasPartitionResponse()) { + error = "Absent partition response"; + return false; + } + + return true; + } + static NKikimrClient::TResponse MakeResponse(ui64 cookie) { NKikimrClient::TResponse response; response.MutablePartitionResponse()->SetCookie(cookie); @@ -306,10 +326,6 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl return InitResult("Absent Ownership result", std::move(record)); } - if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) { - return InitResult("Partition is inactive", std::move(record)); - } - OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie(); GetMaxSeqNo(); } diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index e92ea1d7b8..8ab13eff77 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -110,7 +110,6 @@ message TPersQueuePartitionRequest { message TCmdGetOwnership { // get write ownership for partition optional string Owner = 1 [default = "default"]; optional bool Force = 2 [ default = true]; - optional bool RegisterIfNotExists = 3 [default = true]; } message TCmdReserveBytes { @@ -469,8 +468,6 @@ message TPersQueuePartitionResponse { message TCmdGetOwnershipResult { optional string OwnerCookie = 1; - optional NKikimrPQ.ETopicPartitionStatus Status = 2; - optional int64 SeqNo = 3; } message TCmdPrepareDirectReadResult { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index b47d3ff20a..6abc3855e4 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -962,13 +962,6 @@ message TEvSourceIdResponse { repeated TSource Source = 3; }; -message TEvCheckPartitionStatusRequest { - optional uint32 Partition = 1; -}; - -message TEvCheckPartitionStatusResponse { - optional ETopicPartitionStatus Status = 1; -}; message TTransaction { enum EKind { diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index 798795f331..717d653260 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -599,7 +599,6 @@ public: "Columns { Name: \"Partition\" Type: \"Uint32\"}" "Columns { Name: \"CreateTime\" Type: \"Uint64\"}" "Columns { Name: \"AccessTime\" Type: \"Uint64\"}" - "Columns { Name: \"SeqNo\" Type: \"Uint64\"}" "KeyColumnNames: [\"Hash\", \"SourceId\", \"Topic\"]" ); } |