aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2024-01-30 17:44:14 +0500
committerGitHub <noreply@github.com>2024-01-30 17:44:14 +0500
commit88134144840b61b5c3d4eb9c701dd95168f3f7f4 (patch)
treee6a1c3500788467d6d3813d970f6d7a9593d3d7c
parent7c628c7b86cecdd277cb87453e3c733e4bc15943 (diff)
downloadydb-88134144840b61b5c3d4eb9c701dd95168f3f7f4.tar.gz
Topic split/merge for write (#1386)
-rw-r--r--ydb/core/persqueue/events/internal.h10
-rw-r--r--ydb/core/persqueue/partition.cpp105
-rw-r--r--ydb/core/persqueue/partition.h6
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.cpp339
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.h37
-rw-r--r--ydb/core/persqueue/partition_types.h1
-rw-r--r--ydb/core/persqueue/partition_write.cpp37
-rw-r--r--ydb/core/persqueue/pq_impl.cpp43
-rw-r--r--ydb/core/persqueue/pq_impl.h4
-rw-r--r--ydb/core/persqueue/ut/partition_chooser_ut.cpp140
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp2
-rw-r--r--ydb/core/persqueue/ut/slow/pq_ut.cpp3
-rw-r--r--ydb/core/persqueue/ut/splitmerge_ut.cpp44
-rw-r--r--ydb/core/persqueue/utils.cpp4
-rw-r--r--ydb/core/persqueue/writer/partition_chooser.h4
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h71
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h9
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h5
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h45
-rw-r--r--ydb/core/persqueue/writer/writer.cpp15
-rw-r--r--ydb/core/persqueue/writer/writer.h2
-rw-r--r--ydb/core/protos/msgbus_pq.proto1
-rw-r--r--ydb/core/protos/pqconfig.proto28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h3
-rw-r--r--ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h1
-rw-r--r--ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp2
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.cpp23
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h1
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp2
30 files changed, 277 insertions, 714 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index 7e584528fd..69a8858da8 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -216,13 +216,14 @@ struct TEvPQ {
std::optional<TRowVersion> HeartbeatVersion;
};
- TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite)
+ TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite, std::optional<ui64> initialSeqNo)
: Cookie(cookie)
, MessageNo(messageNo)
, OwnerCookie(ownerCookie)
, Offset(offset)
, Msgs(std::move(msgs))
, IsDirectWrite(isDirectWrite)
+ , InitialSeqNo(initialSeqNo)
{}
ui64 Cookie;
@@ -231,6 +232,7 @@ struct TEvPQ {
TMaybe<ui64> Offset;
TVector<TMsg> Msgs;
bool IsDirectWrite;
+ std::optional<ui64> InitialSeqNo;
};
@@ -939,12 +941,6 @@ struct TEvPQ {
NKikimrClient::TPersQueueFetchResponse Response;
};
- struct TEvSourceIdRequest : public TEventPB<TEvSourceIdRequest, NKikimrPQ::TEvSourceIdRequest, EvSourceIdRequest> {
- };
-
- struct TEvSourceIdResponse : public TEventPB<TEvSourceIdResponse, NKikimrPQ::TEvSourceIdResponse, EvSourceIdResponse> {
- };
-
struct TEvRegisterDirectReadSession : public TEventLocal<TEvRegisterDirectReadSession, EvRegisterDirectReadSession> {
TEvRegisterDirectReadSession(const NPQ::TReadSessionKey& sessionKey, ui32 tabletGeneration)
: Session(sessionKey)
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 74afe14f88..f29e9511f7 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/base/counters.h>
#include <ydb/core/base/path.h>
#include <ydb/core/quoter/public/quoter.h>
+#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/protos/counters_pq.pb.h>
#include <ydb/core/protos/msgbus.pb.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
@@ -482,8 +483,6 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill());
- SourceManager.PassAway();
-
Die(ctx);
}
@@ -935,50 +934,33 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
}
void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx) {
- SourceManager.EnsureSourceIds(ev->Get()->SourceIds);
- MaxSeqNoRequests.emplace_back(ev);
- ProcessMaxSeqNoRequest(ctx);
-}
-
-void TPartition::ProcessMaxSeqNoRequest(const TActorContext& ctx) {
- PQ_LOG_T("TPartition::ProcessMaxSeqNoRequest. Queue size: " << MaxSeqNoRequests.size());
-
- while(!MaxSeqNoRequests.empty()) {
- auto& ev = MaxSeqNoRequests.front();
-
- auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie);
- NKikimrClient::TResponse& resp = *response->Response;
-
- resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
- resp.SetErrorCode(NPersQueue::NErrorCode::OK);
-
- auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult();
- for (const auto& sourceId : ev->Get()->SourceIds) {
- auto& protoInfo = *result.AddSourceIdInfo();
- protoInfo.SetSourceId(sourceId);
+ auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie);
+ NKikimrClient::TResponse& resp = *response->Response;
- auto info = SourceManager.Get(sourceId);
- if (!info) {
- PQ_LOG_D("Stop MaxSeqNoRequest - scheduled a research. SourceId: " << sourceId);
- return;
- }
- if (info.State == TSourceIdInfo::EState::Unknown) {
- continue;
- }
+ resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
+ resp.SetErrorCode(NPersQueue::NErrorCode::OK);
- Y_ABORT_UNLESS(info.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, info.Offset);
- Y_ABORT_UNLESS(info.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, info.SeqNo);
+ auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult();
+ for (const auto& sourceId : ev->Get()->SourceIds) {
+ auto& protoInfo = *result.AddSourceIdInfo();
+ protoInfo.SetSourceId(sourceId);
- protoInfo.SetSeqNo(info.SeqNo);
- protoInfo.SetOffset(info.Offset);
- protoInfo.SetWriteTimestampMS(info.WriteTimestamp.MilliSeconds());
- protoInfo.SetExplicit(info.Explicit);
- protoInfo.SetState(TSourceIdInfo::ConvertState(info.State));
+ auto info = SourceManager.Get(sourceId);
+ if (info.State == TSourceIdInfo::EState::Unknown) {
+ continue;
}
- ctx.Send(Tablet, response.Release());
- MaxSeqNoRequests.pop_front();
+ Y_ABORT_UNLESS(info.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, info.Offset);
+ Y_ABORT_UNLESS(info.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, info.SeqNo);
+
+ protoInfo.SetSeqNo(info.SeqNo);
+ protoInfo.SetOffset(info.Offset);
+ protoInfo.SetWriteTimestampMS(info.WriteTimestamp.MilliSeconds());
+ protoInfo.SetExplicit(info.Explicit);
+ protoInfo.SetState(TSourceIdInfo::ConvertState(info.State));
}
+
+ ctx.Send(Tablet, response.Release());
}
void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) {
@@ -2612,42 +2594,6 @@ void TPartition::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext
}
}
-void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record;
-
- if (Partition != record.GetPartition()) {
- LOG_INFO_S(
- ctx, NKikimrServices::PERSQUEUE,
- "TEvSourceIdRequest for wrong partition " << record.GetPartition() << "." <<
- " Topic: \"" << TopicName() << "\"." <<
- " Partition: " << Partition << "."
- );
- return;
- }
-
- auto& memoryStorage = SourceIdStorage.GetInMemorySourceIds();
-
- auto response = MakeHolder<TEvPQ::TEvSourceIdResponse>();
- for(auto& sourceId : record.GetSourceId()) {
- auto* s = response->Record.AddSource();
- s->SetId(sourceId);
-
- auto it = memoryStorage.find(sourceId);
- if (it != memoryStorage.end()) {
- auto& info = it->second;
- s->SetState(Convert(info.State));
- s->SetSeqNo(info.SeqNo);
- s->SetOffset(info.Offset);
- s->SetExplicit(info.Explicit);
- s->SetWriteTimestamp(info.WriteTimestamp.GetValue());
- } else {
- s->SetState(NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown);
- }
- }
-
- Send(ev->Sender, response.Release());
-}
-
void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
@@ -2664,6 +2610,13 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>();
response->Record.SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
+ if (record.HasSourceId()) {
+ auto sit = SourceIdStorage.GetInMemorySourceIds().find(NSourceIdEncoding::EncodeSimple(record.GetSourceId()));
+ if (sit != SourceIdStorage.GetInMemorySourceIds().end()) {
+ response->Record.SetSeqNo(sit->second.SeqNo);
+ }
+ }
+
Send(ev->Sender, response.Release());
}
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 9fee3b4802..31cb40fd48 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -344,7 +344,6 @@ private:
void CheckIfSessionExists(TUserInfoBase& userInfo, const TActorId& newPipe);
// void DestroyReadSession(const TReadSessionKey& key);
- void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);
TString LogPrefix() const;
@@ -482,9 +481,7 @@ private:
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
- HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
- HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
@@ -540,9 +537,7 @@ private:
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
- HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
- HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
@@ -613,7 +608,6 @@ private:
std::deque<TMessage> Requests;
std::deque<TMessage> Responses;
- std::deque<TEvPQ::TEvGetMaxSeqNoRequest::TPtr> MaxSeqNoRequests;
THead Head;
THead NewHead;
diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp
index ac5eff21be..b0770f1f90 100644
--- a/ydb/core/persqueue/partition_sourcemanager.cpp
+++ b/ydb/core/persqueue/partition_sourcemanager.cpp
@@ -1,15 +1,9 @@
#include "partition.h"
-#include "partition_log.h"
-
-#include <unordered_map>
#include <ydb/core/base/tablet_pipe.h>
namespace NKikimr::NPQ {
-IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId);
-bool IsResearchRequires(const TPartitionGraph::Node* node);
-
//
// TPartitionSourceManager
//
@@ -18,59 +12,6 @@ TPartitionSourceManager::TPartitionSourceManager(TPartition& partition)
: Partition(partition) {
}
-void TPartitionSourceManager::ScheduleBatch() {
- if (WaitSources()) {
- return;
- }
-
- PendingCookies.clear();
- Responses.clear();
-
- if (UnknownSourceIds.empty()) {
- return;
- }
-
- auto node = GetPartitionNode();
- if (!IsResearchRequires(node)) {
- return;
- }
-
- PendingSourceIds = std::move(UnknownSourceIds);
-
- for(const auto* parent : node->HierarhicalParents) {
- PendingCookies.insert(++Cookie);
-
- TActorId actorId = PartitionRequester(parent->Id, parent->TabletId);
- Partition.Send(actorId, CreateRequest(parent->Id).release(), 0, Cookie);
- }
-}
-
-void TPartitionSourceManager::EnsureSourceId(const TString& sourceId) {
- if (!IsResearchRequires(GetPartitionNode())) {
- return;
- }
-
- if (RequireEnqueue(sourceId)) {
- UnknownSourceIds.insert(sourceId);
- }
-
- ScheduleBatch();
-}
-
-void TPartitionSourceManager::EnsureSourceIds(const TVector<TString>& sourceIds) {
- if (!IsResearchRequires(GetPartitionNode())) {
- return;
- }
-
- for(const auto& sourceId : sourceIds) {
- if (RequireEnqueue(sourceId)) {
- UnknownSourceIds.insert(sourceId);
- }
- }
-
- ScheduleBatch();
-}
-
const TPartitionSourceManager::TSourceInfo TPartitionSourceManager::Get(const TString& sourceId) const {
auto& knownSourceIds = GetSourceIdStorage().GetInMemorySourceIds();
@@ -88,19 +29,9 @@ const TPartitionSourceManager::TSourceInfo TPartitionSourceManager::Get(const TS
return result;
}
- auto its = Sources.find(sourceId);
- if (its != Sources.end()) {
- return its->second;
- }
-
- TSourceInfo result;
- result.Pending = IsResearchRequires(GetPartitionNode());
- return result;
+ return TSourceInfo{};
}
-bool TPartitionSourceManager::WaitSources() const {
- return !PendingCookies.empty();
-}
TPartitionSourceManager::TModificationBatch TPartitionSourceManager::CreateModificationBatch(const TActorContext& ctx) {
const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo()
@@ -109,76 +40,10 @@ TPartitionSourceManager::TModificationBatch TPartitionSourceManager::CreateModif
return TModificationBatch(*this, format);
}
-void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& ctx) {
- auto it = PendingCookies.find(ev->Cookie);
- if (it == PendingCookies.end()) {
- PQ_LOG_D("Received TEvSourceIdResponse with unknown cookie: " << ev->Cookie);
- return;
- }
- PendingCookies.erase(it);
-
- if (ev->Get()->Record.HasError()) {
- PQ_LOG_D("Error on request SourceId: " << ev->Get()->Record.GetError());
- Responses.clear();
- RequesterActors.erase(ev->Get()->Record.GetPartition());
-
- if (UnknownSourceIds.empty()) {
- UnknownSourceIds = std::move(PendingSourceIds);
- } else {
- UnknownSourceIds.insert(PendingSourceIds.begin(), PendingSourceIds.end());
- PendingSourceIds.clear();
- }
-
- ScheduleBatch();
- return;
- }
-
- Responses.push_back(ev);
-
- if (PendingCookies.empty()) {
- FinishBatch(ctx);
- ScheduleBatch();
- }
-}
-
const TPartitionSourceManager::TPartitionNode* TPartitionSourceManager::GetPartitionNode() const {
return Partition.PartitionGraph.GetPartition(Partition.Partition);
}
-void TPartitionSourceManager::FinishBatch(const TActorContext& ctx) {
- for(const auto& ev : Responses) {
- const auto& record = ev->Get()->Record;
- for(const auto& s : record.GetSource()) {
- auto& value = Sources[s.GetId()];
- if (s.GetState() == NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown) {
- continue;
- }
- PQ_LOG_T("Received SourceId " << s.GetId() << " SeqNo=" << s.GetSeqNo());
- if (value.State == TSourceIdInfo::EState::Unknown || value.SeqNo < s.GetSeqNo()) {
- value.State = Convert(s.GetState());
- value.SeqNo = s.GetSeqNo();
- value.Offset = s.GetOffset();
- value.Explicit = s.GetExplicit();
- value.WriteTimestamp.FromValue(s.GetWriteTimestamp());
- }
- }
- }
-
- Responses.clear();
- PendingSourceIds.clear();
-
- if (Partition.CurrentStateFunc() == &TPartition::StateIdle) {
- Partition.HandleWrites(ctx);
- }
- Partition.ProcessMaxSeqNoRequest(ctx);
-}
-
-bool TPartitionSourceManager::RequireEnqueue(const TString& sourceId) {
- auto& knownSourceIds = GetSourceIdStorage().GetInMemorySourceIds();
- return !Sources.contains(sourceId) && !knownSourceIds.contains(sourceId)
- && !PendingSourceIds.contains(sourceId);
-}
-
TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {
return Partition.SourceIdStorage;
}
@@ -188,36 +53,6 @@ bool TPartitionSourceManager::HasParents() const {
return node && !node->Parents.empty();
}
-TActorId TPartitionSourceManager::PartitionRequester(TPartitionId id, ui64 tabletId) {
- auto it = RequesterActors.find(id);
- if (it != RequesterActors.end()) {
- return it->second;
- }
-
- TActorId actorId = Partition.RegisterWithSameMailbox(CreateRequester(Partition.SelfId(),
- id,
- tabletId));
- RequesterActors[id] = actorId;
- return actorId;
-}
-
-std::unique_ptr<TEvPQ::TEvSourceIdRequest> TPartitionSourceManager::CreateRequest(TPartitionSourceManager::TPartitionId id) const {
- auto request = std::make_unique<TEvPQ::TEvSourceIdRequest>();
- auto& record = request->Record;
- record.SetPartition(id);
- for(const auto& sourceId : PendingSourceIds) {
- record.AddSourceId(sourceId);
- }
- return request;
-}
-
-void TPartitionSourceManager::PassAway() {
- for(const auto [_, actorId] : RequesterActors) {
- Partition.Send(actorId, new TEvents::TEvPoison());
- }
- RequesterActors.clear();
-}
-
//
// TPartitionSourceManager::TModificationBatch
@@ -231,9 +66,6 @@ TPartitionSourceManager::TModificationBatch::TModificationBatch(TPartitionSource
}
TPartitionSourceManager::TModificationBatch::~TModificationBatch() {
- for(auto& [k, _] : SourceIdWriter.GetSourceIdsToWrite()) {
- Manager.Sources.erase(k);
- }
}
TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmitHeartbeat() const {
@@ -283,11 +115,9 @@ TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch& batc
, SourceId(id) {
auto& memory = MemoryStorage();
auto& writer = WriteStorage();
- auto& sources = Batch.GetManager().Sources;
InMemory = memory.find(id);
InWriter = writer.find(id);
- InSources = sources.end();
if (InWriter != writer.end()) {
Info = Convert(InWriter->second);
@@ -297,14 +127,6 @@ TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch& batc
Info = Convert(InMemory->second);
return;
}
-
- InSources = sources.find(id);
- if (InSources != sources.end()) {
- Info = InSources->second;
- return;
- }
-
- Info.Pending = IsResearchRequires(Batch.Node);
}
std::optional<ui64> TPartitionSourceManager::TSourceManager::SeqNo() const {
@@ -335,9 +157,6 @@ void TPartitionSourceManager::TSourceManager::Update(THeartbeat&& heartbeat) {
Batch.HeartbeatEmitter.Process(SourceId, std::move(heartbeat));
}
-TPartitionSourceManager::TSourceManager::operator bool() const {
- return Info;
-}
const TSourceIdMap& TPartitionSourceManager::TSourceManager::MemoryStorage() const {
return Batch.GetManager().GetSourceIdStorage().GetInMemorySourceIds();
@@ -356,160 +175,4 @@ TPartitionSourceManager::TSourceInfo::TSourceInfo(TSourceIdInfo::EState state)
: State(state) {
}
-TPartitionSourceManager::TSourceInfo::operator bool() const {
- return !Pending;
-}
-
-
-//
-// TSourceIdRequester
-//
-
-class TSourceIdRequester : public TActorBootstrapped<TSourceIdRequester> {
- static constexpr TDuration RetryDelay = TDuration::MilliSeconds(100);
-public:
- TSourceIdRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId)
- : Parent(parent)
- , Partition(partition)
- , TabletId(tabletId) {
- }
-
- void Bootstrap(const TActorContext&) {
- Become(&TThis::StateWork);
- MakePipe();
- }
-
- void PassAway() override {
- if (PipeClient) {
- NTabletPipe::CloseAndForgetClient(SelfId(), PipeClient);
- }
- TActorBootstrapped::PassAway();
- }
-
-private:
- void MakePipe() {
- NTabletPipe::TClientConfig config;
- config.RetryPolicy = {
- .RetryLimitCount = 6,
- .MinRetryTime = TDuration::MilliSeconds(10),
- .MaxRetryTime = TDuration::MilliSeconds(200),
- .BackoffMultiplier = 2,
- .DoFirstRetryInstantly = true
- };
- PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, config));
- }
-
- void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& /*ctx*/) {
- Cookie = ev->Cookie;
- PendingRequest = ev;
-
- DoRequest();
- }
-
- void Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& /*ctx*/) {
- auto msg = std::make_unique<TEvPQ::TEvSourceIdResponse>();
- msg->Record = std::move(ev->Get()->Record);
- bool hasError = msg->Record.HasError();
-
- Reply(msg);
-
- if (hasError) {
- PassAway();
- }
- }
-
- void DoRequest() {
- if (PendingRequest && PipeConnected) {
- auto msg = std::make_unique<TEvPQ::TEvSourceIdRequest>();
- msg->Record = std::move(PendingRequest->Get()->Record);
- PendingRequest.Reset();
-
- NTabletPipe::SendData(SelfId(), PipeClient, msg.release());
- }
- }
-
- void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
- if (ev->Get()->Status == NKikimrProto::EReplyStatus::OK) {
- PipeConnected = true;
- DoRequest();
- } else {
- ReplyWithError("Error connecting to PQ");
- }
- }
-
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
- if (ev->Get()->TabletId == TabletId) {
- PipeConnected = false;
- ReplyWithError("Pipe destroyed");
- }
- }
-
- void ReplyWithError(const TString& error) {
- auto msg = std::make_unique<TEvPQ::TEvSourceIdResponse>();
- msg->Record.SetPartition(Partition);
- msg->Record.SetError(error);
-
- Reply(msg);
-
- PassAway();
- }
-
- void Reply(std::unique_ptr<TEvPQ::TEvSourceIdResponse>& msg) {
- Send(Parent, msg.release(), 0, Cookie);
- }
-
- STFUNC(StateWork)
- {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvPQ::TEvSourceIdRequest, Handle);
- HFunc(TEvPQ::TEvSourceIdResponse, Handle);
- hFunc(TEvTabletPipe::TEvClientConnected, Handle);
- hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
- sFunc(TEvents::TEvPoison, PassAway);
- }
- }
-
-private:
- TActorId Parent;
- TPartitionSourceManager::TPartitionId Partition;
- ui64 TabletId;
- TEvPQ::TEvSourceIdRequest::TPtr PendingRequest;
- ui64 Cookie;
-
- TActorId PipeClient;
- bool PipeConnected = false;
-};
-
-IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId) {
- return new TSourceIdRequester(parent, partition, tabletId);
-}
-
-bool IsResearchRequires(const TPartitionGraph::Node* node) {
- return node && !node->Parents.empty();
-}
-
-NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value) {
- switch(value) {
- case TSourceIdInfo::EState::Unknown:
- return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown;
- case TSourceIdInfo::EState::Registered:
- return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Registered;
- case TSourceIdInfo::EState::PendingRegistration:
- return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_PendingRegistration;
- }
-}
-
-TSourceIdInfo::EState Convert(NKikimrPQ::TEvSourceIdResponse::EState value) {
- switch(value) {
- case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown:
- return TSourceIdInfo::EState::Unknown;
- case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Registered:
- return TSourceIdInfo::EState::Registered;
- case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_PendingRegistration:
- return TSourceIdInfo::EState::PendingRegistration;
- }
-}
-
-
-
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h
index 6a304e04bf..fcf53a25cd 100644
--- a/ydb/core/persqueue/partition_sourcemanager.h
+++ b/ydb/core/persqueue/partition_sourcemanager.h
@@ -28,10 +28,6 @@ public:
ui64 Offset = 0;
bool Explicit = false;
TInstant WriteTimestamp;
-
- bool Pending = false;
-
- operator bool() const;
};
class TSourceManager {
@@ -52,8 +48,6 @@ public:
void Update(ui64 seqNo, ui64 offset, TInstant timestamp);
void Update(THeartbeat&& heartbeat);
- operator bool() const;
-
private:
const TSourceIdMap& MemoryStorage() const;
const TSourceIdMap& WriteStorage() const;
@@ -103,51 +97,20 @@ public:
explicit TPartitionSourceManager(TPartition& partition);
- // For a partition obtained as a result of a merge or split, it requests
- // information about the consumer's parameters from the parent partitions.
- void EnsureSourceId(const TString& sourceId);
- void EnsureSourceIds(const TVector<TString>& sourceIds);
-
- // Returns true if we expect a response from the parent partitions
- bool WaitSources() const;
-
const TSourceInfo Get(const TString& sourceId) const;
TModificationBatch CreateModificationBatch(const TActorContext& ctx);
- void PassAway();
-
-public:
- void Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& ctx);
-
private:
- void ScheduleBatch();
- void FinishBatch(const TActorContext& ctx);
- bool RequireEnqueue(const TString& sourceId);
-
const TPartitionNode* GetPartitionNode() const;
TSourceIdStorage& GetSourceIdStorage() const;
bool HasParents() const;
TActorId PartitionRequester(TPartitionId id, ui64 tabletId);
- std::unique_ptr<TEvPQ::TEvSourceIdRequest> CreateRequest(TPartitionId id) const;
private:
TPartition& Partition;
-
- TSourceIds UnknownSourceIds;
- TSourceIds PendingSourceIds;
-
- std::unordered_map<TString, TSourceInfo> Sources;
-
- ui64 Cookie = 0;
- std::set<ui64> PendingCookies;
- std::vector<TEvPQ::TEvSourceIdResponse::TPtr> Responses;
- std::unordered_map<TPartitionId, TActorId> RequesterActors;
};
-NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value);
-TSourceIdInfo::EState Convert(NKikimrPQ::TEvSourceIdResponse::EState value);
-
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_types.h b/ydb/core/persqueue/partition_types.h
index a2d2a79053..214c70cbb1 100644
--- a/ydb/core/persqueue/partition_types.h
+++ b/ydb/core/persqueue/partition_types.h
@@ -19,6 +19,7 @@ struct TWriteMsg {
ui64 Cookie;
TMaybe<ui64> Offset;
TEvPQ::TEvWrite::TMsg Msg;
+ std::optional<ui64> InitialSeqNo;
};
struct TOwnershipMsg {
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 4c3a78ea08..871956a019 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -304,9 +304,14 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
ui64 maxOffset = 0;
if (it != SourceIdStorage.GetInMemorySourceIds().end()) {
- maxSeqNo = it->second.SeqNo;
+ maxSeqNo = std::max(it->second.SeqNo, writeResponse.InitialSeqNo.value_or(0));
maxOffset = it->second.Offset;
- if (it->second.SeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) {
+ if (maxSeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) {
+ already = true;
+ }
+ } else if (writeResponse.InitialSeqNo) {
+ maxSeqNo = writeResponse.InitialSeqNo.value();
+ if (maxSeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) {
already = true;
}
}
@@ -658,10 +663,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
for (auto& msg: ev->Get()->Msgs) {
size += msg.Data.size();
bool needToChangeOffset = msg.PartNo + 1 == msg.TotalParts;
- if (!msg.DisableDeduplication) {
- SourceManager.EnsureSourceId(msg.SourceId);
- }
- EmplaceRequest(TWriteMsg{ev->Get()->Cookie, offset, std::move(msg)}, ctx);
+ EmplaceRequest(TWriteMsg{ev->Get()->Cookie, offset, std::move(msg), ev->Get()->InitialSeqNo}, ctx);
if (offset && needToChangeOffset)
++*offset;
}
@@ -868,10 +870,6 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
auto& sourceIdBatch = parameters.SourceIdBatch;
auto sourceId = sourceIdBatch.GetSource(p.Msg.SourceId);
- if (!p.Msg.DisableDeduplication && !sourceId) {
- return ProcessResult::Break;
- }
-
WriteInflightSize -= p.Msg.Data.size();
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_RECEIVE_QUEUE].IncrementFor(ctx.Now().MilliSeconds() - p.Msg.ReceiveTimestamp);
@@ -879,7 +877,19 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
ui64 poffset = p.Offset ? *p.Offset : curOffset;
- if (!p.Msg.DisableDeduplication && sourceId.SeqNo() && *sourceId.SeqNo() >= p.Msg.SeqNo) {
+ LOG_TRACE_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "Topic '" << TopicName() << "' partition " << Partition
+ << " process write for '" << EscapeC(p.Msg.SourceId) << "'"
+ << " DisableDeduplication=" << p.Msg.DisableDeduplication
+ << " SeqNo=" << p.Msg.SeqNo
+ << " LocalSeqNo=" << sourceId.SeqNo()
+ << " InitialSeqNo=" << p.InitialSeqNo
+ );
+
+ if (!p.Msg.DisableDeduplication
+ && ((sourceId.SeqNo() && *sourceId.SeqNo() >= p.Msg.SeqNo)
+ || (p.InitialSeqNo && p.InitialSeqNo.value() >= p.Msg.SeqNo))) {
if (poffset >= curOffset) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
@@ -1218,7 +1228,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
.External = false,
.IgnoreQuotaDeadline = true,
.HeartbeatVersion = std::nullopt,
- }};
+ }, std::nullopt};
WriteInflightSize += heartbeat->Data.size();
auto result = ProcessRequest(hbMsg, parameters, request, ctx);
@@ -1534,8 +1544,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) {
Y_ABORT_UNLESS(Requests.empty()
|| !WriteQuota->CanExaust(now)
|| WaitingForPreviousBlobQuota()
- || WaitingForSubDomainQuota(ctx)
- || SourceManager.WaitSources()); //in this case all writes must be processed or no quota left
+ || WaitingForSubDomainQuota(ctx)); //in this case all writes must be processed or no quota left
AnswerCurrentWrites(ctx); //in case if all writes are already done - no answer will be called on kv write, no kv write at all
BecomeIdle(ctx);
return;
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index aa4653b99f..f52155f943 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1229,11 +1229,7 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c
ApplyNewConfigAndReply(ctx);
}
- ProcessSourceIdRequests(partitionId);
ProcessCheckPartitionStatusRequests(partitionId);
- if (allInitialized) {
- SourceIdRequests.clear();
- }
}
void TPersQueue::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx)
@@ -1980,11 +1976,15 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
" offset: " << (req.HasCmdWriteOffset() ? (req.GetCmdWriteOffset() + i) : -1));
}
InitResponseBuilder(responseCookie, msgs.size(), COUNTER_LATENCY_PQ_WRITE);
+ std::optional<ui64> initialSeqNo;
+ if (req.HasInitialSeqNo()) {
+ initialSeqNo = req.GetInitialSeqNo();
+ }
THolder<TEvPQ::TEvWrite> event =
MakeHolder<TEvPQ::TEvWrite>(responseCookie, req.GetMessageNo(),
req.HasOwnerCookie() ? req.GetOwnerCookie() : "",
req.HasCmdWriteOffset() ? req.GetCmdWriteOffset() : TMaybe<ui64>(),
- std::move(msgs), req.GetIsDirectWrite());
+ std::move(msgs), req.GetIsDirectWrite(), initialSeqNo);
ctx.Send(partActor, event.Release());
}
@@ -3886,37 +3886,6 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, con
ctx.Send(ev->Sender, new TEvPersQueue::TEvProposeTransactionAttachResult(TabletID(), txId, status), 0, ev->Cookie);
}
-void TPersQueue::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record;
- auto it = Partitions.find(record.GetPartition());
- if (it == Partitions.end()) {
- LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition());
-
- auto response = MakeHolder<TEvPQ::TEvSourceIdResponse>();
- response->Record.SetError("Partition was not found");
- Send(ev->Sender, response.Release());
-
- return;
- }
-
- if (it->second.InitDone) {
- Forward(ev, it->second.Actor);
- } else {
- SourceIdRequests[record.GetPartition()].push_back(ev);
- }
-}
-
-void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) {
- auto sit = SourceIdRequests.find(partitionId);
- if (sit != SourceIdRequests.end()) {
- auto it = Partitions.find(partitionId);
- for (auto& r : sit->second) {
- Forward(r, it->second.Actor);
- }
- SourceIdRequests.erase(partitionId);
- }
-}
-
void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
auto it = Partitions.find(record.GetPartition());
@@ -4000,7 +3969,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
HFuncTraced(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
HFuncTraced(TEvPersQueue::TEvCancelTransactionProposal, Handle);
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
- HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
+ HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
default:
return false;
}
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 61eabfaa0b..1cd94ed6d7 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -165,9 +165,6 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
ui64 GetAllowedStep() const;
- void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
- void ProcessSourceIdRequests(ui32 partitionId);
-
void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);
void ProcessCheckPartitionStatusRequests(ui32 partitionId);
@@ -407,7 +404,6 @@ private:
void DestroySession(TPipeInfo& pipeInfo);
bool UseMediatorTimeCast = true;
- THashMap<ui32, TVector<TEvPQ::TEvSourceIdRequest::TPtr>> SourceIdRequests;
THashMap<ui32, TVector<TEvPQ::TEvCheckPartitionStatusRequest::TPtr>> CheckPartitionStatusRequests;
TMaybe<ui64> TabletGeneration;
};
diff --git a/ydb/core/persqueue/ut/partition_chooser_ut.cpp b/ydb/core/persqueue/ut/partition_chooser_ut.cpp
index a66e345dd6..17eefd2808 100644
--- a/ydb/core/persqueue/ut/partition_chooser_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_chooser_ut.cpp
@@ -18,8 +18,8 @@ using namespace NKikimrPQ;
void AddPartition(NKikimrSchemeOp::TPersQueueGroupDescription& conf,
ui32 id,
- const std::optional<TString>&& boundaryFrom,
- const std::optional<TString>&& boundaryTo,
+ const std::optional<TString>&& boundaryFrom = std::nullopt,
+ const std::optional<TString>&& boundaryTo = std::nullopt,
std::vector<ui32> children = {}) {
auto* p = conf.AddPartitions();
p->SetPartitionId(id);
@@ -44,7 +44,7 @@ NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig0(bool SplitMergeEnabled
auto* partitionStrategy = config->MutablePartitionStrategy();
partitionStrategy->SetMinPartitionCount(3);
- partitionStrategy->SetMaxPartitionCount(SplitMergeEnabled ? 10 : 3);
+ partitionStrategy->SetMaxPartitionCount(SplitMergeEnabled ? 10 : 0);
config->SetTopicName("/Root/topic-1");
config->SetTopicPath("/Root");
@@ -344,18 +344,6 @@ private:
response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
response->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
- if (ev->Get()->Record.GetPartitionRequest().HasCmdGetOwnership()) {
- auto& o = ev->Get()->Record.GetPartitionRequest().GetCmdGetOwnership();
- if (o.GetRegisterIfNotExists() || SeqNo) {
- auto* cmd = response->Record.MutablePartitionResponse()->MutableCmdGetOwnershipResult();
- cmd->SetOwnerCookie("ower_cookie");
- cmd->SetStatus(Status);
- cmd->SetSeqNo(SeqNo.value_or(0));
- } else {
- response->Record.SetErrorCode(NPersQueue::NErrorCode::SOURCEID_DELETED);
- }
- }
-
auto* sn = response->Record.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult()->AddSourceIdInfo();
sn->SetSeqNo(SeqNo.value_or(0));
sn->SetState(SeqNo ? NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED : NKikimrPQ::TMessageGroupInfo::STATE_PENDING_REGISTRATION);
@@ -365,6 +353,10 @@ private:
void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>();
+ response->Record.SetStatus(Status);
+ if (SeqNo) {
+ response->Record.SetSeqNo(SeqNo.value());
+ }
ctx.Send(ev->Sender, response.Release());
}
@@ -613,49 +605,91 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_InactiveA
UNIT_ASSERT(r->Error);
}
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) {
+Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_OtherPartition_Test) {
NPersQueue::TTestServer server = CreateServer();
+ auto config = CreateConfig0(true);
+ AddPartition(config, 0, {}, "F");
+ AddPartition(config, 1, "F", {});
CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active);
CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
- CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active);
- {
- auto r = ChoosePartition(server, SMDisabled, "A_Source");
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
- }
- {
- auto r = ChoosePartition(server, SMDisabled, "C_Source");
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2);
- }
- {
- WriteToTable(server, "A_Source_w_0", 0);
- auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0");
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
- }
- {
- // Redefine partition for sourceId. Check that partition changed;
- WriteToTable(server, "A_Source_w_0", 1);
- auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0");
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
- }
- {
- // Redefine partition for sourceId to inactive partition. Select new partition.
- WriteToTable(server, "A_Source_w_0", 3);
- auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0");
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
- }
- {
- // Use prefered partition, and save it in table
- auto r = ChoosePartition(server, SMDisabled, "A_Source_1", 1);
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
- }
+ WriteToTable(server, "A_Source_10", 0, 13);
+ auto r = ChoosePartition(server, config, "A_Source_10", 1);
+
+ UNIT_ASSERT(r->Error);
+ AssertTable(server, "A_Source_10", 0, 13);
+}
+
+Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_NewSourceId_Test) {
+ NPersQueue::TTestServer server = CreateServer();
+
+ auto config = CreateConfig0(false);
+ AddPartition(config, 0);
+
+ auto r = ChoosePartition(server, config, "A_Source");
+
+ UNIT_ASSERT(r->Result);
+ UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
+}
+
+Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_RegisteredSourceId_Test) {
+ NPersQueue::TTestServer server = CreateServer();
+
+ auto config = CreateConfig0(false);
+ AddPartition(config, 0);
+ AddPartition(config, 1);
+
+ WriteToTable(server, "A_Source", 0);
+ auto r = ChoosePartition(server, config, "A_Source");
+
+ UNIT_ASSERT(r->Result);
+ UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
+
+ WriteToTable(server, "A_Source", 1);
+ r = ChoosePartition(server, config, "A_Source");
+
+ UNIT_ASSERT(r->Result);
+ UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
+}
+
+Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Inactive_Test) {
+ NPersQueue::TTestServer server = CreateServer();
+
+ auto config = CreateConfig0(false);
+ AddPartition(config, 0, {}, {}, {1});
+ AddPartition(config, 1);
+
+ WriteToTable(server, "A_Source", 0);
+ auto r = ChoosePartition(server, config, "A_Source");
+
+ UNIT_ASSERT(r->Result);
+ UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
+}
+
+Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_PreferedPartition_Test) {
+ NPersQueue::TTestServer server = CreateServer();
+
+ auto config = CreateConfig0(false);
+ AddPartition(config, 0);
+ AddPartition(config, 1);
+
+ auto r = ChoosePartition(server, config, "A_Source", 0);
+
+ UNIT_ASSERT(r->Result);
+ UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
+}
+
+Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_PreferedPartition_Inactive_Test) {
+ NPersQueue::TTestServer server = CreateServer();
+
+ auto config = CreateConfig0(false);
+ AddPartition(config, 0, {}, {}, {1});
+ AddPartition(config, 1);
+
+ auto r = ChoosePartition(server, config, "A_Source", 0);
+
+ UNIT_ASSERT(r->Error);
}
}
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index 7df22fe66a..d6803f176b 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -508,7 +508,7 @@ void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const
TVector<TEvPQ::TEvWrite::TMsg> msgs;
msgs.push_back(msg);
- auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), false);
+ auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), false, std::nullopt);
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
}
diff --git a/ydb/core/persqueue/ut/slow/pq_ut.cpp b/ydb/core/persqueue/ut/slow/pq_ut.cpp
index 6227c8bc6a..c68c71456e 100644
--- a/ydb/core/persqueue/ut/slow/pq_ut.cpp
+++ b/ydb/core/persqueue/ut/slow/pq_ut.cpp
@@ -140,9 +140,8 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) {
auto sourceIds = CmdSourceIdRead(tc);
UNIT_ASSERT(sourceIds.size() > 0);
for (auto& s: sourceIds) {
- Cout << "try to find sourceId " << s << Endl;
auto findIt = std::find(writtenSourceIds.begin(), writtenSourceIds.end(), s);
- UNIT_ASSERT_VALUES_UNEQUAL(findIt, writtenSourceIds.end());
+ UNIT_ASSERT_C(findIt != writtenSourceIds.end(), "try to find sourceId " << s);
}
Cout << TInstant::Now() << "All Ok" << Endl;
});
diff --git a/ydb/core/persqueue/ut/splitmerge_ut.cpp b/ydb/core/persqueue/ut/splitmerge_ut.cpp
index f5fc57fb87..2221298e89 100644
--- a/ydb/core/persqueue/ut/splitmerge_ut.cpp
+++ b/ydb/core/persqueue/ut/splitmerge_ut.cpp
@@ -98,6 +98,10 @@ TTopicSdkTestSetup CreateSetup() {
setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE);
+ setup.GetRuntime().SetLogPriority(NKikimrServices::PQ_PARTITION_CHOOSER, NActors::NLog::PRI_TRACE);
+
+ setup.GetRuntime().GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(true);
+ setup.GetRuntime().GetAppData().PQConfig.SetUseSrcIdMetaMappingInFirstClass(true);
return setup;
}
@@ -209,7 +213,41 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
Y_UNIT_TEST(PartitionSplit) {
TTopicSdkTestSetup setup = CreateSetup();
- setup.CreateTopic();
+ setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
+
+ TTopicClient client = setup.MakeClient();
+
+ auto writeSession = CreateWriteSession(client, "producer-1");
+
+ TTestReadSession ReadSession(client, 2);
+
+ UNIT_ASSERT(writeSession->Write(Msg("message_1.1", 2)));
+
+ ui64 txId = 1006;
+ SplitPartition(setup, ++txId, 0, "a");
+
+ UNIT_ASSERT(writeSession->Write(Msg("message_1.2", 3)));
+
+ ReadSession.WaitAllMessages();
+
+ for(const auto& info : ReadSession.ReceivedMessages) {
+ if (info.Data == "message_1.1") {
+ UNIT_ASSERT_EQUAL(0, info.PartitionId);
+ UNIT_ASSERT_EQUAL(2, info.SeqNo);
+ } else if (info.Data == "message_1.2") {
+ UNIT_ASSERT(1 == info.PartitionId || 2 == info.PartitionId);
+ UNIT_ASSERT_EQUAL(3, info.SeqNo);
+ } else {
+ UNIT_ASSERT_C(false, "Unexpected message: " << info.Data);
+ }
+ }
+
+ writeSession->Close(TDuration::Seconds(1));
+ }
+
+ Y_UNIT_TEST(PartitionSplit_PreferedPartition) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
TTopicClient client = setup.MakeClient();
@@ -269,9 +307,9 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
writeSession3->Close(TDuration::Seconds(1));
}
- Y_UNIT_TEST(PartitionMerge) {
+ Y_UNIT_TEST(PartitionMerge_PreferedPartition) {
TTopicSdkTestSetup setup = CreateSetup();
- setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2);
+ setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2, 100);
TTopicClient client = setup.MakeClient();
diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp
index 0c133e998b..f707e2d574 100644
--- a/ydb/core/persqueue/utils.cpp
+++ b/ydb/core/persqueue/utils.cpp
@@ -3,6 +3,8 @@
#include <deque>
#include <util/string/builder.h>
+//#include <ydb/core/base/appdata_fwd.h>
+//#include <ydb/core/base/feature_flags.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
namespace NKikimr::NPQ {
@@ -33,7 +35,7 @@ ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config) {
}
bool SplitMergeEnabled(const NKikimrPQ::TPQTabletConfig& config) {
- return config.GetPartitionStrategy().GetMinPartitionCount() < config.GetPartitionStrategy().GetMaxPartitionCount(); // TODO
+ return 0 < config.GetPartitionStrategy().GetMaxPartitionCount();
}
static constexpr ui64 PUT_UNIT_SIZE = 40960u; // 40Kb
diff --git a/ydb/core/persqueue/writer/partition_chooser.h b/ydb/core/persqueue/writer/partition_chooser.h
index a4d19c64ad..3ef8499d3b 100644
--- a/ydb/core/persqueue/writer/partition_chooser.h
+++ b/ydb/core/persqueue/writer/partition_chooser.h
@@ -24,16 +24,14 @@ struct TEvPartitionChooser {
static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER)");
struct TEvChooseResult: public TEventLocal<TEvChooseResult, EvChooseResult> {
- TEvChooseResult(ui32 partitionId, ui64 tabletId, const TString& ownerCookie, std::optional<ui64> seqNo)
+ TEvChooseResult(ui32 partitionId, ui64 tabletId, std::optional<ui64> seqNo)
: PartitionId(partitionId)
, TabletId(tabletId)
- , OwnerCookie(ownerCookie)
, SeqNo(seqNo) {
}
ui32 PartitionId;
ui64 TabletId;
- TString OwnerCookie;
std::optional<ui64> SeqNo;
};
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h
index 8d718d8d45..9f8f4be0df 100644
--- a/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h
+++ b/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h
@@ -73,8 +73,10 @@ protected:
void InitTable(const NActors::TActorContext& ctx) {
TThis::Become(&TThis::StateInitTable);
const auto& pqConfig = AppData(ctx)->PQConfig;
+ TRACE("InitTable: SourceId="<< SourceId
+ << " TopicsAreFirstClassCitizen=" << pqConfig.GetTopicsAreFirstClassCitizen()
+ << " UseSrcIdMetaMappingInFirstClass=" <<pqConfig.GetUseSrcIdMetaMappingInFirstClass());
if (SourceId && pqConfig.GetTopicsAreFirstClassCitizen() && pqConfig.GetUseSrcIdMetaMappingInFirstClass()) {
- DEBUG("InitTable");
TableHelper.SendInitTableRequest(ctx);
} else {
StartKqpSession(ctx);
@@ -162,7 +164,7 @@ protected:
DEBUG("Update the table");
TableHelper.SendUpdateRequest(Partition->PartitionId, SeqNo, ctx);
} else {
- StartGetOwnership(ctx);
+ ReplyResult(ctx);
}
}
@@ -190,7 +192,7 @@ protected:
// Use tx only for query after select. Updating AccessTime without transaction.
TableHelper.CloseKqpSession(ctx);
- return StartGetOwnership(ctx);
+ ReplyResult(ctx);
}
StartIdle();
@@ -207,13 +209,18 @@ protected:
protected:
void StartCheckPartitionRequest(const TActorContext &ctx) {
TThis::Become(&TThis::StateCheckPartition);
+
+ if (!Partition) {
+ return ReplyError(ErrorCode::INITIALIZING, "Partition not choosed", ctx);
+ }
+
PartitionHelper.Open(Partition->TabletId, ctx);
- PartitionHelper.SendCheckPartitionStatusRequest(Partition->PartitionId, ctx);
+ PartitionHelper.SendCheckPartitionStatusRequest(Partition->PartitionId, "", ctx);
}
void Handle(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse::TPtr& ev, const NActors::TActorContext& ctx) {
+ PartitionHelper.Close(ctx);
if (NKikimrPQ::ETopicPartitionStatus::Active == ev->Get()->Record.GetStatus()) {
- PartitionHelper.Close(ctx);
return SendUpdateRequests(ctx);
}
ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "Partition isn`t active", ctx);
@@ -230,43 +237,6 @@ protected:
}
protected:
- void StartGetOwnership(const TActorContext &ctx) {
- TThis::Become(&TThis::StateOwnership);
- if (!Partition) {
- return ReplyError(ErrorCode::INITIALIZING, "Partition not choosed", ctx);
- }
-
- DEBUG("GetOwnership Partition TabletId=" << Partition->TabletId);
-
- PartitionHelper.Open(Partition->TabletId, ctx);
- PartitionHelper.SendGetOwnershipRequest(Partition->PartitionId, SourceId, true, ctx);
- }
-
- void HandleOwnership(TEvPersQueue::TEvResponse::TPtr& ev, const NActors::TActorContext& ctx) {
- DEBUG("HandleOwnership");
- auto& record = ev->Get()->Record;
-
- TString error;
- if (!BasicCheck(record, error)) {
- return ReplyError(ErrorCode::INITIALIZING, std::move(error), ctx);
- }
-
- const auto& response = record.GetPartitionResponse();
- if (!response.HasCmdGetOwnershipResult()) {
- return ReplyError(ErrorCode::INITIALIZING, "Absent Ownership result", ctx);
- }
-
- if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) {
- return ReplyError(ErrorCode::INITIALIZING, "Partition is not active", ctx);
- }
-
- OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie();
-
- PartitionHelper.Close(ctx);
-
- OnOwnership(ctx);
- }
-
void HandleOwnership(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) {
auto msg = ev->Get();
if (PartitionHelper.IsPipe(ev->Sender) && msg->Status != NKikimrProto::OK) {
@@ -282,19 +252,6 @@ protected:
}
}
- virtual void OnOwnership(const TActorContext &ctx) = 0;
-
- STATEFN(StateOwnership) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvPersQueue::TEvResponse, HandleOwnership);
- HFunc(TEvTabletPipe::TEvClientConnected, HandleOwnership);
- HFunc(TEvTabletPipe::TEvClientDestroyed, HandleOwnership);
- sFunc(TEvents::TEvPoison, ScheduleStop);
- }
- }
-
-
protected:
void StartIdle() {
TThis::Become(&TThis::StateIdle);
@@ -331,7 +288,8 @@ protected:
protected:
void ReplyResult(const NActors::TActorContext& ctx) {
- ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId, TThis::OwnerCookie, SeqNo));
+ DEBUG("ReplyResult: Partition=" << Partition->PartitionId << ", SeqNo=" << SeqNo);
+ ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId, SeqNo));
}
void ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx) {
@@ -355,7 +313,6 @@ protected:
bool PartitionPersisted = false;
- TString OwnerCookie;
std::optional<ui64> SeqNo = 0;
};
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h
index 97a9750952..f8d9d35e6e 100644
--- a/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h
+++ b/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h
@@ -37,8 +37,8 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
- TThis::Initialize(ctx);
- TThis::InitTable(ctx);
+ TThis::Initialize(ctx);
+ TThis::InitTable(ctx);
}
TActorIdentity SelfId() const {
@@ -59,11 +59,6 @@ public:
}
}
- void OnOwnership(const TActorContext &ctx) override {
- DEBUG("OnOwnership");
- TThis::ReplyResult(ctx);
- }
-
private:
void RequestPQRB(const NActors::TActorContext& ctx) {
DEBUG("RequestPQRB")
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h
index 463217557c..d20c4da916 100644
--- a/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h
+++ b/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h
@@ -47,8 +47,11 @@ public:
NTabletPipe::SendData(ctx, Pipe, ev.Release());
}
- void SendCheckPartitionStatusRequest(ui32 partitionId, const TActorContext& ctx) {
+ void SendCheckPartitionStatusRequest(ui32 partitionId, const TString& sourceId, const TActorContext& ctx) {
auto ev = MakeHolder<NKikimr::TEvPQ::TEvCheckPartitionStatusRequest>(partitionId);
+ if (sourceId) {
+ ev->Record.SetSourceId(sourceId);
+ }
NTabletPipe::SendData(ctx, Pipe, ev.Release());
}
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h
index 695af84d59..1bc599d872 100644
--- a/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h
+++ b/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h
@@ -35,18 +35,17 @@ public:
std::optional<ui32> preferedPartition)
: TAbstractPartitionChooserActor<TSMPartitionChooserActor<TPipeCreator>, TPipeCreator>(parentId, chooser, fullConverter, sourceId, preferedPartition)
, Graph(MakePartitionGraph(config)) {
-
}
void Bootstrap(const TActorContext& ctx) {
+ TThis::Initialize(ctx);
BoundaryPartition = ChoosePartitionSync();
if (TThis::SourceId) {
- TThis::Initialize(ctx);
GetOwnershipFast(ctx);
} else {
TThis::Partition = BoundaryPartition;
- TThis::StartGetOwnership(ctx);
+ TThis::StartCheckPartitionRequest(ctx);
}
}
@@ -104,11 +103,6 @@ public:
GetOldSeqNo(ctx);
}
- void OnOwnership(const TActorContext &ctx) override {
- DEBUG("OnOwnership");
- TThis::ReplyResult(ctx);
- }
-
private:
void GetOwnershipFast(const TActorContext &ctx) {
TThis::Become(&TThis::StateOwnershipFast);
@@ -119,31 +113,18 @@ private:
DEBUG("GetOwnershipFast Partition=" << BoundaryPartition->PartitionId << " TabletId=" << BoundaryPartition->TabletId);
TThis::PartitionHelper.Open(BoundaryPartition->TabletId, ctx);
- TThis::PartitionHelper.SendGetOwnershipRequest(BoundaryPartition->PartitionId, TThis::SourceId, false, ctx);
+ TThis::PartitionHelper.SendCheckPartitionStatusRequest(BoundaryPartition->PartitionId, TThis::SourceId, ctx);
}
- void HandleOwnershipFast(TEvPersQueue::TEvResponse::TPtr& ev, const NActors::TActorContext& ctx) {
- DEBUG("HandleOwnershipFast");
- auto& record = ev->Get()->Record;
-
- TString error;
- if (!BasicCheck(record, error)) {
- return TThis::InitTable(ctx);
- }
-
- const auto& response = record.GetPartitionResponse();
- if (!response.HasCmdGetOwnershipResult()) {
- return TThis::ReplyError(ErrorCode::INITIALIZING, "Absent Ownership result", ctx);
- }
-
- if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) {
- return TThis::ReplyError(ErrorCode::INITIALIZING, "Configuration changed", ctx);
- }
-
- TThis::OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie();
-
- if (response.GetCmdGetOwnershipResult().GetSeqNo() > 0) {
+ void HandleFast(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse::TPtr& ev, const NActors::TActorContext& ctx) {
+ TThis::PartitionHelper.Close(ctx);
+ if (NKikimrPQ::ETopicPartitionStatus::Active == ev->Get()->Record.GetStatus()
+ && ev->Get()->Record.HasSeqNo()
+ && ev->Get()->Record.GetSeqNo() > 0) {
// Fast path: the partition ative and already written
+ TThis::Partition = BoundaryPartition;
+ TThis::SeqNo = ev->Get()->Record.GetSeqNo();
+
TThis::SendUpdateRequests(ctx);
return TThis::ReplyResult(ctx);
}
@@ -154,7 +135,7 @@ private:
STATEFN(StateOwnershipFast) {
TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
switch (ev->GetTypeRewrite()) {
- HFunc(TEvPersQueue::TEvResponse, HandleOwnershipFast);
+ HFunc(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse, HandleFast);
HFunc(TEvTabletPipe::TEvClientConnected, TThis::HandleOwnership);
HFunc(TEvTabletPipe::TEvClientDestroyed, TThis::HandleOwnership);
SFunc(TEvents::TEvPoison, TThis::Die);
@@ -207,7 +188,7 @@ private:
}
TThis::PartitionHelper.Close(ctx);
- TThis::StartCheckPartitionRequest(ctx);
+ OnPartitionChosen(ctx);
}
STATEFN(StateGetMaxSeqNo) {
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp
index dc5bddf72e..5fac083b6c 100644
--- a/ydb/core/persqueue/writer/writer.cpp
+++ b/ydb/core/persqueue/writer/writer.cpp
@@ -343,17 +343,17 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
return InitResult(error, std::move(record));
}
- const auto& response = record.GetPartitionResponse();
+ auto& response = *record.MutablePartitionResponse();
if (!response.HasCmdGetMaxSeqNoResult()) {
return InitResult("Absent MaxSeqNo result", std::move(record));
}
- const auto& result = response.GetCmdGetMaxSeqNoResult();
+ auto& result = *response.MutableCmdGetMaxSeqNoResult();
if (result.SourceIdInfoSize() < 1) {
return InitResult("Empty source id info", std::move(record));
}
- const auto& sourceIdInfo = result.GetSourceIdInfo(0);
+ auto& sourceIdInfo = *result.MutableSourceIdInfo(0);
if (Opts.CheckState) {
switch (sourceIdInfo.GetState()) {
case NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED:
@@ -370,6 +370,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
}
}
+ Y_VERIFY(sourceIdInfo.GetSeqNo() >= 0);
+ if (Opts.InitialSeqNo && (ui64)sourceIdInfo.GetSeqNo() < Opts.InitialSeqNo.value()) {
+ sourceIdInfo.SetSeqNo(Opts.InitialSeqNo.value());
+ }
+
InitResult(OwnerCookie, sourceIdInfo, WriteId);
Become(&TThis::StateWork);
@@ -609,12 +614,16 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
auto& request = *ev->Record.MutablePartitionRequest();
request.SetMessageNo(MessageNo++);
+ if (Opts.InitialSeqNo) {
+ request.SetInitialSeqNo(Opts.InitialSeqNo.value());
+ }
SetWriteId(request);
if (!Opts.UseDeduplication) {
request.SetPartition(PartitionId);
}
+
NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
PendingWrite.emplace_back(cookie);
diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h
index bd3ba41038..9cd28cd5c4 100644
--- a/ydb/core/persqueue/writer/writer.h
+++ b/ydb/core/persqueue/writer/writer.h
@@ -176,6 +176,7 @@ struct TPartitionWriterOpts {
TString SourceId;
std::optional<ui32> ExpectedGeneration;
+ std::optional<ui64> InitialSeqNo;
TString Database;
TString TopicPath;
@@ -204,6 +205,7 @@ struct TPartitionWriterOpts {
TPartitionWriterOpts& WithTxId(const TString& value) { TxId = value; return *this; }
TPartitionWriterOpts& WithTraceId(const TString& value) { TraceId = value; return *this; }
TPartitionWriterOpts& WithRequestType(const TString& value) { RequestType = value; return *this; }
+ TPartitionWriterOpts& WithInitialSeqNo(const std::optional<ui64> value) { InitialSeqNo = value; return *this; }
};
IActor* CreatePartitionWriter(const TActorId& client,
diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto
index e92ea1d7b8..87b3fa3cb2 100644
--- a/ydb/core/protos/msgbus_pq.proto
+++ b/ydb/core/protos/msgbus_pq.proto
@@ -169,6 +169,7 @@ message TPersQueuePartitionRequest {
optional bool IsDirectWrite = 18 [default = false];
optional uint64 PutUnitsSize = 19;
+ optional int64 InitialSeqNo = 26;
}
message TPersQueueMetaRequest {
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index b47d3ff20a..bb70645ab6 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -936,38 +936,14 @@ message TEvSubDomainStatus {
required bool SubDomainOutOfSpace = 1;
};
-message TEvSourceIdRequest {
- optional uint32 Partition = 1;
- repeated string SourceId = 2;
-};
-
-message TEvSourceIdResponse {
- enum EState {
- Unknown = 0;
- Registered = 1;
- PendingRegistration = 2;
- };
-
- message TSource {
- optional string Id = 1;
- optional EState State = 2;
- optional uint64 SeqNo = 3;
- optional uint64 Offset = 4;
- optional bool Explicit = 5;
- optional uint64 WriteTimestamp = 6;
-
- };
- optional string Error = 1;
- optional uint32 Partition = 2;
- repeated TSource Source = 3;
-};
-
message TEvCheckPartitionStatusRequest {
optional uint32 Partition = 1;
+ optional string SourceId = 2;
};
message TEvCheckPartitionStatusResponse {
optional ETopicPartitionStatus Status = 1;
+ optional uint64 SeqNo = 2;
};
message TTransaction {
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
index 128e74ad97..fedf75f9a7 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
@@ -22,12 +22,12 @@ TTopicSdkTestSetup::TTopicSdkTestSetup(const TString& testCaseName, const NKikim
}
}
-void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer, size_t partitionCount)
+void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer, size_t partitionCount, std::optional<size_t> maxPartitionCount)
{
TTopicClient client(MakeDriver());
TCreateTopicSettings topics;
- TPartitioningSettings partitions(partitionCount, partitionCount);
+ TPartitioningSettings partitions(partitionCount, maxPartitionCount.value_or(partitionCount));
topics.PartitioningSettings(partitions);
TConsumerSettings<TCreateTopicSettings> consumers(topics, consumer);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h
index c1642f6b46..7ab9f650dc 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h
@@ -16,7 +16,8 @@ class TTopicSdkTestSetup {
public:
TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings = MakeServerSettings(), bool createTopic = true);
- void CreateTopic(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1);
+ void CreateTopic(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1,
+ std::optional<size_t> maxPartitionCount = std::nullopt);
TString GetEndpoint() const;
TString GetTopicPath(const TString& name = TEST_TOPIC) const;
diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h
index 0d09a43189..0d5e472ca4 100644
--- a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h
+++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h
@@ -470,6 +470,7 @@ private:
ui64 PartitionTabletId;
ui32 PreferedPartition;
TString SourceId;
+ std::optional<ui64> InitialSeqNo;
TString OwnerCookie;
TString UserAgent;
diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
index 20dad1f114..0b7b645c8b 100644
--- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
+++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
@@ -455,6 +455,7 @@ void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) {
void TWriteSessionActor::Handle(NPQ::TEvPartitionChooser::TEvChooseResult::TPtr& ev, const NActors::TActorContext& ctx) {
auto* r = ev->Get();
PartitionTabletId = r->TabletId;
+ InitialSeqNo = r->SeqNo;
LastSourceIdUpdate = ctx.Now();
ProceedPartition(r->PartitionId, ctx);
@@ -484,6 +485,7 @@ void TWriteSessionActor::ProceedPartition(const ui32 partition, const TActorCont
TPartitionWriterOpts opts;
opts.WithSourceId(SourceId);
+ opts.WithInitialSeqNo(InitialSeqNo);
Writer = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter(ctx.SelfID, PartitionTabletId, Partition, opts));
State = ES_WAIT_WRITER_INIT;
diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp
index f50e1a8653..c6564b2479 100644
--- a/ydb/services/lib/actors/pq_schema_actor.cpp
+++ b/ydb/services/lib/actors/pq_schema_actor.cpp
@@ -940,13 +940,25 @@ namespace NKikimr::NGRpcProxy::V1 {
pqDescr->SetName(name);
ui32 parts = 1;
+ ui32 maxParts = 0;
if (request.has_partitioning_settings()) {
- if (request.partitioning_settings().min_active_partitions() < 0) {
- error = TStringBuilder() << "Partitions count must be positive, provided " << request.partitioning_settings().min_active_partitions();
+ const auto& settings = request.partitioning_settings();
+ if (settings.min_active_partitions() < 0) {
+ error = TStringBuilder() << "Partitions count must be positive, provided " << settings.min_active_partitions();
return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
- parts = request.partitioning_settings().min_active_partitions();
+ parts = settings.min_active_partitions();
if (parts == 0) parts = 1;
+
+ if (settings.partition_count_limit() > 0) {
+ maxParts = settings.partition_count_limit();
+
+ if (maxParts < parts) {
+ error = TStringBuilder() << "Partitions count limit must be greater than or equal to partitions count, provided "
+ << settings.partition_count_limit() << " and " << settings.min_active_partitions();
+ return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
+ }
+ }
}
pqDescr->SetTotalGroupCount(parts);
@@ -954,6 +966,11 @@ namespace NKikimr::NGRpcProxy::V1 {
auto config = pqDescr->MutablePQTabletConfig();
auto partConfig = config->MutablePartitionConfig();
+ if (maxParts > 0 && AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge()) {
+ config->MutablePartitionStrategy()->SetMinPartitionCount(parts);
+ config->MutablePartitionStrategy()->SetMaxPartitionCount(maxParts);
+ }
+
config->SetRequireAuthWrite(true);
config->SetRequireAuthRead(true);
pqDescr->SetPartitionPerTablet(1);
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index 6113865f72..2d69db971d 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -193,6 +193,7 @@ private:
ui64 PartitionTabletId;
ui32 PreferedPartition;
std::optional<ui32> ExpectedGeneration;
+ std::optional<ui64> InitialSeqNo;
bool PartitionFound = false;
// 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index bee353798f..972e76a179 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -631,6 +631,7 @@ template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionChooser::TEvChooseResult::TPtr& ev, const NActors::TActorContext& ctx) {
auto* r = ev->Get();
PartitionTabletId = r->TabletId;
+ InitialSeqNo = r->SeqNo;
LastSourceIdUpdate = ctx.Now();
ProceedPartition(r->PartitionId, ctx);
@@ -680,6 +681,7 @@ void TWriteSessionActor<UseMigrationProtocol>::CreatePartitionWriterCache(const
opts.WithDeduplication(UseDeduplication);
opts.WithSourceId(SourceId);
+ opts.WithInitialSeqNo(InitialSeqNo);
opts.WithExpectedGeneration(ExpectedGeneration);
if constexpr (UseMigrationProtocol) {