diff options
author | tesseract <tesseract@yandex-team.com> | 2023-11-14 12:00:44 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-11-14 19:07:43 +0300 |
commit | 0628d4698c38973d90cd1fbc81a6b67e1d6d24a9 (patch) | |
tree | 787768d9699d4575a5cc22747d34cbf6ae89a28c | |
parent | 60b19cde31eefa790f0956b3eb1f2e567dd2ca15 (diff) | |
download | ydb-0628d4698c38973d90cd1fbc81a6b67e1d6d24a9.tar.gz |
Split topic partitions. Take sourceid information from parent partitions
47 files changed, 1902 insertions, 248 deletions
diff --git a/.mapping.json b/.mapping.json index eea167a8b7..146d1f262d 100644 --- a/.mapping.json +++ b/.mapping.json @@ -9475,6 +9475,11 @@ "ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt":"", "ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.txt":"", "ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt":"", + "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.darwin-x86_64.txt":"", + "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-aarch64.txt":"", + "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-x86_64.txt":"", + "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.txt":"", + "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.windows-x86_64.txt":"", "ydb/public/sdk/cpp/client/ydb_types/CMakeLists.darwin-x86_64.txt":"", "ydb/public/sdk/cpp/client/ydb_types/CMakeLists.linux-aarch64.txt":"", "ydb/public/sdk/cpp/client/ydb_types/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt index e15e17c61c..9f6d0a3ab0 100644 --- a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt @@ -57,6 +57,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt index 35c8a46399..ad4711be21 100644 --- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt @@ -58,6 +58,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt index 35c8a46399..ad4711be21 100644 --- a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt @@ -58,6 +58,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp diff --git a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt index e15e17c61c..9f6d0a3ab0 100644 --- a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt @@ -57,6 +57,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 08cd4433a3..b6335edacf 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -138,6 +138,8 @@ struct TEvPQ { EvQuotaCountersUpdated, EvConsumerRemoved, EvFetchResponse, + EvSourceIdRequest, + EvSourceIdResponse, EvEnd }; @@ -854,6 +856,12 @@ struct TEvPQ { TString Message; NKikimrClient::TPersQueueFetchResponse Response; }; + + struct TEvSourceIdRequest : public TEventPB<TEvSourceIdRequest, NKikimrPQ::TEvSourceIdRequest, EvSourceIdRequest> { + }; + + struct TEvSourceIdResponse : public TEventPB<TEvSourceIdResponse, NKikimrPQ::TEvSourceIdResponse, EvSourceIdResponse> { + }; }; } //NKikimr diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index fa25484de7..549167548a 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1,5 +1,6 @@ #include "event_helpers.h" #include "mirrorer.h" +#include "partition_log.h" #include "partition_util.h" #include "partition.h" #include "read.h" @@ -53,6 +54,29 @@ const TString& TPartition::TopicName() const { return TopicConverter->GetClientsideName(); } +TString TPartition::LogPrefix() const { + TString state; + if (CurrentStateFunc() == &TThis::StateInit) { + state = "StateInit"; + } else if (CurrentStateFunc() == &TThis::StateIdle) { + state = "StateIdle"; + } else if (CurrentStateFunc() == &TThis::StateWrite) { + state = "StateWrite"; + } else { + state = "Unknown"; + } + return TStringBuilder() << "" << SelfId() << " " << state << " Partition: " << Partition << " "; +} + +bool TPartition::CanWrite() const { + return (PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active) + && (!PendingPartitionConfig || PendingPartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active); +} + +bool TPartition::CanEnqueue() const { + return PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active; +} + ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 offset) { if (container.empty()) { return offset; @@ -114,6 +138,8 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , TopicConverter(topicConverter) , IsLocalDC(TabletConfig.GetLocalDC()) , DCId(std::move(dcId)) + , PartitionGraph() + , SourceManager(this) , StartOffset(0) , EndOffset(0) , WriteInflightSize(0) @@ -851,34 +877,50 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx } void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx) { - auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie); - NKikimrClient::TResponse& resp = response->Response; + MaxSeqNoRequests.emplace_back(ev); + ProcessMaxSeqNoRequest(ctx); +} - resp.SetStatus(NMsgBusProxy::MSTATUS_OK); - resp.SetErrorCode(NPersQueue::NErrorCode::OK); +void TPartition::ProcessMaxSeqNoRequest(const TActorContext& ctx) { + PQ_LOG_T("TPartition::ProcessMaxSeqNoRequest. Queue size: " << MaxSeqNoRequests.size()); + SourceManager.EnsureSource(ctx); - auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult(); - for (const auto& sourceId : ev->Get()->SourceIds) { - auto& protoInfo = *result.AddSourceIdInfo(); - protoInfo.SetSourceId(sourceId); + while(!MaxSeqNoRequests.empty()) { + auto& ev = MaxSeqNoRequests.front(); - auto it = SourceIdStorage.GetInMemorySourceIds().find(sourceId); - if (it == SourceIdStorage.GetInMemorySourceIds().end()) { - continue; - } + auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie); + NKikimrClient::TResponse& resp = response->Response; - const auto& memInfo = it->second; - Y_ABORT_UNLESS(memInfo.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, memInfo.Offset); - Y_ABORT_UNLESS(memInfo.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, memInfo.SeqNo); + resp.SetStatus(NMsgBusProxy::MSTATUS_OK); + resp.SetErrorCode(NPersQueue::NErrorCode::OK); - protoInfo.SetSeqNo(memInfo.SeqNo); - protoInfo.SetOffset(memInfo.Offset); - protoInfo.SetWriteTimestampMS(memInfo.WriteTimestamp.MilliSeconds()); - protoInfo.SetExplicit(memInfo.Explicit); - protoInfo.SetState(TSourceIdInfo::ConvertState(memInfo.State)); - } + auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult(); + for (const auto& sourceId : ev->Get()->SourceIds) { + auto& protoInfo = *result.AddSourceIdInfo(); + protoInfo.SetSourceId(sourceId); + + auto info = SourceManager.Get(sourceId); + if (!info) { + PQ_LOG_D("Stop MaxSeqNoRequest - scheduled a research. SourceId: " << sourceId); + return; + } + if (info.State == TSourceIdInfo::EState::Unknown) { + continue; + } - ctx.Send(Tablet, response.Release()); + Y_ABORT_UNLESS(info.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, info.Offset); + Y_ABORT_UNLESS(info.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, info.SeqNo); + + protoInfo.SetSeqNo(info.SeqNo); + protoInfo.SetOffset(info.Offset); + protoInfo.SetWriteTimestampMS(info.WriteTimestamp.MilliSeconds()); + protoInfo.SetExplicit(info.Explicit); + protoInfo.SetState(TSourceIdInfo::ConvertState(info.State)); + } + + ctx.Send(Tablet, response.Release()); + MaxSeqNoRequests.pop_front(); + } } void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) { @@ -1297,12 +1339,14 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& DiskIsFull = !diskIsOk; if (response.HasCookie()) { - HandleSetOffsetResponse(response.GetCookie(), ctx); + OnProcessTxsAndUserActsWriteComplete(response.GetCookie(), ctx); } else { - if (ctx.Now() - WriteStartTime > TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs())) { + const auto writeDuration = ctx.Now() - WriteStartTime; + const auto minWriteLatency = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs()); + if (writeDuration > minWriteLatency) { HandleWriteResponse(ctx); } else { - ctx.Schedule(TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs()) - (ctx.Now() - WriteStartTime), new TEvPQ::TEvHandleWriteResponse()); + ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse()); } } } @@ -1414,6 +1458,7 @@ void TPartition::RemoveDistrTx() Y_ABORT_UNLESS(!DistrTxs.empty()); DistrTxs.pop_front(); + PendingPartitionConfig = nullptr; } void TPartition::ProcessDistrTxs(const TActorContext& ctx) @@ -1495,6 +1540,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event) ChangeConfig = MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter, event.Config); + PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition); + SendChangeConfigReply = false; return true; } @@ -1623,11 +1670,85 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co } } +void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorContext& ctx) { + Y_ABORT_UNLESS(cookie == SET_OFFSET_COOKIE); + + + if (ChangeConfig) { + EndChangePartitionConfig(ChangeConfig->Config, + ChangeConfig->TopicConverter, + ctx); + } + + for (auto& user : AffectedUsers) { + if (auto* actual = GetPendingUserIfExists(user)) { + TUserInfo& userInfo = UsersInfoStorage->GetOrCreate(user, ctx); + bool offsetHasChanged = (userInfo.Offset != actual->Offset); + + userInfo.Session = actual->Session; + userInfo.Generation = actual->Generation; + userInfo.Step = actual->Step; + userInfo.Offset = actual->Offset; + userInfo.ReadRuleGeneration = actual->ReadRuleGeneration; + userInfo.ReadFromTimestamp = actual->ReadFromTimestamp; + userInfo.HasReadRule = true; + + if (userInfo.Important != actual->Important) { + if (userInfo.LabeledCounters) { + ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup()); + } + userInfo.SetImportant(actual->Important); + } + if (userInfo.Important && userInfo.Offset < (i64)StartOffset) { + userInfo.Offset = StartOffset; + } + + if (offsetHasChanged && !userInfo.UpdateTimestampFromCache()) { + userInfo.ActualTimestamps = false; + ReadTimestampForOffset(user, userInfo, ctx); + } else { + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1); + } + } else { + auto ui = UsersInfoStorage->GetIfExists(user); + if (ui && ui->LabeledCounters) { + ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup()); + } + + UsersInfoStorage->Remove(user, ctx); + Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user)); + } + } + + for (auto& [actor, reply] : Replies) { + ctx.Send(actor, reply.release()); + } + + PendingUsersInfo.clear(); + Replies.clear(); + AffectedUsers.clear(); + + UsersInfoWriteInProgress = false; + + TxIdHasChanged = false; + + if (ChangeConfig) { + ReportCounters(ctx, true); + ChangeConfig = nullptr; + PendingPartitionConfig = nullptr; + } + + + ProcessTxsAndUserActs(ctx); +} + void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, NPersQueue::TTopicConverterPtr topicConverter, const TActorContext& ctx) { Config = config; + PartitionConfig = GetPartitionConfig(Config, Partition); + PartitionGraph.Rebuild(Config); TopicConverter = topicConverter; Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0); @@ -1697,6 +1818,9 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx) } else if (t.ProposeConfig) { t.Predicate = BeginTransaction(*t.ProposeConfig); + PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config, Partition); + //Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found"); + ctx.Send(Tablet, MakeHolder<TEvPQ::TEvProposePartitionConfigResult>(t.ProposeConfig->Step, t.ProposeConfig->TxId, @@ -1707,6 +1831,7 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx) Y_ABORT_UNLESS(!ChangeConfig); ChangeConfig = t.ChangeConfig; + PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition); SendChangeConfigReply = t.SendReply; BeginChangePartitionConfig(ChangeConfig->Config, ctx); @@ -2392,5 +2517,40 @@ void TPartition::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext } } +void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record; + + if (Partition != record.GetPartition()) { + LOG_INFO_S( + ctx, NKikimrServices::PERSQUEUE, + "TEvSourceIdRequest for wrong partition " << record.GetPartition() << "." << + " Topic: \"" << TopicName() << "\"." << + " Partition: " << Partition << "." + ); + return; + } + + auto& memoryStorage = SourceIdStorage.GetInMemorySourceIds(); + + auto response = MakeHolder<TEvPQ::TEvSourceIdResponse>(); + for(auto& sourceId : record.GetSourceId()) { + auto* s = response->Record.AddSource(); + s->SetId(sourceId); + + auto it = memoryStorage.find(sourceId); + if (it != memoryStorage.end()) { + auto& info = it->second; + s->SetState(Convert(info.State)); + s->SetSeqNo(info.SeqNo); + s->SetOffset(info.Offset); + s->SetExplicit(info.Explicit); + s->SetWriteTimestamp(info.WriteTimestamp.GetValue()); + } else { + s->SetState(NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown); + } + } + + Send(ev->Sender, response.Release()); +} } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 9709d6b973..c8e510ad58 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -4,6 +4,7 @@ #include "header.h" #include "key.h" #include "partition_init.h" +#include "partition_sourcemanager.h" #include "partition_types.h" #include "quota_tracker.h" #include "sourceid.h" @@ -78,6 +79,8 @@ class TPartition : public TActorBootstrapped<TPartition> { friend TInitDataRangeStep; friend TInitDataStep; + friend TPartitionSourceManager; + public: const TString& TopicName() const; @@ -88,6 +91,9 @@ private: struct THasDataReq; struct THasDataDeadline; + bool CanWrite() const; + bool CanEnqueue() const; + void ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error); void ReplyPropose(const TActorContext& ctx, const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode); void ReplyErrorForStoredWrites(const TActorContext& ctx); @@ -101,7 +107,7 @@ private: void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx); void AnswerCurrentWrites(const TActorContext& ctx); void CancelAllWritesOnIdle(const TActorContext& ctx); - void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST); + void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TPartitionSourceManager::TModificationBatch& sourceIdBatch, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST); void ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue::TEvRequest* request); void CreateMirrorerActor(); void DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx); @@ -155,7 +161,6 @@ private: void HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx); void HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx); void HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx); - void HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx); void HandleWakeup(const TActorContext& ctx); void HandleWriteResponse(const TActorContext& ctx); @@ -175,6 +180,8 @@ private: void ProcessTimestampRead(const TActorContext& ctx); void ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx); + void ProcessMaxSeqNoRequest(const TActorContext& ctx); + void ReadTimestampForOffset(const TString& user, TUserInfo& ui, const TActorContext& ctx); void ReportCounters(const TActorContext& ctx, bool force = false); bool UpdateCounters(const TActorContext& ctx, bool force = false); @@ -199,7 +206,7 @@ private: TInstant GetWriteTimeEstimate(ui64 offset) const; bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, - TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter); + TPartitionSourceManager::TModificationBatch& sourceIdBatch); bool CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx); // Removes blobs that are no longer required. Blobs are no longer required if the storage time of all messages @@ -300,6 +307,7 @@ private: void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx); + void OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorContext& ctx); void EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, NPersQueue::TTopicConverterPtr topicConverter, const TActorContext& ctx); @@ -312,7 +320,8 @@ private: template <typename T> void EmplaceRequest(T&& body, const TActorContext& ctx) { - Requests.emplace_back(body, WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now() - TInstant::Zero()); + const auto now = ctx.Now(); + Requests.emplace_back(body, WriteQuota->GetQuotedTime(now), now - TInstant::Zero()); } void EmplaceResponse(TMessage&& message, const TActorContext& ctx); @@ -327,6 +336,9 @@ private: void ResendPendingEvents(const TActorContext& ctx); + void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx); + + TString LogPrefix() const; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR; @@ -460,6 +472,8 @@ private: HFuncTraced(TEvPQ::TEvTxCommit, Handle); HFuncTraced(TEvPQ::TEvTxRollback, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); + HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle); + HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle); @@ -515,6 +529,8 @@ private: HFuncTraced(TEvPQ::TEvTxCommit, Handle); HFuncTraced(TEvPQ::TEvTxRollback, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); + HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle); + HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle); @@ -525,36 +541,45 @@ private: } private: + enum class ProcessResult { + Continue, + Abort, + Break + }; + struct ProcessParameters { - ProcessParameters(TSourceIdWriter& sourceIdWriter, - THeartbeatEmitter& heartbeatEmitter) - : SourceIdWriter(sourceIdWriter) - , HeartbeatEmitter(heartbeatEmitter) { + ProcessParameters(TPartitionSourceManager::TModificationBatch& sourceIdBatch) + : SourceIdBatch(sourceIdBatch) { } - TSourceIdWriter& SourceIdWriter; - THeartbeatEmitter& HeartbeatEmitter; + TPartitionSourceManager::TModificationBatch& SourceIdBatch; ui64 CurOffset; bool OldPartsCleared; bool HeadCleared; }; - bool ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters); - bool ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters); - bool ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters); - bool ProcessRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx); + ProcessResult ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters); + ProcessResult ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters); + ProcessResult ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters); + ProcessResult ProcessRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx); private: ui64 TabletID; ui32 Partition; NKikimrPQ::TPQTabletConfig Config; NKikimrPQ::TPQTabletConfig TabletConfig; + const NKikimrPQ::TPQTabletConfig::TPartition* PartitionConfig = nullptr; + const NKikimrPQ::TPQTabletConfig::TPartition* PendingPartitionConfig = nullptr; + const TTabletCountersBase& Counters; NPersQueue::TTopicConverterPtr TopicConverter; bool IsLocalDC; TString DCId; + TPartitionGraph PartitionGraph; + TPartitionSourceManager SourceManager; + ui32 MaxBlobSize; const ui32 TotalLevels = 4; TVector<ui32> CompactLevelBorder; @@ -575,6 +600,7 @@ private: std::deque<TMessage> Requests; std::deque<TMessage> Responses; + std::deque<TEvPQ::TEvGetMaxSeqNoRequest::TPtr> MaxSeqNoRequests; THead Head; THead NewHead; diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 9e3d7ba111..d6a13a0b29 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -179,6 +179,8 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon case NKikimrProto::NODATA: Partition()->Config = Partition()->TabletConfig; + Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition); + Partition()->PartitionGraph.Rebuild(Partition()->Config); break; case NKikimrProto::ERROR: diff --git a/ydb/core/persqueue/partition_log.h b/ydb/core/persqueue/partition_log.h new file mode 100644 index 0000000000..78c9a9e140 --- /dev/null +++ b/ydb/core/persqueue/partition_log.h @@ -0,0 +1,18 @@ +#pragma once + +#include <library/cpp/actors/core/log.h> +#include <util/generic/string.h> + +namespace NKikimr::NPQ { + +inline TString LogPrefix() { return {}; } + +#define PQ_LOG_T(stream) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream) +#define PQ_LOG_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream) +#define PQ_LOG_I(stream) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream) +#define PQ_LOG_W(stream) LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream) +#define PQ_LOG_NOTICE(stream) LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream) +#define PQ_LOG_ERROR(stream) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream) +#define PQ_LOG_CRIT(stream) LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream) + +} // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 323a14a647..136fd9b3ad 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -887,77 +887,6 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) { Y_ABORT_UNLESS(ReadingTimestamp || UpdateUserInfoTimestamp.empty()); } -void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) { - Y_ABORT_UNLESS(cookie == SET_OFFSET_COOKIE); - - - if (ChangeConfig) { - EndChangePartitionConfig(ChangeConfig->Config, - ChangeConfig->TopicConverter, - ctx); - } - - for (auto& user : AffectedUsers) { - if (auto* actual = GetPendingUserIfExists(user)) { - TUserInfo& userInfo = UsersInfoStorage->GetOrCreate(user, ctx); - bool offsetHasChanged = (userInfo.Offset != actual->Offset); - - userInfo.Session = actual->Session; - userInfo.Generation = actual->Generation; - userInfo.Step = actual->Step; - userInfo.Offset = actual->Offset; - userInfo.ReadRuleGeneration = actual->ReadRuleGeneration; - userInfo.ReadFromTimestamp = actual->ReadFromTimestamp; - userInfo.HasReadRule = true; - - if (userInfo.Important != actual->Important) { - if (userInfo.LabeledCounters) { - ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup()); - } - userInfo.SetImportant(actual->Important); - } - if (userInfo.Important && userInfo.Offset < (i64)StartOffset) { - userInfo.Offset = StartOffset; - } - - if (offsetHasChanged && !userInfo.UpdateTimestampFromCache()) { - userInfo.ActualTimestamps = false; - ReadTimestampForOffset(user, userInfo, ctx); - } else { - TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1); - } - } else { - auto ui = UsersInfoStorage->GetIfExists(user); - if (ui && ui->LabeledCounters) { - ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup()); - } - - UsersInfoStorage->Remove(user, ctx); - Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user)); - } - } - - for (auto& [actor, reply] : Replies) { - ctx.Send(actor, reply.release()); - } - - PendingUsersInfo.clear(); - Replies.clear(); - AffectedUsers.clear(); - - UsersInfoWriteInProgress = false; - - TxIdHasChanged = false; - - if (ChangeConfig) { - ReportCounters(ctx, true); - ChangeConfig = nullptr; - } - - - ProcessTxsAndUserActs(ctx); -} - void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription) { ui32 count = 0; ui32 size = 0; diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp new file mode 100644 index 0000000000..64474fb355 --- /dev/null +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -0,0 +1,466 @@ +#include "partition.h" +#include "partition_log.h" + +#include <unordered_map> + +#include <ydb/core/base/tablet_pipe.h> + +namespace NKikimr::NPQ { + +IActor* CreateRequester(TActorId parent, ui32 partition, ui64 tabletId, TPartitionSourceManager::TSourceIdsPtr& sourceIds, ui64 cookie); +bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node); + +// +// TPartitionSourceManager +// + +TPartitionSourceManager::TPartitionSourceManager(TPartition* partition) + : Partition(partition) { +} + +void TPartitionSourceManager::EnsureSource(const TActorContext& /*ctx*/) { + if (WaitSources()) { + return; + } + + PendingCookies.clear(); + Responses.clear(); + + auto node = GetPartitionNode(); + if (!IsResearchRequires(node)) { + return; + } + + auto unknowSourceIds = BuildUnknownSourceIds(); + if (unknowSourceIds->empty()) { + return; + } + + for(const auto* parent : node.value()->HierarhicalParents) { + PendingCookies.insert(++Cookie); + Partition->RegisterWithSameMailbox(CreateRequester(Partition->SelfId(), + parent->Id, + parent->TabletId, + unknowSourceIds, + Cookie)); + } +} + +const TPartitionSourceManager::TSourceInfo TPartitionSourceManager::Get(const TString& sourceId) const { + auto& knownSourceIds = GetSourceIdStorage().GetInMemorySourceIds(); + auto itk = knownSourceIds.find(sourceId); + if (itk != knownSourceIds.end()) { + auto& value = itk->second; + + TSourceInfo result; + result.State = value.State; + result.SeqNo = value.SeqNo; + result.Offset = value.Offset; + result.Explicit = value.Explicit; + result.WriteTimestamp = value.WriteTimestamp; + + return result; + } + + auto its = Sources.find(sourceId); + if (its != Sources.end()) { + return its->second; + } + + TSourceInfo result; + result.Pending = IsResearchRequires(GetPartitionNode()); + return result; +} + +bool TPartitionSourceManager::WaitSources() const { + return !PendingCookies.empty(); +} + +TPartitionSourceManager::TModificationBatch TPartitionSourceManager::CreateModificationBatch(const TActorContext& ctx) { + const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo() + ? ESourceIdFormat::Proto + : ESourceIdFormat::Raw; + return TModificationBatch(this, format); +} + +void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& ctx) { + auto it = PendingCookies.find(ev->Cookie); + if (it == PendingCookies.end()) { + PQ_LOG_D("Received TEvSourceIdResponse with unknown cookie: " << ev->Cookie); + return; + } + PendingCookies.erase(it); + + if (ev->Get()->Record.HasError()) { + PQ_LOG_D("Error on request SourceId: " << ev->Get()->Record.GetError()); + Responses.clear(); + + EnsureSource(ctx); + return; + } + + Responses.push_back(ev); + + if (PendingCookies.empty()) { + FinishBatch(ctx); + } +} + +TPartitionSourceManager::TPartitionNode TPartitionSourceManager::GetPartitionNode() const { + return Partition->PartitionGraph.GetPartition(Partition->Partition); +} + +TPartitionSourceManager::TSourceIdsPtr TPartitionSourceManager::BuildUnknownSourceIds() const { + auto& knownSourceIds = GetSourceIdStorage().GetInMemorySourceIds(); + auto unknownSourceIds = std::make_shared<std::set<const TString*>>(); + + auto predicate = [&](const TString& sourceId) { + return !Sources.contains(sourceId) && !knownSourceIds.contains(sourceId); + }; + + for(const auto& msg : Partition->Requests) { + if (msg.IsWrite()) { + const TString& sourceId = msg.GetWrite().Msg.SourceId; + if (predicate(sourceId)) { + unknownSourceIds->insert(&sourceId); + } + } + } + + for(const auto& ev : Partition->MaxSeqNoRequests) { + for (const auto& sourceId : ev->Get()->SourceIds) { + if (predicate(sourceId)) { + unknownSourceIds->insert(&sourceId); + } + } + } + + return unknownSourceIds; +} + +void TPartitionSourceManager::FinishBatch(const TActorContext& ctx) { + for(const auto& ev : Responses) { + const auto& record = ev->Get()->Record; + for(const auto& s : record.GetSource()) { + auto& value = Sources[s.GetId()]; + if (s.GetState() == NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown) { + continue; + } + PQ_LOG_T("Received SourceId " << s.GetId() << " SeqNo=" << s.GetSeqNo()); + if (value.State == TSourceIdInfo::EState::Unknown || value.SeqNo < s.GetSeqNo()) { + value.State = Convert(s.GetState()); + value.SeqNo = s.GetSeqNo(); + value.Offset = s.GetOffset(); + value.Explicit = s.GetExplicit(); + value.WriteTimestamp.FromValue(s.GetWriteTimestamp()); + } + } + } + + Responses.clear(); + + if (Partition->CurrentStateFunc() == &TPartition::StateIdle) { + Partition->HandleWrites(ctx); + } + Partition->ProcessMaxSeqNoRequest(ctx); +} + +TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const { + return Partition->SourceIdStorage; +} + +bool TPartitionSourceManager::HasParents() const { + auto node = Partition->PartitionGraph.GetPartition(Partition->Partition); + return node && !node.value()->Parents.empty(); +} + + +// +// TPartitionSourceManager::TModificationBatch +// + +TPartitionSourceManager::TModificationBatch::TModificationBatch(TPartitionSourceManager* manager, ESourceIdFormat format) + : Manager(manager) + , Node(manager->GetPartitionNode()) + , SourceIdWriter(format) + , HeartbeatEmitter(manager->Partition->SourceIdStorage) { +} + +TPartitionSourceManager::TModificationBatch::~TModificationBatch() { + for(auto& [k, _] : SourceIdWriter.GetSourceIdsToWrite()) { + Manager->Sources.erase(k); + } +} + +TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmit() const { + return HeartbeatEmitter.CanEmit(); +} + +TPartitionSourceManager::TSourceManager TPartitionSourceManager::TModificationBatch::GetSource(const TString& id) { + return TPartitionSourceManager::TSourceManager(this, id); +} + +void TPartitionSourceManager::TModificationBatch::Cancel() { + return SourceIdWriter.Clear(); +} + +bool TPartitionSourceManager::TModificationBatch::HasModifications() const { + return !SourceIdWriter.GetSourceIdsToWrite().empty(); +} + +void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) { + SourceIdWriter.FillRequest(request, Manager->Partition->Partition); +} + +void TPartitionSourceManager::TModificationBatch::DeregisterSourceId(const TString& sourceId) { + SourceIdWriter.DeregisterSourceId(sourceId); +} + +TPartitionSourceManager* TPartitionSourceManager::TModificationBatch::GetManager() const { + return Manager; +} + + +// +// TPartitionSourceManager::TSourceManager +// + +TPartitionSourceManager::TSourceInfo Convert(TSourceIdInfo value) { + TPartitionSourceManager::TSourceInfo result(value.State); + result.SeqNo = value.SeqNo; + result.Offset = value.Offset; + result.Explicit = value.Explicit; + result.WriteTimestamp = value.WriteTimestamp; + return result; +} + +TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch* batch, const TString& id) + : Batch(batch) + , SourceId(id) { + auto& memory = MemoryStorage(); + auto& writer = WriteStorage(); + auto& sources = batch->GetManager()->Sources; + + InMemory = memory.find(id); + InWriter = writer.find(id); + InSources = sources.end(); + + if (InWriter != writer.end()) { + Info = Convert(InWriter->second); + return; + } + if (InMemory != memory.end()) { + Info = Convert(InMemory->second); + return; + } + + InSources = sources.find(id); + if (InSources != sources.end()) { + Info = InSources->second; + return; + } + + Info.Pending = IsResearchRequires(batch->Node); +} + +std::optional<ui64> TPartitionSourceManager::TSourceManager::SeqNo() const { + return Info.State == TSourceIdInfo::EState::Unknown ? std::nullopt : std::optional(Info.SeqNo); +} + +bool TPartitionSourceManager::TSourceManager::Explicit() const { + return Info.Explicit; +} + +std::optional<ui64> TPartitionSourceManager::TSourceManager::CommittedSeqNo() const { + return InMemory == MemoryStorage().end() ? std::nullopt : std::optional(InMemory->second.SeqNo); +} + +std::optional<ui64> TPartitionSourceManager::TSourceManager::UpdatedSeqNo() const { + return InWriter == WriteStorage().end() ? std::nullopt : std::optional(InWriter->second.SeqNo); +} + +void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp) { + if (InMemory == MemoryStorage().end()) { + Batch->SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp); + } else { + Batch->SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp)); + } +} + +void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat) { + Batch->HeartbeatEmitter.Process(SourceId, heartbeat); + if (InMemory == MemoryStorage().end()) { + Batch->SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp, heartbeat); + } else { + Batch->SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp, std::move(heartbeat))); + } +} + +TPartitionSourceManager::TSourceManager::operator bool() const { + return Info; +} + +const TSourceIdMap& TPartitionSourceManager::TSourceManager::MemoryStorage() const { + return Batch->GetManager()->GetSourceIdStorage().GetInMemorySourceIds(); +} + +const TSourceIdMap& TPartitionSourceManager::TSourceManager::WriteStorage() const { + return Batch->SourceIdWriter.GetSourceIdsToWrite(); +} + + +// +// TSourceInfo +// + +TPartitionSourceManager::TSourceInfo::TSourceInfo(TSourceIdInfo::EState state) + : State(state) { +} + +TPartitionSourceManager::TSourceInfo::operator bool() const { + return !Pending; +} + + +// +// TSourceIdRequester +// + +class TSourceIdRequester : public TActorBootstrapped<TSourceIdRequester> { + static constexpr TDuration RetryDelay = TDuration::MilliSeconds(100); +public: + TSourceIdRequester(TActorId parent, ui32 partition, ui64 tabletId, TPartitionSourceManager::TSourceIdsPtr& sourceIds, ui64 cookie) + : Parent(parent) + , Partition(partition) + , TabletId(tabletId) + , SourceIds(sourceIds) + , Cookie(cookie) { + } + + void Bootstrap(const TActorContext&) { + Become(&TThis::StateWork); + MakePipe(); + } + + void PassAway() override { + if (PipeClient) { + NTabletPipe::CloseAndForgetClient(SelfId(), PipeClient); + } + TActorBootstrapped::PassAway(); + } + +private: + void MakePipe() { + NTabletPipe::TClientConfig config; + config.RetryPolicy = { + .RetryLimitCount = 6, + .MinRetryTime = TDuration::MilliSeconds(10), + .MaxRetryTime = TDuration::MilliSeconds(200), + .BackoffMultiplier = 2, + .DoFirstRetryInstantly = true + }; + PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, config)); + } + + std::unique_ptr<TEvPQ::TEvSourceIdRequest> CreateRequest() { + auto request = std::make_unique<TEvPQ::TEvSourceIdRequest>(); + auto& record = request->Record; + record.SetPartition(Partition); + for(const auto& sourceId : *SourceIds) { + record.AddSourceId(*sourceId); + } + return request; + } + + void Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& /*ctx*/) { + auto msg = std::make_unique<TEvPQ::TEvSourceIdResponse>(); + msg->Record = std::move(ev->Get()->Record); + + Reply(msg); + PassAway(); + } + + void DoRequest() { + NTabletPipe::SendData(SelfId(), PipeClient, CreateRequest().release()); + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { + if (ev->Get()->Status == NKikimrProto::EReplyStatus::OK) { + DoRequest(); + } else { + ReplyWithError("Error connecting to PQ"); + } + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { + if (ev->Get()->TabletId == TabletId) { + ReplyWithError("Pipe destroyed"); + } + } + + void ReplyWithError(const TString& error) { + auto msg = std::make_unique<TEvPQ::TEvSourceIdResponse>(); + msg->Record.SetError(error); + + Reply(msg); + + PassAway(); + } + + void Reply(std::unique_ptr<TEvPQ::TEvSourceIdResponse>& msg) { + Send(Parent, msg.release(), 0, Cookie); + } + + STFUNC(StateWork) + { + switch (ev->GetTypeRewrite()) { + HFunc(TEvPQ::TEvSourceIdResponse, Handle); + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + TActorId Parent; + ui32 Partition; + ui64 TabletId; + TPartitionSourceManager::TSourceIdsPtr SourceIds; + ui64 Cookie; + + TActorId PipeClient; +}; + +IActor* CreateRequester(TActorId parent, ui32 partition, ui64 tabletId, TPartitionSourceManager::TSourceIdsPtr& sourceIds, ui64 cookie) { + return new TSourceIdRequester(parent, partition, tabletId, sourceIds, cookie); +} + +bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node) { + return node && !node.value()->Parents.empty(); +} + +NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value) { + switch(value) { + case TSourceIdInfo::EState::Unknown: + return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown; + case TSourceIdInfo::EState::Registered: + return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Registered; + case TSourceIdInfo::EState::PendingRegistration: + return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_PendingRegistration; + } +} + +TSourceIdInfo::EState Convert(NKikimrPQ::TEvSourceIdResponse::EState value) { + switch(value) { + case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown: + return TSourceIdInfo::EState::Unknown; + case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Registered: + return TSourceIdInfo::EState::Registered; + case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_PendingRegistration: + return TSourceIdInfo::EState::PendingRegistration; + } +} + + + +} // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h new file mode 100644 index 0000000000..6f57ecdb18 --- /dev/null +++ b/ydb/core/persqueue/partition_sourcemanager.h @@ -0,0 +1,140 @@ +#pragma once + +#include <util/generic/fwd.h> +#include <ydb/core/persqueue/events/internal.h> + +#include "sourceid.h" +#include "utils.h" + +namespace NKikimr::NPQ { + +class TPartition; + +class TPartitionSourceManager { +private: + using TPartitionNode = std::optional<const TPartitionGraph::Node *>; + +public: + using TSourceIdsPtr = std::shared_ptr<std::set<const TString*>>; + class TModificationBatch; + + struct TSourceInfo { + TSourceInfo(TSourceIdInfo::EState state = TSourceIdInfo::EState::Unknown); + + TSourceIdInfo::EState State; + ui64 SeqNo = 0; + ui64 Offset = 0; + bool Explicit = false; + TInstant WriteTimestamp; + + bool Pending = false; + + operator bool() const; + }; + + class TSourceManager { + public: + TSourceManager(TModificationBatch* batch, const TString& id); + + // Checks whether a message with the specified Sourceid can be processed. + // The message can be processed if it is not required to receive information + // about it in the parent partitions, or this information has already been received. + bool CanProcess() const; + + std::optional<ui64> SeqNo() const; + bool Explicit() const; + + std::optional<ui64> CommittedSeqNo() const; + std::optional<ui64> UpdatedSeqNo() const; + + void Update(ui64 seqNo, ui64 offset, TInstant timestamp); + void Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat); + + operator bool() const; + + private: + const TSourceIdMap& MemoryStorage() const; + const TSourceIdMap& WriteStorage() const; + + + private: + TModificationBatch* Batch; + const TString SourceId; + + TSourceInfo Info; + + TSourceIdMap::const_iterator InMemory; + TSourceIdMap::const_iterator InWriter; + std::unordered_map<TString, TSourceInfo>::const_iterator InSources; + }; + + // Encapsulates the logic of SourceId operation during modification + class TModificationBatch { + friend TSourceManager; + public: + TModificationBatch(TPartitionSourceManager* manager, ESourceIdFormat format); + ~TModificationBatch(); + + TMaybe<THeartbeat> CanEmit() const; + TSourceManager GetSource(const TString& id); + + void Cancel(); + bool HasModifications() const; + void FillRequest(TEvKeyValue::TEvRequest* request); + + template <typename... Args> + void RegisterSourceId(const TString& sourceId, Args&&... args) { + SourceIdWriter.RegisterSourceId(sourceId, std::forward<Args>(args)...); + } + void DeregisterSourceId(const TString& sourceId); + + private: + TPartitionSourceManager* GetManager() const; + + private: + TPartitionSourceManager* Manager; + + TPartitionNode Node; + TSourceIdWriter SourceIdWriter; + THeartbeatEmitter HeartbeatEmitter; + }; + + + TPartitionSourceManager(TPartition* partition); + + // For a partition obtained as a result of a merge or split, it requests + // information about the consumer's parameters from the parent partitions. + void EnsureSource(const TActorContext& ctx); + + // Returns true if we expect a response from the parent partitions + bool WaitSources() const; + + const TSourceInfo Get(const TString& sourceId) const; + + TModificationBatch CreateModificationBatch(const TActorContext& ctx); + +public: + void Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& ctx); + +private: + TPartitionNode GetPartitionNode() const; + TSourceIdsPtr BuildUnknownSourceIds() const; + void FinishBatch(const TActorContext& ctx); + + TSourceIdStorage& GetSourceIdStorage() const; + bool HasParents() const; + +private: + TPartition* Partition; + + std::unordered_map<TString, TSourceInfo> Sources; + + ui64 Cookie = 0; + std::set<ui64> PendingCookies; + std::vector<TEvPQ::TEvSourceIdResponse::TPtr> Responses; +}; + +NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value); +TSourceIdInfo::EState Convert(NKikimrPQ::TEvSourceIdResponse::EState value); + +} // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 528dbc18cf..2c8661eee4 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1,5 +1,6 @@ #include "event_helpers.h" #include "mirrorer.h" +#include "partition_log.h" #include "partition_util.h" #include "partition.h" #include "read.h" @@ -28,6 +29,8 @@ static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB; static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB; static const ui32 MAX_INLINE_SIZE = 1000; +static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL; + void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition); @@ -78,13 +81,13 @@ void TPartition::HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActor } void TPartition::HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvUpdateAvailableSize. Partition: " << Partition); + PQ_LOG_T("TPartition::HandleOnWrite TEvUpdateAvailableSize."); UpdateAvailableSize(ctx); } void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::CancelAllWritesOnIdle. Partition: " << Partition); + PQ_LOG_T("TPartition::CancelAllWritesOnIdle."); for (const auto& w : Requests) { ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL, "Disk is full"); @@ -106,7 +109,7 @@ void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) { } void TPartition::FailBadClient(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::FailBadClient. Partition: " << Partition); + PQ_LOG_T("TPartition::FailBadClient."); for (auto it = Owners.begin(); it != Owners.end();) { it = DropOwner(it, ctx); @@ -138,7 +141,7 @@ void TPartition::FailBadClient(const TActorContext& ctx) { } void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessChangeOwnerRequest. Partition: " << Partition); + PQ_LOG_T("TPartition::ProcessChangeOwnerRequest."); auto &owner = ev->Owner; auto it = Owners.find(owner); @@ -163,7 +166,7 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator& it, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::DropOwner. Partition: " << Partition); + PQ_LOG_D("TPartition::DropOwner."); Y_ABORT_UNLESS(ReservedSize >= it->second.ReservedSize); ReservedSize -= it->second.ReservedSize; @@ -179,7 +182,7 @@ THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THas } void TPartition::Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvChangeOwner. Partition: " << Partition); + PQ_LOG_T("TPartition::HandleOnWrite TEvChangeOwner."); bool res = OwnerPipes.insert(ev->Get()->PipeClient).second; Y_ABORT_UNLESS(res); @@ -188,7 +191,7 @@ void TPartition::Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ct } void TPartition::ProcessReserveRequests(const TActorContext& ctx) { - LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessReserveRequests. Partition: " << Partition); + PQ_LOG_T("TPartition::ProcessReserveRequests."); const ui64 maxWriteInflightSize = Config.GetPartitionConfig().GetMaxWriteInflightSize(); @@ -236,7 +239,7 @@ void TPartition::UpdateWriteBufferIsFullState(const TInstant& now) { void TPartition::Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvReserveBytes. Partition: " << Partition); + PQ_LOG_T("TPartition::HandleOnWrite TEvReserveBytes."); const TString& ownerCookie = ev->Get()->OwnerCookie; TStringBuf owner = TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie); @@ -267,7 +270,7 @@ void TPartition::HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ct } void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { - LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AnswerCurrentWrites. Partition: " << Partition); + PQ_LOG_T("TPartition::AnswerCurrentWrites. Responses.size()=" << Responses.size()); ui64 offset = EndOffset; while (!Responses.empty()) { @@ -306,6 +309,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { offset = *wrOffset; } } + if (!already && partNo + 1 == totalParts) { if (it == SourceIdStorage.GetInMemorySourceIds().end()) { Y_ABORT_UNLESS(!writeResponse.Msg.HeartbeatVersion); @@ -392,7 +396,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { } void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::SyncMemoryStateWithKVState. Partition: " << Partition); + PQ_LOG_T("TPartition::SyncMemoryStateWithKVState."); if (!CompactedKeys.empty()) HeadKeys.clear(); @@ -460,13 +464,13 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) { } void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvHandleWriteResponse. Partition: " << Partition); + PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse."); HandleWriteResponse(ctx); } void TPartition::HandleWriteResponse(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleWriteResponse. Partition: " << Partition); + PQ_LOG_T("TPartition::HandleWriteResponse."); Y_ABORT_UNLESS(CurrentStateFunc() == &TThis::StateWrite); ui64 prevEndOffset = EndOffset; @@ -539,7 +543,13 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { } void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvWrite. Partition: " << Partition); + PQ_LOG_T("TPartition::HandleOnWrite TEvWrite."); + + if (!CanEnqueue()) { + ReplyError(ctx, ev->Get()->Cookie, InactivePartitionErrorCode, + TStringBuilder() << "Write to inactive partition"); + return; + } ui32 sz = std::accumulate(ev->Get()->Msgs.begin(), ev->Get()->Msgs.end(), 0u, [](ui32 sum, const TEvPQ::TEvWrite::TMsg& msg) { return sum + msg.Data.size(); @@ -654,7 +664,7 @@ void TPartition::HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TA } void TPartition::HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvRegisterMessageGroup. Partition: " << Partition); + PQ_LOG_T("TPartition::HandleOnWrite TEvRegisterMessageGroup."); const auto& body = ev->Get()->Body; @@ -692,7 +702,7 @@ void TPartition::HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const } void TPartition::HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvDeregisterMessageGroup. Partition: " << Partition); + PQ_LOG_T("TPartition::HandleOnWrite TEvDeregisterMessageGroup."); const auto& body = ev->Get()->Body; @@ -711,7 +721,7 @@ void TPartition::HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActo } void TPartition::HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvSplitMessageGroup. Partition: " << Partition); + PQ_LOG_T("TPartition::HandleOnWrite TEvSplitMessageGroup."); if (ev->Get()->Deregistrations.size() > 1) { return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, @@ -768,7 +778,7 @@ std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessChangeOwnerRequests. Partition: " << Partition); + PQ_LOG_T("TPartition::ProcessChangeOwnerRequests."); while (!WaitToChangeOwner.empty()) { auto &ev = WaitToChangeOwner.front(); @@ -784,8 +794,8 @@ void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) { } } -void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::CancelAllWritesOnWrite. Partition: " << Partition); +void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TPartitionSourceManager::TModificationBatch& sourceIdBatch, NPersQueue::NErrorCode::EErrorCode errorCode) { + PQ_LOG_T("TPartition::CancelAllWritesOnWrite."); ReplyError(ctx, p.Cookie, errorCode, errorStr); TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); @@ -793,7 +803,7 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T FailBadClient(ctx); NewHead.Clear(); NewHead.Offset = EndOffset; - sourceIdWriter.Clear(); + sourceIdBatch.Cancel(); request->Record.Clear(); PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); CompactedKeys.clear(); @@ -801,7 +811,7 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T WriteCycleSize = 0; } -bool TPartition::ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters) { +TPartition::ProcessResult TPartition::ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters) { auto& body = msg.Body; TMaybe<TPartitionKeyRange> keyRange; @@ -810,20 +820,20 @@ bool TPartition::ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters } body.AssignedOffset = parameters.CurOffset; - parameters.SourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange)); + parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange)); - return true; + return ProcessResult::Continue; } -bool TPartition::ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters) { - parameters.SourceIdWriter.DeregisterSourceId(msg.Body.SourceId); +TPartition::ProcessResult TPartition::ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters) { + parameters.SourceIdBatch.DeregisterSourceId(msg.Body.SourceId); - return true; + return ProcessResult::Continue; } -bool TPartition::ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters) { +TPartition::ProcessResult TPartition::ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters) { for (auto& body : msg.Deregistrations) { - parameters.SourceIdWriter.DeregisterSourceId(body.SourceId); + parameters.SourceIdBatch.DeregisterSourceId(body.SourceId); } for (auto& body : msg.Registrations) { @@ -833,16 +843,20 @@ bool TPartition::ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& p } body.AssignedOffset = parameters.CurOffset; - parameters.SourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true); + parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true); } - return true; + return ProcessResult::Continue; } -bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx) { +TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx) { ui64& curOffset = parameters.CurOffset; - TSourceIdWriter& sourceIdWriter = parameters.SourceIdWriter; - THeartbeatEmitter& heartbeatEmitter = parameters.HeartbeatEmitter; + auto& sourceIdBatch = parameters.SourceIdBatch; + auto sourceId = sourceIdBatch.GetSource(p.Msg.SourceId); + + if (!sourceId) { + return ProcessResult::Break; + } WriteInflightSize -= p.Msg.Data.size(); @@ -851,21 +865,16 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv ui64 poffset = p.Offset ? *p.Offset : curOffset; - auto it_inMemory = SourceIdStorage.GetInMemorySourceIds().find(p.Msg.SourceId); - auto it_toWrite = sourceIdWriter.GetSourceIdsToWrite().find(p.Msg.SourceId); - if (!p.Msg.DisableDeduplication && (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end() && it_inMemory->second.SeqNo >= p.Msg.SeqNo || (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end() && it_toWrite->second.SeqNo >= p.Msg.SeqNo))) { - bool isWriting = (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end()); - bool isCommitted = (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end()); - + if (!p.Msg.DisableDeduplication && sourceId.SeqNo() && *sourceId.SeqNo() >= p.Msg.SeqNo) { if (poffset >= curOffset) { LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, "Already written message. Topic: '" << TopicName() << "' Partition: " << Partition << " SourceId: '" << EscapeC(p.Msg.SourceId) - << "'. Message seqNo = " << p.Msg.SeqNo - << ". Committed seqNo = " << (isCommitted ? it_inMemory->second.SeqNo : 0) - << (isWriting ? ". Writing seqNo: " : ". ") << (isWriting ? it_toWrite->second.SeqNo : 0) - << " EndOffset " << EndOffset << " CurOffset " << curOffset << " offset " << poffset + << "'. Message seqNo: " << p.Msg.SeqNo + << ". Committed seqNo: " << sourceId.CommittedSeqNo() + << ". Writing seqNo: " << sourceId.UpdatedSeqNo() + << ". EndOffset: " << EndOffset << ". CurOffset: " << curOffset << ". Offset: " << poffset ); TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1); @@ -876,19 +885,19 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv } TString().swap(p.Msg.Data); - return true; + return ProcessResult::Continue; } if (const auto& hbVersion = p.Msg.HeartbeatVersion) { - if (it_inMemory == SourceIdStorage.GetInMemorySourceIds().end()) { + if (!sourceId.SeqNo()) { CancelAllWritesOnWrite(ctx, request, TStringBuilder() - << "Cannot apply heartbeat on unknown sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdWriter); - return false; + << "Cannot apply heartbeat on unknown sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdBatch); + return ProcessResult::Abort; } - if (!it_inMemory->second.Explicit) { + if (!sourceId.Explicit()) { CancelAllWritesOnWrite(ctx, request, TStringBuilder() - << "Cannot apply heartbeat on implcit sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdWriter); - return false; + << "Cannot apply heartbeat on implcit sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdBatch); + return ProcessResult::Abort; } LOG_DEBUG_S( @@ -903,20 +912,17 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv .Data = p.Msg.Data, }; - heartbeatEmitter.Process(p.Msg.SourceId, heartbeat); - sourceIdWriter.RegisterSourceId(p.Msg.SourceId, it_inMemory->second.Updated( - p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat) - )); + sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat)); - return true; + return ProcessResult::Continue; } if (poffset < curOffset) { //too small offset CancelAllWritesOnWrite(ctx, request, TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo << " partNo: " << p.Msg.PartNo << " has incorrect offset " << poffset << ", must be at least " << curOffset, - p, sourceIdWriter, NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET); - return false; + p, sourceIdBatch, NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET); + return ProcessResult::Abort; } Y_ABORT_UNLESS(poffset >= curOffset); @@ -928,8 +934,8 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo << " partNo: " << p.Msg.PartNo << " has gap inside partitioned message, incorrect offset " << poffset << ", must be " << curOffset, - p, sourceIdWriter); - return false; + p, sourceIdBatch); + return ProcessResult::Abort; } curOffset = poffset; } @@ -971,9 +977,9 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv TString s; if (!PartitionedBlob.IsNextPart(p.Msg.SourceId, p.Msg.SeqNo, p.Msg.PartNo, &s)) { //this must not be happen - client sends gaps, fail this client till the end - CancelAllWritesOnWrite(ctx, request, s, p, sourceIdWriter); + CancelAllWritesOnWrite(ctx, request, s, p, sourceIdBatch); //now no changes will leak - return false; + return ProcessResult::Abort; } WriteNewSize += p.Msg.SourceId.size() + p.Msg.Data.size(); @@ -1108,11 +1114,7 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv << " NewHead: " << NewHead ); - if (it_inMemory == SourceIdStorage.GetInMemorySourceIds().end()) { - sourceIdWriter.RegisterSourceId(p.Msg.SourceId, p.Msg.SeqNo, curOffset, CurrentTimestamp); - } else { - sourceIdWriter.RegisterSourceId(p.Msg.SourceId, it_inMemory->second.Updated(p.Msg.SeqNo, curOffset, CurrentTimestamp)); - } + sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp); ++curOffset; PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); @@ -1120,15 +1122,15 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv TString().swap(p.Msg.Data); - return true; + return ProcessResult::Continue; } bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, - TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter) + TPartitionSourceManager::TModificationBatch& sourceIdBatch) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AppendHeadWithNewWrites. Partition: " << Partition); + PQ_LOG_T("TPartition::AppendHeadWithNewWrites."); - ProcessParameters parameters(sourceIdWriter, heartbeatEmitter); + ProcessParameters parameters(sourceIdBatch); parameters.CurOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset; WriteCycleSize = 0; @@ -1152,28 +1154,35 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const //Process is following: if batch contains already written messages or only one client message part -> unpack it and process as several TClientBlobs //otherwise write this batch as is to head; - while (!Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big + bool run = true; + while (run && !Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big auto pp = Requests.front(); Requests.pop_front(); - bool processed = true; + ProcessResult result = ProcessResult::Continue; if (pp.IsWrite()) { - processed = ProcessRequest(pp.GetWrite(), parameters, request, ctx); + result = ProcessRequest(pp.GetWrite(), parameters, request, ctx); } else if (pp.IsRegisterMessageGroup()) { - processed = ProcessRequest(pp.GetRegisterMessageGroup(), parameters); + result = ProcessRequest(pp.GetRegisterMessageGroup(), parameters); } else if (pp.IsDeregisterMessageGroup()) { - processed = ProcessRequest(pp.GetDeregisterMessageGroup(), parameters); + result = ProcessRequest(pp.GetDeregisterMessageGroup(), parameters); } else if (pp.IsSplitMessageGroup()) { - processed = ProcessRequest(pp.GetSplitMessageGroup(), parameters); + result = ProcessRequest(pp.GetSplitMessageGroup(), parameters); } else { Y_ABORT_UNLESS(pp.IsOwnership()); } - if (!processed) { - return false; + switch (result) { + case ProcessResult::Abort: + return false; + case ProcessResult::Break: + Requests.push_front(pp); + run = false; + break; + case ProcessResult::Continue: + EmplaceResponse(std::move(pp), ctx); + break; } - - EmplaceResponse(std::move(pp), ctx); } UpdateWriteBufferIsFullState(ctx.Now()); @@ -1233,7 +1242,7 @@ std::pair<TKey, ui32> TPartition::GetNewWriteKey(bool headCleared) { } void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AddNewWriteBlob. Partition: " << Partition); + PQ_LOG_T("TPartition::AddNewWriteBlob."); const auto& key = res.first; @@ -1321,7 +1330,7 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq } void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::SetDeadlinesForWrites. Partition: " << Partition); + PQ_LOG_T("TPartition::SetDeadlinesForWrites."); if (AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs() > 0 && QuotaDeadline == TInstant::Zero()) { QuotaDeadline = ctx.Now() + TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs()); @@ -1331,13 +1340,13 @@ void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) { } void TPartition::Handle(TEvPQ::TEvQuotaDeadlineCheck::TPtr&, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::Handle TEvQuotaDeadlineCheck. Partition: " << Partition); + PQ_LOG_T("TPartition::Handle TEvQuotaDeadlineCheck."); FilterDeadlinedWrites(ctx); } bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, const TActorContext& ctx) { - LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessWrites. Partition: " << Partition); + PQ_LOG_T("TPartition::ProcessWrites."); FilterDeadlinedWrites(ctx); @@ -1364,14 +1373,9 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c Y_ABORT_UNLESS(request->Record.CmdRenameSize() == 0); Y_ABORT_UNLESS(request->Record.CmdDeleteRangeSize() == 0); - const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo() - ? ESourceIdFormat::Proto - : ESourceIdFormat::Raw; - TSourceIdWriter sourceIdWriter(format); - THeartbeatEmitter heartbeatEmitter(SourceIdStorage); - - bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter, heartbeatEmitter); + auto sourceIdBatch = SourceManager.CreateModificationBatch(ctx); + bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdBatch); if (headCleared) { Y_ABORT_UNLESS(!CompactedKeys.empty() || Head.PackedSize == 0); for (ui32 i = 0; i < TotalLevels; ++i) { @@ -1379,7 +1383,7 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c } } - if (const auto heartbeat = heartbeatEmitter.CanEmit()) { + if (const auto heartbeat = sourceIdBatch.CanEmit()) { LOG_INFO_S( ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName() << "' partition " << Partition @@ -1408,17 +1412,17 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c } if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed - if (sourceIdWriter.GetSourceIdsToWrite().empty()) { + if (!sourceIdBatch.HasModifications()) { return request->Record.CmdWriteSize() > 0 || request->Record.CmdRenameSize() > 0 || request->Record.CmdDeleteRangeSize() > 0; } else { - sourceIdWriter.FillRequest(request, Partition); + sourceIdBatch.FillRequest(request); return true; } } - sourceIdWriter.FillRequest(request, Partition); + sourceIdBatch.FillRequest(request); std::pair<TKey, ui32> res = GetNewWriteKey(headCleared); const auto& key = res.first; @@ -1440,7 +1444,7 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) { if (QuotaDeadline == TInstant::Zero() || QuotaDeadline > ctx.Now()) return; - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::FilterDeadlinedWrites. Partition: " << Partition); + PQ_LOG_T("TPartition::FilterDeadlinedWrites."); std::deque<TMessage> newRequests; for (auto& w : Requests) { @@ -1466,7 +1470,27 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) { void TPartition::HandleWrites(const TActorContext& ctx) { - LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleWrites. Partition: " << Partition); + if (!CanWrite()) { + if (CanEnqueue()) { + return; + } else { + for(const auto& r : ReserveRequests) { + ReplyError(ctx, r->Cookie, InactivePartitionErrorCode, + TStringBuilder() << "Write to inactive partition"); + } + ReserveRequests.clear(); + for(const auto& r : Requests) { + ReplyError(ctx, r.GetCookie(), InactivePartitionErrorCode, + TStringBuilder() << "Write to inactive partition"); + } + Requests.clear(); + return; + } + } + + PQ_LOG_T("TPartition::HandleWrites. Requests.size()=" << Requests.size()); + + SourceManager.EnsureSource(ctx); Become(&TThis::StateWrite); @@ -1495,7 +1519,11 @@ void TPartition::HandleWrites(const TActorContext& ctx) { bool res = ProcessWrites(request.Get(), now, ctx); Y_ABORT_UNLESS(!res); } - Y_ABORT_UNLESS(Requests.empty() || !WriteQuota->CanExaust(now) || WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx)); //in this case all writes must be processed or no quota left + Y_ABORT_UNLESS(Requests.empty() + || !WriteQuota->CanExaust(now) + || WaitingForPreviousBlobQuota() + || WaitingForSubDomainQuota(ctx) + || SourceManager.WaitSources()); //in this case all writes must be processed or no quota left AnswerCurrentWrites(ctx); //in case if all writes are already done - no answer will be called on kv write, no kv write at all BecomeIdle(ctx); return; @@ -1541,8 +1569,8 @@ bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 w return MeteringDataSize(ctx) + withSize > ReserveSize(); } -void TPartition::WriteBlobWithQuota(const TActorContext& ctx, THolder<TEvKeyValue::TEvRequest>&& request) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::WriteBlobWithQuota. Partition: " << Partition); +void TPartition::WriteBlobWithQuota(const TActorContext& /*ctx*/, THolder<TEvKeyValue::TEvRequest>&& request) { + PQ_LOG_T("TPartition::WriteBlobWithQuota."); // Request quota and write blob. // Mirrored topics are not quoted in local dc. diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 9d79ec3440..4a08925350 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1163,26 +1163,33 @@ void TPersQueue::Handle(TEvPQ::TEvTabletCacheCounters::TPtr& ev, const TActorCon CacheCounters = ev->Get()->Counters; SetCacheCounters(CacheCounters); - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " topic '" << TopicConverter->GetClientsideName() + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " topic '" << (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined") << "Counters. CacheSize " << CacheCounters.CacheSizeBytes << " CachedBlobs " << CacheCounters.CacheSizeBlobs); } void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& ctx) { - auto it = Partitions.find(ev->Get()->Partition); + const auto partitionId = ev->Get()->Partition; + auto it = Partitions.find(partitionId); Y_ABORT_UNLESS(it != Partitions.end()); Y_ABORT_UNLESS(!it->second.InitDone); it->second.InitDone = true; ++PartitionsInited; Y_ABORT_UNLESS(ConfigInited);//partitions are inited only after config - if (!InitCompleted && PartitionsInited == Partitions.size()) { + auto allInitialized = PartitionsInited == Partitions.size(); + if (!InitCompleted && allInitialized) { OnInitComplete(ctx); } - if (NewConfigShouldBeApplied && PartitionsInited == Partitions.size()) { + if (NewConfigShouldBeApplied && allInitialized) { ApplyNewConfigAndReply(ctx); } + + ProcessSourceIdRequests(partitionId); + if (allInitialized) { + SourceIdRequests.clear(); + } } void TPersQueue::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) @@ -1221,7 +1228,7 @@ void TPersQueue::FinishResponse(THashMap<ui64, TAutoPtr<TResponseBuilder>>::iter void TPersQueue::Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorContext& ctx) -{ +{ if (!ConfigInited) { UpdateConfigRequests.emplace_back(ev->Release(), ev->Sender); return; @@ -1844,7 +1851,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << - " got client PART message topic: " << TopicConverter->GetClientsideName() << " partition: " << req.GetPartition() + " got client PART message topic: " << (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined") << " partition: " << req.GetPartition() << " SourceId: \'" << EscapeC(msgs.back().SourceId) << "\' SeqNo: " << msgs.back().SeqNo << " partNo : " << msgs.back().PartNo << " messageNo: " << req.GetMessageNo() << " size: " << data.size() @@ -1878,7 +1885,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p } LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << - " got client message topic: " << TopicConverter->GetClientsideName() << + " got client message topic: " << (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined") << " partition: " << req.GetPartition() << " SourceId: \'" << EscapeC(msgs.back().SourceId) << "\' SeqNo: " << msgs.back().SeqNo << " partNo : " << msgs.back().PartNo << @@ -2171,7 +2178,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ui32 partition = req.GetPartition(); auto it = Partitions.find(partition); - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic " << TopicConverter->GetClientsideName() << " partition " << partition); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '" + << (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined") << "' partition " << partition); if (it == Partitions.end()) { ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::WRONG_PARTITION_NUMBER, @@ -3613,6 +3621,38 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, con ctx.Send(ev->Sender, new TEvPersQueue::TEvProposeTransactionAttachResult(TabletID(), txId, status), 0, ev->Cookie); } +void TPersQueue::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record; + auto it = Partitions.find(record.GetPartition()); + if (it == Partitions.end()) { + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition()); + + auto response = THolder<TEvPQ::TEvSourceIdResponse>(); + response->Record.SetError("Partition was not found"); + Send(ev->Sender, response.Release()); + + return; + } + + if (it->second.InitDone) { + Forward(ev, it->second.Actor); + } else { + SourceIdRequests[record.GetPartition()].push_back(ev); + } +} + +void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) { + auto sit = SourceIdRequests.find(partitionId); + if (sit != SourceIdRequests.end()) { + auto it = Partitions.find(partitionId); + for (auto& r : sit->second) { + Forward(r, it->second.Actor); + } + SourceIdRequests.erase(partitionId); + } +} + + bool TPersQueue::HandleHook(STFUNC_SIG) { SetActivityType(NKikimrServices::TActivity::PERSQUEUE_ACTOR); @@ -3654,6 +3694,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG) HFuncTraced(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); HFuncTraced(TEvPersQueue::TEvCancelTransactionProposal, Handle); HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle); + HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle); default: return false; } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index f30645995f..6b31eb64fc 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -157,6 +157,9 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { ui64 GetAllowedStep() const; + void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx); + void ProcessSourceIdRequests(ui32 partitionId); + static constexpr const char * KeyConfig() { return "_config"; } static constexpr const char * KeyState() { return "_state"; } static constexpr const char * KeyTxInfo() { return "_txinfo"; } @@ -375,6 +378,8 @@ private: bool CanProcessDeleteTxs() const; bool UseMediatorTimeCast = true; + + THashMap<ui32, TVector<TEvPQ::TEvSourceIdRequest::TPtr>> SourceIdRequests; }; diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp index ee78d0674c..69216b487e 100644 --- a/ydb/core/persqueue/sourceid.cpp +++ b/ydb/core/persqueue/sourceid.cpp @@ -116,6 +116,15 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs) { } +TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat& heartbeat) + : SeqNo(seqNo) + , Offset(offset) + , WriteTimestamp(createTs) + , CreateTimestamp(createTs) + , LastHeartbeat(heartbeat) +{ +} + TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit) : SeqNo(seqNo) , Offset(offset) diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index 1c6cfd34bf..c5ac29cd81 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -34,6 +34,7 @@ struct TSourceIdInfo { TSourceIdInfo() = default; TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs); + TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat& heartbeat); TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false); TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const; diff --git a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt index a1a9555e7e..2f3840b3b6 100644 --- a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt @@ -34,6 +34,8 @@ target_link_libraries(ydb-core-persqueue-ut PUBLIC persqueue-ut-common core-testlib-default ydb_persqueue_core-ut-ut_utils + ydb_topic-ut-ut_utils + tx-schemeshard-ut_helpers library-cpp-resource ) target_link_options(ydb-core-persqueue-ut PRIVATE @@ -52,9 +54,11 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partitiongraph_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/splitmerge_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt index c97fc74541..65612f86de 100644 --- a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt @@ -34,6 +34,8 @@ target_link_libraries(ydb-core-persqueue-ut PUBLIC persqueue-ut-common core-testlib-default ydb_persqueue_core-ut-ut_utils + ydb_topic-ut-ut_utils + tx-schemeshard-ut_helpers library-cpp-resource ) target_link_options(ydb-core-persqueue-ut PRIVATE @@ -55,9 +57,11 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partitiongraph_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/splitmerge_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt index a5cbaf7829..29e6d7b08e 100644 --- a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt @@ -35,6 +35,8 @@ target_link_libraries(ydb-core-persqueue-ut PUBLIC persqueue-ut-common core-testlib-default ydb_persqueue_core-ut-ut_utils + ydb_topic-ut-ut_utils + tx-schemeshard-ut_helpers library-cpp-resource ) target_link_options(ydb-core-persqueue-ut PRIVATE @@ -56,9 +58,11 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partitiongraph_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/splitmerge_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp diff --git a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt index f5a43dc332..347416de43 100644 --- a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt @@ -34,6 +34,8 @@ target_link_libraries(ydb-core-persqueue-ut PUBLIC persqueue-ut-common core-testlib-default ydb_persqueue_core-ut-ut_utils + ydb_topic-ut-ut_utils + tx-schemeshard-ut_helpers library-cpp-resource ) target_sources(ydb-core-persqueue-ut PRIVATE @@ -45,9 +47,11 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partitiongraph_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/splitmerge_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index 948224d296..7e4b911c68 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -916,7 +916,8 @@ void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdReadResult()); auto res = result->Record.GetPartitionResponse().GetCmdReadResult(); - UNIT_ASSERT_EQUAL(res.ResultSize(), resCount); + UNIT_ASSERT_EQUAL_C(res.ResultSize(), resCount, + "Result size missmatch: expected " << resCount << " but received " << res.ResultSize()); ui64 off = offset; for (ui32 i = 0; i < resCount; ++i) { diff --git a/ydb/core/persqueue/ut/fetch_request_ut.cpp b/ydb/core/persqueue/ut/fetch_request_ut.cpp index b6d928c9f0..b1db820001 100644 --- a/ydb/core/persqueue/ut/fetch_request_ut.cpp +++ b/ydb/core/persqueue/ut/fetch_request_ut.cpp @@ -89,6 +89,8 @@ Y_UNIT_TEST_SUITE(TFetchRequestTests) { auto& runtime = setup->GetRuntime(); StartSchemeCache(runtime); + runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_TRACE); + ui32 totalPartitions = 5; setup->CreateTopic("topic1", "dc1", totalPartitions); @@ -103,6 +105,7 @@ Y_UNIT_TEST_SUITE(TFetchRequestTests) { auto ev = runtime.GrabEdgeEvent<TEvPQ::TEvFetchResponse>(); UNIT_ASSERT_C(ev->Status == Ydb::StatusIds::SCHEME_ERROR, ev->Message); } + Y_UNIT_TEST(CheckAccess) { auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); auto& runtime = setup->GetRuntime(); diff --git a/ydb/core/persqueue/ut/partitiongraph_ut.cpp b/ydb/core/persqueue/ut/partitiongraph_ut.cpp new file mode 100644 index 0000000000..0003f18cec --- /dev/null +++ b/ydb/core/persqueue/ut/partitiongraph_ut.cpp @@ -0,0 +1,88 @@ + +#include <ydb/core/persqueue/utils.h> + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NKikimr::NPQ; + +Y_UNIT_TEST_SUITE(TPartitionGraphTest) { + Y_UNIT_TEST(BuildGraph) { + + // 0 ------------ + // 1 -| + // |- 3 -| + // 2 -| |- 5 + // 4 -------| + // + NKikimrPQ::TPQTabletConfig config; + + // Without parents and childrens + auto* p0 = config.AddAllPartitions(); + p0->SetPartitionId(0); + + auto* p1 = config.AddAllPartitions(); + p1->SetPartitionId(1); + p1->AddChildPartitionIds(3); + + auto* p2 = config.AddAllPartitions(); + p2->SetPartitionId(2); + p2->AddChildPartitionIds(3); + + auto* p3 = config.AddAllPartitions(); + p3->SetPartitionId(3); + p3->AddChildPartitionIds(5); + p3->AddParentPartitionIds(1); + p3->AddParentPartitionIds(2); + + auto* p4 = config.AddAllPartitions(); + p4->SetPartitionId(4); + p4->AddChildPartitionIds(5); + + auto* p5 = config.AddAllPartitions(); + p5->SetPartitionId(5); + p5->AddParentPartitionIds(3); + p5->AddParentPartitionIds(4); + + TPartitionGraph graph; + graph.Rebuild(config); + + const auto n0o = graph.GetPartition(0); + const auto n1o = graph.GetPartition(1); + const auto n2o = graph.GetPartition(2); + const auto n3o = graph.GetPartition(3); + const auto n4o = graph.GetPartition(4); + const auto n5o = graph.GetPartition(5); + + UNIT_ASSERT(n0o); + UNIT_ASSERT(n1o); + UNIT_ASSERT(n2o); + UNIT_ASSERT(n3o); + UNIT_ASSERT(n4o); + UNIT_ASSERT(n5o); + + auto& n0 = *n0o.value(); + auto& n1 = *n1o.value(); + auto& n2 = *n2o.value(); + auto& n3 = *n3o.value(); + auto& n4 = *n4o.value(); + auto& n5 = *n5o.value(); + + + UNIT_ASSERT_EQUAL(n0.Parents.size(), 0); + UNIT_ASSERT_EQUAL(n0.Children.size(), 0); + UNIT_ASSERT_EQUAL(n0.HierarhicalParents.size(), 0); + + UNIT_ASSERT_EQUAL(n1.Parents.size(), 0); + UNIT_ASSERT_EQUAL(n1.Children.size(), 1); + UNIT_ASSERT_EQUAL(n1.HierarhicalParents.size(), 0); + + UNIT_ASSERT_EQUAL_C(n5.Parents.size(), 2, "n5.Parents.size() == " << n5.Parents.size() << " but expected 2"); + UNIT_ASSERT_EQUAL_C(n5.Children.size(), 0, "n5.Children.size() == " << n5.Children.size() << " but expected 0"); + UNIT_ASSERT_EQUAL_C(n5.HierarhicalParents.size(), 4, "n5.HierarhicalParents.size() == " << n5.HierarhicalParents.size() << " but expected 4"); + UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n0) == n5.HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n1) != n5.HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n2) != n5.HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n3) != n5.HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n4) != n5.HierarhicalParents.end()); + } +} diff --git a/ydb/core/persqueue/ut/splitmerge_ut.cpp b/ydb/core/persqueue/ut/splitmerge_ut.cpp new file mode 100644 index 0000000000..7799b07ca9 --- /dev/null +++ b/ydb/core/persqueue/ut/splitmerge_ut.cpp @@ -0,0 +1,379 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> +#include <ydb/core/tx/schemeshard/ut_helpers/test_env.h> + +namespace NKikimr { + +using namespace NYdb::NTopic; +using namespace NYdb::NTopic::NTests; +using namespace NSchemeShardUT_Private; + + +// TODO +static constexpr ui64 SS = 72057594046644480l; + +auto CreateTransaction(const TString& parentPath, ::NKikimrSchemeOp::TPersQueueGroupDescription& scheme) { + NKikimrSchemeOp::TModifyScheme tx; + tx.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup); + tx.SetWorkingDir(parentPath); + tx.MutableAlterPersQueueGroup()->CopyFrom(scheme); + return tx; +} + +TEvTx* CreateRequest(ui64 txId, NKikimrSchemeOp::TModifyScheme&& tx) { + auto ev = new TEvTx(txId, SS); + *ev->Record.AddTransaction() = std::move(tx); + return ev; +} + +void DoRequest(TTopicSdkTestSetup& setup, ui64& txId, NKikimrSchemeOp::TPersQueueGroupDescription& scheme) { + Cerr << "ALTER_SCHEME: " << scheme << Endl; + + const auto sender = setup.GetRuntime().AllocateEdgeActor(); + const auto request = CreateRequest(txId, CreateTransaction("/Root", scheme)); + setup.GetRuntime().Send(new IEventHandle( + MakeTabletResolverID(), + sender, + new TEvTabletResolver::TEvForward( + SS, + new IEventHandle(TActorId(), sender, request), + { }, + TEvTabletResolver::TEvForward::EActor::Tablet + )), + 0); + + auto subscriber = CreateNotificationSubscriber(setup.GetRuntime(), SS); + setup.GetRuntime().Send(new IEventHandle(subscriber, sender, new TEvSchemeShard::TEvNotifyTxCompletion(txId))); + TAutoPtr<IEventHandle> handle; + auto event = setup.GetRuntime().GrabEdgeEvent<TEvSchemeShard::TEvNotifyTxCompletionResult>(handle); + UNIT_ASSERT(event); + UNIT_ASSERT_EQUAL(event->Record.GetTxId(), txId); + + auto e = setup.GetRuntime().GrabEdgeEvent<TEvSchemeShard::TEvModifySchemeTransactionResult>(handle); + UNIT_ASSERT_EQUAL_C(e->Record.GetStatus(), TEvSchemeShard::EStatus::StatusAccepted, + "Unexpected status " << NKikimrScheme::EStatus_Name(e->Record.GetStatus()) << " " << e->Record.GetReason()); +} + +void SplitPartition(TTopicSdkTestSetup& setup, ui64& txId, const ui32 partition, TString boundary) { + ::NKikimrSchemeOp::TPersQueueGroupDescription scheme; + scheme.SetName(TEST_TOPIC); + auto* split = scheme.AddSplit(); + split->SetPartition(partition); + split->SetSplitBoundary(boundary); + + DoRequest(setup, txId, scheme); +} + +void MergePartition(TTopicSdkTestSetup& setup, ui64& txId, const ui32 partitionLeft, const ui32 partitionRight) { + ::NKikimrSchemeOp::TPersQueueGroupDescription scheme; + scheme.SetName(TEST_TOPIC); + auto* merge = scheme.AddMerge(); + merge->SetPartition(partitionLeft); + merge->SetAdjacentPartition(partitionRight); + + DoRequest(setup, txId, scheme); +} + +auto Msg(const TString& data, ui64 seqNo) { + TWriteMessage msg(data); + msg.SeqNo(seqNo); + return msg; +} + + + +Y_UNIT_TEST_SUITE(TopicSplitMerge) { + Y_UNIT_TEST(PartitionSplit) { + TTopicSdkTestSetup setup("TopicSplitMerge", TTopicSdkTestSetup::MakeServerSettings(), false); + + auto& ff = setup.GetRuntime().GetAppData().FeatureFlags; + ff.SetEnableTopicSplitMerge(true); + ff.SetEnablePQConfigTransactionsAtSchemeShard(true); + + setup.CreateTopic(); + + TTopicClient client = setup.MakeClient(); + + setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE); + + TString producer1 = "producer-1"; + TString producer2 = "producer-2"; + TString producer3 = "producer-3"; + + auto writeSettings1 = TWriteSessionSettings() + .Path(TEST_TOPIC) + .ProducerId(producer1) + .MessageGroupId(producer1); + auto writeSession1 = client.CreateSimpleBlockingWriteSession(writeSettings1); + + auto writeSettings2 = TWriteSessionSettings() + .Path(TEST_TOPIC) + .ProducerId(producer2) + .MessageGroupId(producer2); + auto writeSession2 = client.CreateSimpleBlockingWriteSession(writeSettings2); + + auto writeSettings3 = TWriteSessionSettings() + .Path(TEST_TOPIC) + .ProducerId(producer3) + .PartitionId(0); + auto writeSession3 = client.CreateSimpleBlockingWriteSession(writeSettings3); + + Cerr << ">>>>> 1 " << Endl; + + struct MsgInfo { + ui64 PartitionId; + ui64 SeqNo; + ui64 Offset; + TString Data; + }; + + NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>(); + std::vector<MsgInfo> receivedMessages; + std::set<size_t> partitions; + + auto readSettings = TReadSessionSettings() + .ConsumerName(TEST_CONSUMER) + .AppendTopics(TEST_TOPIC); + + readSettings.EventHandlers_.SimpleDataHandlers( + [&] + (TReadSessionEvent::TDataReceivedEvent& ev) mutable { + auto& messages = ev.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + + Cerr << ">>>>> Received TDataReceivedEvent message partitionId=" << message.GetPartitionSession()->GetPartitionId() + << ", message=" << message.GetData() + << ", seqNo=" << message.GetSeqNo() + << ", offset=" << message.GetOffset() + << Endl; + receivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(), + message.GetSeqNo(), + message.GetOffset(), + message.GetData()}); + } + + if (receivedMessages.size() == 5) { + checkedPromise.SetValue(); + } + }); + + readSettings.EventHandlers_.StartPartitionSessionHandler( + [&] + (TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable { + Cerr << ">>>>> Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl; + partitions.insert(ev.GetPartitionSession()->GetPartitionId()); + ev.Confirm(); + }); + + auto readSession = client.CreateReadSession(readSettings); + + Cerr << ">>>>> 2 " << Endl; + + UNIT_ASSERT(writeSession1->Write(Msg("message_1.1", 2))); + UNIT_ASSERT(writeSession2->Write(Msg("message_2.1", 3))); + UNIT_ASSERT(writeSession3->Write(Msg("message_3.1", 1))); + + Cerr << ">>>>> 3 " << Endl; + + ui64 txId = 0; + SplitPartition(setup, ++txId, 0, "a"); + + Cerr << ">>>>> 4 " << Endl; + + writeSession1->Write(Msg("message_1.2_2", 2)); // Will be ignored because duplicated SeqNo + writeSession3->Write(Msg("message_3.2", 11)); // Will be fail because partition is not writable after split + writeSession1->Write(Msg("message_1.2", 5)); + writeSession2->Write(Msg("message_2.2", 7)); + + Cerr << ">>>>> 5 " << Endl; + + checkedPromise.GetFuture().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(5, receivedMessages.size()); + + Cerr << ">>>>> 6 " << Endl; + + for(const auto& info : receivedMessages) { + if (info.Data == "message_1.1") { + UNIT_ASSERT_C(1, info.PartitionId); + UNIT_ASSERT_C(2, info.SeqNo); + UNIT_ASSERT_C(1, info.Offset); + } else if (info.Data == "message_2.1") { + UNIT_ASSERT_C(1, info.PartitionId); + UNIT_ASSERT_C(3, info.SeqNo); + UNIT_ASSERT_C(1, info.Offset); + } else if (info.Data == "message_1.2") { + UNIT_ASSERT_C(2, info.PartitionId); + UNIT_ASSERT_C(5, info.SeqNo); + UNIT_ASSERT_C(1, info.Offset); + } else if (info.Data == "message_2.2") { + UNIT_ASSERT_C(3, info.PartitionId); + UNIT_ASSERT_C(7, info.SeqNo); + UNIT_ASSERT_C(1, info.Offset); + } else if (info.Data == "message_3.1") { + UNIT_ASSERT_C(1, info.PartitionId); + UNIT_ASSERT_C(1, info.SeqNo); + UNIT_ASSERT_C(1, info.Offset); + } else { + UNIT_ASSERT_C(false, "Unexpected message: " << info.Data); + } + } + + Cerr << ">>>>> 7 " << Endl; + + writeSession1->Close(TDuration::Seconds(1)); + writeSession2->Close(TDuration::Seconds(1)); + writeSession3->Close(TDuration::Seconds(1)); + readSession->Close(TDuration::Seconds(1)); + + Cerr << ">>>>> 8 " << Endl; + } + + Y_UNIT_TEST(PartitionMerge) { + TTopicSdkTestSetup setup("TopicSplitMerge", TTopicSdkTestSetup::MakeServerSettings(), false); + + auto& ff = setup.GetRuntime().GetAppData().FeatureFlags; + ff.SetEnableTopicSplitMerge(true); + ff.SetEnablePQConfigTransactionsAtSchemeShard(true); + + setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2); + + TTopicClient client = setup.MakeClient(); + + setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE); + + TString producer1 = "producer-1"; + TString producer2 = "producer-2"; + TString producer3 = "producer-3"; + + auto writeSettings1 = TWriteSessionSettings() + .Path(TEST_TOPIC) + .ProducerId(producer1) + .MessageGroupId(producer1) + .PartitionId(0); + auto writeSession1 = client.CreateSimpleBlockingWriteSession(writeSettings1); + + auto writeSettings2 = TWriteSessionSettings() + .Path(TEST_TOPIC) + .ProducerId(producer2) + .MessageGroupId(producer2) + .PartitionId(1); + auto writeSession2 = client.CreateSimpleBlockingWriteSession(writeSettings2); + + Cerr << ">>>>> 1 " << Endl; + + struct MsgInfo { + ui64 PartitionId; + ui64 SeqNo; + ui64 Offset; + TString Data; + }; + + NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>(); + std::vector<MsgInfo> receivedMessages; + std::set<size_t> partitions; + + auto readSettings = TReadSessionSettings() + .ConsumerName(TEST_CONSUMER) + .AppendTopics(TEST_TOPIC); + + readSettings.EventHandlers_.SimpleDataHandlers( + [&] + (TReadSessionEvent::TDataReceivedEvent& ev) mutable { + auto& messages = ev.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + + Cerr << ">>>>> Received message partitionId=" << message.GetPartitionSession()->GetPartitionId() + << ", message=" << message.GetData() + << ", seqNo=" << message.GetSeqNo() + << ", offset=" << message.GetOffset() + << Endl; + receivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(), + message.GetSeqNo(), + message.GetOffset(), + message.GetData()}); + } + + if (receivedMessages.size() == 3) { + checkedPromise.SetValue(); + } + }); + + readSettings.EventHandlers_.StartPartitionSessionHandler( + [&] + (TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable { + Cerr << ">>>>> Received message " << ev.DebugString() << Endl; + partitions.insert(ev.GetPartitionSession()->GetPartitionId()); + ev.Confirm(); + }); + + auto readSession = client.CreateReadSession(readSettings); + + Cerr << ">>>>> 2 " << Endl; + + UNIT_ASSERT(writeSession1->Write(Msg("message_1.1", 2))); + UNIT_ASSERT(writeSession2->Write(Msg("message_2.1", 3))); + + Cerr << ">>>>> 3 " << Endl; + + ui64 txId = 0; + MergePartition(setup, ++txId, 0, 1); + + Cerr << ">>>>> 4 " << Endl; + + UNIT_ASSERT(writeSession1->Write(Msg("message_1.2", 5))); // Will be fail because partition is not writable after merge + UNIT_ASSERT(writeSession2->Write(Msg("message_2.2", 7))); // Will be fail because partition is not writable after merge + + auto writeSettings3 = TWriteSessionSettings() + .Path(TEST_TOPIC) + .ProducerId(producer1) + .MessageGroupId(producer1); + auto writeSession3 = client.CreateSimpleBlockingWriteSession(writeSettings3); + + UNIT_ASSERT(writeSession3->Write(Msg("message_3.1", 2))); // Will be ignored because duplicated SeqNo + UNIT_ASSERT(writeSession3->Write(Msg("message_3.2", 11))); + + + Cerr << ">>>>> 5 " << Endl; + + checkedPromise.GetFuture().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(3, receivedMessages.size()); + + Cerr << ">>>>> 6 " << Endl; + + for(const auto& info : receivedMessages) { + if (info.Data == TString("message_1.1")) { + UNIT_ASSERT_C(1, info.PartitionId); + UNIT_ASSERT_C(2, info.SeqNo); + UNIT_ASSERT_C(1, info.Offset); + } else if (info.Data == TString("message_2.1")) { + UNIT_ASSERT_C(2, info.PartitionId); + UNIT_ASSERT_C(3, info.SeqNo); + UNIT_ASSERT_C(1, info.Offset); + } else if (info.Data == TString("message_3.2")) { + UNIT_ASSERT_C(3, info.PartitionId); + UNIT_ASSERT_C(11, info.SeqNo); + UNIT_ASSERT_C(1, info.Offset); + } else { + UNIT_ASSERT_C(false, "Unexpected message: " << info.Data); + } + } + + Cerr << ">>>>> 7 " << Endl; + + writeSession1->Close(TDuration::Seconds(1)); + writeSession2->Close(TDuration::Seconds(1)); + writeSession3->Close(TDuration::Seconds(1)); + readSession->Close(TDuration::Seconds(1)); + + Cerr << ">>>>> 8 " << Endl; + } + +} + +} // namespace NKikimr diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make index 0e4ff94d48..9daecdfa54 100644 --- a/ydb/core/persqueue/ut/ya.make +++ b/ydb/core/persqueue/ut/ya.make @@ -20,6 +20,9 @@ PEERDIR( ydb/core/persqueue/ut/common ydb/core/testlib/default ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils + ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils + + ydb/core/tx/schemeshard/ut_helpers ) YQL_LAST_ABI_VERSION() @@ -33,9 +36,11 @@ SRCS( mirrorer_ut.cpp pq_ut.cpp partition_ut.cpp + partitiongraph_ut.cpp pqtablet_ut.cpp quota_tracker_ut.cpp sourceid_ut.cpp + splitmerge_ut.cpp type_codecs_ut.cpp user_info_ut.cpp pqrb_describes_ut.cpp diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index b3028391ab..9b0b4bc58f 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -1,5 +1,10 @@ #include "utils.h" +#include <deque> +#include <util/string/builder.h> + +#include <ydb/library/yverify_stream/yverify_stream.h> + namespace NKikimr::NPQ { ui64 TopicPartitionReserveSize(const NKikimrPQ::TPQTabletConfig& config) { @@ -36,4 +41,78 @@ ui64 PutUnitsSize(const ui64 size) { return putUnitsCount; } +const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId) { + for(const auto& p : config.GetPartitions()) { + if (partitionId == p.GetPartitionId()) { + return &p; + } + } + return nullptr; +} + +void TPartitionGraph::Rebuild(const NKikimrPQ::TPQTabletConfig& config) { + Partitions.clear(); + + if (0 == config.AllPartitionsSize()) { + return; + } + + for (const auto& p : config.GetAllPartitions()) { + Partitions.emplace(p.GetPartitionId(), p); + } + + std::deque<Node*> queue; + for(const auto& p : config.GetAllPartitions()) { + auto& node = Partitions[p.GetPartitionId()]; + + node.Children.reserve(p.ChildPartitionIdsSize()); + for (auto id : p.GetChildPartitionIds()) { + node.Children.push_back(&Partitions[id]); + } + + node.Parents.reserve(p.ParentPartitionIdsSize()); + for (auto id : p.GetParentPartitionIds()) { + node.Parents.push_back(&Partitions[id]); + } + + if (p.GetParentPartitionIds().empty()) { + queue.push_back(&node); + } + } + + while(!queue.empty()) { + auto* n = queue.front(); + queue.pop_front(); + + bool allCompleted = true; + for(auto* c : n->Parents) { + if (c->HierarhicalParents.empty() && !c->Parents.empty()) { + allCompleted = false; + break; + } + } + + if (allCompleted) { + for(auto* c : n->Parents) { + n->HierarhicalParents.insert(c->HierarhicalParents.begin(), c->HierarhicalParents.end()); + n->HierarhicalParents.insert(c); + } + queue.insert(queue.end(), n->Children.begin(), n->Children.end()); + } + } +} + +std::optional<const TPartitionGraph::Node*> TPartitionGraph::GetPartition(ui32 id) const { + auto it = Partitions.find(id); + if (it == Partitions.end()) { + return std::nullopt; + } + return std::optional(&it->second); +} + +TPartitionGraph::Node::Node(const NKikimrPQ::TPQTabletConfig::TPartition& config) { + Id = config.GetPartitionId(); + TabletId = config.GetTabletId(); +} + } // NKikimr::NPQ diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index a490cb351a..344b699e43 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -9,4 +9,35 @@ ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config); ui64 PutUnitsSize(const ui64 size); +TString SourceIdHash(const TString& sourceId); + +const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId); + +// The graph of split-merge operations. +class TPartitionGraph { +public: + struct Node { + + Node() = default; + Node(Node&&) = default; + Node(const NKikimrPQ::TPQTabletConfig::TPartition& config); + + ui32 Id; + ui64 TabletId; + + // Direct parents of this node + std::vector<Node*> Parents; + // Direct children of this node + std::vector<Node*> Children; + // All parents include parents of parents and so on + std::set<Node*> HierarhicalParents; + }; + + void Rebuild(const NKikimrPQ::TPQTabletConfig& config); + + std::optional<const Node*> GetPartition(ui32 id) const; +private: + std::unordered_map<ui32, Node> Partitions; +}; + } // NKikimr::NPQ diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make index 435108c327..6ba0c6a559 100644 --- a/ydb/core/persqueue/ya.make +++ b/ydb/core/persqueue/ya.make @@ -15,6 +15,7 @@ SRCS( partition_init.cpp partition_monitoring.cpp partition_read.cpp + partition_sourcemanager.cpp partition_write.cpp partition.cpp percentile_counter.cpp diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 6c8944426f..8d7d3d902c 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -339,6 +339,7 @@ message TPQTabletConfig { repeated uint32 ParentPartitionIds = 4; repeated uint32 ChildPartitionIds = 5; optional uint64 CreateVersion = 6; + optional uint64 TabletId = 7; } repeated TPartition Partitions = 31; // filled by schemeshard @@ -357,6 +358,9 @@ message TPQTabletConfig { required uint32 MaxPartitionCount = 2; } optional TPartitionStrategy PartitionStrategy = 35; + + repeated TPartition AllPartitions = 36; + } message THeartbeat { @@ -927,6 +931,33 @@ message TEvSubDomainStatus { required bool SubDomainOutOfSpace = 1; }; +message TEvSourceIdRequest { + optional uint32 Partition = 1; + repeated string SourceId = 2; +}; + +message TEvSourceIdResponse { + enum EState { + Unknown = 0; + Registered = 1; + PendingRegistration = 2; + }; + + message TSource { + optional string Id = 1; + optional EState State = 2; + optional uint64 SeqNo = 3; + optional uint64 Offset = 4; + optional bool Explicit = 5; + optional uint64 WriteTimestamp = 6; + + }; + optional string Error = 1; + optional uint32 Partition = 2; + repeated TSource Source = 3; +}; + + message TTransaction { enum EKind { KIND_UNKNOWN = 0; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 2bf3d36417..51f5089157 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -707,7 +707,8 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans event->Record.SetTxId(ui64(txId)); ActorIdToProto(context.SS->SelfId(), event->Record.MutableSourceActor()); - MakePQTabletConfig(*event->Record.MutableConfig()->MutableTabletConfig(), + MakePQTabletConfig(context, + *event->Record.MutableConfig()->MutableTabletConfig(), pqGroup, pqShard, topicName, @@ -744,7 +745,8 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId auto event = MakeHolder<TEvPersQueue::TEvUpdateConfig>(); event->Record.SetTxId(ui64(txId)); - MakePQTabletConfig(*event->Record.MutableTabletConfig(), + MakePQTabletConfig(context, + *event->Record.MutableTabletConfig(), pqGroup, pqShard, topicName, diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 49a60ce457..1910df417b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -832,7 +832,26 @@ public: } private: - static void MakePQTabletConfig(NKikimrPQ::TPQTabletConfig& config, + static void FillPartition(NKikimrPQ::TPQTabletConfig::TPartition& partition, const TTopicTabletInfo::TTopicPartitionInfo* pq, ui64 tabletId) { + partition.SetPartitionId(pq->PqId); + partition.SetCreateVersion(pq->CreateVersion); + if (pq->KeyRange) { + pq->KeyRange->SerializeToProto(*partition.MutableKeyRange()); + } + partition.SetStatus(pq->Status); + partition.MutableParentPartitionIds()->Reserve(pq->ParentPartitionIds.size()); + for (const auto parent : pq->ParentPartitionIds) { + partition.MutableParentPartitionIds()->AddAlreadyReserved(parent); + } + partition.MutableChildPartitionIds()->Reserve(pq->ChildPartitionIds.size()); + for (const auto children : pq->ChildPartitionIds) { + partition.MutableChildPartitionIds()->AddAlreadyReserved(children); + } + partition.SetTabletId(tabletId); + } + + static void MakePQTabletConfig(const TOperationContext& context, + NKikimrPQ::TPQTabletConfig& config, const TTopicInfo& pqGroup, const TTopicTabletInfo& pqShard, const TString& topicName, @@ -857,23 +876,19 @@ private: config.SetVersion(pqGroup.AlterData->AlterVersion); } - for (const auto& pq : pqShard.Partitions) { + for(const auto& pq : pqShard.Partitions) { config.AddPartitionIds(pq->PqId); auto& partition = *config.AddPartitions(); - partition.SetPartitionId(pq->PqId); - partition.SetCreateVersion(pq->CreateVersion); - if (pq->KeyRange) { - pq->KeyRange->SerializeToProto(*partition.MutableKeyRange()); - } - partition.SetStatus(pq->Status); - partition.MutableParentPartitionIds()->Reserve(pq->ParentPartitionIds.size()); - for (const auto parent : pq->ParentPartitionIds) { - partition.MutableParentPartitionIds()->AddAlreadyReserved(parent); - } - partition.MutableChildPartitionIds()->Reserve(pq->ChildPartitionIds.size()); - for (const auto children : pq->ChildPartitionIds) { - partition.MutableChildPartitionIds()->AddAlreadyReserved(children); + FillPartition(partition, pq.Get(), 0); + } + + for(const auto& p : pqGroup.Shards) { + const auto& pqShard = p.second; + const auto& tabletId = context.SS->ShardInfos[p.first].TabletID; + for (const auto& pq : pqShard->Partitions) { + auto& partition = *config.AddAllPartitions(); + FillPartition(partition, pq.Get(), ui64(tabletId)); } } } diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h index 8613260bca..3517e9d30d 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.h +++ b/ydb/library/persqueue/topic_parser/topic_parser.h @@ -264,6 +264,7 @@ public: bool IsFirstClass() const; + operator bool() const { return Valid && !ClientsideName; }; private: void BuildInternals(const NKikimrPQ::TPQTabletConfig& config); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt index 282211a4b9..aab41b843e 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(ut_utils) add_executable(ydb-public-sdk-cpp-client-ydb_topic-ut) target_compile_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE @@ -30,6 +31,7 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC ydb_persqueue_core-ut-ut_utils client-ydb_topic-codecs client-ydb_topic-impl + ydb_topic-ut-ut_utils ) target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE -Wl,-platform_version,macos,11.0,11.0 @@ -43,8 +45,6 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt index be6ba2fbf3..e9b30bcfbd 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(ut_utils) add_executable(ydb-public-sdk-cpp-client-ydb_topic-ut) target_compile_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE @@ -30,6 +31,7 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC ydb_persqueue_core-ut-ut_utils client-ydb_topic-codecs client-ydb_topic-impl + ydb_topic-ut-ut_utils ) target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE -ldl @@ -46,8 +48,6 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt index 74b2b0756c..45c90bfdb5 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(ut_utils) add_executable(ydb-public-sdk-cpp-client-ydb_topic-ut) target_compile_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE @@ -31,6 +32,7 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC ydb_persqueue_core-ut-ut_utils client-ydb_topic-codecs client-ydb_topic-impl + ydb_topic-ut-ut_utils ) target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE -ldl @@ -47,8 +49,6 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt index 3c14b2644b..c41eb4e0dd 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(ut_utils) add_executable(ydb-public-sdk-cpp-client-ydb_topic-ut) target_compile_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE @@ -30,14 +31,13 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC ydb_persqueue_core-ut-ut_utils client-ydb_topic-codecs client-ydb_topic-impl + ydb_topic-ut-ut_utils ) target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..ec01329d1c --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ydb_topic-ut-ut_utils) +target_compile_options(ydb_topic-ut-ut_utils PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ydb_topic-ut-ut_utils PUBLIC + contrib-libs-cxxsupp + yutil + cpp-grpc-server + cpp-testing-unittest + cpp-threading-chunk_queue + core-testlib-default + library-persqueue-topic_parser_public + cpp-client-ydb_driver + cpp-client-ydb_topic + cpp-client-ydb_table +) +target_sources(ydb_topic-ut-ut_utils PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp +) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..aece94a075 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-aarch64.txt @@ -0,0 +1,30 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ydb_topic-ut-ut_utils) +target_compile_options(ydb_topic-ut-ut_utils PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ydb_topic-ut-ut_utils PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-grpc-server + cpp-testing-unittest + cpp-threading-chunk_queue + core-testlib-default + library-persqueue-topic_parser_public + cpp-client-ydb_driver + cpp-client-ydb_topic + cpp-client-ydb_table +) +target_sources(ydb_topic-ut-ut_utils PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp +) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..aece94a075 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-x86_64.txt @@ -0,0 +1,30 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ydb_topic-ut-ut_utils) +target_compile_options(ydb_topic-ut-ut_utils PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ydb_topic-ut-ut_utils PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-grpc-server + cpp-testing-unittest + cpp-threading-chunk_queue + core-testlib-default + library-persqueue-topic_parser_public + cpp-client-ydb_driver + cpp-client-ydb_topic + cpp-client-ydb_table +) +target_sources(ydb_topic-ut-ut_utils PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp +) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..ec01329d1c --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.windows-x86_64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ydb_topic-ut-ut_utils) +target_compile_options(ydb_topic-ut-ut_utils PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ydb_topic-ut-ut_utils PUBLIC + contrib-libs-cxxsupp + yutil + cpp-grpc-server + cpp-testing-unittest + cpp-threading-chunk_queue + core-testlib-default + library-persqueue-topic_parser_public + cpp-client-ydb_driver + cpp-client-ydb_topic + cpp-client-ydb_table +) +target_sources(ydb_topic-ut-ut_utils PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp +) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp index c82589773f..128e74ad97 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -22,11 +22,14 @@ TTopicSdkTestSetup::TTopicSdkTestSetup(const TString& testCaseName, const NKikim } } -void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer) +void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer, size_t partitionCount) { TTopicClient client(MakeDriver()); TCreateTopicSettings topics; + TPartitioningSettings partitions(partitionCount, partitionCount); + + topics.PartitioningSettings(partitions); TConsumerSettings<TCreateTopicSettings> consumers(topics, consumer); topics.AppendConsumers(consumers); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h index a9014a3437..c1642f6b46 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h @@ -16,7 +16,7 @@ class TTopicSdkTestSetup { public: TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings = MakeServerSettings(), bool createTopic = true); - void CreateTopic(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER); + void CreateTopic(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1); TString GetEndpoint() const; TString GetTopicPath(const TString& name = TEST_TOPIC) const; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/ya.make b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/ya.make new file mode 100644 index 0000000000..9b807dcd9c --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/ya.make @@ -0,0 +1,23 @@ +LIBRARY() + +SRCS( + managed_executor.cpp + managed_executor.h + topic_sdk_test_setup.cpp + topic_sdk_test_setup.h +) + +PEERDIR( + library/cpp/grpc/server + library/cpp/testing/unittest + library/cpp/threading/chunk_queue + ydb/core/testlib/default + ydb/library/persqueue/topic_parser_public + ydb/public/sdk/cpp/client/ydb_driver + ydb/public/sdk/cpp/client/ydb_topic + ydb/public/sdk/cpp/client/ydb_table +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make index ba152ee2ae..ec6820dc4d 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make @@ -24,6 +24,7 @@ PEERDIR( ydb/public/sdk/cpp/client/ydb_topic ydb/public/sdk/cpp/client/ydb_topic/impl + ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils ) YQL_LAST_ABI_VERSION() @@ -33,8 +34,6 @@ SRCS( describe_topic_ut.cpp local_partition_ut.cpp topic_to_table_ut.cpp - ut_utils/managed_executor.cpp - ut_utils/topic_sdk_test_setup.cpp ) END() |