diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2024-01-30 17:44:14 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-30 17:44:14 +0500 |
commit | 88134144840b61b5c3d4eb9c701dd95168f3f7f4 (patch) | |
tree | e6a1c3500788467d6d3813d970f6d7a9593d3d7c | |
parent | 7c628c7b86cecdd277cb87453e3c733e4bc15943 (diff) | |
download | ydb-88134144840b61b5c3d4eb9c701dd95168f3f7f4.tar.gz |
Topic split/merge for write (#1386)
30 files changed, 277 insertions, 714 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 7e584528fd..69a8858da8 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -216,13 +216,14 @@ struct TEvPQ { std::optional<TRowVersion> HeartbeatVersion; }; - TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite) + TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite, std::optional<ui64> initialSeqNo) : Cookie(cookie) , MessageNo(messageNo) , OwnerCookie(ownerCookie) , Offset(offset) , Msgs(std::move(msgs)) , IsDirectWrite(isDirectWrite) + , InitialSeqNo(initialSeqNo) {} ui64 Cookie; @@ -231,6 +232,7 @@ struct TEvPQ { TMaybe<ui64> Offset; TVector<TMsg> Msgs; bool IsDirectWrite; + std::optional<ui64> InitialSeqNo; }; @@ -939,12 +941,6 @@ struct TEvPQ { NKikimrClient::TPersQueueFetchResponse Response; }; - struct TEvSourceIdRequest : public TEventPB<TEvSourceIdRequest, NKikimrPQ::TEvSourceIdRequest, EvSourceIdRequest> { - }; - - struct TEvSourceIdResponse : public TEventPB<TEvSourceIdResponse, NKikimrPQ::TEvSourceIdResponse, EvSourceIdResponse> { - }; - struct TEvRegisterDirectReadSession : public TEventLocal<TEvRegisterDirectReadSession, EvRegisterDirectReadSession> { TEvRegisterDirectReadSession(const NPQ::TReadSessionKey& sessionKey, ui32 tabletGeneration) : Session(sessionKey) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 74afe14f88..f29e9511f7 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -10,6 +10,7 @@ #include <ydb/core/base/counters.h> #include <ydb/core/base/path.h> #include <ydb/core/quoter/public/quoter.h> +#include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/protos/counters_pq.pb.h> #include <ydb/core/protos/msgbus.pb.h> #include <ydb/library/persqueue/topic_parser/topic_parser.h> @@ -482,8 +483,6 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill()); - SourceManager.PassAway(); - Die(ctx); } @@ -935,50 +934,33 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx } void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx) { - SourceManager.EnsureSourceIds(ev->Get()->SourceIds); - MaxSeqNoRequests.emplace_back(ev); - ProcessMaxSeqNoRequest(ctx); -} - -void TPartition::ProcessMaxSeqNoRequest(const TActorContext& ctx) { - PQ_LOG_T("TPartition::ProcessMaxSeqNoRequest. Queue size: " << MaxSeqNoRequests.size()); - - while(!MaxSeqNoRequests.empty()) { - auto& ev = MaxSeqNoRequests.front(); - - auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie); - NKikimrClient::TResponse& resp = *response->Response; - - resp.SetStatus(NMsgBusProxy::MSTATUS_OK); - resp.SetErrorCode(NPersQueue::NErrorCode::OK); - - auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult(); - for (const auto& sourceId : ev->Get()->SourceIds) { - auto& protoInfo = *result.AddSourceIdInfo(); - protoInfo.SetSourceId(sourceId); + auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie); + NKikimrClient::TResponse& resp = *response->Response; - auto info = SourceManager.Get(sourceId); - if (!info) { - PQ_LOG_D("Stop MaxSeqNoRequest - scheduled a research. SourceId: " << sourceId); - return; - } - if (info.State == TSourceIdInfo::EState::Unknown) { - continue; - } + resp.SetStatus(NMsgBusProxy::MSTATUS_OK); + resp.SetErrorCode(NPersQueue::NErrorCode::OK); - Y_ABORT_UNLESS(info.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, info.Offset); - Y_ABORT_UNLESS(info.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, info.SeqNo); + auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult(); + for (const auto& sourceId : ev->Get()->SourceIds) { + auto& protoInfo = *result.AddSourceIdInfo(); + protoInfo.SetSourceId(sourceId); - protoInfo.SetSeqNo(info.SeqNo); - protoInfo.SetOffset(info.Offset); - protoInfo.SetWriteTimestampMS(info.WriteTimestamp.MilliSeconds()); - protoInfo.SetExplicit(info.Explicit); - protoInfo.SetState(TSourceIdInfo::ConvertState(info.State)); + auto info = SourceManager.Get(sourceId); + if (info.State == TSourceIdInfo::EState::Unknown) { + continue; } - ctx.Send(Tablet, response.Release()); - MaxSeqNoRequests.pop_front(); + Y_ABORT_UNLESS(info.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, info.Offset); + Y_ABORT_UNLESS(info.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, info.SeqNo); + + protoInfo.SetSeqNo(info.SeqNo); + protoInfo.SetOffset(info.Offset); + protoInfo.SetWriteTimestampMS(info.WriteTimestamp.MilliSeconds()); + protoInfo.SetExplicit(info.Explicit); + protoInfo.SetState(TSourceIdInfo::ConvertState(info.State)); } + + ctx.Send(Tablet, response.Release()); } void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) { @@ -2612,42 +2594,6 @@ void TPartition::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext } } -void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record; - - if (Partition != record.GetPartition()) { - LOG_INFO_S( - ctx, NKikimrServices::PERSQUEUE, - "TEvSourceIdRequest for wrong partition " << record.GetPartition() << "." << - " Topic: \"" << TopicName() << "\"." << - " Partition: " << Partition << "." - ); - return; - } - - auto& memoryStorage = SourceIdStorage.GetInMemorySourceIds(); - - auto response = MakeHolder<TEvPQ::TEvSourceIdResponse>(); - for(auto& sourceId : record.GetSourceId()) { - auto* s = response->Record.AddSource(); - s->SetId(sourceId); - - auto it = memoryStorage.find(sourceId); - if (it != memoryStorage.end()) { - auto& info = it->second; - s->SetState(Convert(info.State)); - s->SetSeqNo(info.SeqNo); - s->SetOffset(info.Offset); - s->SetExplicit(info.Explicit); - s->SetWriteTimestamp(info.WriteTimestamp.GetValue()); - } else { - s->SetState(NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown); - } - } - - Send(ev->Sender, response.Release()); -} - void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) { auto& record = ev->Get()->Record; @@ -2664,6 +2610,13 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>(); response->Record.SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active); + if (record.HasSourceId()) { + auto sit = SourceIdStorage.GetInMemorySourceIds().find(NSourceIdEncoding::EncodeSimple(record.GetSourceId())); + if (sit != SourceIdStorage.GetInMemorySourceIds().end()) { + response->Record.SetSeqNo(sit->second.SeqNo); + } + } + Send(ev->Sender, response.Release()); } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 9fee3b4802..31cb40fd48 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -344,7 +344,6 @@ private: void CheckIfSessionExists(TUserInfoBase& userInfo, const TActorId& newPipe); // void DestroyReadSession(const TReadSessionKey& key); - void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx); TString LogPrefix() const; @@ -482,9 +481,7 @@ private: HFuncTraced(TEvPQ::TEvTxCommit, Handle); HFuncTraced(TEvPQ::TEvTxRollback, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); - HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle); HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle); - HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle); @@ -540,9 +537,7 @@ private: HFuncTraced(TEvPQ::TEvTxCommit, Handle); HFuncTraced(TEvPQ::TEvTxRollback, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); - HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle); HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle); - HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle); @@ -613,7 +608,6 @@ private: std::deque<TMessage> Requests; std::deque<TMessage> Responses; - std::deque<TEvPQ::TEvGetMaxSeqNoRequest::TPtr> MaxSeqNoRequests; THead Head; THead NewHead; diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp index ac5eff21be..b0770f1f90 100644 --- a/ydb/core/persqueue/partition_sourcemanager.cpp +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -1,15 +1,9 @@ #include "partition.h" -#include "partition_log.h" - -#include <unordered_map> #include <ydb/core/base/tablet_pipe.h> namespace NKikimr::NPQ { -IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId); -bool IsResearchRequires(const TPartitionGraph::Node* node); - // // TPartitionSourceManager // @@ -18,59 +12,6 @@ TPartitionSourceManager::TPartitionSourceManager(TPartition& partition) : Partition(partition) { } -void TPartitionSourceManager::ScheduleBatch() { - if (WaitSources()) { - return; - } - - PendingCookies.clear(); - Responses.clear(); - - if (UnknownSourceIds.empty()) { - return; - } - - auto node = GetPartitionNode(); - if (!IsResearchRequires(node)) { - return; - } - - PendingSourceIds = std::move(UnknownSourceIds); - - for(const auto* parent : node->HierarhicalParents) { - PendingCookies.insert(++Cookie); - - TActorId actorId = PartitionRequester(parent->Id, parent->TabletId); - Partition.Send(actorId, CreateRequest(parent->Id).release(), 0, Cookie); - } -} - -void TPartitionSourceManager::EnsureSourceId(const TString& sourceId) { - if (!IsResearchRequires(GetPartitionNode())) { - return; - } - - if (RequireEnqueue(sourceId)) { - UnknownSourceIds.insert(sourceId); - } - - ScheduleBatch(); -} - -void TPartitionSourceManager::EnsureSourceIds(const TVector<TString>& sourceIds) { - if (!IsResearchRequires(GetPartitionNode())) { - return; - } - - for(const auto& sourceId : sourceIds) { - if (RequireEnqueue(sourceId)) { - UnknownSourceIds.insert(sourceId); - } - } - - ScheduleBatch(); -} - const TPartitionSourceManager::TSourceInfo TPartitionSourceManager::Get(const TString& sourceId) const { auto& knownSourceIds = GetSourceIdStorage().GetInMemorySourceIds(); @@ -88,19 +29,9 @@ const TPartitionSourceManager::TSourceInfo TPartitionSourceManager::Get(const TS return result; } - auto its = Sources.find(sourceId); - if (its != Sources.end()) { - return its->second; - } - - TSourceInfo result; - result.Pending = IsResearchRequires(GetPartitionNode()); - return result; + return TSourceInfo{}; } -bool TPartitionSourceManager::WaitSources() const { - return !PendingCookies.empty(); -} TPartitionSourceManager::TModificationBatch TPartitionSourceManager::CreateModificationBatch(const TActorContext& ctx) { const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo() @@ -109,76 +40,10 @@ TPartitionSourceManager::TModificationBatch TPartitionSourceManager::CreateModif return TModificationBatch(*this, format); } -void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& ctx) { - auto it = PendingCookies.find(ev->Cookie); - if (it == PendingCookies.end()) { - PQ_LOG_D("Received TEvSourceIdResponse with unknown cookie: " << ev->Cookie); - return; - } - PendingCookies.erase(it); - - if (ev->Get()->Record.HasError()) { - PQ_LOG_D("Error on request SourceId: " << ev->Get()->Record.GetError()); - Responses.clear(); - RequesterActors.erase(ev->Get()->Record.GetPartition()); - - if (UnknownSourceIds.empty()) { - UnknownSourceIds = std::move(PendingSourceIds); - } else { - UnknownSourceIds.insert(PendingSourceIds.begin(), PendingSourceIds.end()); - PendingSourceIds.clear(); - } - - ScheduleBatch(); - return; - } - - Responses.push_back(ev); - - if (PendingCookies.empty()) { - FinishBatch(ctx); - ScheduleBatch(); - } -} - const TPartitionSourceManager::TPartitionNode* TPartitionSourceManager::GetPartitionNode() const { return Partition.PartitionGraph.GetPartition(Partition.Partition); } -void TPartitionSourceManager::FinishBatch(const TActorContext& ctx) { - for(const auto& ev : Responses) { - const auto& record = ev->Get()->Record; - for(const auto& s : record.GetSource()) { - auto& value = Sources[s.GetId()]; - if (s.GetState() == NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown) { - continue; - } - PQ_LOG_T("Received SourceId " << s.GetId() << " SeqNo=" << s.GetSeqNo()); - if (value.State == TSourceIdInfo::EState::Unknown || value.SeqNo < s.GetSeqNo()) { - value.State = Convert(s.GetState()); - value.SeqNo = s.GetSeqNo(); - value.Offset = s.GetOffset(); - value.Explicit = s.GetExplicit(); - value.WriteTimestamp.FromValue(s.GetWriteTimestamp()); - } - } - } - - Responses.clear(); - PendingSourceIds.clear(); - - if (Partition.CurrentStateFunc() == &TPartition::StateIdle) { - Partition.HandleWrites(ctx); - } - Partition.ProcessMaxSeqNoRequest(ctx); -} - -bool TPartitionSourceManager::RequireEnqueue(const TString& sourceId) { - auto& knownSourceIds = GetSourceIdStorage().GetInMemorySourceIds(); - return !Sources.contains(sourceId) && !knownSourceIds.contains(sourceId) - && !PendingSourceIds.contains(sourceId); -} - TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const { return Partition.SourceIdStorage; } @@ -188,36 +53,6 @@ bool TPartitionSourceManager::HasParents() const { return node && !node->Parents.empty(); } -TActorId TPartitionSourceManager::PartitionRequester(TPartitionId id, ui64 tabletId) { - auto it = RequesterActors.find(id); - if (it != RequesterActors.end()) { - return it->second; - } - - TActorId actorId = Partition.RegisterWithSameMailbox(CreateRequester(Partition.SelfId(), - id, - tabletId)); - RequesterActors[id] = actorId; - return actorId; -} - -std::unique_ptr<TEvPQ::TEvSourceIdRequest> TPartitionSourceManager::CreateRequest(TPartitionSourceManager::TPartitionId id) const { - auto request = std::make_unique<TEvPQ::TEvSourceIdRequest>(); - auto& record = request->Record; - record.SetPartition(id); - for(const auto& sourceId : PendingSourceIds) { - record.AddSourceId(sourceId); - } - return request; -} - -void TPartitionSourceManager::PassAway() { - for(const auto [_, actorId] : RequesterActors) { - Partition.Send(actorId, new TEvents::TEvPoison()); - } - RequesterActors.clear(); -} - // // TPartitionSourceManager::TModificationBatch @@ -231,9 +66,6 @@ TPartitionSourceManager::TModificationBatch::TModificationBatch(TPartitionSource } TPartitionSourceManager::TModificationBatch::~TModificationBatch() { - for(auto& [k, _] : SourceIdWriter.GetSourceIdsToWrite()) { - Manager.Sources.erase(k); - } } TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmitHeartbeat() const { @@ -283,11 +115,9 @@ TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch& batc , SourceId(id) { auto& memory = MemoryStorage(); auto& writer = WriteStorage(); - auto& sources = Batch.GetManager().Sources; InMemory = memory.find(id); InWriter = writer.find(id); - InSources = sources.end(); if (InWriter != writer.end()) { Info = Convert(InWriter->second); @@ -297,14 +127,6 @@ TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch& batc Info = Convert(InMemory->second); return; } - - InSources = sources.find(id); - if (InSources != sources.end()) { - Info = InSources->second; - return; - } - - Info.Pending = IsResearchRequires(Batch.Node); } std::optional<ui64> TPartitionSourceManager::TSourceManager::SeqNo() const { @@ -335,9 +157,6 @@ void TPartitionSourceManager::TSourceManager::Update(THeartbeat&& heartbeat) { Batch.HeartbeatEmitter.Process(SourceId, std::move(heartbeat)); } -TPartitionSourceManager::TSourceManager::operator bool() const { - return Info; -} const TSourceIdMap& TPartitionSourceManager::TSourceManager::MemoryStorage() const { return Batch.GetManager().GetSourceIdStorage().GetInMemorySourceIds(); @@ -356,160 +175,4 @@ TPartitionSourceManager::TSourceInfo::TSourceInfo(TSourceIdInfo::EState state) : State(state) { } -TPartitionSourceManager::TSourceInfo::operator bool() const { - return !Pending; -} - - -// -// TSourceIdRequester -// - -class TSourceIdRequester : public TActorBootstrapped<TSourceIdRequester> { - static constexpr TDuration RetryDelay = TDuration::MilliSeconds(100); -public: - TSourceIdRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId) - : Parent(parent) - , Partition(partition) - , TabletId(tabletId) { - } - - void Bootstrap(const TActorContext&) { - Become(&TThis::StateWork); - MakePipe(); - } - - void PassAway() override { - if (PipeClient) { - NTabletPipe::CloseAndForgetClient(SelfId(), PipeClient); - } - TActorBootstrapped::PassAway(); - } - -private: - void MakePipe() { - NTabletPipe::TClientConfig config; - config.RetryPolicy = { - .RetryLimitCount = 6, - .MinRetryTime = TDuration::MilliSeconds(10), - .MaxRetryTime = TDuration::MilliSeconds(200), - .BackoffMultiplier = 2, - .DoFirstRetryInstantly = true - }; - PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, config)); - } - - void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& /*ctx*/) { - Cookie = ev->Cookie; - PendingRequest = ev; - - DoRequest(); - } - - void Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& /*ctx*/) { - auto msg = std::make_unique<TEvPQ::TEvSourceIdResponse>(); - msg->Record = std::move(ev->Get()->Record); - bool hasError = msg->Record.HasError(); - - Reply(msg); - - if (hasError) { - PassAway(); - } - } - - void DoRequest() { - if (PendingRequest && PipeConnected) { - auto msg = std::make_unique<TEvPQ::TEvSourceIdRequest>(); - msg->Record = std::move(PendingRequest->Get()->Record); - PendingRequest.Reset(); - - NTabletPipe::SendData(SelfId(), PipeClient, msg.release()); - } - } - - void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { - if (ev->Get()->Status == NKikimrProto::EReplyStatus::OK) { - PipeConnected = true; - DoRequest(); - } else { - ReplyWithError("Error connecting to PQ"); - } - } - - void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { - if (ev->Get()->TabletId == TabletId) { - PipeConnected = false; - ReplyWithError("Pipe destroyed"); - } - } - - void ReplyWithError(const TString& error) { - auto msg = std::make_unique<TEvPQ::TEvSourceIdResponse>(); - msg->Record.SetPartition(Partition); - msg->Record.SetError(error); - - Reply(msg); - - PassAway(); - } - - void Reply(std::unique_ptr<TEvPQ::TEvSourceIdResponse>& msg) { - Send(Parent, msg.release(), 0, Cookie); - } - - STFUNC(StateWork) - { - switch (ev->GetTypeRewrite()) { - HFunc(TEvPQ::TEvSourceIdRequest, Handle); - HFunc(TEvPQ::TEvSourceIdResponse, Handle); - hFunc(TEvTabletPipe::TEvClientConnected, Handle); - hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); - sFunc(TEvents::TEvPoison, PassAway); - } - } - -private: - TActorId Parent; - TPartitionSourceManager::TPartitionId Partition; - ui64 TabletId; - TEvPQ::TEvSourceIdRequest::TPtr PendingRequest; - ui64 Cookie; - - TActorId PipeClient; - bool PipeConnected = false; -}; - -IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId) { - return new TSourceIdRequester(parent, partition, tabletId); -} - -bool IsResearchRequires(const TPartitionGraph::Node* node) { - return node && !node->Parents.empty(); -} - -NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value) { - switch(value) { - case TSourceIdInfo::EState::Unknown: - return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown; - case TSourceIdInfo::EState::Registered: - return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Registered; - case TSourceIdInfo::EState::PendingRegistration: - return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_PendingRegistration; - } -} - -TSourceIdInfo::EState Convert(NKikimrPQ::TEvSourceIdResponse::EState value) { - switch(value) { - case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown: - return TSourceIdInfo::EState::Unknown; - case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Registered: - return TSourceIdInfo::EState::Registered; - case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_PendingRegistration: - return TSourceIdInfo::EState::PendingRegistration; - } -} - - - } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h index 6a304e04bf..fcf53a25cd 100644 --- a/ydb/core/persqueue/partition_sourcemanager.h +++ b/ydb/core/persqueue/partition_sourcemanager.h @@ -28,10 +28,6 @@ public: ui64 Offset = 0; bool Explicit = false; TInstant WriteTimestamp; - - bool Pending = false; - - operator bool() const; }; class TSourceManager { @@ -52,8 +48,6 @@ public: void Update(ui64 seqNo, ui64 offset, TInstant timestamp); void Update(THeartbeat&& heartbeat); - operator bool() const; - private: const TSourceIdMap& MemoryStorage() const; const TSourceIdMap& WriteStorage() const; @@ -103,51 +97,20 @@ public: explicit TPartitionSourceManager(TPartition& partition); - // For a partition obtained as a result of a merge or split, it requests - // information about the consumer's parameters from the parent partitions. - void EnsureSourceId(const TString& sourceId); - void EnsureSourceIds(const TVector<TString>& sourceIds); - - // Returns true if we expect a response from the parent partitions - bool WaitSources() const; - const TSourceInfo Get(const TString& sourceId) const; TModificationBatch CreateModificationBatch(const TActorContext& ctx); - void PassAway(); - -public: - void Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& ctx); - private: - void ScheduleBatch(); - void FinishBatch(const TActorContext& ctx); - bool RequireEnqueue(const TString& sourceId); - const TPartitionNode* GetPartitionNode() const; TSourceIdStorage& GetSourceIdStorage() const; bool HasParents() const; TActorId PartitionRequester(TPartitionId id, ui64 tabletId); - std::unique_ptr<TEvPQ::TEvSourceIdRequest> CreateRequest(TPartitionId id) const; private: TPartition& Partition; - - TSourceIds UnknownSourceIds; - TSourceIds PendingSourceIds; - - std::unordered_map<TString, TSourceInfo> Sources; - - ui64 Cookie = 0; - std::set<ui64> PendingCookies; - std::vector<TEvPQ::TEvSourceIdResponse::TPtr> Responses; - std::unordered_map<TPartitionId, TActorId> RequesterActors; }; -NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value); -TSourceIdInfo::EState Convert(NKikimrPQ::TEvSourceIdResponse::EState value); - } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_types.h b/ydb/core/persqueue/partition_types.h index a2d2a79053..214c70cbb1 100644 --- a/ydb/core/persqueue/partition_types.h +++ b/ydb/core/persqueue/partition_types.h @@ -19,6 +19,7 @@ struct TWriteMsg { ui64 Cookie; TMaybe<ui64> Offset; TEvPQ::TEvWrite::TMsg Msg; + std::optional<ui64> InitialSeqNo; }; struct TOwnershipMsg { diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 4c3a78ea08..871956a019 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -304,9 +304,14 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { ui64 maxOffset = 0; if (it != SourceIdStorage.GetInMemorySourceIds().end()) { - maxSeqNo = it->second.SeqNo; + maxSeqNo = std::max(it->second.SeqNo, writeResponse.InitialSeqNo.value_or(0)); maxOffset = it->second.Offset; - if (it->second.SeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) { + if (maxSeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) { + already = true; + } + } else if (writeResponse.InitialSeqNo) { + maxSeqNo = writeResponse.InitialSeqNo.value(); + if (maxSeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) { already = true; } } @@ -658,10 +663,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c for (auto& msg: ev->Get()->Msgs) { size += msg.Data.size(); bool needToChangeOffset = msg.PartNo + 1 == msg.TotalParts; - if (!msg.DisableDeduplication) { - SourceManager.EnsureSourceId(msg.SourceId); - } - EmplaceRequest(TWriteMsg{ev->Get()->Cookie, offset, std::move(msg)}, ctx); + EmplaceRequest(TWriteMsg{ev->Get()->Cookie, offset, std::move(msg), ev->Get()->InitialSeqNo}, ctx); if (offset && needToChangeOffset) ++*offset; } @@ -868,10 +870,6 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame auto& sourceIdBatch = parameters.SourceIdBatch; auto sourceId = sourceIdBatch.GetSource(p.Msg.SourceId); - if (!p.Msg.DisableDeduplication && !sourceId) { - return ProcessResult::Break; - } - WriteInflightSize -= p.Msg.Data.size(); TabletCounters.Percentile()[COUNTER_LATENCY_PQ_RECEIVE_QUEUE].IncrementFor(ctx.Now().MilliSeconds() - p.Msg.ReceiveTimestamp); @@ -879,7 +877,19 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame ui64 poffset = p.Offset ? *p.Offset : curOffset; - if (!p.Msg.DisableDeduplication && sourceId.SeqNo() && *sourceId.SeqNo() >= p.Msg.SeqNo) { + LOG_TRACE_S( + ctx, NKikimrServices::PERSQUEUE, + "Topic '" << TopicName() << "' partition " << Partition + << " process write for '" << EscapeC(p.Msg.SourceId) << "'" + << " DisableDeduplication=" << p.Msg.DisableDeduplication + << " SeqNo=" << p.Msg.SeqNo + << " LocalSeqNo=" << sourceId.SeqNo() + << " InitialSeqNo=" << p.InitialSeqNo + ); + + if (!p.Msg.DisableDeduplication + && ((sourceId.SeqNo() && *sourceId.SeqNo() >= p.Msg.SeqNo) + || (p.InitialSeqNo && p.InitialSeqNo.value() >= p.Msg.SeqNo))) { if (poffset >= curOffset) { LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, @@ -1218,7 +1228,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const .External = false, .IgnoreQuotaDeadline = true, .HeartbeatVersion = std::nullopt, - }}; + }, std::nullopt}; WriteInflightSize += heartbeat->Data.size(); auto result = ProcessRequest(hbMsg, parameters, request, ctx); @@ -1534,8 +1544,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) { Y_ABORT_UNLESS(Requests.empty() || !WriteQuota->CanExaust(now) || WaitingForPreviousBlobQuota() - || WaitingForSubDomainQuota(ctx) - || SourceManager.WaitSources()); //in this case all writes must be processed or no quota left + || WaitingForSubDomainQuota(ctx)); //in this case all writes must be processed or no quota left AnswerCurrentWrites(ctx); //in case if all writes are already done - no answer will be called on kv write, no kv write at all BecomeIdle(ctx); return; diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index aa4653b99f..f52155f943 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1229,11 +1229,7 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c ApplyNewConfigAndReply(ctx); } - ProcessSourceIdRequests(partitionId); ProcessCheckPartitionStatusRequests(partitionId); - if (allInitialized) { - SourceIdRequests.clear(); - } } void TPersQueue::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) @@ -1980,11 +1976,15 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p " offset: " << (req.HasCmdWriteOffset() ? (req.GetCmdWriteOffset() + i) : -1)); } InitResponseBuilder(responseCookie, msgs.size(), COUNTER_LATENCY_PQ_WRITE); + std::optional<ui64> initialSeqNo; + if (req.HasInitialSeqNo()) { + initialSeqNo = req.GetInitialSeqNo(); + } THolder<TEvPQ::TEvWrite> event = MakeHolder<TEvPQ::TEvWrite>(responseCookie, req.GetMessageNo(), req.HasOwnerCookie() ? req.GetOwnerCookie() : "", req.HasCmdWriteOffset() ? req.GetCmdWriteOffset() : TMaybe<ui64>(), - std::move(msgs), req.GetIsDirectWrite()); + std::move(msgs), req.GetIsDirectWrite(), initialSeqNo); ctx.Send(partActor, event.Release()); } @@ -3886,37 +3886,6 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, con ctx.Send(ev->Sender, new TEvPersQueue::TEvProposeTransactionAttachResult(TabletID(), txId, status), 0, ev->Cookie); } -void TPersQueue::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record; - auto it = Partitions.find(record.GetPartition()); - if (it == Partitions.end()) { - LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition()); - - auto response = MakeHolder<TEvPQ::TEvSourceIdResponse>(); - response->Record.SetError("Partition was not found"); - Send(ev->Sender, response.Release()); - - return; - } - - if (it->second.InitDone) { - Forward(ev, it->second.Actor); - } else { - SourceIdRequests[record.GetPartition()].push_back(ev); - } -} - -void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) { - auto sit = SourceIdRequests.find(partitionId); - if (sit != SourceIdRequests.end()) { - auto it = Partitions.find(partitionId); - for (auto& r : sit->second) { - Forward(r, it->second.Actor); - } - SourceIdRequests.erase(partitionId); - } -} - void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) { auto& record = ev->Get()->Record; auto it = Partitions.find(record.GetPartition()); @@ -4000,7 +3969,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG) HFuncTraced(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); HFuncTraced(TEvPersQueue::TEvCancelTransactionProposal, Handle); HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle); - HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle); + HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle); default: return false; } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 61eabfaa0b..1cd94ed6d7 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -165,9 +165,6 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { ui64 GetAllowedStep() const; - void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx); - void ProcessSourceIdRequests(ui32 partitionId); - void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx); void ProcessCheckPartitionStatusRequests(ui32 partitionId); @@ -407,7 +404,6 @@ private: void DestroySession(TPipeInfo& pipeInfo); bool UseMediatorTimeCast = true; - THashMap<ui32, TVector<TEvPQ::TEvSourceIdRequest::TPtr>> SourceIdRequests; THashMap<ui32, TVector<TEvPQ::TEvCheckPartitionStatusRequest::TPtr>> CheckPartitionStatusRequests; TMaybe<ui64> TabletGeneration; }; diff --git a/ydb/core/persqueue/ut/partition_chooser_ut.cpp b/ydb/core/persqueue/ut/partition_chooser_ut.cpp index a66e345dd6..17eefd2808 100644 --- a/ydb/core/persqueue/ut/partition_chooser_ut.cpp +++ b/ydb/core/persqueue/ut/partition_chooser_ut.cpp @@ -18,8 +18,8 @@ using namespace NKikimrPQ; void AddPartition(NKikimrSchemeOp::TPersQueueGroupDescription& conf, ui32 id, - const std::optional<TString>&& boundaryFrom, - const std::optional<TString>&& boundaryTo, + const std::optional<TString>&& boundaryFrom = std::nullopt, + const std::optional<TString>&& boundaryTo = std::nullopt, std::vector<ui32> children = {}) { auto* p = conf.AddPartitions(); p->SetPartitionId(id); @@ -44,7 +44,7 @@ NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig0(bool SplitMergeEnabled auto* partitionStrategy = config->MutablePartitionStrategy(); partitionStrategy->SetMinPartitionCount(3); - partitionStrategy->SetMaxPartitionCount(SplitMergeEnabled ? 10 : 3); + partitionStrategy->SetMaxPartitionCount(SplitMergeEnabled ? 10 : 0); config->SetTopicName("/Root/topic-1"); config->SetTopicPath("/Root"); @@ -344,18 +344,6 @@ private: response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); response->Record.SetErrorCode(NPersQueue::NErrorCode::OK); - if (ev->Get()->Record.GetPartitionRequest().HasCmdGetOwnership()) { - auto& o = ev->Get()->Record.GetPartitionRequest().GetCmdGetOwnership(); - if (o.GetRegisterIfNotExists() || SeqNo) { - auto* cmd = response->Record.MutablePartitionResponse()->MutableCmdGetOwnershipResult(); - cmd->SetOwnerCookie("ower_cookie"); - cmd->SetStatus(Status); - cmd->SetSeqNo(SeqNo.value_or(0)); - } else { - response->Record.SetErrorCode(NPersQueue::NErrorCode::SOURCEID_DELETED); - } - } - auto* sn = response->Record.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult()->AddSourceIdInfo(); sn->SetSeqNo(SeqNo.value_or(0)); sn->SetState(SeqNo ? NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED : NKikimrPQ::TMessageGroupInfo::STATE_PENDING_REGISTRATION); @@ -365,6 +353,10 @@ private: void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) { auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>(); + response->Record.SetStatus(Status); + if (SeqNo) { + response->Record.SetSeqNo(SeqNo.value()); + } ctx.Send(ev->Sender, response.Release()); } @@ -613,49 +605,91 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_InactiveA UNIT_ASSERT(r->Error); } -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) { +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_OtherPartition_Test) { NPersQueue::TTestServer server = CreateServer(); + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, "F"); + AddPartition(config, 1, "F", {}); CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); - { - auto r = ChoosePartition(server, SMDisabled, "A_Source"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); - } - { - auto r = ChoosePartition(server, SMDisabled, "C_Source"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2); - } - { - WriteToTable(server, "A_Source_w_0", 0); - auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); - } - { - // Redefine partition for sourceId. Check that partition changed; - WriteToTable(server, "A_Source_w_0", 1); - auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); - } - { - // Redefine partition for sourceId to inactive partition. Select new partition. - WriteToTable(server, "A_Source_w_0", 3); - auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); - } - { - // Use prefered partition, and save it in table - auto r = ChoosePartition(server, SMDisabled, "A_Source_1", 1); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); - } + WriteToTable(server, "A_Source_10", 0, 13); + auto r = ChoosePartition(server, config, "A_Source_10", 1); + + UNIT_ASSERT(r->Error); + AssertTable(server, "A_Source_10", 0, 13); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_NewSourceId_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(false); + AddPartition(config, 0); + + auto r = ChoosePartition(server, config, "A_Source"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_RegisteredSourceId_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(false); + AddPartition(config, 0); + AddPartition(config, 1); + + WriteToTable(server, "A_Source", 0); + auto r = ChoosePartition(server, config, "A_Source"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); + + WriteToTable(server, "A_Source", 1); + r = ChoosePartition(server, config, "A_Source"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Inactive_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(false); + AddPartition(config, 0, {}, {}, {1}); + AddPartition(config, 1); + + WriteToTable(server, "A_Source", 0); + auto r = ChoosePartition(server, config, "A_Source"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_PreferedPartition_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(false); + AddPartition(config, 0); + AddPartition(config, 1); + + auto r = ChoosePartition(server, config, "A_Source", 0); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_PreferedPartition_Inactive_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(false); + AddPartition(config, 0, {}, {}, {1}); + AddPartition(config, 1); + + auto r = ChoosePartition(server, config, "A_Source", 0); + + UNIT_ASSERT(r->Error); } } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 7df22fe66a..d6803f176b 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -508,7 +508,7 @@ void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const TVector<TEvPQ::TEvWrite::TMsg> msgs; msgs.push_back(msg); - auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), false); + auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), false, std::nullopt); Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); } diff --git a/ydb/core/persqueue/ut/slow/pq_ut.cpp b/ydb/core/persqueue/ut/slow/pq_ut.cpp index 6227c8bc6a..c68c71456e 100644 --- a/ydb/core/persqueue/ut/slow/pq_ut.cpp +++ b/ydb/core/persqueue/ut/slow/pq_ut.cpp @@ -140,9 +140,8 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) { auto sourceIds = CmdSourceIdRead(tc); UNIT_ASSERT(sourceIds.size() > 0); for (auto& s: sourceIds) { - Cout << "try to find sourceId " << s << Endl; auto findIt = std::find(writtenSourceIds.begin(), writtenSourceIds.end(), s); - UNIT_ASSERT_VALUES_UNEQUAL(findIt, writtenSourceIds.end()); + UNIT_ASSERT_C(findIt != writtenSourceIds.end(), "try to find sourceId " << s); } Cout << TInstant::Now() << "All Ok" << Endl; }); diff --git a/ydb/core/persqueue/ut/splitmerge_ut.cpp b/ydb/core/persqueue/ut/splitmerge_ut.cpp index f5fc57fb87..2221298e89 100644 --- a/ydb/core/persqueue/ut/splitmerge_ut.cpp +++ b/ydb/core/persqueue/ut/splitmerge_ut.cpp @@ -98,6 +98,10 @@ TTopicSdkTestSetup CreateSetup() { setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE); + setup.GetRuntime().SetLogPriority(NKikimrServices::PQ_PARTITION_CHOOSER, NActors::NLog::PRI_TRACE); + + setup.GetRuntime().GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(true); + setup.GetRuntime().GetAppData().PQConfig.SetUseSrcIdMetaMappingInFirstClass(true); return setup; } @@ -209,7 +213,41 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { Y_UNIT_TEST(PartitionSplit) { TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopic(); + setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100); + + TTopicClient client = setup.MakeClient(); + + auto writeSession = CreateWriteSession(client, "producer-1"); + + TTestReadSession ReadSession(client, 2); + + UNIT_ASSERT(writeSession->Write(Msg("message_1.1", 2))); + + ui64 txId = 1006; + SplitPartition(setup, ++txId, 0, "a"); + + UNIT_ASSERT(writeSession->Write(Msg("message_1.2", 3))); + + ReadSession.WaitAllMessages(); + + for(const auto& info : ReadSession.ReceivedMessages) { + if (info.Data == "message_1.1") { + UNIT_ASSERT_EQUAL(0, info.PartitionId); + UNIT_ASSERT_EQUAL(2, info.SeqNo); + } else if (info.Data == "message_1.2") { + UNIT_ASSERT(1 == info.PartitionId || 2 == info.PartitionId); + UNIT_ASSERT_EQUAL(3, info.SeqNo); + } else { + UNIT_ASSERT_C(false, "Unexpected message: " << info.Data); + } + } + + writeSession->Close(TDuration::Seconds(1)); + } + + Y_UNIT_TEST(PartitionSplit_PreferedPartition) { + TTopicSdkTestSetup setup = CreateSetup(); + setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100); TTopicClient client = setup.MakeClient(); @@ -269,9 +307,9 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { writeSession3->Close(TDuration::Seconds(1)); } - Y_UNIT_TEST(PartitionMerge) { + Y_UNIT_TEST(PartitionMerge_PreferedPartition) { TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2); + setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2, 100); TTopicClient client = setup.MakeClient(); diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 0c133e998b..f707e2d574 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -3,6 +3,8 @@ #include <deque> #include <util/string/builder.h> +//#include <ydb/core/base/appdata_fwd.h> +//#include <ydb/core/base/feature_flags.h> #include <ydb/library/yverify_stream/yverify_stream.h> namespace NKikimr::NPQ { @@ -33,7 +35,7 @@ ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config) { } bool SplitMergeEnabled(const NKikimrPQ::TPQTabletConfig& config) { - return config.GetPartitionStrategy().GetMinPartitionCount() < config.GetPartitionStrategy().GetMaxPartitionCount(); // TODO + return 0 < config.GetPartitionStrategy().GetMaxPartitionCount(); } static constexpr ui64 PUT_UNIT_SIZE = 40960u; // 40Kb diff --git a/ydb/core/persqueue/writer/partition_chooser.h b/ydb/core/persqueue/writer/partition_chooser.h index a4d19c64ad..3ef8499d3b 100644 --- a/ydb/core/persqueue/writer/partition_chooser.h +++ b/ydb/core/persqueue/writer/partition_chooser.h @@ -24,16 +24,14 @@ struct TEvPartitionChooser { static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER)"); struct TEvChooseResult: public TEventLocal<TEvChooseResult, EvChooseResult> { - TEvChooseResult(ui32 partitionId, ui64 tabletId, const TString& ownerCookie, std::optional<ui64> seqNo) + TEvChooseResult(ui32 partitionId, ui64 tabletId, std::optional<ui64> seqNo) : PartitionId(partitionId) , TabletId(tabletId) - , OwnerCookie(ownerCookie) , SeqNo(seqNo) { } ui32 PartitionId; ui64 TabletId; - TString OwnerCookie; std::optional<ui64> SeqNo; }; diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h index 8d718d8d45..9f8f4be0df 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h +++ b/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h @@ -73,8 +73,10 @@ protected: void InitTable(const NActors::TActorContext& ctx) { TThis::Become(&TThis::StateInitTable); const auto& pqConfig = AppData(ctx)->PQConfig; + TRACE("InitTable: SourceId="<< SourceId + << " TopicsAreFirstClassCitizen=" << pqConfig.GetTopicsAreFirstClassCitizen() + << " UseSrcIdMetaMappingInFirstClass=" <<pqConfig.GetUseSrcIdMetaMappingInFirstClass()); if (SourceId && pqConfig.GetTopicsAreFirstClassCitizen() && pqConfig.GetUseSrcIdMetaMappingInFirstClass()) { - DEBUG("InitTable"); TableHelper.SendInitTableRequest(ctx); } else { StartKqpSession(ctx); @@ -162,7 +164,7 @@ protected: DEBUG("Update the table"); TableHelper.SendUpdateRequest(Partition->PartitionId, SeqNo, ctx); } else { - StartGetOwnership(ctx); + ReplyResult(ctx); } } @@ -190,7 +192,7 @@ protected: // Use tx only for query after select. Updating AccessTime without transaction. TableHelper.CloseKqpSession(ctx); - return StartGetOwnership(ctx); + ReplyResult(ctx); } StartIdle(); @@ -207,13 +209,18 @@ protected: protected: void StartCheckPartitionRequest(const TActorContext &ctx) { TThis::Become(&TThis::StateCheckPartition); + + if (!Partition) { + return ReplyError(ErrorCode::INITIALIZING, "Partition not choosed", ctx); + } + PartitionHelper.Open(Partition->TabletId, ctx); - PartitionHelper.SendCheckPartitionStatusRequest(Partition->PartitionId, ctx); + PartitionHelper.SendCheckPartitionStatusRequest(Partition->PartitionId, "", ctx); } void Handle(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse::TPtr& ev, const NActors::TActorContext& ctx) { + PartitionHelper.Close(ctx); if (NKikimrPQ::ETopicPartitionStatus::Active == ev->Get()->Record.GetStatus()) { - PartitionHelper.Close(ctx); return SendUpdateRequests(ctx); } ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "Partition isn`t active", ctx); @@ -230,43 +237,6 @@ protected: } protected: - void StartGetOwnership(const TActorContext &ctx) { - TThis::Become(&TThis::StateOwnership); - if (!Partition) { - return ReplyError(ErrorCode::INITIALIZING, "Partition not choosed", ctx); - } - - DEBUG("GetOwnership Partition TabletId=" << Partition->TabletId); - - PartitionHelper.Open(Partition->TabletId, ctx); - PartitionHelper.SendGetOwnershipRequest(Partition->PartitionId, SourceId, true, ctx); - } - - void HandleOwnership(TEvPersQueue::TEvResponse::TPtr& ev, const NActors::TActorContext& ctx) { - DEBUG("HandleOwnership"); - auto& record = ev->Get()->Record; - - TString error; - if (!BasicCheck(record, error)) { - return ReplyError(ErrorCode::INITIALIZING, std::move(error), ctx); - } - - const auto& response = record.GetPartitionResponse(); - if (!response.HasCmdGetOwnershipResult()) { - return ReplyError(ErrorCode::INITIALIZING, "Absent Ownership result", ctx); - } - - if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) { - return ReplyError(ErrorCode::INITIALIZING, "Partition is not active", ctx); - } - - OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie(); - - PartitionHelper.Close(ctx); - - OnOwnership(ctx); - } - void HandleOwnership(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) { auto msg = ev->Get(); if (PartitionHelper.IsPipe(ev->Sender) && msg->Status != NKikimrProto::OK) { @@ -282,19 +252,6 @@ protected: } } - virtual void OnOwnership(const TActorContext &ctx) = 0; - - STATEFN(StateOwnership) { - TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); - switch (ev->GetTypeRewrite()) { - HFunc(TEvPersQueue::TEvResponse, HandleOwnership); - HFunc(TEvTabletPipe::TEvClientConnected, HandleOwnership); - HFunc(TEvTabletPipe::TEvClientDestroyed, HandleOwnership); - sFunc(TEvents::TEvPoison, ScheduleStop); - } - } - - protected: void StartIdle() { TThis::Become(&TThis::StateIdle); @@ -331,7 +288,8 @@ protected: protected: void ReplyResult(const NActors::TActorContext& ctx) { - ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId, TThis::OwnerCookie, SeqNo)); + DEBUG("ReplyResult: Partition=" << Partition->PartitionId << ", SeqNo=" << SeqNo); + ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId, SeqNo)); } void ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx) { @@ -355,7 +313,6 @@ protected: bool PartitionPersisted = false; - TString OwnerCookie; std::optional<ui64> SeqNo = 0; }; diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h index 97a9750952..f8d9d35e6e 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h +++ b/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h @@ -37,8 +37,8 @@ public: } void Bootstrap(const TActorContext& ctx) { - TThis::Initialize(ctx); - TThis::InitTable(ctx); + TThis::Initialize(ctx); + TThis::InitTable(ctx); } TActorIdentity SelfId() const { @@ -59,11 +59,6 @@ public: } } - void OnOwnership(const TActorContext &ctx) override { - DEBUG("OnOwnership"); - TThis::ReplyResult(ctx); - } - private: void RequestPQRB(const NActors::TActorContext& ctx) { DEBUG("RequestPQRB") diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h index 463217557c..d20c4da916 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h +++ b/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h @@ -47,8 +47,11 @@ public: NTabletPipe::SendData(ctx, Pipe, ev.Release()); } - void SendCheckPartitionStatusRequest(ui32 partitionId, const TActorContext& ctx) { + void SendCheckPartitionStatusRequest(ui32 partitionId, const TString& sourceId, const TActorContext& ctx) { auto ev = MakeHolder<NKikimr::TEvPQ::TEvCheckPartitionStatusRequest>(partitionId); + if (sourceId) { + ev->Record.SetSourceId(sourceId); + } NTabletPipe::SendData(ctx, Pipe, ev.Release()); } diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h index 695af84d59..1bc599d872 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h +++ b/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h @@ -35,18 +35,17 @@ public: std::optional<ui32> preferedPartition) : TAbstractPartitionChooserActor<TSMPartitionChooserActor<TPipeCreator>, TPipeCreator>(parentId, chooser, fullConverter, sourceId, preferedPartition) , Graph(MakePartitionGraph(config)) { - } void Bootstrap(const TActorContext& ctx) { + TThis::Initialize(ctx); BoundaryPartition = ChoosePartitionSync(); if (TThis::SourceId) { - TThis::Initialize(ctx); GetOwnershipFast(ctx); } else { TThis::Partition = BoundaryPartition; - TThis::StartGetOwnership(ctx); + TThis::StartCheckPartitionRequest(ctx); } } @@ -104,11 +103,6 @@ public: GetOldSeqNo(ctx); } - void OnOwnership(const TActorContext &ctx) override { - DEBUG("OnOwnership"); - TThis::ReplyResult(ctx); - } - private: void GetOwnershipFast(const TActorContext &ctx) { TThis::Become(&TThis::StateOwnershipFast); @@ -119,31 +113,18 @@ private: DEBUG("GetOwnershipFast Partition=" << BoundaryPartition->PartitionId << " TabletId=" << BoundaryPartition->TabletId); TThis::PartitionHelper.Open(BoundaryPartition->TabletId, ctx); - TThis::PartitionHelper.SendGetOwnershipRequest(BoundaryPartition->PartitionId, TThis::SourceId, false, ctx); + TThis::PartitionHelper.SendCheckPartitionStatusRequest(BoundaryPartition->PartitionId, TThis::SourceId, ctx); } - void HandleOwnershipFast(TEvPersQueue::TEvResponse::TPtr& ev, const NActors::TActorContext& ctx) { - DEBUG("HandleOwnershipFast"); - auto& record = ev->Get()->Record; - - TString error; - if (!BasicCheck(record, error)) { - return TThis::InitTable(ctx); - } - - const auto& response = record.GetPartitionResponse(); - if (!response.HasCmdGetOwnershipResult()) { - return TThis::ReplyError(ErrorCode::INITIALIZING, "Absent Ownership result", ctx); - } - - if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) { - return TThis::ReplyError(ErrorCode::INITIALIZING, "Configuration changed", ctx); - } - - TThis::OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie(); - - if (response.GetCmdGetOwnershipResult().GetSeqNo() > 0) { + void HandleFast(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse::TPtr& ev, const NActors::TActorContext& ctx) { + TThis::PartitionHelper.Close(ctx); + if (NKikimrPQ::ETopicPartitionStatus::Active == ev->Get()->Record.GetStatus() + && ev->Get()->Record.HasSeqNo() + && ev->Get()->Record.GetSeqNo() > 0) { // Fast path: the partition ative and already written + TThis::Partition = BoundaryPartition; + TThis::SeqNo = ev->Get()->Record.GetSeqNo(); + TThis::SendUpdateRequests(ctx); return TThis::ReplyResult(ctx); } @@ -154,7 +135,7 @@ private: STATEFN(StateOwnershipFast) { TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); switch (ev->GetTypeRewrite()) { - HFunc(TEvPersQueue::TEvResponse, HandleOwnershipFast); + HFunc(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse, HandleFast); HFunc(TEvTabletPipe::TEvClientConnected, TThis::HandleOwnership); HFunc(TEvTabletPipe::TEvClientDestroyed, TThis::HandleOwnership); SFunc(TEvents::TEvPoison, TThis::Die); @@ -207,7 +188,7 @@ private: } TThis::PartitionHelper.Close(ctx); - TThis::StartCheckPartitionRequest(ctx); + OnPartitionChosen(ctx); } STATEFN(StateGetMaxSeqNo) { diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index dc5bddf72e..5fac083b6c 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -343,17 +343,17 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl return InitResult(error, std::move(record)); } - const auto& response = record.GetPartitionResponse(); + auto& response = *record.MutablePartitionResponse(); if (!response.HasCmdGetMaxSeqNoResult()) { return InitResult("Absent MaxSeqNo result", std::move(record)); } - const auto& result = response.GetCmdGetMaxSeqNoResult(); + auto& result = *response.MutableCmdGetMaxSeqNoResult(); if (result.SourceIdInfoSize() < 1) { return InitResult("Empty source id info", std::move(record)); } - const auto& sourceIdInfo = result.GetSourceIdInfo(0); + auto& sourceIdInfo = *result.MutableSourceIdInfo(0); if (Opts.CheckState) { switch (sourceIdInfo.GetState()) { case NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED: @@ -370,6 +370,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl } } + Y_VERIFY(sourceIdInfo.GetSeqNo() >= 0); + if (Opts.InitialSeqNo && (ui64)sourceIdInfo.GetSeqNo() < Opts.InitialSeqNo.value()) { + sourceIdInfo.SetSeqNo(Opts.InitialSeqNo.value()); + } + InitResult(OwnerCookie, sourceIdInfo, WriteId); Become(&TThis::StateWork); @@ -609,12 +614,16 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl auto& request = *ev->Record.MutablePartitionRequest(); request.SetMessageNo(MessageNo++); + if (Opts.InitialSeqNo) { + request.SetInitialSeqNo(Opts.InitialSeqNo.value()); + } SetWriteId(request); if (!Opts.UseDeduplication) { request.SetPartition(PartitionId); } + NTabletPipe::SendData(SelfId(), PipeClient, ev.Release()); PendingWrite.emplace_back(cookie); diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h index bd3ba41038..9cd28cd5c4 100644 --- a/ydb/core/persqueue/writer/writer.h +++ b/ydb/core/persqueue/writer/writer.h @@ -176,6 +176,7 @@ struct TPartitionWriterOpts { TString SourceId; std::optional<ui32> ExpectedGeneration; + std::optional<ui64> InitialSeqNo; TString Database; TString TopicPath; @@ -204,6 +205,7 @@ struct TPartitionWriterOpts { TPartitionWriterOpts& WithTxId(const TString& value) { TxId = value; return *this; } TPartitionWriterOpts& WithTraceId(const TString& value) { TraceId = value; return *this; } TPartitionWriterOpts& WithRequestType(const TString& value) { RequestType = value; return *this; } + TPartitionWriterOpts& WithInitialSeqNo(const std::optional<ui64> value) { InitialSeqNo = value; return *this; } }; IActor* CreatePartitionWriter(const TActorId& client, diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index e92ea1d7b8..87b3fa3cb2 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -169,6 +169,7 @@ message TPersQueuePartitionRequest { optional bool IsDirectWrite = 18 [default = false]; optional uint64 PutUnitsSize = 19; + optional int64 InitialSeqNo = 26; } message TPersQueueMetaRequest { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index b47d3ff20a..bb70645ab6 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -936,38 +936,14 @@ message TEvSubDomainStatus { required bool SubDomainOutOfSpace = 1; }; -message TEvSourceIdRequest { - optional uint32 Partition = 1; - repeated string SourceId = 2; -}; - -message TEvSourceIdResponse { - enum EState { - Unknown = 0; - Registered = 1; - PendingRegistration = 2; - }; - - message TSource { - optional string Id = 1; - optional EState State = 2; - optional uint64 SeqNo = 3; - optional uint64 Offset = 4; - optional bool Explicit = 5; - optional uint64 WriteTimestamp = 6; - - }; - optional string Error = 1; - optional uint32 Partition = 2; - repeated TSource Source = 3; -}; - message TEvCheckPartitionStatusRequest { optional uint32 Partition = 1; + optional string SourceId = 2; }; message TEvCheckPartitionStatusResponse { optional ETopicPartitionStatus Status = 1; + optional uint64 SeqNo = 2; }; message TTransaction { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp index 128e74ad97..fedf75f9a7 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -22,12 +22,12 @@ TTopicSdkTestSetup::TTopicSdkTestSetup(const TString& testCaseName, const NKikim } } -void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer, size_t partitionCount) +void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer, size_t partitionCount, std::optional<size_t> maxPartitionCount) { TTopicClient client(MakeDriver()); TCreateTopicSettings topics; - TPartitioningSettings partitions(partitionCount, partitionCount); + TPartitioningSettings partitions(partitionCount, maxPartitionCount.value_or(partitionCount)); topics.PartitioningSettings(partitions); TConsumerSettings<TCreateTopicSettings> consumers(topics, consumer); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h index c1642f6b46..7ab9f650dc 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h @@ -16,7 +16,8 @@ class TTopicSdkTestSetup { public: TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings = MakeServerSettings(), bool createTopic = true); - void CreateTopic(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1); + void CreateTopic(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1, + std::optional<size_t> maxPartitionCount = std::nullopt); TString GetEndpoint() const; TString GetTopicPath(const TString& name = TEST_TOPIC) const; diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h index 0d09a43189..0d5e472ca4 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h @@ -470,6 +470,7 @@ private: ui64 PartitionTabletId; ui32 PreferedPartition; TString SourceId; + std::optional<ui64> InitialSeqNo; TString OwnerCookie; TString UserAgent; diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp index 20dad1f114..0b7b645c8b 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp @@ -455,6 +455,7 @@ void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) { void TWriteSessionActor::Handle(NPQ::TEvPartitionChooser::TEvChooseResult::TPtr& ev, const NActors::TActorContext& ctx) { auto* r = ev->Get(); PartitionTabletId = r->TabletId; + InitialSeqNo = r->SeqNo; LastSourceIdUpdate = ctx.Now(); ProceedPartition(r->PartitionId, ctx); @@ -484,6 +485,7 @@ void TWriteSessionActor::ProceedPartition(const ui32 partition, const TActorCont TPartitionWriterOpts opts; opts.WithSourceId(SourceId); + opts.WithInitialSeqNo(InitialSeqNo); Writer = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter(ctx.SelfID, PartitionTabletId, Partition, opts)); State = ES_WAIT_WRITER_INIT; diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index f50e1a8653..c6564b2479 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -940,13 +940,25 @@ namespace NKikimr::NGRpcProxy::V1 { pqDescr->SetName(name); ui32 parts = 1; + ui32 maxParts = 0; if (request.has_partitioning_settings()) { - if (request.partitioning_settings().min_active_partitions() < 0) { - error = TStringBuilder() << "Partitions count must be positive, provided " << request.partitioning_settings().min_active_partitions(); + const auto& settings = request.partitioning_settings(); + if (settings.min_active_partitions() < 0) { + error = TStringBuilder() << "Partitions count must be positive, provided " << settings.min_active_partitions(); return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } - parts = request.partitioning_settings().min_active_partitions(); + parts = settings.min_active_partitions(); if (parts == 0) parts = 1; + + if (settings.partition_count_limit() > 0) { + maxParts = settings.partition_count_limit(); + + if (maxParts < parts) { + error = TStringBuilder() << "Partitions count limit must be greater than or equal to partitions count, provided " + << settings.partition_count_limit() << " and " << settings.min_active_partitions(); + return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); + } + } } pqDescr->SetTotalGroupCount(parts); @@ -954,6 +966,11 @@ namespace NKikimr::NGRpcProxy::V1 { auto config = pqDescr->MutablePQTabletConfig(); auto partConfig = config->MutablePartitionConfig(); + if (maxParts > 0 && AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge()) { + config->MutablePartitionStrategy()->SetMinPartitionCount(parts); + config->MutablePartitionStrategy()->SetMaxPartitionCount(maxParts); + } + config->SetRequireAuthWrite(true); config->SetRequireAuthRead(true); pqDescr->SetPartitionPerTablet(1); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 6113865f72..2d69db971d 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -193,6 +193,7 @@ private: ui64 PartitionTabletId; ui32 PreferedPartition; std::optional<ui32> ExpectedGeneration; + std::optional<ui64> InitialSeqNo; bool PartitionFound = false; // 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1 diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index bee353798f..972e76a179 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -631,6 +631,7 @@ template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionChooser::TEvChooseResult::TPtr& ev, const NActors::TActorContext& ctx) { auto* r = ev->Get(); PartitionTabletId = r->TabletId; + InitialSeqNo = r->SeqNo; LastSourceIdUpdate = ctx.Now(); ProceedPartition(r->PartitionId, ctx); @@ -680,6 +681,7 @@ void TWriteSessionActor<UseMigrationProtocol>::CreatePartitionWriterCache(const opts.WithDeduplication(UseDeduplication); opts.WithSourceId(SourceId); + opts.WithInitialSeqNo(InitialSeqNo); opts.WithExpectedGeneration(ExpectedGeneration); if constexpr (UseMigrationProtocol) { |