aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-11-14 12:00:44 +0300
committertesseract <tesseract@yandex-team.com>2023-11-14 19:07:43 +0300
commit0628d4698c38973d90cd1fbc81a6b67e1d6d24a9 (patch)
tree787768d9699d4575a5cc22747d34cbf6ae89a28c
parent60b19cde31eefa790f0956b3eb1f2e567dd2ca15 (diff)
downloadydb-0628d4698c38973d90cd1fbc81a6b67e1d6d24a9.tar.gz
Split topic partitions. Take sourceid information from parent partitions
-rw-r--r--.mapping.json5
-rw-r--r--ydb/core/persqueue/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/persqueue/events/internal.h8
-rw-r--r--ydb/core/persqueue/partition.cpp210
-rw-r--r--ydb/core/persqueue/partition.h54
-rw-r--r--ydb/core/persqueue/partition_init.cpp2
-rw-r--r--ydb/core/persqueue/partition_log.h18
-rw-r--r--ydb/core/persqueue/partition_read.cpp71
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.cpp466
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.h140
-rw-r--r--ydb/core/persqueue/partition_write.cpp228
-rw-r--r--ydb/core/persqueue/pq_impl.cpp57
-rw-r--r--ydb/core/persqueue/pq_impl.h5
-rw-r--r--ydb/core/persqueue/sourceid.cpp9
-rw-r--r--ydb/core/persqueue/sourceid.h1
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp3
-rw-r--r--ydb/core/persqueue/ut/fetch_request_ut.cpp3
-rw-r--r--ydb/core/persqueue/ut/partitiongraph_ut.cpp88
-rw-r--r--ydb/core/persqueue/ut/splitmerge_ut.cpp379
-rw-r--r--ydb/core/persqueue/ut/ya.make5
-rw-r--r--ydb/core/persqueue/utils.cpp79
-rw-r--r--ydb/core/persqueue/utils.h31
-rw-r--r--ydb/core/persqueue/ya.make1
-rw-r--r--ydb/core/protos/pqconfig.proto31
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.cpp6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h45
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.h1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.darwin-x86_64.txt29
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-aarch64.txt30
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-x86_64.txt30
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.txt17
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.windows-x86_64.txt29
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/ya.make23
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make3
47 files changed, 1902 insertions, 248 deletions
diff --git a/.mapping.json b/.mapping.json
index eea167a8b7..146d1f262d 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -9475,6 +9475,11 @@
"ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt":"",
"ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.txt":"",
"ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt":"",
+ "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-aarch64.txt":"",
+ "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-x86_64.txt":"",
+ "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.txt":"",
+ "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.windows-x86_64.txt":"",
"ydb/public/sdk/cpp/client/ydb_types/CMakeLists.darwin-x86_64.txt":"",
"ydb/public/sdk/cpp/client/ydb_types/CMakeLists.linux-aarch64.txt":"",
"ydb/public/sdk/cpp/client/ydb_types/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
index e15e17c61c..9f6d0a3ab0 100644
--- a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
@@ -57,6 +57,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
index 35c8a46399..ad4711be21 100644
--- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
@@ -58,6 +58,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
index 35c8a46399..ad4711be21 100644
--- a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
@@ -58,6 +58,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp
diff --git a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
index e15e17c61c..9f6d0a3ab0 100644
--- a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
@@ -57,6 +57,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index 08cd4433a3..b6335edacf 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -138,6 +138,8 @@ struct TEvPQ {
EvQuotaCountersUpdated,
EvConsumerRemoved,
EvFetchResponse,
+ EvSourceIdRequest,
+ EvSourceIdResponse,
EvEnd
};
@@ -854,6 +856,12 @@ struct TEvPQ {
TString Message;
NKikimrClient::TPersQueueFetchResponse Response;
};
+
+ struct TEvSourceIdRequest : public TEventPB<TEvSourceIdRequest, NKikimrPQ::TEvSourceIdRequest, EvSourceIdRequest> {
+ };
+
+ struct TEvSourceIdResponse : public TEventPB<TEvSourceIdResponse, NKikimrPQ::TEvSourceIdResponse, EvSourceIdResponse> {
+ };
};
} //NKikimr
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index fa25484de7..549167548a 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1,5 +1,6 @@
#include "event_helpers.h"
#include "mirrorer.h"
+#include "partition_log.h"
#include "partition_util.h"
#include "partition.h"
#include "read.h"
@@ -53,6 +54,29 @@ const TString& TPartition::TopicName() const {
return TopicConverter->GetClientsideName();
}
+TString TPartition::LogPrefix() const {
+ TString state;
+ if (CurrentStateFunc() == &TThis::StateInit) {
+ state = "StateInit";
+ } else if (CurrentStateFunc() == &TThis::StateIdle) {
+ state = "StateIdle";
+ } else if (CurrentStateFunc() == &TThis::StateWrite) {
+ state = "StateWrite";
+ } else {
+ state = "Unknown";
+ }
+ return TStringBuilder() << "" << SelfId() << " " << state << " Partition: " << Partition << " ";
+}
+
+bool TPartition::CanWrite() const {
+ return (PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active)
+ && (!PendingPartitionConfig || PendingPartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active);
+}
+
+bool TPartition::CanEnqueue() const {
+ return PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active;
+}
+
ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 offset) {
if (container.empty()) {
return offset;
@@ -114,6 +138,8 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, TopicConverter(topicConverter)
, IsLocalDC(TabletConfig.GetLocalDC())
, DCId(std::move(dcId))
+ , PartitionGraph()
+ , SourceManager(this)
, StartOffset(0)
, EndOffset(0)
, WriteInflightSize(0)
@@ -851,34 +877,50 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
}
void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx) {
- auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie);
- NKikimrClient::TResponse& resp = response->Response;
+ MaxSeqNoRequests.emplace_back(ev);
+ ProcessMaxSeqNoRequest(ctx);
+}
- resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
- resp.SetErrorCode(NPersQueue::NErrorCode::OK);
+void TPartition::ProcessMaxSeqNoRequest(const TActorContext& ctx) {
+ PQ_LOG_T("TPartition::ProcessMaxSeqNoRequest. Queue size: " << MaxSeqNoRequests.size());
+ SourceManager.EnsureSource(ctx);
- auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult();
- for (const auto& sourceId : ev->Get()->SourceIds) {
- auto& protoInfo = *result.AddSourceIdInfo();
- protoInfo.SetSourceId(sourceId);
+ while(!MaxSeqNoRequests.empty()) {
+ auto& ev = MaxSeqNoRequests.front();
- auto it = SourceIdStorage.GetInMemorySourceIds().find(sourceId);
- if (it == SourceIdStorage.GetInMemorySourceIds().end()) {
- continue;
- }
+ auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie);
+ NKikimrClient::TResponse& resp = response->Response;
- const auto& memInfo = it->second;
- Y_ABORT_UNLESS(memInfo.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, memInfo.Offset);
- Y_ABORT_UNLESS(memInfo.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, memInfo.SeqNo);
+ resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
+ resp.SetErrorCode(NPersQueue::NErrorCode::OK);
- protoInfo.SetSeqNo(memInfo.SeqNo);
- protoInfo.SetOffset(memInfo.Offset);
- protoInfo.SetWriteTimestampMS(memInfo.WriteTimestamp.MilliSeconds());
- protoInfo.SetExplicit(memInfo.Explicit);
- protoInfo.SetState(TSourceIdInfo::ConvertState(memInfo.State));
- }
+ auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult();
+ for (const auto& sourceId : ev->Get()->SourceIds) {
+ auto& protoInfo = *result.AddSourceIdInfo();
+ protoInfo.SetSourceId(sourceId);
+
+ auto info = SourceManager.Get(sourceId);
+ if (!info) {
+ PQ_LOG_D("Stop MaxSeqNoRequest - scheduled a research. SourceId: " << sourceId);
+ return;
+ }
+ if (info.State == TSourceIdInfo::EState::Unknown) {
+ continue;
+ }
- ctx.Send(Tablet, response.Release());
+ Y_ABORT_UNLESS(info.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, info.Offset);
+ Y_ABORT_UNLESS(info.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, info.SeqNo);
+
+ protoInfo.SetSeqNo(info.SeqNo);
+ protoInfo.SetOffset(info.Offset);
+ protoInfo.SetWriteTimestampMS(info.WriteTimestamp.MilliSeconds());
+ protoInfo.SetExplicit(info.Explicit);
+ protoInfo.SetState(TSourceIdInfo::ConvertState(info.State));
+ }
+
+ ctx.Send(Tablet, response.Release());
+ MaxSeqNoRequests.pop_front();
+ }
}
void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) {
@@ -1297,12 +1339,14 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
DiskIsFull = !diskIsOk;
if (response.HasCookie()) {
- HandleSetOffsetResponse(response.GetCookie(), ctx);
+ OnProcessTxsAndUserActsWriteComplete(response.GetCookie(), ctx);
} else {
- if (ctx.Now() - WriteStartTime > TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs())) {
+ const auto writeDuration = ctx.Now() - WriteStartTime;
+ const auto minWriteLatency = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs());
+ if (writeDuration > minWriteLatency) {
HandleWriteResponse(ctx);
} else {
- ctx.Schedule(TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs()) - (ctx.Now() - WriteStartTime), new TEvPQ::TEvHandleWriteResponse());
+ ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse());
}
}
}
@@ -1414,6 +1458,7 @@ void TPartition::RemoveDistrTx()
Y_ABORT_UNLESS(!DistrTxs.empty());
DistrTxs.pop_front();
+ PendingPartitionConfig = nullptr;
}
void TPartition::ProcessDistrTxs(const TActorContext& ctx)
@@ -1495,6 +1540,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
ChangeConfig =
MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
event.Config);
+ PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition);
+
SendChangeConfigReply = false;
return true;
}
@@ -1623,11 +1670,85 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
}
}
+void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorContext& ctx) {
+ Y_ABORT_UNLESS(cookie == SET_OFFSET_COOKIE);
+
+
+ if (ChangeConfig) {
+ EndChangePartitionConfig(ChangeConfig->Config,
+ ChangeConfig->TopicConverter,
+ ctx);
+ }
+
+ for (auto& user : AffectedUsers) {
+ if (auto* actual = GetPendingUserIfExists(user)) {
+ TUserInfo& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
+ bool offsetHasChanged = (userInfo.Offset != actual->Offset);
+
+ userInfo.Session = actual->Session;
+ userInfo.Generation = actual->Generation;
+ userInfo.Step = actual->Step;
+ userInfo.Offset = actual->Offset;
+ userInfo.ReadRuleGeneration = actual->ReadRuleGeneration;
+ userInfo.ReadFromTimestamp = actual->ReadFromTimestamp;
+ userInfo.HasReadRule = true;
+
+ if (userInfo.Important != actual->Important) {
+ if (userInfo.LabeledCounters) {
+ ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup());
+ }
+ userInfo.SetImportant(actual->Important);
+ }
+ if (userInfo.Important && userInfo.Offset < (i64)StartOffset) {
+ userInfo.Offset = StartOffset;
+ }
+
+ if (offsetHasChanged && !userInfo.UpdateTimestampFromCache()) {
+ userInfo.ActualTimestamps = false;
+ ReadTimestampForOffset(user, userInfo, ctx);
+ } else {
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
+ }
+ } else {
+ auto ui = UsersInfoStorage->GetIfExists(user);
+ if (ui && ui->LabeledCounters) {
+ ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());
+ }
+
+ UsersInfoStorage->Remove(user, ctx);
+ Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user));
+ }
+ }
+
+ for (auto& [actor, reply] : Replies) {
+ ctx.Send(actor, reply.release());
+ }
+
+ PendingUsersInfo.clear();
+ Replies.clear();
+ AffectedUsers.clear();
+
+ UsersInfoWriteInProgress = false;
+
+ TxIdHasChanged = false;
+
+ if (ChangeConfig) {
+ ReportCounters(ctx, true);
+ ChangeConfig = nullptr;
+ PendingPartitionConfig = nullptr;
+ }
+
+
+ ProcessTxsAndUserActs(ctx);
+}
+
void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
NPersQueue::TTopicConverterPtr topicConverter,
const TActorContext& ctx)
{
Config = config;
+ PartitionConfig = GetPartitionConfig(Config, Partition);
+ PartitionGraph.Rebuild(Config);
TopicConverter = topicConverter;
Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0);
@@ -1697,6 +1818,9 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx)
} else if (t.ProposeConfig) {
t.Predicate = BeginTransaction(*t.ProposeConfig);
+ PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config, Partition);
+ //Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found");
+
ctx.Send(Tablet,
MakeHolder<TEvPQ::TEvProposePartitionConfigResult>(t.ProposeConfig->Step,
t.ProposeConfig->TxId,
@@ -1707,6 +1831,7 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx)
Y_ABORT_UNLESS(!ChangeConfig);
ChangeConfig = t.ChangeConfig;
+ PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition);
SendChangeConfigReply = t.SendReply;
BeginChangePartitionConfig(ChangeConfig->Config, ctx);
@@ -2392,5 +2517,40 @@ void TPartition::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext
}
}
+void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx) {
+ auto& record = ev->Get()->Record;
+
+ if (Partition != record.GetPartition()) {
+ LOG_INFO_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "TEvSourceIdRequest for wrong partition " << record.GetPartition() << "." <<
+ " Topic: \"" << TopicName() << "\"." <<
+ " Partition: " << Partition << "."
+ );
+ return;
+ }
+
+ auto& memoryStorage = SourceIdStorage.GetInMemorySourceIds();
+
+ auto response = MakeHolder<TEvPQ::TEvSourceIdResponse>();
+ for(auto& sourceId : record.GetSourceId()) {
+ auto* s = response->Record.AddSource();
+ s->SetId(sourceId);
+
+ auto it = memoryStorage.find(sourceId);
+ if (it != memoryStorage.end()) {
+ auto& info = it->second;
+ s->SetState(Convert(info.State));
+ s->SetSeqNo(info.SeqNo);
+ s->SetOffset(info.Offset);
+ s->SetExplicit(info.Explicit);
+ s->SetWriteTimestamp(info.WriteTimestamp.GetValue());
+ } else {
+ s->SetState(NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown);
+ }
+ }
+
+ Send(ev->Sender, response.Release());
+}
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 9709d6b973..c8e510ad58 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -4,6 +4,7 @@
#include "header.h"
#include "key.h"
#include "partition_init.h"
+#include "partition_sourcemanager.h"
#include "partition_types.h"
#include "quota_tracker.h"
#include "sourceid.h"
@@ -78,6 +79,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
friend TInitDataRangeStep;
friend TInitDataStep;
+ friend TPartitionSourceManager;
+
public:
const TString& TopicName() const;
@@ -88,6 +91,9 @@ private:
struct THasDataReq;
struct THasDataDeadline;
+ bool CanWrite() const;
+ bool CanEnqueue() const;
+
void ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error);
void ReplyPropose(const TActorContext& ctx, const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode);
void ReplyErrorForStoredWrites(const TActorContext& ctx);
@@ -101,7 +107,7 @@ private:
void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx);
void AnswerCurrentWrites(const TActorContext& ctx);
void CancelAllWritesOnIdle(const TActorContext& ctx);
- void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST);
+ void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TPartitionSourceManager::TModificationBatch& sourceIdBatch, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST);
void ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue::TEvRequest* request);
void CreateMirrorerActor();
void DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx);
@@ -155,7 +161,6 @@ private:
void HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx);
void HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx);
void HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx);
- void HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx);
void HandleWakeup(const TActorContext& ctx);
void HandleWriteResponse(const TActorContext& ctx);
@@ -175,6 +180,8 @@ private:
void ProcessTimestampRead(const TActorContext& ctx);
void ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx);
+ void ProcessMaxSeqNoRequest(const TActorContext& ctx);
+
void ReadTimestampForOffset(const TString& user, TUserInfo& ui, const TActorContext& ctx);
void ReportCounters(const TActorContext& ctx, bool force = false);
bool UpdateCounters(const TActorContext& ctx, bool force = false);
@@ -199,7 +206,7 @@ private:
TInstant GetWriteTimeEstimate(ui64 offset) const;
bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx,
- TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter);
+ TPartitionSourceManager::TModificationBatch& sourceIdBatch);
bool CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
// Removes blobs that are no longer required. Blobs are no longer required if the storage time of all messages
@@ -300,6 +307,7 @@ private:
void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
const TActorContext& ctx);
+ void OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorContext& ctx);
void EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
NPersQueue::TTopicConverterPtr topicConverter,
const TActorContext& ctx);
@@ -312,7 +320,8 @@ private:
template <typename T>
void EmplaceRequest(T&& body, const TActorContext& ctx) {
- Requests.emplace_back(body, WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now() - TInstant::Zero());
+ const auto now = ctx.Now();
+ Requests.emplace_back(body, WriteQuota->GetQuotedTime(now), now - TInstant::Zero());
}
void EmplaceResponse(TMessage&& message, const TActorContext& ctx);
@@ -327,6 +336,9 @@ private:
void ResendPendingEvents(const TActorContext& ctx);
+ void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
+
+ TString LogPrefix() const;
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
@@ -460,6 +472,8 @@ private:
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
+ HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
+ HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
@@ -515,6 +529,8 @@ private:
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
+ HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
+ HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
@@ -525,36 +541,45 @@ private:
}
private:
+ enum class ProcessResult {
+ Continue,
+ Abort,
+ Break
+ };
+
struct ProcessParameters {
- ProcessParameters(TSourceIdWriter& sourceIdWriter,
- THeartbeatEmitter& heartbeatEmitter)
- : SourceIdWriter(sourceIdWriter)
- , HeartbeatEmitter(heartbeatEmitter) {
+ ProcessParameters(TPartitionSourceManager::TModificationBatch& sourceIdBatch)
+ : SourceIdBatch(sourceIdBatch) {
}
- TSourceIdWriter& SourceIdWriter;
- THeartbeatEmitter& HeartbeatEmitter;
+ TPartitionSourceManager::TModificationBatch& SourceIdBatch;
ui64 CurOffset;
bool OldPartsCleared;
bool HeadCleared;
};
- bool ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters);
- bool ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters);
- bool ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
- bool ProcessRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
+ ProcessResult ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters);
+ ProcessResult ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters);
+ ProcessResult ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
+ ProcessResult ProcessRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
private:
ui64 TabletID;
ui32 Partition;
NKikimrPQ::TPQTabletConfig Config;
NKikimrPQ::TPQTabletConfig TabletConfig;
+ const NKikimrPQ::TPQTabletConfig::TPartition* PartitionConfig = nullptr;
+ const NKikimrPQ::TPQTabletConfig::TPartition* PendingPartitionConfig = nullptr;
+
const TTabletCountersBase& Counters;
NPersQueue::TTopicConverterPtr TopicConverter;
bool IsLocalDC;
TString DCId;
+ TPartitionGraph PartitionGraph;
+ TPartitionSourceManager SourceManager;
+
ui32 MaxBlobSize;
const ui32 TotalLevels = 4;
TVector<ui32> CompactLevelBorder;
@@ -575,6 +600,7 @@ private:
std::deque<TMessage> Requests;
std::deque<TMessage> Responses;
+ std::deque<TEvPQ::TEvGetMaxSeqNoRequest::TPtr> MaxSeqNoRequests;
THead Head;
THead NewHead;
diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp
index 9e3d7ba111..d6a13a0b29 100644
--- a/ydb/core/persqueue/partition_init.cpp
+++ b/ydb/core/persqueue/partition_init.cpp
@@ -179,6 +179,8 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
case NKikimrProto::NODATA:
Partition()->Config = Partition()->TabletConfig;
+ Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition);
+ Partition()->PartitionGraph.Rebuild(Partition()->Config);
break;
case NKikimrProto::ERROR:
diff --git a/ydb/core/persqueue/partition_log.h b/ydb/core/persqueue/partition_log.h
new file mode 100644
index 0000000000..78c9a9e140
--- /dev/null
+++ b/ydb/core/persqueue/partition_log.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <library/cpp/actors/core/log.h>
+#include <util/generic/string.h>
+
+namespace NKikimr::NPQ {
+
+inline TString LogPrefix() { return {}; }
+
+#define PQ_LOG_T(stream) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_I(stream) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_W(stream) LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_NOTICE(stream) LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_ERROR(stream) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_CRIT(stream) LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+
+} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp
index 323a14a647..136fd9b3ad 100644
--- a/ydb/core/persqueue/partition_read.cpp
+++ b/ydb/core/persqueue/partition_read.cpp
@@ -887,77 +887,6 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) {
Y_ABORT_UNLESS(ReadingTimestamp || UpdateUserInfoTimestamp.empty());
}
-void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) {
- Y_ABORT_UNLESS(cookie == SET_OFFSET_COOKIE);
-
-
- if (ChangeConfig) {
- EndChangePartitionConfig(ChangeConfig->Config,
- ChangeConfig->TopicConverter,
- ctx);
- }
-
- for (auto& user : AffectedUsers) {
- if (auto* actual = GetPendingUserIfExists(user)) {
- TUserInfo& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
- bool offsetHasChanged = (userInfo.Offset != actual->Offset);
-
- userInfo.Session = actual->Session;
- userInfo.Generation = actual->Generation;
- userInfo.Step = actual->Step;
- userInfo.Offset = actual->Offset;
- userInfo.ReadRuleGeneration = actual->ReadRuleGeneration;
- userInfo.ReadFromTimestamp = actual->ReadFromTimestamp;
- userInfo.HasReadRule = true;
-
- if (userInfo.Important != actual->Important) {
- if (userInfo.LabeledCounters) {
- ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup());
- }
- userInfo.SetImportant(actual->Important);
- }
- if (userInfo.Important && userInfo.Offset < (i64)StartOffset) {
- userInfo.Offset = StartOffset;
- }
-
- if (offsetHasChanged && !userInfo.UpdateTimestampFromCache()) {
- userInfo.ActualTimestamps = false;
- ReadTimestampForOffset(user, userInfo, ctx);
- } else {
- TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
- }
- } else {
- auto ui = UsersInfoStorage->GetIfExists(user);
- if (ui && ui->LabeledCounters) {
- ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());
- }
-
- UsersInfoStorage->Remove(user, ctx);
- Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user));
- }
- }
-
- for (auto& [actor, reply] : Replies) {
- ctx.Send(actor, reply.release());
- }
-
- PendingUsersInfo.clear();
- Replies.clear();
- AffectedUsers.clear();
-
- UsersInfoWriteInProgress = false;
-
- TxIdHasChanged = false;
-
- if (ChangeConfig) {
- ReportCounters(ctx, true);
- ChangeConfig = nullptr;
- }
-
-
- ProcessTxsAndUserActs(ctx);
-}
-
void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription) {
ui32 count = 0;
ui32 size = 0;
diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp
new file mode 100644
index 0000000000..64474fb355
--- /dev/null
+++ b/ydb/core/persqueue/partition_sourcemanager.cpp
@@ -0,0 +1,466 @@
+#include "partition.h"
+#include "partition_log.h"
+
+#include <unordered_map>
+
+#include <ydb/core/base/tablet_pipe.h>
+
+namespace NKikimr::NPQ {
+
+IActor* CreateRequester(TActorId parent, ui32 partition, ui64 tabletId, TPartitionSourceManager::TSourceIdsPtr& sourceIds, ui64 cookie);
+bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node);
+
+//
+// TPartitionSourceManager
+//
+
+TPartitionSourceManager::TPartitionSourceManager(TPartition* partition)
+ : Partition(partition) {
+}
+
+void TPartitionSourceManager::EnsureSource(const TActorContext& /*ctx*/) {
+ if (WaitSources()) {
+ return;
+ }
+
+ PendingCookies.clear();
+ Responses.clear();
+
+ auto node = GetPartitionNode();
+ if (!IsResearchRequires(node)) {
+ return;
+ }
+
+ auto unknowSourceIds = BuildUnknownSourceIds();
+ if (unknowSourceIds->empty()) {
+ return;
+ }
+
+ for(const auto* parent : node.value()->HierarhicalParents) {
+ PendingCookies.insert(++Cookie);
+ Partition->RegisterWithSameMailbox(CreateRequester(Partition->SelfId(),
+ parent->Id,
+ parent->TabletId,
+ unknowSourceIds,
+ Cookie));
+ }
+}
+
+const TPartitionSourceManager::TSourceInfo TPartitionSourceManager::Get(const TString& sourceId) const {
+ auto& knownSourceIds = GetSourceIdStorage().GetInMemorySourceIds();
+ auto itk = knownSourceIds.find(sourceId);
+ if (itk != knownSourceIds.end()) {
+ auto& value = itk->second;
+
+ TSourceInfo result;
+ result.State = value.State;
+ result.SeqNo = value.SeqNo;
+ result.Offset = value.Offset;
+ result.Explicit = value.Explicit;
+ result.WriteTimestamp = value.WriteTimestamp;
+
+ return result;
+ }
+
+ auto its = Sources.find(sourceId);
+ if (its != Sources.end()) {
+ return its->second;
+ }
+
+ TSourceInfo result;
+ result.Pending = IsResearchRequires(GetPartitionNode());
+ return result;
+}
+
+bool TPartitionSourceManager::WaitSources() const {
+ return !PendingCookies.empty();
+}
+
+TPartitionSourceManager::TModificationBatch TPartitionSourceManager::CreateModificationBatch(const TActorContext& ctx) {
+ const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo()
+ ? ESourceIdFormat::Proto
+ : ESourceIdFormat::Raw;
+ return TModificationBatch(this, format);
+}
+
+void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& ctx) {
+ auto it = PendingCookies.find(ev->Cookie);
+ if (it == PendingCookies.end()) {
+ PQ_LOG_D("Received TEvSourceIdResponse with unknown cookie: " << ev->Cookie);
+ return;
+ }
+ PendingCookies.erase(it);
+
+ if (ev->Get()->Record.HasError()) {
+ PQ_LOG_D("Error on request SourceId: " << ev->Get()->Record.GetError());
+ Responses.clear();
+
+ EnsureSource(ctx);
+ return;
+ }
+
+ Responses.push_back(ev);
+
+ if (PendingCookies.empty()) {
+ FinishBatch(ctx);
+ }
+}
+
+TPartitionSourceManager::TPartitionNode TPartitionSourceManager::GetPartitionNode() const {
+ return Partition->PartitionGraph.GetPartition(Partition->Partition);
+}
+
+TPartitionSourceManager::TSourceIdsPtr TPartitionSourceManager::BuildUnknownSourceIds() const {
+ auto& knownSourceIds = GetSourceIdStorage().GetInMemorySourceIds();
+ auto unknownSourceIds = std::make_shared<std::set<const TString*>>();
+
+ auto predicate = [&](const TString& sourceId) {
+ return !Sources.contains(sourceId) && !knownSourceIds.contains(sourceId);
+ };
+
+ for(const auto& msg : Partition->Requests) {
+ if (msg.IsWrite()) {
+ const TString& sourceId = msg.GetWrite().Msg.SourceId;
+ if (predicate(sourceId)) {
+ unknownSourceIds->insert(&sourceId);
+ }
+ }
+ }
+
+ for(const auto& ev : Partition->MaxSeqNoRequests) {
+ for (const auto& sourceId : ev->Get()->SourceIds) {
+ if (predicate(sourceId)) {
+ unknownSourceIds->insert(&sourceId);
+ }
+ }
+ }
+
+ return unknownSourceIds;
+}
+
+void TPartitionSourceManager::FinishBatch(const TActorContext& ctx) {
+ for(const auto& ev : Responses) {
+ const auto& record = ev->Get()->Record;
+ for(const auto& s : record.GetSource()) {
+ auto& value = Sources[s.GetId()];
+ if (s.GetState() == NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown) {
+ continue;
+ }
+ PQ_LOG_T("Received SourceId " << s.GetId() << " SeqNo=" << s.GetSeqNo());
+ if (value.State == TSourceIdInfo::EState::Unknown || value.SeqNo < s.GetSeqNo()) {
+ value.State = Convert(s.GetState());
+ value.SeqNo = s.GetSeqNo();
+ value.Offset = s.GetOffset();
+ value.Explicit = s.GetExplicit();
+ value.WriteTimestamp.FromValue(s.GetWriteTimestamp());
+ }
+ }
+ }
+
+ Responses.clear();
+
+ if (Partition->CurrentStateFunc() == &TPartition::StateIdle) {
+ Partition->HandleWrites(ctx);
+ }
+ Partition->ProcessMaxSeqNoRequest(ctx);
+}
+
+TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {
+ return Partition->SourceIdStorage;
+}
+
+bool TPartitionSourceManager::HasParents() const {
+ auto node = Partition->PartitionGraph.GetPartition(Partition->Partition);
+ return node && !node.value()->Parents.empty();
+}
+
+
+//
+// TPartitionSourceManager::TModificationBatch
+//
+
+TPartitionSourceManager::TModificationBatch::TModificationBatch(TPartitionSourceManager* manager, ESourceIdFormat format)
+ : Manager(manager)
+ , Node(manager->GetPartitionNode())
+ , SourceIdWriter(format)
+ , HeartbeatEmitter(manager->Partition->SourceIdStorage) {
+}
+
+TPartitionSourceManager::TModificationBatch::~TModificationBatch() {
+ for(auto& [k, _] : SourceIdWriter.GetSourceIdsToWrite()) {
+ Manager->Sources.erase(k);
+ }
+}
+
+TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmit() const {
+ return HeartbeatEmitter.CanEmit();
+}
+
+TPartitionSourceManager::TSourceManager TPartitionSourceManager::TModificationBatch::GetSource(const TString& id) {
+ return TPartitionSourceManager::TSourceManager(this, id);
+}
+
+void TPartitionSourceManager::TModificationBatch::Cancel() {
+ return SourceIdWriter.Clear();
+}
+
+bool TPartitionSourceManager::TModificationBatch::HasModifications() const {
+ return !SourceIdWriter.GetSourceIdsToWrite().empty();
+}
+
+void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) {
+ SourceIdWriter.FillRequest(request, Manager->Partition->Partition);
+}
+
+void TPartitionSourceManager::TModificationBatch::DeregisterSourceId(const TString& sourceId) {
+ SourceIdWriter.DeregisterSourceId(sourceId);
+}
+
+TPartitionSourceManager* TPartitionSourceManager::TModificationBatch::GetManager() const {
+ return Manager;
+}
+
+
+//
+// TPartitionSourceManager::TSourceManager
+//
+
+TPartitionSourceManager::TSourceInfo Convert(TSourceIdInfo value) {
+ TPartitionSourceManager::TSourceInfo result(value.State);
+ result.SeqNo = value.SeqNo;
+ result.Offset = value.Offset;
+ result.Explicit = value.Explicit;
+ result.WriteTimestamp = value.WriteTimestamp;
+ return result;
+}
+
+TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch* batch, const TString& id)
+ : Batch(batch)
+ , SourceId(id) {
+ auto& memory = MemoryStorage();
+ auto& writer = WriteStorage();
+ auto& sources = batch->GetManager()->Sources;
+
+ InMemory = memory.find(id);
+ InWriter = writer.find(id);
+ InSources = sources.end();
+
+ if (InWriter != writer.end()) {
+ Info = Convert(InWriter->second);
+ return;
+ }
+ if (InMemory != memory.end()) {
+ Info = Convert(InMemory->second);
+ return;
+ }
+
+ InSources = sources.find(id);
+ if (InSources != sources.end()) {
+ Info = InSources->second;
+ return;
+ }
+
+ Info.Pending = IsResearchRequires(batch->Node);
+}
+
+std::optional<ui64> TPartitionSourceManager::TSourceManager::SeqNo() const {
+ return Info.State == TSourceIdInfo::EState::Unknown ? std::nullopt : std::optional(Info.SeqNo);
+}
+
+bool TPartitionSourceManager::TSourceManager::Explicit() const {
+ return Info.Explicit;
+}
+
+std::optional<ui64> TPartitionSourceManager::TSourceManager::CommittedSeqNo() const {
+ return InMemory == MemoryStorage().end() ? std::nullopt : std::optional(InMemory->second.SeqNo);
+}
+
+std::optional<ui64> TPartitionSourceManager::TSourceManager::UpdatedSeqNo() const {
+ return InWriter == WriteStorage().end() ? std::nullopt : std::optional(InWriter->second.SeqNo);
+}
+
+void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp) {
+ if (InMemory == MemoryStorage().end()) {
+ Batch->SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp);
+ } else {
+ Batch->SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp));
+ }
+}
+
+void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat) {
+ Batch->HeartbeatEmitter.Process(SourceId, heartbeat);
+ if (InMemory == MemoryStorage().end()) {
+ Batch->SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp, heartbeat);
+ } else {
+ Batch->SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp, std::move(heartbeat)));
+ }
+}
+
+TPartitionSourceManager::TSourceManager::operator bool() const {
+ return Info;
+}
+
+const TSourceIdMap& TPartitionSourceManager::TSourceManager::MemoryStorage() const {
+ return Batch->GetManager()->GetSourceIdStorage().GetInMemorySourceIds();
+}
+
+const TSourceIdMap& TPartitionSourceManager::TSourceManager::WriteStorage() const {
+ return Batch->SourceIdWriter.GetSourceIdsToWrite();
+}
+
+
+//
+// TSourceInfo
+//
+
+TPartitionSourceManager::TSourceInfo::TSourceInfo(TSourceIdInfo::EState state)
+ : State(state) {
+}
+
+TPartitionSourceManager::TSourceInfo::operator bool() const {
+ return !Pending;
+}
+
+
+//
+// TSourceIdRequester
+//
+
+class TSourceIdRequester : public TActorBootstrapped<TSourceIdRequester> {
+ static constexpr TDuration RetryDelay = TDuration::MilliSeconds(100);
+public:
+ TSourceIdRequester(TActorId parent, ui32 partition, ui64 tabletId, TPartitionSourceManager::TSourceIdsPtr& sourceIds, ui64 cookie)
+ : Parent(parent)
+ , Partition(partition)
+ , TabletId(tabletId)
+ , SourceIds(sourceIds)
+ , Cookie(cookie) {
+ }
+
+ void Bootstrap(const TActorContext&) {
+ Become(&TThis::StateWork);
+ MakePipe();
+ }
+
+ void PassAway() override {
+ if (PipeClient) {
+ NTabletPipe::CloseAndForgetClient(SelfId(), PipeClient);
+ }
+ TActorBootstrapped::PassAway();
+ }
+
+private:
+ void MakePipe() {
+ NTabletPipe::TClientConfig config;
+ config.RetryPolicy = {
+ .RetryLimitCount = 6,
+ .MinRetryTime = TDuration::MilliSeconds(10),
+ .MaxRetryTime = TDuration::MilliSeconds(200),
+ .BackoffMultiplier = 2,
+ .DoFirstRetryInstantly = true
+ };
+ PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, config));
+ }
+
+ std::unique_ptr<TEvPQ::TEvSourceIdRequest> CreateRequest() {
+ auto request = std::make_unique<TEvPQ::TEvSourceIdRequest>();
+ auto& record = request->Record;
+ record.SetPartition(Partition);
+ for(const auto& sourceId : *SourceIds) {
+ record.AddSourceId(*sourceId);
+ }
+ return request;
+ }
+
+ void Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& /*ctx*/) {
+ auto msg = std::make_unique<TEvPQ::TEvSourceIdResponse>();
+ msg->Record = std::move(ev->Get()->Record);
+
+ Reply(msg);
+ PassAway();
+ }
+
+ void DoRequest() {
+ NTabletPipe::SendData(SelfId(), PipeClient, CreateRequest().release());
+ }
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
+ if (ev->Get()->Status == NKikimrProto::EReplyStatus::OK) {
+ DoRequest();
+ } else {
+ ReplyWithError("Error connecting to PQ");
+ }
+ }
+
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
+ if (ev->Get()->TabletId == TabletId) {
+ ReplyWithError("Pipe destroyed");
+ }
+ }
+
+ void ReplyWithError(const TString& error) {
+ auto msg = std::make_unique<TEvPQ::TEvSourceIdResponse>();
+ msg->Record.SetError(error);
+
+ Reply(msg);
+
+ PassAway();
+ }
+
+ void Reply(std::unique_ptr<TEvPQ::TEvSourceIdResponse>& msg) {
+ Send(Parent, msg.release(), 0, Cookie);
+ }
+
+ STFUNC(StateWork)
+ {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvPQ::TEvSourceIdResponse, Handle);
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+private:
+ TActorId Parent;
+ ui32 Partition;
+ ui64 TabletId;
+ TPartitionSourceManager::TSourceIdsPtr SourceIds;
+ ui64 Cookie;
+
+ TActorId PipeClient;
+};
+
+IActor* CreateRequester(TActorId parent, ui32 partition, ui64 tabletId, TPartitionSourceManager::TSourceIdsPtr& sourceIds, ui64 cookie) {
+ return new TSourceIdRequester(parent, partition, tabletId, sourceIds, cookie);
+}
+
+bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node) {
+ return node && !node.value()->Parents.empty();
+}
+
+NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value) {
+ switch(value) {
+ case TSourceIdInfo::EState::Unknown:
+ return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown;
+ case TSourceIdInfo::EState::Registered:
+ return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Registered;
+ case TSourceIdInfo::EState::PendingRegistration:
+ return NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_PendingRegistration;
+ }
+}
+
+TSourceIdInfo::EState Convert(NKikimrPQ::TEvSourceIdResponse::EState value) {
+ switch(value) {
+ case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown:
+ return TSourceIdInfo::EState::Unknown;
+ case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Registered:
+ return TSourceIdInfo::EState::Registered;
+ case NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_PendingRegistration:
+ return TSourceIdInfo::EState::PendingRegistration;
+ }
+}
+
+
+
+} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h
new file mode 100644
index 0000000000..6f57ecdb18
--- /dev/null
+++ b/ydb/core/persqueue/partition_sourcemanager.h
@@ -0,0 +1,140 @@
+#pragma once
+
+#include <util/generic/fwd.h>
+#include <ydb/core/persqueue/events/internal.h>
+
+#include "sourceid.h"
+#include "utils.h"
+
+namespace NKikimr::NPQ {
+
+class TPartition;
+
+class TPartitionSourceManager {
+private:
+ using TPartitionNode = std::optional<const TPartitionGraph::Node *>;
+
+public:
+ using TSourceIdsPtr = std::shared_ptr<std::set<const TString*>>;
+ class TModificationBatch;
+
+ struct TSourceInfo {
+ TSourceInfo(TSourceIdInfo::EState state = TSourceIdInfo::EState::Unknown);
+
+ TSourceIdInfo::EState State;
+ ui64 SeqNo = 0;
+ ui64 Offset = 0;
+ bool Explicit = false;
+ TInstant WriteTimestamp;
+
+ bool Pending = false;
+
+ operator bool() const;
+ };
+
+ class TSourceManager {
+ public:
+ TSourceManager(TModificationBatch* batch, const TString& id);
+
+ // Checks whether a message with the specified Sourceid can be processed.
+ // The message can be processed if it is not required to receive information
+ // about it in the parent partitions, or this information has already been received.
+ bool CanProcess() const;
+
+ std::optional<ui64> SeqNo() const;
+ bool Explicit() const;
+
+ std::optional<ui64> CommittedSeqNo() const;
+ std::optional<ui64> UpdatedSeqNo() const;
+
+ void Update(ui64 seqNo, ui64 offset, TInstant timestamp);
+ void Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat);
+
+ operator bool() const;
+
+ private:
+ const TSourceIdMap& MemoryStorage() const;
+ const TSourceIdMap& WriteStorage() const;
+
+
+ private:
+ TModificationBatch* Batch;
+ const TString SourceId;
+
+ TSourceInfo Info;
+
+ TSourceIdMap::const_iterator InMemory;
+ TSourceIdMap::const_iterator InWriter;
+ std::unordered_map<TString, TSourceInfo>::const_iterator InSources;
+ };
+
+ // Encapsulates the logic of SourceId operation during modification
+ class TModificationBatch {
+ friend TSourceManager;
+ public:
+ TModificationBatch(TPartitionSourceManager* manager, ESourceIdFormat format);
+ ~TModificationBatch();
+
+ TMaybe<THeartbeat> CanEmit() const;
+ TSourceManager GetSource(const TString& id);
+
+ void Cancel();
+ bool HasModifications() const;
+ void FillRequest(TEvKeyValue::TEvRequest* request);
+
+ template <typename... Args>
+ void RegisterSourceId(const TString& sourceId, Args&&... args) {
+ SourceIdWriter.RegisterSourceId(sourceId, std::forward<Args>(args)...);
+ }
+ void DeregisterSourceId(const TString& sourceId);
+
+ private:
+ TPartitionSourceManager* GetManager() const;
+
+ private:
+ TPartitionSourceManager* Manager;
+
+ TPartitionNode Node;
+ TSourceIdWriter SourceIdWriter;
+ THeartbeatEmitter HeartbeatEmitter;
+ };
+
+
+ TPartitionSourceManager(TPartition* partition);
+
+ // For a partition obtained as a result of a merge or split, it requests
+ // information about the consumer's parameters from the parent partitions.
+ void EnsureSource(const TActorContext& ctx);
+
+ // Returns true if we expect a response from the parent partitions
+ bool WaitSources() const;
+
+ const TSourceInfo Get(const TString& sourceId) const;
+
+ TModificationBatch CreateModificationBatch(const TActorContext& ctx);
+
+public:
+ void Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& ctx);
+
+private:
+ TPartitionNode GetPartitionNode() const;
+ TSourceIdsPtr BuildUnknownSourceIds() const;
+ void FinishBatch(const TActorContext& ctx);
+
+ TSourceIdStorage& GetSourceIdStorage() const;
+ bool HasParents() const;
+
+private:
+ TPartition* Partition;
+
+ std::unordered_map<TString, TSourceInfo> Sources;
+
+ ui64 Cookie = 0;
+ std::set<ui64> PendingCookies;
+ std::vector<TEvPQ::TEvSourceIdResponse::TPtr> Responses;
+};
+
+NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value);
+TSourceIdInfo::EState Convert(NKikimrPQ::TEvSourceIdResponse::EState value);
+
+} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 528dbc18cf..2c8661eee4 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -1,5 +1,6 @@
#include "event_helpers.h"
#include "mirrorer.h"
+#include "partition_log.h"
#include "partition_util.h"
#include "partition.h"
#include "read.h"
@@ -28,6 +29,8 @@ static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB;
static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB;
static const ui32 MAX_INLINE_SIZE = 1000;
+static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL;
+
void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);
@@ -78,13 +81,13 @@ void TPartition::HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActor
}
void TPartition::HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvUpdateAvailableSize. Partition: " << Partition);
+ PQ_LOG_T("TPartition::HandleOnWrite TEvUpdateAvailableSize.");
UpdateAvailableSize(ctx);
}
void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::CancelAllWritesOnIdle. Partition: " << Partition);
+ PQ_LOG_T("TPartition::CancelAllWritesOnIdle.");
for (const auto& w : Requests) {
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL, "Disk is full");
@@ -106,7 +109,7 @@ void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) {
}
void TPartition::FailBadClient(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::FailBadClient. Partition: " << Partition);
+ PQ_LOG_T("TPartition::FailBadClient.");
for (auto it = Owners.begin(); it != Owners.end();) {
it = DropOwner(it, ctx);
@@ -138,7 +141,7 @@ void TPartition::FailBadClient(const TActorContext& ctx) {
}
void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessChangeOwnerRequest. Partition: " << Partition);
+ PQ_LOG_T("TPartition::ProcessChangeOwnerRequest.");
auto &owner = ev->Owner;
auto it = Owners.find(owner);
@@ -163,7 +166,7 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator& it, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::DropOwner. Partition: " << Partition);
+ PQ_LOG_D("TPartition::DropOwner.");
Y_ABORT_UNLESS(ReservedSize >= it->second.ReservedSize);
ReservedSize -= it->second.ReservedSize;
@@ -179,7 +182,7 @@ THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THas
}
void TPartition::Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvChangeOwner. Partition: " << Partition);
+ PQ_LOG_T("TPartition::HandleOnWrite TEvChangeOwner.");
bool res = OwnerPipes.insert(ev->Get()->PipeClient).second;
Y_ABORT_UNLESS(res);
@@ -188,7 +191,7 @@ void TPartition::Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ct
}
void TPartition::ProcessReserveRequests(const TActorContext& ctx) {
- LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessReserveRequests. Partition: " << Partition);
+ PQ_LOG_T("TPartition::ProcessReserveRequests.");
const ui64 maxWriteInflightSize = Config.GetPartitionConfig().GetMaxWriteInflightSize();
@@ -236,7 +239,7 @@ void TPartition::UpdateWriteBufferIsFullState(const TInstant& now) {
void TPartition::Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvReserveBytes. Partition: " << Partition);
+ PQ_LOG_T("TPartition::HandleOnWrite TEvReserveBytes.");
const TString& ownerCookie = ev->Get()->OwnerCookie;
TStringBuf owner = TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie);
@@ -267,7 +270,7 @@ void TPartition::HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ct
}
void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
- LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AnswerCurrentWrites. Partition: " << Partition);
+ PQ_LOG_T("TPartition::AnswerCurrentWrites. Responses.size()=" << Responses.size());
ui64 offset = EndOffset;
while (!Responses.empty()) {
@@ -306,6 +309,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
offset = *wrOffset;
}
}
+
if (!already && partNo + 1 == totalParts) {
if (it == SourceIdStorage.GetInMemorySourceIds().end()) {
Y_ABORT_UNLESS(!writeResponse.Msg.HeartbeatVersion);
@@ -392,7 +396,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
}
void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::SyncMemoryStateWithKVState. Partition: " << Partition);
+ PQ_LOG_T("TPartition::SyncMemoryStateWithKVState.");
if (!CompactedKeys.empty())
HeadKeys.clear();
@@ -460,13 +464,13 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
}
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvHandleWriteResponse. Partition: " << Partition);
+ PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
HandleWriteResponse(ctx);
}
void TPartition::HandleWriteResponse(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleWriteResponse. Partition: " << Partition);
+ PQ_LOG_T("TPartition::HandleWriteResponse.");
Y_ABORT_UNLESS(CurrentStateFunc() == &TThis::StateWrite);
ui64 prevEndOffset = EndOffset;
@@ -539,7 +543,13 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
}
void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvWrite. Partition: " << Partition);
+ PQ_LOG_T("TPartition::HandleOnWrite TEvWrite.");
+
+ if (!CanEnqueue()) {
+ ReplyError(ctx, ev->Get()->Cookie, InactivePartitionErrorCode,
+ TStringBuilder() << "Write to inactive partition");
+ return;
+ }
ui32 sz = std::accumulate(ev->Get()->Msgs.begin(), ev->Get()->Msgs.end(), 0u, [](ui32 sum, const TEvPQ::TEvWrite::TMsg& msg) {
return sum + msg.Data.size();
@@ -654,7 +664,7 @@ void TPartition::HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TA
}
void TPartition::HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvRegisterMessageGroup. Partition: " << Partition);
+ PQ_LOG_T("TPartition::HandleOnWrite TEvRegisterMessageGroup.");
const auto& body = ev->Get()->Body;
@@ -692,7 +702,7 @@ void TPartition::HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const
}
void TPartition::HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvDeregisterMessageGroup. Partition: " << Partition);
+ PQ_LOG_T("TPartition::HandleOnWrite TEvDeregisterMessageGroup.");
const auto& body = ev->Get()->Body;
@@ -711,7 +721,7 @@ void TPartition::HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActo
}
void TPartition::HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvSplitMessageGroup. Partition: " << Partition);
+ PQ_LOG_T("TPartition::HandleOnWrite TEvSplitMessageGroup.");
if (ev->Get()->Deregistrations.size() > 1) {
return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST,
@@ -768,7 +778,7 @@ std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool
void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessChangeOwnerRequests. Partition: " << Partition);
+ PQ_LOG_T("TPartition::ProcessChangeOwnerRequests.");
while (!WaitToChangeOwner.empty()) {
auto &ev = WaitToChangeOwner.front();
@@ -784,8 +794,8 @@ void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) {
}
}
-void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::CancelAllWritesOnWrite. Partition: " << Partition);
+void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TPartitionSourceManager::TModificationBatch& sourceIdBatch, NPersQueue::NErrorCode::EErrorCode errorCode) {
+ PQ_LOG_T("TPartition::CancelAllWritesOnWrite.");
ReplyError(ctx, p.Cookie, errorCode, errorStr);
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
@@ -793,7 +803,7 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T
FailBadClient(ctx);
NewHead.Clear();
NewHead.Offset = EndOffset;
- sourceIdWriter.Clear();
+ sourceIdBatch.Cancel();
request->Record.Clear();
PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);
CompactedKeys.clear();
@@ -801,7 +811,7 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T
WriteCycleSize = 0;
}
-bool TPartition::ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters) {
+TPartition::ProcessResult TPartition::ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters) {
auto& body = msg.Body;
TMaybe<TPartitionKeyRange> keyRange;
@@ -810,20 +820,20 @@ bool TPartition::ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters
}
body.AssignedOffset = parameters.CurOffset;
- parameters.SourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
+ parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
- return true;
+ return ProcessResult::Continue;
}
-bool TPartition::ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters) {
- parameters.SourceIdWriter.DeregisterSourceId(msg.Body.SourceId);
+TPartition::ProcessResult TPartition::ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters) {
+ parameters.SourceIdBatch.DeregisterSourceId(msg.Body.SourceId);
- return true;
+ return ProcessResult::Continue;
}
-bool TPartition::ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters) {
+TPartition::ProcessResult TPartition::ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters) {
for (auto& body : msg.Deregistrations) {
- parameters.SourceIdWriter.DeregisterSourceId(body.SourceId);
+ parameters.SourceIdBatch.DeregisterSourceId(body.SourceId);
}
for (auto& body : msg.Registrations) {
@@ -833,16 +843,20 @@ bool TPartition::ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& p
}
body.AssignedOffset = parameters.CurOffset;
- parameters.SourceIdWriter.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true);
+ parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true);
}
- return true;
+ return ProcessResult::Continue;
}
-bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx) {
+TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx) {
ui64& curOffset = parameters.CurOffset;
- TSourceIdWriter& sourceIdWriter = parameters.SourceIdWriter;
- THeartbeatEmitter& heartbeatEmitter = parameters.HeartbeatEmitter;
+ auto& sourceIdBatch = parameters.SourceIdBatch;
+ auto sourceId = sourceIdBatch.GetSource(p.Msg.SourceId);
+
+ if (!sourceId) {
+ return ProcessResult::Break;
+ }
WriteInflightSize -= p.Msg.Data.size();
@@ -851,21 +865,16 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv
ui64 poffset = p.Offset ? *p.Offset : curOffset;
- auto it_inMemory = SourceIdStorage.GetInMemorySourceIds().find(p.Msg.SourceId);
- auto it_toWrite = sourceIdWriter.GetSourceIdsToWrite().find(p.Msg.SourceId);
- if (!p.Msg.DisableDeduplication && (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end() && it_inMemory->second.SeqNo >= p.Msg.SeqNo || (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end() && it_toWrite->second.SeqNo >= p.Msg.SeqNo))) {
- bool isWriting = (it_toWrite != sourceIdWriter.GetSourceIdsToWrite().end());
- bool isCommitted = (it_inMemory != SourceIdStorage.GetInMemorySourceIds().end());
-
+ if (!p.Msg.DisableDeduplication && sourceId.SeqNo() && *sourceId.SeqNo() >= p.Msg.SeqNo) {
if (poffset >= curOffset) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"Already written message. Topic: '" << TopicName()
<< "' Partition: " << Partition << " SourceId: '" << EscapeC(p.Msg.SourceId)
- << "'. Message seqNo = " << p.Msg.SeqNo
- << ". Committed seqNo = " << (isCommitted ? it_inMemory->second.SeqNo : 0)
- << (isWriting ? ". Writing seqNo: " : ". ") << (isWriting ? it_toWrite->second.SeqNo : 0)
- << " EndOffset " << EndOffset << " CurOffset " << curOffset << " offset " << poffset
+ << "'. Message seqNo: " << p.Msg.SeqNo
+ << ". Committed seqNo: " << sourceId.CommittedSeqNo()
+ << ". Writing seqNo: " << sourceId.UpdatedSeqNo()
+ << ". EndOffset: " << EndOffset << ". CurOffset: " << curOffset << ". Offset: " << poffset
);
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
@@ -876,19 +885,19 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv
}
TString().swap(p.Msg.Data);
- return true;
+ return ProcessResult::Continue;
}
if (const auto& hbVersion = p.Msg.HeartbeatVersion) {
- if (it_inMemory == SourceIdStorage.GetInMemorySourceIds().end()) {
+ if (!sourceId.SeqNo()) {
CancelAllWritesOnWrite(ctx, request, TStringBuilder()
- << "Cannot apply heartbeat on unknown sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdWriter);
- return false;
+ << "Cannot apply heartbeat on unknown sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdBatch);
+ return ProcessResult::Abort;
}
- if (!it_inMemory->second.Explicit) {
+ if (!sourceId.Explicit()) {
CancelAllWritesOnWrite(ctx, request, TStringBuilder()
- << "Cannot apply heartbeat on implcit sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdWriter);
- return false;
+ << "Cannot apply heartbeat on implcit sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdBatch);
+ return ProcessResult::Abort;
}
LOG_DEBUG_S(
@@ -903,20 +912,17 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv
.Data = p.Msg.Data,
};
- heartbeatEmitter.Process(p.Msg.SourceId, heartbeat);
- sourceIdWriter.RegisterSourceId(p.Msg.SourceId, it_inMemory->second.Updated(
- p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat)
- ));
+ sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat));
- return true;
+ return ProcessResult::Continue;
}
if (poffset < curOffset) { //too small offset
CancelAllWritesOnWrite(ctx, request,
TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo
<< " partNo: " << p.Msg.PartNo << " has incorrect offset " << poffset << ", must be at least " << curOffset,
- p, sourceIdWriter, NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET);
- return false;
+ p, sourceIdBatch, NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET);
+ return ProcessResult::Abort;
}
Y_ABORT_UNLESS(poffset >= curOffset);
@@ -928,8 +934,8 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv
TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo
<< " partNo: " << p.Msg.PartNo << " has gap inside partitioned message, incorrect offset "
<< poffset << ", must be " << curOffset,
- p, sourceIdWriter);
- return false;
+ p, sourceIdBatch);
+ return ProcessResult::Abort;
}
curOffset = poffset;
}
@@ -971,9 +977,9 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv
TString s;
if (!PartitionedBlob.IsNextPart(p.Msg.SourceId, p.Msg.SeqNo, p.Msg.PartNo, &s)) {
//this must not be happen - client sends gaps, fail this client till the end
- CancelAllWritesOnWrite(ctx, request, s, p, sourceIdWriter);
+ CancelAllWritesOnWrite(ctx, request, s, p, sourceIdBatch);
//now no changes will leak
- return false;
+ return ProcessResult::Abort;
}
WriteNewSize += p.Msg.SourceId.size() + p.Msg.Data.size();
@@ -1108,11 +1114,7 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv
<< " NewHead: " << NewHead
);
- if (it_inMemory == SourceIdStorage.GetInMemorySourceIds().end()) {
- sourceIdWriter.RegisterSourceId(p.Msg.SourceId, p.Msg.SeqNo, curOffset, CurrentTimestamp);
- } else {
- sourceIdWriter.RegisterSourceId(p.Msg.SourceId, it_inMemory->second.Updated(p.Msg.SeqNo, curOffset, CurrentTimestamp));
- }
+ sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp);
++curOffset;
PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);
@@ -1120,15 +1122,15 @@ bool TPartition::ProcessRequest(TWriteMsg& p, ProcessParameters& parameters, TEv
TString().swap(p.Msg.Data);
- return true;
+ return ProcessResult::Continue;
}
bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx,
- TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter)
+ TPartitionSourceManager::TModificationBatch& sourceIdBatch)
{
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AppendHeadWithNewWrites. Partition: " << Partition);
+ PQ_LOG_T("TPartition::AppendHeadWithNewWrites.");
- ProcessParameters parameters(sourceIdWriter, heartbeatEmitter);
+ ProcessParameters parameters(sourceIdBatch);
parameters.CurOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset;
WriteCycleSize = 0;
@@ -1152,28 +1154,35 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
//Process is following: if batch contains already written messages or only one client message part -> unpack it and process as several TClientBlobs
//otherwise write this batch as is to head;
- while (!Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big
+ bool run = true;
+ while (run && !Requests.empty() && WriteCycleSize < MAX_WRITE_CYCLE_SIZE) { //head is not too big
auto pp = Requests.front();
Requests.pop_front();
- bool processed = true;
+ ProcessResult result = ProcessResult::Continue;
if (pp.IsWrite()) {
- processed = ProcessRequest(pp.GetWrite(), parameters, request, ctx);
+ result = ProcessRequest(pp.GetWrite(), parameters, request, ctx);
} else if (pp.IsRegisterMessageGroup()) {
- processed = ProcessRequest(pp.GetRegisterMessageGroup(), parameters);
+ result = ProcessRequest(pp.GetRegisterMessageGroup(), parameters);
} else if (pp.IsDeregisterMessageGroup()) {
- processed = ProcessRequest(pp.GetDeregisterMessageGroup(), parameters);
+ result = ProcessRequest(pp.GetDeregisterMessageGroup(), parameters);
} else if (pp.IsSplitMessageGroup()) {
- processed = ProcessRequest(pp.GetSplitMessageGroup(), parameters);
+ result = ProcessRequest(pp.GetSplitMessageGroup(), parameters);
} else {
Y_ABORT_UNLESS(pp.IsOwnership());
}
- if (!processed) {
- return false;
+ switch (result) {
+ case ProcessResult::Abort:
+ return false;
+ case ProcessResult::Break:
+ Requests.push_front(pp);
+ run = false;
+ break;
+ case ProcessResult::Continue:
+ EmplaceResponse(std::move(pp), ctx);
+ break;
}
-
- EmplaceResponse(std::move(pp), ctx);
}
UpdateWriteBufferIsFullState(ctx.Now());
@@ -1233,7 +1242,7 @@ std::pair<TKey, ui32> TPartition::GetNewWriteKey(bool headCleared) {
}
void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AddNewWriteBlob. Partition: " << Partition);
+ PQ_LOG_T("TPartition::AddNewWriteBlob.");
const auto& key = res.first;
@@ -1321,7 +1330,7 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
}
void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::SetDeadlinesForWrites. Partition: " << Partition);
+ PQ_LOG_T("TPartition::SetDeadlinesForWrites.");
if (AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs() > 0 && QuotaDeadline == TInstant::Zero()) {
QuotaDeadline = ctx.Now() + TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs());
@@ -1331,13 +1340,13 @@ void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) {
}
void TPartition::Handle(TEvPQ::TEvQuotaDeadlineCheck::TPtr&, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::Handle TEvQuotaDeadlineCheck. Partition: " << Partition);
+ PQ_LOG_T("TPartition::Handle TEvQuotaDeadlineCheck.");
FilterDeadlinedWrites(ctx);
}
bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, const TActorContext& ctx) {
- LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessWrites. Partition: " << Partition);
+ PQ_LOG_T("TPartition::ProcessWrites.");
FilterDeadlinedWrites(ctx);
@@ -1364,14 +1373,9 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c
Y_ABORT_UNLESS(request->Record.CmdRenameSize() == 0);
Y_ABORT_UNLESS(request->Record.CmdDeleteRangeSize() == 0);
- const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo()
- ? ESourceIdFormat::Proto
- : ESourceIdFormat::Raw;
- TSourceIdWriter sourceIdWriter(format);
- THeartbeatEmitter heartbeatEmitter(SourceIdStorage);
-
- bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter, heartbeatEmitter);
+ auto sourceIdBatch = SourceManager.CreateModificationBatch(ctx);
+ bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdBatch);
if (headCleared) {
Y_ABORT_UNLESS(!CompactedKeys.empty() || Head.PackedSize == 0);
for (ui32 i = 0; i < TotalLevels; ++i) {
@@ -1379,7 +1383,7 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c
}
}
- if (const auto heartbeat = heartbeatEmitter.CanEmit()) {
+ if (const auto heartbeat = sourceIdBatch.CanEmit()) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"Topic '" << TopicName() << "' partition " << Partition
@@ -1408,17 +1412,17 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c
}
if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed
- if (sourceIdWriter.GetSourceIdsToWrite().empty()) {
+ if (!sourceIdBatch.HasModifications()) {
return request->Record.CmdWriteSize() > 0
|| request->Record.CmdRenameSize() > 0
|| request->Record.CmdDeleteRangeSize() > 0;
} else {
- sourceIdWriter.FillRequest(request, Partition);
+ sourceIdBatch.FillRequest(request);
return true;
}
}
- sourceIdWriter.FillRequest(request, Partition);
+ sourceIdBatch.FillRequest(request);
std::pair<TKey, ui32> res = GetNewWriteKey(headCleared);
const auto& key = res.first;
@@ -1440,7 +1444,7 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) {
if (QuotaDeadline == TInstant::Zero() || QuotaDeadline > ctx.Now())
return;
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::FilterDeadlinedWrites. Partition: " << Partition);
+ PQ_LOG_T("TPartition::FilterDeadlinedWrites.");
std::deque<TMessage> newRequests;
for (auto& w : Requests) {
@@ -1466,7 +1470,27 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) {
void TPartition::HandleWrites(const TActorContext& ctx) {
- LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleWrites. Partition: " << Partition);
+ if (!CanWrite()) {
+ if (CanEnqueue()) {
+ return;
+ } else {
+ for(const auto& r : ReserveRequests) {
+ ReplyError(ctx, r->Cookie, InactivePartitionErrorCode,
+ TStringBuilder() << "Write to inactive partition");
+ }
+ ReserveRequests.clear();
+ for(const auto& r : Requests) {
+ ReplyError(ctx, r.GetCookie(), InactivePartitionErrorCode,
+ TStringBuilder() << "Write to inactive partition");
+ }
+ Requests.clear();
+ return;
+ }
+ }
+
+ PQ_LOG_T("TPartition::HandleWrites. Requests.size()=" << Requests.size());
+
+ SourceManager.EnsureSource(ctx);
Become(&TThis::StateWrite);
@@ -1495,7 +1519,11 @@ void TPartition::HandleWrites(const TActorContext& ctx) {
bool res = ProcessWrites(request.Get(), now, ctx);
Y_ABORT_UNLESS(!res);
}
- Y_ABORT_UNLESS(Requests.empty() || !WriteQuota->CanExaust(now) || WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx)); //in this case all writes must be processed or no quota left
+ Y_ABORT_UNLESS(Requests.empty()
+ || !WriteQuota->CanExaust(now)
+ || WaitingForPreviousBlobQuota()
+ || WaitingForSubDomainQuota(ctx)
+ || SourceManager.WaitSources()); //in this case all writes must be processed or no quota left
AnswerCurrentWrites(ctx); //in case if all writes are already done - no answer will be called on kv write, no kv write at all
BecomeIdle(ctx);
return;
@@ -1541,8 +1569,8 @@ bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 w
return MeteringDataSize(ctx) + withSize > ReserveSize();
}
-void TPartition::WriteBlobWithQuota(const TActorContext& ctx, THolder<TEvKeyValue::TEvRequest>&& request) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::WriteBlobWithQuota. Partition: " << Partition);
+void TPartition::WriteBlobWithQuota(const TActorContext& /*ctx*/, THolder<TEvKeyValue::TEvRequest>&& request) {
+ PQ_LOG_T("TPartition::WriteBlobWithQuota.");
// Request quota and write blob.
// Mirrored topics are not quoted in local dc.
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 9d79ec3440..4a08925350 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1163,26 +1163,33 @@ void TPersQueue::Handle(TEvPQ::TEvTabletCacheCounters::TPtr& ev, const TActorCon
CacheCounters = ev->Get()->Counters;
SetCacheCounters(CacheCounters);
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " topic '" << TopicConverter->GetClientsideName()
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " topic '" << (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined")
<< "Counters. CacheSize " << CacheCounters.CacheSizeBytes << " CachedBlobs " << CacheCounters.CacheSizeBlobs);
}
void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& ctx)
{
- auto it = Partitions.find(ev->Get()->Partition);
+ const auto partitionId = ev->Get()->Partition;
+ auto it = Partitions.find(partitionId);
Y_ABORT_UNLESS(it != Partitions.end());
Y_ABORT_UNLESS(!it->second.InitDone);
it->second.InitDone = true;
++PartitionsInited;
Y_ABORT_UNLESS(ConfigInited);//partitions are inited only after config
- if (!InitCompleted && PartitionsInited == Partitions.size()) {
+ auto allInitialized = PartitionsInited == Partitions.size();
+ if (!InitCompleted && allInitialized) {
OnInitComplete(ctx);
}
- if (NewConfigShouldBeApplied && PartitionsInited == Partitions.size()) {
+ if (NewConfigShouldBeApplied && allInitialized) {
ApplyNewConfigAndReply(ctx);
}
+
+ ProcessSourceIdRequests(partitionId);
+ if (allInitialized) {
+ SourceIdRequests.clear();
+ }
}
void TPersQueue::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx)
@@ -1221,7 +1228,7 @@ void TPersQueue::FinishResponse(THashMap<ui64, TAutoPtr<TResponseBuilder>>::iter
void TPersQueue::Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorContext& ctx)
-{
+{
if (!ConfigInited) {
UpdateConfigRequests.emplace_back(ev->Release(), ev->Sender);
return;
@@ -1844,7 +1851,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"Tablet " << TabletID() <<
- " got client PART message topic: " << TopicConverter->GetClientsideName() << " partition: " << req.GetPartition()
+ " got client PART message topic: " << (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined") << " partition: " << req.GetPartition()
<< " SourceId: \'" << EscapeC(msgs.back().SourceId) << "\' SeqNo: "
<< msgs.back().SeqNo << " partNo : " << msgs.back().PartNo
<< " messageNo: " << req.GetMessageNo() << " size: " << data.size()
@@ -1878,7 +1885,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
}
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
"Tablet " << TabletID() <<
- " got client message topic: " << TopicConverter->GetClientsideName() <<
+ " got client message topic: " << (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined") <<
" partition: " << req.GetPartition() <<
" SourceId: \'" << EscapeC(msgs.back().SourceId) <<
"\' SeqNo: " << msgs.back().SeqNo << " partNo : " << msgs.back().PartNo <<
@@ -2171,7 +2178,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext&
ui32 partition = req.GetPartition();
auto it = Partitions.find(partition);
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic " << TopicConverter->GetClientsideName() << " partition " << partition);
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '"
+ << (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined") << "' partition " << partition);
if (it == Partitions.end()) {
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::WRONG_PARTITION_NUMBER,
@@ -3613,6 +3621,38 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, con
ctx.Send(ev->Sender, new TEvPersQueue::TEvProposeTransactionAttachResult(TabletID(), txId, status), 0, ev->Cookie);
}
+void TPersQueue::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx) {
+ auto& record = ev->Get()->Record;
+ auto it = Partitions.find(record.GetPartition());
+ if (it == Partitions.end()) {
+ LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition());
+
+ auto response = THolder<TEvPQ::TEvSourceIdResponse>();
+ response->Record.SetError("Partition was not found");
+ Send(ev->Sender, response.Release());
+
+ return;
+ }
+
+ if (it->second.InitDone) {
+ Forward(ev, it->second.Actor);
+ } else {
+ SourceIdRequests[record.GetPartition()].push_back(ev);
+ }
+}
+
+void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) {
+ auto sit = SourceIdRequests.find(partitionId);
+ if (sit != SourceIdRequests.end()) {
+ auto it = Partitions.find(partitionId);
+ for (auto& r : sit->second) {
+ Forward(r, it->second.Actor);
+ }
+ SourceIdRequests.erase(partitionId);
+ }
+}
+
+
bool TPersQueue::HandleHook(STFUNC_SIG)
{
SetActivityType(NKikimrServices::TActivity::PERSQUEUE_ACTOR);
@@ -3654,6 +3694,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
HFuncTraced(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
HFuncTraced(TEvPersQueue::TEvCancelTransactionProposal, Handle);
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
+ HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
default:
return false;
}
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index f30645995f..6b31eb64fc 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -157,6 +157,9 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
ui64 GetAllowedStep() const;
+ void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
+ void ProcessSourceIdRequests(ui32 partitionId);
+
static constexpr const char * KeyConfig() { return "_config"; }
static constexpr const char * KeyState() { return "_state"; }
static constexpr const char * KeyTxInfo() { return "_txinfo"; }
@@ -375,6 +378,8 @@ private:
bool CanProcessDeleteTxs() const;
bool UseMediatorTimeCast = true;
+
+ THashMap<ui32, TVector<TEvPQ::TEvSourceIdRequest::TPtr>> SourceIdRequests;
};
diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp
index ee78d0674c..69216b487e 100644
--- a/ydb/core/persqueue/sourceid.cpp
+++ b/ydb/core/persqueue/sourceid.cpp
@@ -116,6 +116,15 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs)
{
}
+TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat& heartbeat)
+ : SeqNo(seqNo)
+ , Offset(offset)
+ , WriteTimestamp(createTs)
+ , CreateTimestamp(createTs)
+ , LastHeartbeat(heartbeat)
+{
+}
+
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit)
: SeqNo(seqNo)
, Offset(offset)
diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h
index 1c6cfd34bf..c5ac29cd81 100644
--- a/ydb/core/persqueue/sourceid.h
+++ b/ydb/core/persqueue/sourceid.h
@@ -34,6 +34,7 @@ struct TSourceIdInfo {
TSourceIdInfo() = default;
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs);
+ TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat& heartbeat);
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false);
TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const;
diff --git a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt
index a1a9555e7e..2f3840b3b6 100644
--- a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt
@@ -34,6 +34,8 @@ target_link_libraries(ydb-core-persqueue-ut PUBLIC
persqueue-ut-common
core-testlib-default
ydb_persqueue_core-ut-ut_utils
+ ydb_topic-ut-ut_utils
+ tx-schemeshard-ut_helpers
library-cpp-resource
)
target_link_options(ydb-core-persqueue-ut PRIVATE
@@ -52,9 +54,11 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partitiongraph_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/splitmerge_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp
diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
index c97fc74541..65612f86de 100644
--- a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
@@ -34,6 +34,8 @@ target_link_libraries(ydb-core-persqueue-ut PUBLIC
persqueue-ut-common
core-testlib-default
ydb_persqueue_core-ut-ut_utils
+ ydb_topic-ut-ut_utils
+ tx-schemeshard-ut_helpers
library-cpp-resource
)
target_link_options(ydb-core-persqueue-ut PRIVATE
@@ -55,9 +57,11 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partitiongraph_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/splitmerge_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp
diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt
index a5cbaf7829..29e6d7b08e 100644
--- a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt
@@ -35,6 +35,8 @@ target_link_libraries(ydb-core-persqueue-ut PUBLIC
persqueue-ut-common
core-testlib-default
ydb_persqueue_core-ut-ut_utils
+ ydb_topic-ut-ut_utils
+ tx-schemeshard-ut_helpers
library-cpp-resource
)
target_link_options(ydb-core-persqueue-ut PRIVATE
@@ -56,9 +58,11 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partitiongraph_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/splitmerge_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp
diff --git a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt
index f5a43dc332..347416de43 100644
--- a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt
@@ -34,6 +34,8 @@ target_link_libraries(ydb-core-persqueue-ut PUBLIC
persqueue-ut-common
core-testlib-default
ydb_persqueue_core-ut-ut_utils
+ ydb_topic-ut-ut_utils
+ tx-schemeshard-ut_helpers
library-cpp-resource
)
target_sources(ydb-core-persqueue-ut PRIVATE
@@ -45,9 +47,11 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partition_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/partitiongraph_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/quota_tracker_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/splitmerge_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index 948224d296..7e4b911c68 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -916,7 +916,8 @@ void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui
UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdReadResult());
auto res = result->Record.GetPartitionResponse().GetCmdReadResult();
- UNIT_ASSERT_EQUAL(res.ResultSize(), resCount);
+ UNIT_ASSERT_EQUAL_C(res.ResultSize(), resCount,
+ "Result size missmatch: expected " << resCount << " but received " << res.ResultSize());
ui64 off = offset;
for (ui32 i = 0; i < resCount; ++i) {
diff --git a/ydb/core/persqueue/ut/fetch_request_ut.cpp b/ydb/core/persqueue/ut/fetch_request_ut.cpp
index b6d928c9f0..b1db820001 100644
--- a/ydb/core/persqueue/ut/fetch_request_ut.cpp
+++ b/ydb/core/persqueue/ut/fetch_request_ut.cpp
@@ -89,6 +89,8 @@ Y_UNIT_TEST_SUITE(TFetchRequestTests) {
auto& runtime = setup->GetRuntime();
StartSchemeCache(runtime);
+ runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_TRACE);
+
ui32 totalPartitions = 5;
setup->CreateTopic("topic1", "dc1", totalPartitions);
@@ -103,6 +105,7 @@ Y_UNIT_TEST_SUITE(TFetchRequestTests) {
auto ev = runtime.GrabEdgeEvent<TEvPQ::TEvFetchResponse>();
UNIT_ASSERT_C(ev->Status == Ydb::StatusIds::SCHEME_ERROR, ev->Message);
}
+
Y_UNIT_TEST(CheckAccess) {
auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
auto& runtime = setup->GetRuntime();
diff --git a/ydb/core/persqueue/ut/partitiongraph_ut.cpp b/ydb/core/persqueue/ut/partitiongraph_ut.cpp
new file mode 100644
index 0000000000..0003f18cec
--- /dev/null
+++ b/ydb/core/persqueue/ut/partitiongraph_ut.cpp
@@ -0,0 +1,88 @@
+
+#include <ydb/core/persqueue/utils.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NKikimr::NPQ;
+
+Y_UNIT_TEST_SUITE(TPartitionGraphTest) {
+ Y_UNIT_TEST(BuildGraph) {
+
+ // 0 ------------
+ // 1 -|
+ // |- 3 -|
+ // 2 -| |- 5
+ // 4 -------|
+ //
+ NKikimrPQ::TPQTabletConfig config;
+
+ // Without parents and childrens
+ auto* p0 = config.AddAllPartitions();
+ p0->SetPartitionId(0);
+
+ auto* p1 = config.AddAllPartitions();
+ p1->SetPartitionId(1);
+ p1->AddChildPartitionIds(3);
+
+ auto* p2 = config.AddAllPartitions();
+ p2->SetPartitionId(2);
+ p2->AddChildPartitionIds(3);
+
+ auto* p3 = config.AddAllPartitions();
+ p3->SetPartitionId(3);
+ p3->AddChildPartitionIds(5);
+ p3->AddParentPartitionIds(1);
+ p3->AddParentPartitionIds(2);
+
+ auto* p4 = config.AddAllPartitions();
+ p4->SetPartitionId(4);
+ p4->AddChildPartitionIds(5);
+
+ auto* p5 = config.AddAllPartitions();
+ p5->SetPartitionId(5);
+ p5->AddParentPartitionIds(3);
+ p5->AddParentPartitionIds(4);
+
+ TPartitionGraph graph;
+ graph.Rebuild(config);
+
+ const auto n0o = graph.GetPartition(0);
+ const auto n1o = graph.GetPartition(1);
+ const auto n2o = graph.GetPartition(2);
+ const auto n3o = graph.GetPartition(3);
+ const auto n4o = graph.GetPartition(4);
+ const auto n5o = graph.GetPartition(5);
+
+ UNIT_ASSERT(n0o);
+ UNIT_ASSERT(n1o);
+ UNIT_ASSERT(n2o);
+ UNIT_ASSERT(n3o);
+ UNIT_ASSERT(n4o);
+ UNIT_ASSERT(n5o);
+
+ auto& n0 = *n0o.value();
+ auto& n1 = *n1o.value();
+ auto& n2 = *n2o.value();
+ auto& n3 = *n3o.value();
+ auto& n4 = *n4o.value();
+ auto& n5 = *n5o.value();
+
+
+ UNIT_ASSERT_EQUAL(n0.Parents.size(), 0);
+ UNIT_ASSERT_EQUAL(n0.Children.size(), 0);
+ UNIT_ASSERT_EQUAL(n0.HierarhicalParents.size(), 0);
+
+ UNIT_ASSERT_EQUAL(n1.Parents.size(), 0);
+ UNIT_ASSERT_EQUAL(n1.Children.size(), 1);
+ UNIT_ASSERT_EQUAL(n1.HierarhicalParents.size(), 0);
+
+ UNIT_ASSERT_EQUAL_C(n5.Parents.size(), 2, "n5.Parents.size() == " << n5.Parents.size() << " but expected 2");
+ UNIT_ASSERT_EQUAL_C(n5.Children.size(), 0, "n5.Children.size() == " << n5.Children.size() << " but expected 0");
+ UNIT_ASSERT_EQUAL_C(n5.HierarhicalParents.size(), 4, "n5.HierarhicalParents.size() == " << n5.HierarhicalParents.size() << " but expected 4");
+ UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n0) == n5.HierarhicalParents.end());
+ UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n1) != n5.HierarhicalParents.end());
+ UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n2) != n5.HierarhicalParents.end());
+ UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n3) != n5.HierarhicalParents.end());
+ UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n4) != n5.HierarhicalParents.end());
+ }
+}
diff --git a/ydb/core/persqueue/ut/splitmerge_ut.cpp b/ydb/core/persqueue/ut/splitmerge_ut.cpp
new file mode 100644
index 0000000000..7799b07ca9
--- /dev/null
+++ b/ydb/core/persqueue/ut/splitmerge_ut.cpp
@@ -0,0 +1,379 @@
+#include <ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
+#include <ydb/core/tx/schemeshard/ut_helpers/test_env.h>
+
+namespace NKikimr {
+
+using namespace NYdb::NTopic;
+using namespace NYdb::NTopic::NTests;
+using namespace NSchemeShardUT_Private;
+
+
+// TODO
+static constexpr ui64 SS = 72057594046644480l;
+
+auto CreateTransaction(const TString& parentPath, ::NKikimrSchemeOp::TPersQueueGroupDescription& scheme) {
+ NKikimrSchemeOp::TModifyScheme tx;
+ tx.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
+ tx.SetWorkingDir(parentPath);
+ tx.MutableAlterPersQueueGroup()->CopyFrom(scheme);
+ return tx;
+}
+
+TEvTx* CreateRequest(ui64 txId, NKikimrSchemeOp::TModifyScheme&& tx) {
+ auto ev = new TEvTx(txId, SS);
+ *ev->Record.AddTransaction() = std::move(tx);
+ return ev;
+}
+
+void DoRequest(TTopicSdkTestSetup& setup, ui64& txId, NKikimrSchemeOp::TPersQueueGroupDescription& scheme) {
+ Cerr << "ALTER_SCHEME: " << scheme << Endl;
+
+ const auto sender = setup.GetRuntime().AllocateEdgeActor();
+ const auto request = CreateRequest(txId, CreateTransaction("/Root", scheme));
+ setup.GetRuntime().Send(new IEventHandle(
+ MakeTabletResolverID(),
+ sender,
+ new TEvTabletResolver::TEvForward(
+ SS,
+ new IEventHandle(TActorId(), sender, request),
+ { },
+ TEvTabletResolver::TEvForward::EActor::Tablet
+ )),
+ 0);
+
+ auto subscriber = CreateNotificationSubscriber(setup.GetRuntime(), SS);
+ setup.GetRuntime().Send(new IEventHandle(subscriber, sender, new TEvSchemeShard::TEvNotifyTxCompletion(txId)));
+ TAutoPtr<IEventHandle> handle;
+ auto event = setup.GetRuntime().GrabEdgeEvent<TEvSchemeShard::TEvNotifyTxCompletionResult>(handle);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EQUAL(event->Record.GetTxId(), txId);
+
+ auto e = setup.GetRuntime().GrabEdgeEvent<TEvSchemeShard::TEvModifySchemeTransactionResult>(handle);
+ UNIT_ASSERT_EQUAL_C(e->Record.GetStatus(), TEvSchemeShard::EStatus::StatusAccepted,
+ "Unexpected status " << NKikimrScheme::EStatus_Name(e->Record.GetStatus()) << " " << e->Record.GetReason());
+}
+
+void SplitPartition(TTopicSdkTestSetup& setup, ui64& txId, const ui32 partition, TString boundary) {
+ ::NKikimrSchemeOp::TPersQueueGroupDescription scheme;
+ scheme.SetName(TEST_TOPIC);
+ auto* split = scheme.AddSplit();
+ split->SetPartition(partition);
+ split->SetSplitBoundary(boundary);
+
+ DoRequest(setup, txId, scheme);
+}
+
+void MergePartition(TTopicSdkTestSetup& setup, ui64& txId, const ui32 partitionLeft, const ui32 partitionRight) {
+ ::NKikimrSchemeOp::TPersQueueGroupDescription scheme;
+ scheme.SetName(TEST_TOPIC);
+ auto* merge = scheme.AddMerge();
+ merge->SetPartition(partitionLeft);
+ merge->SetAdjacentPartition(partitionRight);
+
+ DoRequest(setup, txId, scheme);
+}
+
+auto Msg(const TString& data, ui64 seqNo) {
+ TWriteMessage msg(data);
+ msg.SeqNo(seqNo);
+ return msg;
+}
+
+
+
+Y_UNIT_TEST_SUITE(TopicSplitMerge) {
+ Y_UNIT_TEST(PartitionSplit) {
+ TTopicSdkTestSetup setup("TopicSplitMerge", TTopicSdkTestSetup::MakeServerSettings(), false);
+
+ auto& ff = setup.GetRuntime().GetAppData().FeatureFlags;
+ ff.SetEnableTopicSplitMerge(true);
+ ff.SetEnablePQConfigTransactionsAtSchemeShard(true);
+
+ setup.CreateTopic();
+
+ TTopicClient client = setup.MakeClient();
+
+ setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+ setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE);
+
+ TString producer1 = "producer-1";
+ TString producer2 = "producer-2";
+ TString producer3 = "producer-3";
+
+ auto writeSettings1 = TWriteSessionSettings()
+ .Path(TEST_TOPIC)
+ .ProducerId(producer1)
+ .MessageGroupId(producer1);
+ auto writeSession1 = client.CreateSimpleBlockingWriteSession(writeSettings1);
+
+ auto writeSettings2 = TWriteSessionSettings()
+ .Path(TEST_TOPIC)
+ .ProducerId(producer2)
+ .MessageGroupId(producer2);
+ auto writeSession2 = client.CreateSimpleBlockingWriteSession(writeSettings2);
+
+ auto writeSettings3 = TWriteSessionSettings()
+ .Path(TEST_TOPIC)
+ .ProducerId(producer3)
+ .PartitionId(0);
+ auto writeSession3 = client.CreateSimpleBlockingWriteSession(writeSettings3);
+
+ Cerr << ">>>>> 1 " << Endl;
+
+ struct MsgInfo {
+ ui64 PartitionId;
+ ui64 SeqNo;
+ ui64 Offset;
+ TString Data;
+ };
+
+ NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>();
+ std::vector<MsgInfo> receivedMessages;
+ std::set<size_t> partitions;
+
+ auto readSettings = TReadSessionSettings()
+ .ConsumerName(TEST_CONSUMER)
+ .AppendTopics(TEST_TOPIC);
+
+ readSettings.EventHandlers_.SimpleDataHandlers(
+ [&]
+ (TReadSessionEvent::TDataReceivedEvent& ev) mutable {
+ auto& messages = ev.GetMessages();
+ for (size_t i = 0u; i < messages.size(); ++i) {
+ auto& message = messages[i];
+
+ Cerr << ">>>>> Received TDataReceivedEvent message partitionId=" << message.GetPartitionSession()->GetPartitionId()
+ << ", message=" << message.GetData()
+ << ", seqNo=" << message.GetSeqNo()
+ << ", offset=" << message.GetOffset()
+ << Endl;
+ receivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(),
+ message.GetSeqNo(),
+ message.GetOffset(),
+ message.GetData()});
+ }
+
+ if (receivedMessages.size() == 5) {
+ checkedPromise.SetValue();
+ }
+ });
+
+ readSettings.EventHandlers_.StartPartitionSessionHandler(
+ [&]
+ (TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable {
+ Cerr << ">>>>> Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl;
+ partitions.insert(ev.GetPartitionSession()->GetPartitionId());
+ ev.Confirm();
+ });
+
+ auto readSession = client.CreateReadSession(readSettings);
+
+ Cerr << ">>>>> 2 " << Endl;
+
+ UNIT_ASSERT(writeSession1->Write(Msg("message_1.1", 2)));
+ UNIT_ASSERT(writeSession2->Write(Msg("message_2.1", 3)));
+ UNIT_ASSERT(writeSession3->Write(Msg("message_3.1", 1)));
+
+ Cerr << ">>>>> 3 " << Endl;
+
+ ui64 txId = 0;
+ SplitPartition(setup, ++txId, 0, "a");
+
+ Cerr << ">>>>> 4 " << Endl;
+
+ writeSession1->Write(Msg("message_1.2_2", 2)); // Will be ignored because duplicated SeqNo
+ writeSession3->Write(Msg("message_3.2", 11)); // Will be fail because partition is not writable after split
+ writeSession1->Write(Msg("message_1.2", 5));
+ writeSession2->Write(Msg("message_2.2", 7));
+
+ Cerr << ">>>>> 5 " << Endl;
+
+ checkedPromise.GetFuture().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(5, receivedMessages.size());
+
+ Cerr << ">>>>> 6 " << Endl;
+
+ for(const auto& info : receivedMessages) {
+ if (info.Data == "message_1.1") {
+ UNIT_ASSERT_C(1, info.PartitionId);
+ UNIT_ASSERT_C(2, info.SeqNo);
+ UNIT_ASSERT_C(1, info.Offset);
+ } else if (info.Data == "message_2.1") {
+ UNIT_ASSERT_C(1, info.PartitionId);
+ UNIT_ASSERT_C(3, info.SeqNo);
+ UNIT_ASSERT_C(1, info.Offset);
+ } else if (info.Data == "message_1.2") {
+ UNIT_ASSERT_C(2, info.PartitionId);
+ UNIT_ASSERT_C(5, info.SeqNo);
+ UNIT_ASSERT_C(1, info.Offset);
+ } else if (info.Data == "message_2.2") {
+ UNIT_ASSERT_C(3, info.PartitionId);
+ UNIT_ASSERT_C(7, info.SeqNo);
+ UNIT_ASSERT_C(1, info.Offset);
+ } else if (info.Data == "message_3.1") {
+ UNIT_ASSERT_C(1, info.PartitionId);
+ UNIT_ASSERT_C(1, info.SeqNo);
+ UNIT_ASSERT_C(1, info.Offset);
+ } else {
+ UNIT_ASSERT_C(false, "Unexpected message: " << info.Data);
+ }
+ }
+
+ Cerr << ">>>>> 7 " << Endl;
+
+ writeSession1->Close(TDuration::Seconds(1));
+ writeSession2->Close(TDuration::Seconds(1));
+ writeSession3->Close(TDuration::Seconds(1));
+ readSession->Close(TDuration::Seconds(1));
+
+ Cerr << ">>>>> 8 " << Endl;
+ }
+
+ Y_UNIT_TEST(PartitionMerge) {
+ TTopicSdkTestSetup setup("TopicSplitMerge", TTopicSdkTestSetup::MakeServerSettings(), false);
+
+ auto& ff = setup.GetRuntime().GetAppData().FeatureFlags;
+ ff.SetEnableTopicSplitMerge(true);
+ ff.SetEnablePQConfigTransactionsAtSchemeShard(true);
+
+ setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2);
+
+ TTopicClient client = setup.MakeClient();
+
+ setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+ setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE);
+
+ TString producer1 = "producer-1";
+ TString producer2 = "producer-2";
+ TString producer3 = "producer-3";
+
+ auto writeSettings1 = TWriteSessionSettings()
+ .Path(TEST_TOPIC)
+ .ProducerId(producer1)
+ .MessageGroupId(producer1)
+ .PartitionId(0);
+ auto writeSession1 = client.CreateSimpleBlockingWriteSession(writeSettings1);
+
+ auto writeSettings2 = TWriteSessionSettings()
+ .Path(TEST_TOPIC)
+ .ProducerId(producer2)
+ .MessageGroupId(producer2)
+ .PartitionId(1);
+ auto writeSession2 = client.CreateSimpleBlockingWriteSession(writeSettings2);
+
+ Cerr << ">>>>> 1 " << Endl;
+
+ struct MsgInfo {
+ ui64 PartitionId;
+ ui64 SeqNo;
+ ui64 Offset;
+ TString Data;
+ };
+
+ NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>();
+ std::vector<MsgInfo> receivedMessages;
+ std::set<size_t> partitions;
+
+ auto readSettings = TReadSessionSettings()
+ .ConsumerName(TEST_CONSUMER)
+ .AppendTopics(TEST_TOPIC);
+
+ readSettings.EventHandlers_.SimpleDataHandlers(
+ [&]
+ (TReadSessionEvent::TDataReceivedEvent& ev) mutable {
+ auto& messages = ev.GetMessages();
+ for (size_t i = 0u; i < messages.size(); ++i) {
+ auto& message = messages[i];
+
+ Cerr << ">>>>> Received message partitionId=" << message.GetPartitionSession()->GetPartitionId()
+ << ", message=" << message.GetData()
+ << ", seqNo=" << message.GetSeqNo()
+ << ", offset=" << message.GetOffset()
+ << Endl;
+ receivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(),
+ message.GetSeqNo(),
+ message.GetOffset(),
+ message.GetData()});
+ }
+
+ if (receivedMessages.size() == 3) {
+ checkedPromise.SetValue();
+ }
+ });
+
+ readSettings.EventHandlers_.StartPartitionSessionHandler(
+ [&]
+ (TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable {
+ Cerr << ">>>>> Received message " << ev.DebugString() << Endl;
+ partitions.insert(ev.GetPartitionSession()->GetPartitionId());
+ ev.Confirm();
+ });
+
+ auto readSession = client.CreateReadSession(readSettings);
+
+ Cerr << ">>>>> 2 " << Endl;
+
+ UNIT_ASSERT(writeSession1->Write(Msg("message_1.1", 2)));
+ UNIT_ASSERT(writeSession2->Write(Msg("message_2.1", 3)));
+
+ Cerr << ">>>>> 3 " << Endl;
+
+ ui64 txId = 0;
+ MergePartition(setup, ++txId, 0, 1);
+
+ Cerr << ">>>>> 4 " << Endl;
+
+ UNIT_ASSERT(writeSession1->Write(Msg("message_1.2", 5))); // Will be fail because partition is not writable after merge
+ UNIT_ASSERT(writeSession2->Write(Msg("message_2.2", 7))); // Will be fail because partition is not writable after merge
+
+ auto writeSettings3 = TWriteSessionSettings()
+ .Path(TEST_TOPIC)
+ .ProducerId(producer1)
+ .MessageGroupId(producer1);
+ auto writeSession3 = client.CreateSimpleBlockingWriteSession(writeSettings3);
+
+ UNIT_ASSERT(writeSession3->Write(Msg("message_3.1", 2))); // Will be ignored because duplicated SeqNo
+ UNIT_ASSERT(writeSession3->Write(Msg("message_3.2", 11)));
+
+
+ Cerr << ">>>>> 5 " << Endl;
+
+ checkedPromise.GetFuture().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(3, receivedMessages.size());
+
+ Cerr << ">>>>> 6 " << Endl;
+
+ for(const auto& info : receivedMessages) {
+ if (info.Data == TString("message_1.1")) {
+ UNIT_ASSERT_C(1, info.PartitionId);
+ UNIT_ASSERT_C(2, info.SeqNo);
+ UNIT_ASSERT_C(1, info.Offset);
+ } else if (info.Data == TString("message_2.1")) {
+ UNIT_ASSERT_C(2, info.PartitionId);
+ UNIT_ASSERT_C(3, info.SeqNo);
+ UNIT_ASSERT_C(1, info.Offset);
+ } else if (info.Data == TString("message_3.2")) {
+ UNIT_ASSERT_C(3, info.PartitionId);
+ UNIT_ASSERT_C(11, info.SeqNo);
+ UNIT_ASSERT_C(1, info.Offset);
+ } else {
+ UNIT_ASSERT_C(false, "Unexpected message: " << info.Data);
+ }
+ }
+
+ Cerr << ">>>>> 7 " << Endl;
+
+ writeSession1->Close(TDuration::Seconds(1));
+ writeSession2->Close(TDuration::Seconds(1));
+ writeSession3->Close(TDuration::Seconds(1));
+ readSession->Close(TDuration::Seconds(1));
+
+ Cerr << ">>>>> 8 " << Endl;
+ }
+
+}
+
+} // namespace NKikimr
diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make
index 0e4ff94d48..9daecdfa54 100644
--- a/ydb/core/persqueue/ut/ya.make
+++ b/ydb/core/persqueue/ut/ya.make
@@ -20,6 +20,9 @@ PEERDIR(
ydb/core/persqueue/ut/common
ydb/core/testlib/default
ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils
+ ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils
+
+ ydb/core/tx/schemeshard/ut_helpers
)
YQL_LAST_ABI_VERSION()
@@ -33,9 +36,11 @@ SRCS(
mirrorer_ut.cpp
pq_ut.cpp
partition_ut.cpp
+ partitiongraph_ut.cpp
pqtablet_ut.cpp
quota_tracker_ut.cpp
sourceid_ut.cpp
+ splitmerge_ut.cpp
type_codecs_ut.cpp
user_info_ut.cpp
pqrb_describes_ut.cpp
diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp
index b3028391ab..9b0b4bc58f 100644
--- a/ydb/core/persqueue/utils.cpp
+++ b/ydb/core/persqueue/utils.cpp
@@ -1,5 +1,10 @@
#include "utils.h"
+#include <deque>
+#include <util/string/builder.h>
+
+#include <ydb/library/yverify_stream/yverify_stream.h>
+
namespace NKikimr::NPQ {
ui64 TopicPartitionReserveSize(const NKikimrPQ::TPQTabletConfig& config) {
@@ -36,4 +41,78 @@ ui64 PutUnitsSize(const ui64 size) {
return putUnitsCount;
}
+const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId) {
+ for(const auto& p : config.GetPartitions()) {
+ if (partitionId == p.GetPartitionId()) {
+ return &p;
+ }
+ }
+ return nullptr;
+}
+
+void TPartitionGraph::Rebuild(const NKikimrPQ::TPQTabletConfig& config) {
+ Partitions.clear();
+
+ if (0 == config.AllPartitionsSize()) {
+ return;
+ }
+
+ for (const auto& p : config.GetAllPartitions()) {
+ Partitions.emplace(p.GetPartitionId(), p);
+ }
+
+ std::deque<Node*> queue;
+ for(const auto& p : config.GetAllPartitions()) {
+ auto& node = Partitions[p.GetPartitionId()];
+
+ node.Children.reserve(p.ChildPartitionIdsSize());
+ for (auto id : p.GetChildPartitionIds()) {
+ node.Children.push_back(&Partitions[id]);
+ }
+
+ node.Parents.reserve(p.ParentPartitionIdsSize());
+ for (auto id : p.GetParentPartitionIds()) {
+ node.Parents.push_back(&Partitions[id]);
+ }
+
+ if (p.GetParentPartitionIds().empty()) {
+ queue.push_back(&node);
+ }
+ }
+
+ while(!queue.empty()) {
+ auto* n = queue.front();
+ queue.pop_front();
+
+ bool allCompleted = true;
+ for(auto* c : n->Parents) {
+ if (c->HierarhicalParents.empty() && !c->Parents.empty()) {
+ allCompleted = false;
+ break;
+ }
+ }
+
+ if (allCompleted) {
+ for(auto* c : n->Parents) {
+ n->HierarhicalParents.insert(c->HierarhicalParents.begin(), c->HierarhicalParents.end());
+ n->HierarhicalParents.insert(c);
+ }
+ queue.insert(queue.end(), n->Children.begin(), n->Children.end());
+ }
+ }
+}
+
+std::optional<const TPartitionGraph::Node*> TPartitionGraph::GetPartition(ui32 id) const {
+ auto it = Partitions.find(id);
+ if (it == Partitions.end()) {
+ return std::nullopt;
+ }
+ return std::optional(&it->second);
+}
+
+TPartitionGraph::Node::Node(const NKikimrPQ::TPQTabletConfig::TPartition& config) {
+ Id = config.GetPartitionId();
+ TabletId = config.GetTabletId();
+}
+
} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h
index a490cb351a..344b699e43 100644
--- a/ydb/core/persqueue/utils.h
+++ b/ydb/core/persqueue/utils.h
@@ -9,4 +9,35 @@ ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config);
ui64 PutUnitsSize(const ui64 size);
+TString SourceIdHash(const TString& sourceId);
+
+const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId);
+
+// The graph of split-merge operations.
+class TPartitionGraph {
+public:
+ struct Node {
+
+ Node() = default;
+ Node(Node&&) = default;
+ Node(const NKikimrPQ::TPQTabletConfig::TPartition& config);
+
+ ui32 Id;
+ ui64 TabletId;
+
+ // Direct parents of this node
+ std::vector<Node*> Parents;
+ // Direct children of this node
+ std::vector<Node*> Children;
+ // All parents include parents of parents and so on
+ std::set<Node*> HierarhicalParents;
+ };
+
+ void Rebuild(const NKikimrPQ::TPQTabletConfig& config);
+
+ std::optional<const Node*> GetPartition(ui32 id) const;
+private:
+ std::unordered_map<ui32, Node> Partitions;
+};
+
} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make
index 435108c327..6ba0c6a559 100644
--- a/ydb/core/persqueue/ya.make
+++ b/ydb/core/persqueue/ya.make
@@ -15,6 +15,7 @@ SRCS(
partition_init.cpp
partition_monitoring.cpp
partition_read.cpp
+ partition_sourcemanager.cpp
partition_write.cpp
partition.cpp
percentile_counter.cpp
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 6c8944426f..8d7d3d902c 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -339,6 +339,7 @@ message TPQTabletConfig {
repeated uint32 ParentPartitionIds = 4;
repeated uint32 ChildPartitionIds = 5;
optional uint64 CreateVersion = 6;
+ optional uint64 TabletId = 7;
}
repeated TPartition Partitions = 31; // filled by schemeshard
@@ -357,6 +358,9 @@ message TPQTabletConfig {
required uint32 MaxPartitionCount = 2;
}
optional TPartitionStrategy PartitionStrategy = 35;
+
+ repeated TPartition AllPartitions = 36;
+
}
message THeartbeat {
@@ -927,6 +931,33 @@ message TEvSubDomainStatus {
required bool SubDomainOutOfSpace = 1;
};
+message TEvSourceIdRequest {
+ optional uint32 Partition = 1;
+ repeated string SourceId = 2;
+};
+
+message TEvSourceIdResponse {
+ enum EState {
+ Unknown = 0;
+ Registered = 1;
+ PendingRegistration = 2;
+ };
+
+ message TSource {
+ optional string Id = 1;
+ optional EState State = 2;
+ optional uint64 SeqNo = 3;
+ optional uint64 Offset = 4;
+ optional bool Explicit = 5;
+ optional uint64 WriteTimestamp = 6;
+
+ };
+ optional string Error = 1;
+ optional uint32 Partition = 2;
+ repeated TSource Source = 3;
+};
+
+
message TTransaction {
enum EKind {
KIND_UNKNOWN = 0;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
index 2bf3d36417..51f5089157 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
@@ -707,7 +707,8 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
event->Record.SetTxId(ui64(txId));
ActorIdToProto(context.SS->SelfId(), event->Record.MutableSourceActor());
- MakePQTabletConfig(*event->Record.MutableConfig()->MutableTabletConfig(),
+ MakePQTabletConfig(context,
+ *event->Record.MutableConfig()->MutableTabletConfig(),
pqGroup,
pqShard,
topicName,
@@ -744,7 +745,8 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
auto event = MakeHolder<TEvPersQueue::TEvUpdateConfig>();
event->Record.SetTxId(ui64(txId));
- MakePQTabletConfig(*event->Record.MutableTabletConfig(),
+ MakePQTabletConfig(context,
+ *event->Record.MutableTabletConfig(),
pqGroup,
pqShard,
topicName,
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index 49a60ce457..1910df417b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -832,7 +832,26 @@ public:
}
private:
- static void MakePQTabletConfig(NKikimrPQ::TPQTabletConfig& config,
+ static void FillPartition(NKikimrPQ::TPQTabletConfig::TPartition& partition, const TTopicTabletInfo::TTopicPartitionInfo* pq, ui64 tabletId) {
+ partition.SetPartitionId(pq->PqId);
+ partition.SetCreateVersion(pq->CreateVersion);
+ if (pq->KeyRange) {
+ pq->KeyRange->SerializeToProto(*partition.MutableKeyRange());
+ }
+ partition.SetStatus(pq->Status);
+ partition.MutableParentPartitionIds()->Reserve(pq->ParentPartitionIds.size());
+ for (const auto parent : pq->ParentPartitionIds) {
+ partition.MutableParentPartitionIds()->AddAlreadyReserved(parent);
+ }
+ partition.MutableChildPartitionIds()->Reserve(pq->ChildPartitionIds.size());
+ for (const auto children : pq->ChildPartitionIds) {
+ partition.MutableChildPartitionIds()->AddAlreadyReserved(children);
+ }
+ partition.SetTabletId(tabletId);
+ }
+
+ static void MakePQTabletConfig(const TOperationContext& context,
+ NKikimrPQ::TPQTabletConfig& config,
const TTopicInfo& pqGroup,
const TTopicTabletInfo& pqShard,
const TString& topicName,
@@ -857,23 +876,19 @@ private:
config.SetVersion(pqGroup.AlterData->AlterVersion);
}
- for (const auto& pq : pqShard.Partitions) {
+ for(const auto& pq : pqShard.Partitions) {
config.AddPartitionIds(pq->PqId);
auto& partition = *config.AddPartitions();
- partition.SetPartitionId(pq->PqId);
- partition.SetCreateVersion(pq->CreateVersion);
- if (pq->KeyRange) {
- pq->KeyRange->SerializeToProto(*partition.MutableKeyRange());
- }
- partition.SetStatus(pq->Status);
- partition.MutableParentPartitionIds()->Reserve(pq->ParentPartitionIds.size());
- for (const auto parent : pq->ParentPartitionIds) {
- partition.MutableParentPartitionIds()->AddAlreadyReserved(parent);
- }
- partition.MutableChildPartitionIds()->Reserve(pq->ChildPartitionIds.size());
- for (const auto children : pq->ChildPartitionIds) {
- partition.MutableChildPartitionIds()->AddAlreadyReserved(children);
+ FillPartition(partition, pq.Get(), 0);
+ }
+
+ for(const auto& p : pqGroup.Shards) {
+ const auto& pqShard = p.second;
+ const auto& tabletId = context.SS->ShardInfos[p.first].TabletID;
+ for (const auto& pq : pqShard->Partitions) {
+ auto& partition = *config.AddAllPartitions();
+ FillPartition(partition, pq.Get(), ui64(tabletId));
}
}
}
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h
index 8613260bca..3517e9d30d 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.h
+++ b/ydb/library/persqueue/topic_parser/topic_parser.h
@@ -264,6 +264,7 @@ public:
bool IsFirstClass() const;
+ operator bool() const { return Valid && !ClientsideName; };
private:
void BuildInternals(const NKikimrPQ::TPQTabletConfig& config);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt
index 282211a4b9..aab41b843e 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(ut_utils)
add_executable(ydb-public-sdk-cpp-client-ydb_topic-ut)
target_compile_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
@@ -30,6 +31,7 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC
ydb_persqueue_core-ut-ut_utils
client-ydb_topic-codecs
client-ydb_topic-impl
+ ydb_topic-ut-ut_utils
)
target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
-Wl,-platform_version,macos,11.0,11.0
@@ -43,8 +45,6 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
index be6ba2fbf3..e9b30bcfbd 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(ut_utils)
add_executable(ydb-public-sdk-cpp-client-ydb_topic-ut)
target_compile_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
@@ -30,6 +31,7 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC
ydb_persqueue_core-ut-ut_utils
client-ydb_topic-codecs
client-ydb_topic-impl
+ ydb_topic-ut-ut_utils
)
target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
-ldl
@@ -46,8 +48,6 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt
index 74b2b0756c..45c90bfdb5 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(ut_utils)
add_executable(ydb-public-sdk-cpp-client-ydb_topic-ut)
target_compile_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
@@ -31,6 +32,7 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC
ydb_persqueue_core-ut-ut_utils
client-ydb_topic-codecs
client-ydb_topic-impl
+ ydb_topic-ut-ut_utils
)
target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
-ldl
@@ -47,8 +49,6 @@ target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt
index 3c14b2644b..c41eb4e0dd 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(ut_utils)
add_executable(ydb-public-sdk-cpp-client-ydb_topic-ut)
target_compile_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
@@ -30,14 +31,13 @@ target_link_libraries(ydb-public-sdk-cpp-client-ydb_topic-ut PUBLIC
ydb_persqueue_core-ut-ut_utils
client-ydb_topic-codecs
client-ydb_topic-impl
+ ydb_topic-ut-ut_utils
)
target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..ec01329d1c
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ydb_topic-ut-ut_utils)
+target_compile_options(ydb_topic-ut-ut_utils PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ydb_topic-ut-ut_utils PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-grpc-server
+ cpp-testing-unittest
+ cpp-threading-chunk_queue
+ core-testlib-default
+ library-persqueue-topic_parser_public
+ cpp-client-ydb_driver
+ cpp-client-ydb_topic
+ cpp-client-ydb_table
+)
+target_sources(ydb_topic-ut-ut_utils PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..aece94a075
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,30 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ydb_topic-ut-ut_utils)
+target_compile_options(ydb_topic-ut-ut_utils PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ydb_topic-ut-ut_utils PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-grpc-server
+ cpp-testing-unittest
+ cpp-threading-chunk_queue
+ core-testlib-default
+ library-persqueue-topic_parser_public
+ cpp-client-ydb_driver
+ cpp-client-ydb_topic
+ cpp-client-ydb_table
+)
+target_sources(ydb_topic-ut-ut_utils PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..aece94a075
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,30 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ydb_topic-ut-ut_utils)
+target_compile_options(ydb_topic-ut-ut_utils PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ydb_topic-ut-ut_utils PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-grpc-server
+ cpp-testing-unittest
+ cpp-threading-chunk_queue
+ core-testlib-default
+ library-persqueue-topic_parser_public
+ cpp-client-ydb_driver
+ cpp-client-ydb_topic
+ cpp-client-ydb_table
+)
+target_sources(ydb_topic-ut-ut_utils PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..ec01329d1c
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ydb_topic-ut-ut_utils)
+target_compile_options(ydb_topic-ut-ut_utils PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ydb_topic-ut-ut_utils PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-grpc-server
+ cpp-testing-unittest
+ cpp-threading-chunk_queue
+ core-testlib-default
+ library-persqueue-topic_parser_public
+ cpp-client-ydb_driver
+ cpp-client-ydb_topic
+ cpp-client-ydb_table
+)
+target_sources(ydb_topic-ut-ut_utils PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
index c82589773f..128e74ad97 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp
@@ -22,11 +22,14 @@ TTopicSdkTestSetup::TTopicSdkTestSetup(const TString& testCaseName, const NKikim
}
}
-void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer)
+void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer, size_t partitionCount)
{
TTopicClient client(MakeDriver());
TCreateTopicSettings topics;
+ TPartitioningSettings partitions(partitionCount, partitionCount);
+
+ topics.PartitioningSettings(partitions);
TConsumerSettings<TCreateTopicSettings> consumers(topics, consumer);
topics.AppendConsumers(consumers);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h
index a9014a3437..c1642f6b46 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h
@@ -16,7 +16,7 @@ class TTopicSdkTestSetup {
public:
TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings = MakeServerSettings(), bool createTopic = true);
- void CreateTopic(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER);
+ void CreateTopic(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1);
TString GetEndpoint() const;
TString GetTopicPath(const TString& name = TEST_TOPIC) const;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/ya.make b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/ya.make
new file mode 100644
index 0000000000..9b807dcd9c
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/ya.make
@@ -0,0 +1,23 @@
+LIBRARY()
+
+SRCS(
+ managed_executor.cpp
+ managed_executor.h
+ topic_sdk_test_setup.cpp
+ topic_sdk_test_setup.h
+)
+
+PEERDIR(
+ library/cpp/grpc/server
+ library/cpp/testing/unittest
+ library/cpp/threading/chunk_queue
+ ydb/core/testlib/default
+ ydb/library/persqueue/topic_parser_public
+ ydb/public/sdk/cpp/client/ydb_driver
+ ydb/public/sdk/cpp/client/ydb_topic
+ ydb/public/sdk/cpp/client/ydb_table
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make
index ba152ee2ae..ec6820dc4d 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make
@@ -24,6 +24,7 @@ PEERDIR(
ydb/public/sdk/cpp/client/ydb_topic
ydb/public/sdk/cpp/client/ydb_topic/impl
+ ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils
)
YQL_LAST_ABI_VERSION()
@@ -33,8 +34,6 @@ SRCS(
describe_topic_ut.cpp
local_partition_ut.cpp
topic_to_table_ut.cpp
- ut_utils/managed_executor.cpp
- ut_utils/topic_sdk_test_setup.cpp
)
END()