aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexSm <alex@ydb.tech>2024-01-23 17:40:54 +0100
committerGitHub <noreply@github.com>2024-01-23 17:40:54 +0100
commit4295d15342518cd2f871859c10116bac7f0f1695 (patch)
tree4ffdf2023528f4ee3e9697034b4fbd623cf438b6
parent9692b5b65c92c840426b78e5c350427e12e5e6d9 (diff)
downloadydb-4295d15342518cd2f871859c10116bac7f0f1695.tar.gz
Revert "Choose partition for topic split/merge" (#1243)
-rw-r--r--ydb/core/persqueue/events/internal.h18
-rw-r--r--ydb/core/persqueue/partition.cpp21
-rw-r--r--ydb/core/persqueue/partition.h5
-rw-r--r--ydb/core/persqueue/partition_init.cpp2
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.cpp12
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.h6
-rw-r--r--ydb/core/persqueue/partition_write.cpp23
-rw-r--r--ydb/core/persqueue/pq_impl.cpp35
-rw-r--r--ydb/core/persqueue/pq_impl.h4
-rw-r--r--ydb/core/persqueue/transaction.cpp9
-rw-r--r--ydb/core/persqueue/ut/partition_chooser_ut.cpp486
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp2
-rw-r--r--ydb/core/persqueue/ut/partitiongraph_ut.cpp72
-rw-r--r--ydb/core/persqueue/ut/pqtablet_mock.h2
-rw-r--r--ydb/core/persqueue/utils.cpp83
-rw-r--r--ydb/core/persqueue/utils.h13
-rw-r--r--ydb/core/persqueue/writer/common.h34
-rw-r--r--ydb/core/persqueue/writer/metadata_initializers.cpp14
-rw-r--r--ydb/core/persqueue/writer/partition_chooser.h9
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl.cpp430
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl.h198
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h368
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h163
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h86
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl__pqrb_helper.h65
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h272
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h280
-rw-r--r--ydb/core/persqueue/writer/pipe_utils.h76
-rw-r--r--ydb/core/persqueue/writer/source_id_encoding.cpp63
-rw-r--r--ydb/core/persqueue/writer/source_id_encoding.h2
-rw-r--r--ydb/core/persqueue/writer/writer.cpp26
-rw-r--r--ydb/core/protos/msgbus_pq.proto3
-rw-r--r--ydb/core/protos/pqconfig.proto7
-rw-r--r--ydb/core/testlib/test_pq_client.h1
34 files changed, 760 insertions, 2130 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index c6dbd10747..428f3e9b36 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -169,8 +169,6 @@ struct TEvPQ {
EvCacheProxyForgetRead,
EvGetFullDirectReadData,
EvProvideDirectReadInfo,
- EvCheckPartitionStatusRequest,
- EvCheckPartitionStatusResponse,
EvEnd
};
@@ -493,13 +491,12 @@ struct TEvPQ {
};
struct TEvChangeOwner : public TEventLocal<TEvChangeOwner, EvChangeOwner> {
- explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force, const bool registerIfNotExists)
+ explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force)
: Cookie(cookie)
, Owner(owner)
, PipeClient(pipeClient)
, Sender(sender)
, Force(force)
- , RegisterIfNotExists(registerIfNotExists)
{}
ui64 Cookie;
@@ -507,7 +504,6 @@ struct TEvPQ {
TActorId PipeClient;
TActorId Sender;
bool Force;
- bool RegisterIfNotExists;
};
struct TEvPipeDisconnected : public TEventLocal<TEvPipeDisconnected, EvPipeDisconnected> {
@@ -993,18 +989,6 @@ struct TEvPQ {
struct TEvProvideDirectReadInfo : public TEventLocal<TEvProvideDirectReadInfo, EvProvideDirectReadInfo> {
};
- struct TEvCheckPartitionStatusRequest : public TEventPB<TEvCheckPartitionStatusRequest, NKikimrPQ::TEvCheckPartitionStatusRequest, EvCheckPartitionStatusRequest> {
- TEvCheckPartitionStatusRequest() = default;
-
- TEvCheckPartitionStatusRequest(ui32 partitionId) {
- Record.SetPartition(partitionId);
- }
- };
-
- struct TEvCheckPartitionStatusResponse : public TEventPB<TEvCheckPartitionStatusResponse, NKikimrPQ::TEvCheckPartitionStatusResponse, EvCheckPartitionStatusResponse> {
- };
-
-
};
} //NKikimr
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index b73c10d7ab..b996667d53 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1796,7 +1796,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf
{
Config = config;
PartitionConfig = GetPartitionConfig(Config, Partition);
- PartitionGraph = MakePartitionGraph(Config);
+ PartitionGraph.Rebuild(Config);
TopicConverter = topicConverter;
NewPartition = false;
@@ -2616,23 +2616,4 @@ void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext
Send(ev->Sender, response.Release());
}
-void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record;
-
- if (Partition != record.GetPartition()) {
- LOG_INFO_S(
- ctx, NKikimrServices::PERSQUEUE,
- "TEvCheckPartitionStatusRequest for wrong partition " << record.GetPartition() << "." <<
- " Topic: \"" << TopicName() << "\"." <<
- " Partition: " << Partition << "."
- );
- return;
- }
-
- auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>();
- response->Record.SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
-
- Send(ev->Sender, response.Release());
-}
-
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 264b78275a..dd0fa7e36e 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -100,7 +100,7 @@ private:
void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp);
void ReplyOk(const TActorContext& ctx, const ui64 dst);
- void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, ui64 seqNo);
+ void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie);
void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime);
@@ -344,7 +344,6 @@ private:
// void DestroyReadSession(const TReadSessionKey& key);
void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);
TString LogPrefix() const;
@@ -482,7 +481,6 @@ private:
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
- HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
@@ -540,7 +538,6 @@ private:
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
- HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp
index fcab0f62f3..913812a063 100644
--- a/ydb/core/persqueue/partition_init.cpp
+++ b/ydb/core/persqueue/partition_init.cpp
@@ -180,7 +180,7 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
case NKikimrProto::NODATA:
Partition()->Config = Partition()->TabletConfig;
Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition);
- Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config);
+ Partition()->PartitionGraph.Rebuild(Partition()->Config);
break;
case NKikimrProto::ERROR:
diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp
index ac5eff21be..0e8f6d7812 100644
--- a/ydb/core/persqueue/partition_sourcemanager.cpp
+++ b/ydb/core/persqueue/partition_sourcemanager.cpp
@@ -8,7 +8,7 @@
namespace NKikimr::NPQ {
IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId);
-bool IsResearchRequires(const TPartitionGraph::Node* node);
+bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node);
//
// TPartitionSourceManager
@@ -37,7 +37,7 @@ void TPartitionSourceManager::ScheduleBatch() {
PendingSourceIds = std::move(UnknownSourceIds);
- for(const auto* parent : node->HierarhicalParents) {
+ for(const auto* parent : node.value()->HierarhicalParents) {
PendingCookies.insert(++Cookie);
TActorId actorId = PartitionRequester(parent->Id, parent->TabletId);
@@ -141,7 +141,7 @@ void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const
}
}
-const TPartitionSourceManager::TPartitionNode* TPartitionSourceManager::GetPartitionNode() const {
+TPartitionSourceManager::TPartitionNode TPartitionSourceManager::GetPartitionNode() const {
return Partition.PartitionGraph.GetPartition(Partition.Partition);
}
@@ -185,7 +185,7 @@ TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {
bool TPartitionSourceManager::HasParents() const {
auto node = Partition.PartitionGraph.GetPartition(Partition.Partition);
- return node && !node->Parents.empty();
+ return node && !node.value()->Parents.empty();
}
TActorId TPartitionSourceManager::PartitionRequester(TPartitionId id, ui64 tabletId) {
@@ -484,8 +484,8 @@ IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId p
return new TSourceIdRequester(parent, partition, tabletId);
}
-bool IsResearchRequires(const TPartitionGraph::Node* node) {
- return node && !node->Parents.empty();
+bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node) {
+ return node && !node.value()->Parents.empty();
}
NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value) {
diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h
index 6a304e04bf..a05a51aa97 100644
--- a/ydb/core/persqueue/partition_sourcemanager.h
+++ b/ydb/core/persqueue/partition_sourcemanager.h
@@ -12,7 +12,7 @@ class TPartition;
class TPartitionSourceManager {
private:
- using TPartitionNode = TPartitionGraph::Node;
+ using TPartitionNode = std::optional<const TPartitionGraph::Node *>;
public:
using TPartitionId = ui32;
@@ -96,7 +96,7 @@ public:
private:
TPartitionSourceManager& Manager;
- const TPartitionNode* Node;
+ TPartitionNode Node;
TSourceIdWriter SourceIdWriter;
THeartbeatEmitter HeartbeatEmitter;
};
@@ -125,7 +125,7 @@ private:
void FinishBatch(const TActorContext& ctx);
bool RequireEnqueue(const TString& sourceId);
- const TPartitionNode* GetPartitionNode() const;
+ TPartitionNode GetPartitionNode() const;
TSourceIdStorage& GetSourceIdStorage() const;
bool HasParents() const;
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 4c3a78ea08..164e21643b 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -31,18 +31,14 @@ static const ui32 MAX_INLINE_SIZE = 1000;
static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL;
-void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) {
+void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);
THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
NKikimrClient::TResponse& resp = *response->Response;
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
resp.SetErrorCode(NPersQueue::NErrorCode::OK);
- auto* r = resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult();
- r->SetOwnerCookie(cookie);
- r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
- r->SetSeqNo(seqNo);
-
+ resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult()->SetOwnerCookie(cookie);
ctx.Send(Tablet, response.Release());
}
@@ -150,12 +146,8 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
auto &owner = ev->Owner;
auto it = Owners.find(owner);
if (it == Owners.end()) {
- if (ev->RegisterIfNotExists) {
- Owners[owner];
- it = Owners.find(owner);
- } else {
- return ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, "SourceId isn't registered");
- }
+ Owners[owner];
+ it = Owners.find(owner);
}
if (it->second.NeedResetOwner || ev->Force) { //change owner
Y_ABORT_UNLESS(ReservedSize >= it->second.ReservedSize);
@@ -354,13 +346,10 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion)
++offset;
} else if (response.IsOwnership()) {
- const auto& r = response.GetOwnership();
- const TString& ownerCookie = r.OwnerCookie;
+ const TString& ownerCookie = response.GetOwnership().OwnerCookie;
auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie));
if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) {
- auto sit = SourceIdStorage.GetInMemorySourceIds().find(NSourceIdEncoding::EncodeSimple(it->first));
- auto seqNo = sit == SourceIdStorage.GetInMemorySourceIds().end() ? 0 : sit->second.SeqNo;
- ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie, seqNo);
+ ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie);
} else {
ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already");
}
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index aa4653b99f..a26eef6602 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1230,7 +1230,6 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c
}
ProcessSourceIdRequests(partitionId);
- ProcessCheckPartitionStatusRequests(partitionId);
if (allInitialized) {
SourceIdRequests.clear();
}
@@ -2049,8 +2048,7 @@ void TPersQueue::HandleGetOwnershipRequest(const ui64 responseCookie, const TAct
it->second = TPipeInfo::ForOwner(partActor, owner, it->second.ServerActors);
InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_GET_OWNERSHIP);
- THolder<TEvPQ::TEvChangeOwner> event = MakeHolder<TEvPQ::TEvChangeOwner>(responseCookie, owner, pipeClient, sender,
- req.GetCmdGetOwnership().GetForce(), req.GetCmdGetOwnership().GetRegisterIfNotExists());
+ THolder<TEvPQ::TEvChangeOwner> event = MakeHolder<TEvPQ::TEvChangeOwner>(responseCookie, owner, pipeClient, sender, req.GetCmdGetOwnership().GetForce());
ctx.Send(partActor, event.Release());
}
@@ -3917,37 +3915,6 @@ void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) {
}
}
-void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record;
- auto it = Partitions.find(record.GetPartition());
- if (it == Partitions.end()) {
- LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition());
-
- auto response = THolder<TEvPQ::TEvCheckPartitionStatusResponse>();
- response->Record.SetStatus(NKikimrPQ::ETopicPartitionStatus::Deleted);
- Send(ev->Sender, response.Release());
-
- return;
- }
-
- if (it->second.InitDone) {
- Forward(ev, it->second.Actor);
- } else {
- CheckPartitionStatusRequests[record.GetPartition()].push_back(ev);
- }
-}
-
-void TPersQueue::ProcessCheckPartitionStatusRequests(ui32 partitionId) {
- auto sit = CheckPartitionStatusRequests.find(partitionId);
- if (sit != CheckPartitionStatusRequests.end()) {
- auto it = Partitions.find(partitionId);
- for (auto& r : sit->second) {
- Forward(r, it->second.Actor);
- }
- CheckPartitionStatusRequests.erase(partitionId);
- }
-}
-
TString TPersQueue::LogPrefix() const {
return TStringBuilder() << SelfId() << " ";
}
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 61eabfaa0b..dd78e89add 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -168,9 +168,6 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
void ProcessSourceIdRequests(ui32 partitionId);
- void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);
- void ProcessCheckPartitionStatusRequests(ui32 partitionId);
-
TString LogPrefix() const;
static constexpr const char * KeyConfig() { return "_config"; }
@@ -408,7 +405,6 @@ private:
bool UseMediatorTimeCast = true;
THashMap<ui32, TVector<TEvPQ::TEvSourceIdRequest::TPtr>> SourceIdRequests;
- THashMap<ui32, TVector<TEvPQ::TEvCheckPartitionStatusRequest::TPtr>> CheckPartitionStatusRequests;
TMaybe<ui64> TabletGeneration;
};
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp
index b613822c8e..86a1839e4b 100644
--- a/ydb/core/persqueue/transaction.cpp
+++ b/ydb/core/persqueue/transaction.cpp
@@ -147,7 +147,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
TabletConfig = txBody.GetTabletConfig();
BootstrapConfig = txBody.GetBootstrapConfig();
- TPartitionGraph graph = MakePartitionGraph(TabletConfig);
+ TPartitionGraph graph;
+ graph.Rebuild(TabletConfig);
for (const auto& p : TabletConfig.GetPartitions()) {
auto node = graph.GetPartition(p.GetPartitionId());
@@ -155,15 +156,15 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
// Old configuration format without AllPartitions. Split/Merge is not supported.
continue;
}
- if (node->Children.empty()) {
- for (const auto* r : node->Parents) {
+ if (node.value()->Children.empty()) {
+ for (const auto* r : node.value()->Parents) {
if (extractTabletId != r->TabletId) {
Senders.insert(r->TabletId);
}
}
}
- for (const auto* r : node->Children) {
+ for (const auto* r : node.value()->Children) {
if (r->Children.empty()) {
if (extractTabletId != r->TabletId) {
Receivers.insert(r->TabletId);
diff --git a/ydb/core/persqueue/ut/partition_chooser_ut.cpp b/ydb/core/persqueue/ut/partition_chooser_ut.cpp
index a66e345dd6..e43f99ad85 100644
--- a/ydb/core/persqueue/ut/partition_chooser_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_chooser_ut.cpp
@@ -5,43 +5,17 @@
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
-#include <ydb/core/persqueue/writer/pipe_utils.h>
using namespace NKikimr::NPQ;
static constexpr bool SMEnabled = true;
static constexpr bool SMDisabled = false;
-using namespace NKikimr;
-using namespace NActors;
-using namespace NKikimrPQ;
-
-void AddPartition(NKikimrSchemeOp::TPersQueueGroupDescription& conf,
- ui32 id,
- const std::optional<TString>&& boundaryFrom,
- const std::optional<TString>&& boundaryTo,
- std::vector<ui32> children = {}) {
- auto* p = conf.AddPartitions();
- p->SetPartitionId(id);
- p->SetTabletId(1000 + id);
- p->SetStatus(children.empty() ? NKikimrPQ::ETopicPartitionStatus::Active : NKikimrPQ::ETopicPartitionStatus::Inactive);
- if (boundaryFrom) {
- p->MutableKeyRange()->SetFromBound(boundaryFrom.value());
- }
- if (boundaryTo) {
- p->MutableKeyRange()->SetToBound(boundaryTo.value());
- }
- for(ui32 c : children) {
- p->AddChildPartitionIds(c);
- }
-}
-
-NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig0(bool SplitMergeEnabled) {
+NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig(bool SplitMergeEnabled) {
+ Cerr << ">>>>> SplitMergeEnabled=" << SplitMergeEnabled << Endl;
NKikimrSchemeOp::TPersQueueGroupDescription result;
NKikimrPQ::TPQTabletConfig* config = result.MutablePQTabletConfig();
- result.SetBalancerTabletID(999);
-
auto* partitionStrategy = config->MutablePartitionStrategy();
partitionStrategy->SetMinPartitionCount(3);
partitionStrategy->SetMaxPartitionCount(SplitMergeEnabled ? 10 : 3);
@@ -49,17 +23,27 @@ NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig0(bool SplitMergeEnabled
config->SetTopicName("/Root/topic-1");
config->SetTopicPath("/Root");
- return result;
-}
+ auto* p0 = result.AddPartitions();
+ p0->SetPartitionId(0);
+ p0->SetTabletId(1000);
+ p0->MutableKeyRange()->SetToBound("C");
-NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig(bool SplitMergeEnabled) {
- NKikimrSchemeOp::TPersQueueGroupDescription result = CreateConfig0(SplitMergeEnabled);
+ auto* p1 = result.AddPartitions();
+ p1->SetPartitionId(1);
+ p1->SetTabletId(1001);
+ p1->MutableKeyRange()->SetFromBound("C");
+ p1->MutableKeyRange()->SetToBound("F");
+
+ auto* p2 = result.AddPartitions();
+ p2->SetPartitionId(2);
+ p2->SetTabletId(1002);
+ p2->MutableKeyRange()->SetFromBound("F");
- AddPartition(result, 0, {}, "C");
- AddPartition(result, 1, "C", "F");
- AddPartition(result, 2, "F", "Z");
- AddPartition(result, 3, "Z", {}, {4});
- AddPartition(result, 4, "Z", {});
+ auto* p3 = result.AddPartitions();
+ p3->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Inactive);
+ p3->SetPartitionId(3);
+ p3->SetTabletId(1003);
+ p3->MutableKeyRange()->SetFromBound("D");
return result;
}
@@ -188,17 +172,14 @@ NPersQueue::TTopicConverterPtr CreateTopicConverter() {
return NPersQueue::TTopicNameConverter::ForFirstClass(CreateConfig(SMDisabled).GetPQTabletConfig());
}
-TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server,
- const NKikimrSchemeOp::TPersQueueGroupDescription& config,
- const TString& sourceId,
- std::optional<ui32> preferedPartition = std::nullopt) {
-
+TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server, bool spliMergeEnabled, const TString& sourceId, std::optional<ui32> preferedPartition = std::nullopt) {
NPersQueue::TTopicConverterPtr fullConverter = CreateTopicConverter();
+
TWriteSessionMock* mock = new TWriteSessionMock();
NActors::TActorId parentId = server.GetRuntime()->Register(mock);
- server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActorM(parentId,
- config,
+ server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActor(parentId,
+ CreateConfig(spliMergeEnabled),
fullConverter,
sourceId,
preferedPartition,
@@ -207,55 +188,15 @@ TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server,
mock->Promise.GetFuture().GetValueSync();
return mock;
-
-}
-
-TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server, bool spliMergeEnabled, const TString& sourceId, std::optional<ui32> preferedPartition = std::nullopt) {
- auto config = CreateConfig(spliMergeEnabled);
- return ChoosePartition(server, config, sourceId, preferedPartition);
}
-void InitTable(NPersQueue::TTestServer& server) {
- class Initializer: public TActorBootstrapped<Initializer> {
- public:
- Initializer(NThreading::TPromise<void>& promise)
- : Promise(promise) {}
-
- void Bootstrap(const TActorContext& ctx) {
- Become(&TThis::StateWork);
- ctx.Send(
- NKikimr::NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()),
- new NKikimr::NMetadata::NProvider::TEvPrepareManager(NKikimr::NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant())
- );
- }
-
- private:
- void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext&) {
- Promise.SetValue();
- }
-
- STFUNC(StateWork) {
- switch (ev->GetTypeRewrite()) {
- HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle);
- }
- }
-
- private:
- NThreading::TPromise<void>& Promise;
- };
-
- NThreading::TPromise<void> promise = NThreading::NewPromise<void>();
- server.GetRuntime()->Register(new Initializer(promise));
- promise.GetFuture().GetValueSync();
-}
-
-void WriteToTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32 partitionId, ui64 seqNo = 0) {
- InitTable(server);
-
+void WriteToTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32 partitionId) {
const auto& pqConfig = server.CleverServer->GetRuntime()->GetAppData().PQConfig;
auto tableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping
: ESourceIdTableGeneration::SrcIdMeta2;
+ Cerr << ">>>>> pqConfig.GetTopicsAreFirstClassCitizen()=" << pqConfig.GetTopicsAreFirstClassCitizen() << Endl;
+
NPersQueue::TTopicConverterPtr fullConverter = CreateTopicConverter();
NKikimr::NPQ::NSourceIdEncoding::TEncodedSourceId encoded = NSourceIdEncoding::EncodeSrcId(
fullConverter->GetTopicForSrcIdHash(), sourceId, tableGeneration
@@ -264,361 +205,64 @@ void WriteToTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32
TString query;
if (pqConfig.GetTopicsAreFirstClassCitizen()) {
query = TStringBuilder() << "--!syntax_v1\n"
- "UPSERT INTO `//Root/.metadata/TopicPartitionsMapping` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition, SeqNo) VALUES "
+ "UPSERT INTO `//Root/.metadata/TopicPartitionsMapping` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition) VALUES "
"(" << encoded.KeysHash << ", \"" << fullConverter->GetClientsideName() << "\", \""
<< encoded.EscapedSourceId << "\", "<< TInstant::Now().MilliSeconds() << ", "
- << TInstant::Now().MilliSeconds() << ", " << partitionId << ", " << seqNo << ");";
+ << TInstant::Now().MilliSeconds() << ", " << partitionId << ");";
} else {
query = TStringBuilder() << "--!syntax_v1\n"
- "UPSERT INTO `/Root/PQ/SourceIdMeta2` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition, SeqNo) VALUES ("
+ "UPSERT INTO `/Root/PQ/SourceIdMeta2` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES ("
<< encoded.Hash << ", \"" << fullConverter->GetClientsideName() << "\", \"" << encoded.EscapedSourceId << "\", "
- << TInstant::Now().MilliSeconds() << ", " << TInstant::Now().MilliSeconds() << ", " << partitionId << ", " << seqNo << "); ";
+ << TInstant::Now().MilliSeconds() << ", " << TInstant::Now().MilliSeconds() << ", " << partitionId << "); ";
}
Cerr << "Run query:\n" << query << Endl;
auto scResult = server.AnnoyingClient->RunYqlDataQuery(query);
}
-TMaybe<NYdb::TResultSet> SelectTable(NPersQueue::TTestServer& server, const TString& sourceId) {
- const auto& pqConfig = server.CleverServer->GetRuntime()->GetAppData().PQConfig;
- auto tableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping
- : ESourceIdTableGeneration::SrcIdMeta2;
-
- NPersQueue::TTopicConverterPtr fullConverter = CreateTopicConverter();
- NKikimr::NPQ::NSourceIdEncoding::TEncodedSourceId encoded = NSourceIdEncoding::EncodeSrcId(
- fullConverter->GetTopicForSrcIdHash(), sourceId, tableGeneration
- );
+Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_Test) {
+ NPersQueue::TTestServer server{};
- TString query;
- if (pqConfig.GetTopicsAreFirstClassCitizen()) {
- query = TStringBuilder() << "--!syntax_v1\n"
- "SELECT Partition, SeqNo "
- "FROM `//Root/.metadata/TopicPartitionsMapping` "
- "WHERE Hash = " << encoded.KeysHash <<
- " AND Topic = \"" << fullConverter->GetClientsideName() << "\"" <<
- " AND ProducerId = \"" << encoded.EscapedSourceId << "\"";
- } else {
- query = TStringBuilder() << "--!syntax_v1\n"
- "SELECT Partition, SeqNo "
- "FROM `/Root/PQ/SourceIdMeta2` "
- "WHERE Hash = " << encoded.KeysHash <<
- " AND Topic = \"" << fullConverter->GetClientsideName() << "\"" <<
- " AND SourceId = \"" << encoded.EscapedSourceId << "\"";
+ {
+ auto r = ChoosePartition(server, SMEnabled, "A_Source");
+ UNIT_ASSERT(r->Result);
+ UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
+ }
+ {
+ auto r = ChoosePartition(server, SMEnabled, "Y_Source");
+ UNIT_ASSERT(r->Result);
+ UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2);
}
- Cerr << "Run query:\n" << query << Endl;
- return server.AnnoyingClient->RunYqlDataQuery(query);
-}
-
-void AssertTableEmpty(NPersQueue::TTestServer& server, const TString& sourceId) {
- auto result = SelectTable(server, sourceId);
-
- UNIT_ASSERT(result);
- UNIT_ASSERT_VALUES_EQUAL_C(result->RowsCount(), 0, "Table must not contains SourceId='" << sourceId << "'");
-}
-
-void AssertTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32 partitionId, ui64 seqNo) {
- auto result = SelectTable(server, sourceId);
-
- UNIT_ASSERT(result);
- UNIT_ASSERT_VALUES_EQUAL_C(result->RowsCount(), 1, "Table must contains SourceId='" << sourceId << "'");
-
- NYdb::TResultSetParser parser(*result);
- UNIT_ASSERT(parser.TryNextRow());
- NYdb::TValueParser p(parser.GetValue(0));
- NYdb::TValueParser s(parser.GetValue(1));
- UNIT_ASSERT_VALUES_EQUAL(*p.GetOptionalUint32().Get(), partitionId);
- UNIT_ASSERT_VALUES_EQUAL(*s.GetOptionalUint64().Get(), seqNo);
-}
-
-class TPQTabletMock: public TActor<TPQTabletMock> {
-public:
- TPQTabletMock(ETopicPartitionStatus status, std::optional<ui64> seqNo)
- : TActor(&TThis::StateMockWork)
- , Status(status)
- , SeqNo(seqNo) {
+ {
+ // Define partition for sourceId that is not in partition boundary
+ WriteToTable(server, "X_Source_w_0", 0);
+ auto r = ChoosePartition(server, SMEnabled, "X_Source_w_0");
+ UNIT_ASSERT(r->Error);
}
-
-private:
- void Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ctx) {
- auto response = MakeHolder<TEvPersQueue::TEvResponse>();
-
- response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
- response->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
-
- if (ev->Get()->Record.GetPartitionRequest().HasCmdGetOwnership()) {
- auto& o = ev->Get()->Record.GetPartitionRequest().GetCmdGetOwnership();
- if (o.GetRegisterIfNotExists() || SeqNo) {
- auto* cmd = response->Record.MutablePartitionResponse()->MutableCmdGetOwnershipResult();
- cmd->SetOwnerCookie("ower_cookie");
- cmd->SetStatus(Status);
- cmd->SetSeqNo(SeqNo.value_or(0));
- } else {
- response->Record.SetErrorCode(NPersQueue::NErrorCode::SOURCEID_DELETED);
- }
- }
-
- auto* sn = response->Record.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult()->AddSourceIdInfo();
- sn->SetSeqNo(SeqNo.value_or(0));
- sn->SetState(SeqNo ? NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED : NKikimrPQ::TMessageGroupInfo::STATE_PENDING_REGISTRATION);
-
- ctx.Send(ev->Sender, response.Release());
+ {
+ // Redefine partition for sourceId. Check that partition changed;
+ WriteToTable(server, "X_Source_w_0", 2);
+ auto r = ChoosePartition(server, SMEnabled, "X_Source_w_0");
+ UNIT_ASSERT(r->Result);
+ UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2);
}
-
- void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
- auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>();
-
- ctx.Send(ev->Sender, response.Release());
+ {
+ // Redefine partition for sourceId to inactive partition. Select new partition use partition boundary.
+ WriteToTable(server, "A_Source_w_0", 3);
+ auto r = ChoosePartition(server, SMEnabled, "A_Source_w_0");
+ UNIT_ASSERT(r->Result);
+ UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
}
-
- STFUNC(StateMockWork) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvPersQueue::TEvRequest, Handle);
- HFunc(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
- }
+ {
+ // Use prefered partition, but sourceId not in partition boundary
+ auto r = ChoosePartition(server, SMEnabled, "A_Source_1", 1);
+ UNIT_ASSERT(r->Error);
}
-
-private:
- ETopicPartitionStatus Status;
- std::optional<ui64> SeqNo;
-};
-
-
-TPQTabletMock* CreatePQTabletMock(NPersQueue::TTestServer& server, ui32 partitionId, ETopicPartitionStatus status, std::optional<ui64> seqNo = std::nullopt) {
- TPQTabletMock* mock = new TPQTabletMock(status, seqNo);
- auto actorId = server.GetRuntime()->Register(mock);
- NKikimr::NTabletPipe::NTest::TPipeMock::Register(partitionId + 1000, actorId);
- return mock;
}
-NPersQueue::TTestServer CreateServer() {
+Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) {
NPersQueue::TTestServer server{};
server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(true);
server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetUseSrcIdMetaMappingInFirstClass(true);
- server.EnableLogs({NKikimrServices::PQ_PARTITION_CHOOSER}, NActors::NLog::PRI_TRACE);
-
- return server;
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_NewSourceId_Test) {
- // We check the scenario when writing is performed with a new SourceID
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 0, {}, {});
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active);
-
- auto r = ChoosePartition(server, config, "A_Source_0");
-
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
- AssertTable(server, "A_Source_0", 0, 0);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_BoundaryTrue_Test) {
- // We check the partition selection scenario when we have already written with the
- // specified SourceID, the partition to which we wrote is active, and the partition
- // boundaries coincide with the distribution.
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 0, {}, "F");
- AddPartition(config, 1, "F", {});
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active);
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
-
- WriteToTable(server, "A_Source_1", 0, 11);
- auto r = ChoosePartition(server, config, "A_Source_1");
-
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
- AssertTable(server, "A_Source_1", 0, 11);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_BoundaryFalse_Test) {
- // We check the partition selection scenario when we have already written with the
- // specified SourceID, the partition to which we wrote is active, and the partition
- // boundaries is not coincide with the distribution.
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 0, {}, "F");
- AddPartition(config, 1, "F", {});
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active);
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
-
- WriteToTable(server, "A_Source_2", 1, 13);
- auto r = ChoosePartition(server, config, "A_Source_2");
-
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
- AssertTable(server, "A_Source_2", 1, 13);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionInactive_0_Test) {
- // Boundary partition is inactive. It is configuration error - required reload of configuration.
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 0, {}, "F");
- AddPartition(config, 1, "F", {});
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive);
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
-
- WriteToTable(server, "A_Source_3", 0, 13);
- auto r = ChoosePartition(server, config, "A_Source_3");
-
- UNIT_ASSERT(r->Error);
- AssertTable(server, "A_Source_3", 0, 13);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionInactive_1_Test) {
- // Boundary partition is inactive. It is configuration error - required reload of configuration.
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 0, {}, "F");
- AddPartition(config, 1, "F", {});
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); // Active but not written
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Inactive);
-
- WriteToTable(server, "A_Source_4", 1, 13);
- auto r = ChoosePartition(server, config, "A_Source_4");
-
- UNIT_ASSERT(r->Error);
- AssertTable(server, "A_Source_4", 1, 13);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionNotExists_Test) {
- // Old partition alredy deleted. Choose new partition by boundary and save SeqNo
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 1, {}, "F");
- AddPartition(config, 2, "F", {});
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
- CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active);
-
- WriteToTable(server, "A_Source_5", 0, 13);
- auto r = ChoosePartition(server, config, "A_Source_5");
-
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 13);
- AssertTable(server, "A_Source_5", 1, 13);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_OldPartitionExists_Test) {
- // Old partition exists. Receive SeqNo from the partition. Choose new partition by boundary and save SeqNo
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 0, {}, {}, {1, 2});
- AddPartition(config, 1, {}, "F");
- AddPartition(config, 2, "F", {});
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive, 157);
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
- CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active);
-
- WriteToTable(server, "A_Source_6", 0, 13);
- auto r = ChoosePartition(server, config, "A_Source_6");
-
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 157);
- AssertTable(server, "A_Source_6", 1, 157);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_OldPartitionExists_NotWritten_Test) {
- // Old partition exists but not written. Choose new partition by boundary and save SeqNo
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 0, {}, {}, {1, 2});
- AddPartition(config, 1, {}, "F");
- AddPartition(config, 2, "F", {});
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive);
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
- CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active);
-
- WriteToTable(server, "A_Source_7", 0, 13);
- auto r = ChoosePartition(server, config, "A_Source_7");
-
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 13);
- AssertTable(server, "A_Source_7", 1, 13);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_OldPartitionExists_NotBoundary_Test) {
- // Old partition exists. Receive SeqNo from the partition. Choose new partition from children and save SeqNo
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 0, {}, "F", { 2});
- AddPartition(config, 1, "F", {});
- AddPartition(config, 2, {}, "F");
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive, 157);
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
- CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active);
-
- WriteToTable(server, "Y_Source_7", 0, 13);
- auto r = ChoosePartition(server, config, "Y_Source_7");
-
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 157);
- AssertTable(server, "Y_Source_7", 2, 157);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_Active_Test) {
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 0, {}, "F");
- AddPartition(config, 1, "F", {});
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active);
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
-
- auto r = ChoosePartition(server, config, "", 0);
-
- UNIT_ASSERT(r->Result);
- UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_InactiveConfig_Test) {
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 0, {}, {}, {1});
- AddPartition(config, 1, {}, {});
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive);
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
-
- auto r = ChoosePartition(server, config, "", 0);
-
- UNIT_ASSERT(r->Error);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_InactiveActor_Test) {
- NPersQueue::TTestServer server = CreateServer();
-
- auto config = CreateConfig0(true);
- AddPartition(config, 0, {}, "F");
- AddPartition(config, 1, "F", {});
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive);
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
-
- auto r = ChoosePartition(server, config, "", 0);
-
- UNIT_ASSERT(r->Error);
-}
-
-Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) {
- NPersQueue::TTestServer server = CreateServer();
-
- CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active);
- CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
- CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active);
{
auto r = ChoosePartition(server, SMDisabled, "A_Source");
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index eba55773cd..2ea5da2d6a 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -514,7 +514,7 @@ void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const
void TPartitionFixture::SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force)
{
- auto event = MakeHolder<TEvPQ::TEvChangeOwner>(cookie, owner, pipeClient, Ctx->Edge, force, true);
+ auto event = MakeHolder<TEvPQ::TEvChangeOwner>(cookie, owner, pipeClient, Ctx->Edge, force);
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
}
diff --git a/ydb/core/persqueue/ut/partitiongraph_ut.cpp b/ydb/core/persqueue/ut/partitiongraph_ut.cpp
index 9067d76fec..0003f18cec 100644
--- a/ydb/core/persqueue/ut/partitiongraph_ut.cpp
+++ b/ydb/core/persqueue/ut/partitiongraph_ut.cpp
@@ -44,37 +44,45 @@ Y_UNIT_TEST_SUITE(TPartitionGraphTest) {
p5->AddParentPartitionIds(4);
TPartitionGraph graph;
- graph = std::move(MakePartitionGraph(config));
-
- const auto n0 = graph.GetPartition(0);
- const auto n1 = graph.GetPartition(1);
- const auto n2 = graph.GetPartition(2);
- const auto n3 = graph.GetPartition(3);
- const auto n4 = graph.GetPartition(4);
- const auto n5 = graph.GetPartition(5);
-
- UNIT_ASSERT(n0);
- UNIT_ASSERT(n1);
- UNIT_ASSERT(n2);
- UNIT_ASSERT(n3);
- UNIT_ASSERT(n4);
- UNIT_ASSERT(n5);
-
- UNIT_ASSERT_VALUES_EQUAL(n0->Parents.size(), 0);
- UNIT_ASSERT_VALUES_EQUAL(n0->Children.size(), 0);
- UNIT_ASSERT_VALUES_EQUAL(n0->HierarhicalParents.size(), 0);
-
- UNIT_ASSERT_VALUES_EQUAL(n1->Parents.size(), 0);
- UNIT_ASSERT_VALUES_EQUAL(n1->Children.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(n1->HierarhicalParents.size(), 0);
-
- UNIT_ASSERT_VALUES_EQUAL(n5->Parents.size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(n5->Children.size(), 0u);
- UNIT_ASSERT_VALUES_EQUAL(n5->HierarhicalParents.size(), 4);
- UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n0) == n5->HierarhicalParents.end());
- UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n1) != n5->HierarhicalParents.end());
- UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n2) != n5->HierarhicalParents.end());
- UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n3) != n5->HierarhicalParents.end());
- UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n4) != n5->HierarhicalParents.end());
+ graph.Rebuild(config);
+
+ const auto n0o = graph.GetPartition(0);
+ const auto n1o = graph.GetPartition(1);
+ const auto n2o = graph.GetPartition(2);
+ const auto n3o = graph.GetPartition(3);
+ const auto n4o = graph.GetPartition(4);
+ const auto n5o = graph.GetPartition(5);
+
+ UNIT_ASSERT(n0o);
+ UNIT_ASSERT(n1o);
+ UNIT_ASSERT(n2o);
+ UNIT_ASSERT(n3o);
+ UNIT_ASSERT(n4o);
+ UNIT_ASSERT(n5o);
+
+ auto& n0 = *n0o.value();
+ auto& n1 = *n1o.value();
+ auto& n2 = *n2o.value();
+ auto& n3 = *n3o.value();
+ auto& n4 = *n4o.value();
+ auto& n5 = *n5o.value();
+
+
+ UNIT_ASSERT_EQUAL(n0.Parents.size(), 0);
+ UNIT_ASSERT_EQUAL(n0.Children.size(), 0);
+ UNIT_ASSERT_EQUAL(n0.HierarhicalParents.size(), 0);
+
+ UNIT_ASSERT_EQUAL(n1.Parents.size(), 0);
+ UNIT_ASSERT_EQUAL(n1.Children.size(), 1);
+ UNIT_ASSERT_EQUAL(n1.HierarhicalParents.size(), 0);
+
+ UNIT_ASSERT_EQUAL_C(n5.Parents.size(), 2, "n5.Parents.size() == " << n5.Parents.size() << " but expected 2");
+ UNIT_ASSERT_EQUAL_C(n5.Children.size(), 0, "n5.Children.size() == " << n5.Children.size() << " but expected 0");
+ UNIT_ASSERT_EQUAL_C(n5.HierarhicalParents.size(), 4, "n5.HierarhicalParents.size() == " << n5.HierarhicalParents.size() << " but expected 4");
+ UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n0) == n5.HierarhicalParents.end());
+ UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n1) != n5.HierarhicalParents.end());
+ UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n2) != n5.HierarhicalParents.end());
+ UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n3) != n5.HierarhicalParents.end());
+ UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n4) != n5.HierarhicalParents.end());
}
}
diff --git a/ydb/core/persqueue/ut/pqtablet_mock.h b/ydb/core/persqueue/ut/pqtablet_mock.h
index a64526da77..0fd2cdc734 100644
--- a/ydb/core/persqueue/ut/pqtablet_mock.h
+++ b/ydb/core/persqueue/ut/pqtablet_mock.h
@@ -4,7 +4,6 @@
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
#include <ydb/core/testlib/actors/test_runtime.h>
#include <ydb/core/tx/tx_processing.h>
-#include <ydb/core/persqueue/events/global.h>
#include <ydb/library/actors/core/actor.h>
@@ -60,7 +59,6 @@ private:
switch (ev->GetTypeRewrite()) {
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
- // TX
HFunc(TEvTxProcessing::TEvReadSet, Handle);
HFunc(TEvTxProcessing::TEvReadSetAck, Handle);
HFunc(TEvPQTablet::TEvSendReadSet, Handle);
diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp
index 0c133e998b..c4a934454a 100644
--- a/ydb/core/persqueue/utils.cpp
+++ b/ydb/core/persqueue/utils.cpp
@@ -54,69 +54,29 @@ const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ
return nullptr;
}
-TPartitionGraph::TPartitionGraph() {
-}
-
-TPartitionGraph::TPartitionGraph(std::unordered_map<ui32, Node>&& partitions) {
- Partitions = std::move(partitions);
-}
-
-const TPartitionGraph::Node* TPartitionGraph::GetPartition(ui32 id) const {
- auto it = Partitions.find(id);
- if (it == Partitions.end()) {
- return nullptr;
- }
- return &it->second;
-}
-
-std::set<ui32> TPartitionGraph::GetActiveChildren(ui32 id) const {
- const auto* p = GetPartition(id);
- if (!p) {
- return {};
- }
-
- std::deque<const Node*> queue;
- queue.push_back(p);
-
- std::set<ui32> result;
- while(!queue.empty()) {
- const auto* n = queue.front();
- queue.pop_front();
+void TPartitionGraph::Rebuild(const NKikimrPQ::TPQTabletConfig& config) {
+ Partitions.clear();
- if (n->Children.empty()) {
- result.emplace(n->Id);
- } else {
- queue.insert(queue.end(), n->Children.begin(), n->Children.end());
- }
- }
-
- return result;
-}
-
-template<typename TPartition>
-std::unordered_map<ui32, TPartitionGraph::Node> BuildGraph(const ::google::protobuf::RepeatedPtrField<TPartition>& partitions) {
- std::unordered_map<ui32, TPartitionGraph::Node> result;
-
- if (0 == partitions.size()) {
- return result;
+ if (0 == config.AllPartitionsSize()) {
+ return;
}
- for (const auto& p : partitions) {
- result.emplace(p.GetPartitionId(), TPartitionGraph::Node(p.GetPartitionId(), p.GetTabletId()));
+ for (const auto& p : config.GetAllPartitions()) {
+ Partitions.emplace(p.GetPartitionId(), p);
}
- std::deque<TPartitionGraph::Node*> queue;
- for(const auto& p : partitions) {
- auto& node = result[p.GetPartitionId()];
+ std::deque<Node*> queue;
+ for(const auto& p : config.GetAllPartitions()) {
+ auto& node = Partitions[p.GetPartitionId()];
node.Children.reserve(p.ChildPartitionIdsSize());
for (auto id : p.GetChildPartitionIds()) {
- node.Children.push_back(&result[id]);
+ node.Children.push_back(&Partitions[id]);
}
node.Parents.reserve(p.ParentPartitionIdsSize());
for (auto id : p.GetParentPartitionIds()) {
- node.Parents.push_back(&result[id]);
+ node.Parents.push_back(&Partitions[id]);
}
if (p.GetParentPartitionIds().empty()) {
@@ -144,22 +104,19 @@ std::unordered_map<ui32, TPartitionGraph::Node> BuildGraph(const ::google::proto
queue.insert(queue.end(), n->Children.begin(), n->Children.end());
}
}
-
- return result;
-}
-
-
-TPartitionGraph::Node::Node(ui32 id, ui64 tabletId)
- : Id(id)
- , TabletId(tabletId) {
}
-TPartitionGraph MakePartitionGraph(const NKikimrPQ::TPQTabletConfig& config) {
- return TPartitionGraph(BuildGraph<NKikimrPQ::TPQTabletConfig::TPartition>(config.GetAllPartitions()));
+std::optional<const TPartitionGraph::Node*> TPartitionGraph::GetPartition(ui32 id) const {
+ auto it = Partitions.find(id);
+ if (it == Partitions.end()) {
+ return std::nullopt;
+ }
+ return std::optional(&it->second);
}
-TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescription& config) {
- return TPartitionGraph(BuildGraph<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition>(config.GetPartitions()));
+TPartitionGraph::Node::Node(const NKikimrPQ::TPQTabletConfig::TPartition& config) {
+ Id = config.GetPartitionId();
+ TabletId = config.GetTabletId();
}
} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h
index 65d76ada40..daa7f20d0a 100644
--- a/ydb/core/persqueue/utils.h
+++ b/ydb/core/persqueue/utils.h
@@ -1,6 +1,5 @@
#pragma once
-#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/protos/pqconfig.pb.h>
namespace NKikimr::NPQ {
@@ -23,7 +22,7 @@ public:
Node() = default;
Node(Node&&) = default;
- Node(ui32 id, ui64 tabletId);
+ Node(const NKikimrPQ::TPQTabletConfig::TPartition& config);
ui32 Id;
ui64 TabletId;
@@ -36,17 +35,11 @@ public:
std::set<Node*> HierarhicalParents;
};
- TPartitionGraph();
- TPartitionGraph(std::unordered_map<ui32, Node>&& partitions);
-
- const Node* GetPartition(ui32 id) const;
- std::set<ui32> GetActiveChildren(ui32 id) const;
+ void Rebuild(const NKikimrPQ::TPQTabletConfig& config);
+ std::optional<const Node*> GetPartition(ui32 id) const;
private:
std::unordered_map<ui32, Node> Partitions;
};
-TPartitionGraph MakePartitionGraph(const NKikimrPQ::TPQTabletConfig& config);
-TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescription& config);
-
} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/writer/common.h b/ydb/core/persqueue/writer/common.h
deleted file mode 100644
index 2d5a67c71f..0000000000
--- a/ydb/core/persqueue/writer/common.h
+++ /dev/null
@@ -1,34 +0,0 @@
-#pragma once
-
-#include <ydb/core/base/defs.h>
-#include <ydb/core/protos/msgbus.pb.h>
-#include <ydb/core/protos/msgbus_pq.pb.h>
-
-#include <ydb/public/lib/base/msgbus_status.h>
-
-namespace NKikimr::NPQ {
-
-inline bool BasicCheck(const NKikimrClient::TResponse& response, TString& error, bool mustHaveResponse = true) {
- if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
- error = TStringBuilder() << "Status is not ok"
- << ": status# " << static_cast<ui32>(response.GetStatus());
- return false;
- }
-
- if (response.GetErrorCode() != NPersQueue::NErrorCode::OK) {
- error = TStringBuilder() << "Error code is not ok"
- << ": code# " << static_cast<ui32>(response.GetErrorCode());
- return false;
- }
-
- if (mustHaveResponse && !response.HasPartitionResponse()) {
- error = "Absent partition response";
- return false;
- }
-
- return true;
-}
-
-
-} // namespace NKikimr::NPQ
-
diff --git a/ydb/core/persqueue/writer/metadata_initializers.cpp b/ydb/core/persqueue/writer/metadata_initializers.cpp
index dff6ade3ad..178a669d52 100644
--- a/ydb/core/persqueue/writer/metadata_initializers.cpp
+++ b/ydb/core/persqueue/writer/metadata_initializers.cpp
@@ -60,20 +60,6 @@ void TSrcIdMetaInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr cont
}
result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogCreateTable>(request, "create"));
-
- {
- Ydb::Table::AlterTableRequest request;
- request.set_session_id("");
- request.set_path(tablePath);
-
- {
- auto& column = *request.add_add_columns();
- column.set_name("SeqNo");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64);
- }
-
- result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogAlterTable>(request, "add_column_SeqNo"));
- }
}
result.emplace_back(NInitializer::TACLModifierConstructor::GetReadOnlyModifier(tablePath, "acl"));
controller->OnPreparationFinished(result);
diff --git a/ydb/core/persqueue/writer/partition_chooser.h b/ydb/core/persqueue/writer/partition_chooser.h
index a4d19c64ad..9ab9eee1cb 100644
--- a/ydb/core/persqueue/writer/partition_chooser.h
+++ b/ydb/core/persqueue/writer/partition_chooser.h
@@ -24,17 +24,13 @@ struct TEvPartitionChooser {
static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER)");
struct TEvChooseResult: public TEventLocal<TEvChooseResult, EvChooseResult> {
- TEvChooseResult(ui32 partitionId, ui64 tabletId, const TString& ownerCookie, std::optional<ui64> seqNo)
+ TEvChooseResult(ui32 partitionId, ui64 tabletId)
: PartitionId(partitionId)
- , TabletId(tabletId)
- , OwnerCookie(ownerCookie)
- , SeqNo(seqNo) {
+ , TabletId(tabletId) {
}
ui32 PartitionId;
ui64 TabletId;
- TString OwnerCookie;
- std::optional<ui64> SeqNo;
};
struct TEvChooseError: public TEventLocal<TEvChooseError, EvChooseError> {
@@ -67,7 +63,6 @@ public:
virtual const TPartitionInfo* GetPartition(const TString& sourceId) const = 0;
virtual const TPartitionInfo* GetPartition(ui32 partitionId) const = 0;
- virtual const TPartitionInfo* GetRandomPartition() const = 0;
};
std::shared_ptr<IPartitionChooser> CreatePartitionChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config, bool withoutHash = false);
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl.cpp b/ydb/core/persqueue/writer/partition_chooser_impl.cpp
index 3ac107bd35..d8102d2f32 100644
--- a/ydb/core/persqueue/writer/partition_chooser_impl.cpp
+++ b/ydb/core/persqueue/writer/partition_chooser_impl.cpp
@@ -1,5 +1,5 @@
#include "partition_chooser_impl.h"
-#include "ydb/library/actors/interconnect/types.h"
+#include "ydb/public/sdk/cpp/client/ydb_proto/accessor.h"
#include <library/cpp/digest/md5/md5.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
@@ -32,6 +32,424 @@ TString TMd5Converter::operator()(const TString& sourceId) const {
return AsKeyBound(Hash(sourceId));
}
+
+//
+// TPartitionChooserActor
+//
+
+TPartitionChooserActor::TPartitionChooserActor(TActorId parent,
+ const NKikimrSchemeOp::TPersQueueGroupDescription& config,
+ std::shared_ptr<IPartitionChooser>& chooser,
+ NPersQueue::TTopicConverterPtr& fullConverter,
+ const TString& sourceId,
+ std::optional<ui32> preferedPartition)
+ : Parent(parent)
+ , FullConverter(fullConverter)
+ , SourceId(sourceId)
+ , PreferedPartition(preferedPartition)
+ , Chooser(chooser)
+ , SplitMergeEnabled_(SplitMergeEnabled(config.GetPQTabletConfig()))
+ , Partition(nullptr)
+ , BalancerTabletId(config.GetBalancerTabletID()) {
+}
+
+void TPartitionChooserActor::Bootstrap(const TActorContext& ctx) {
+ const auto& pqConfig = AppData(ctx)->PQConfig;
+
+ NeedUpdateTable = (!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass()) && !SplitMergeEnabled_ && SourceId;
+
+ if (!SourceId) {
+ return ChoosePartition(ctx);
+ }
+
+ TableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping
+ : ESourceIdTableGeneration::SrcIdMeta2;
+ try {
+ EncodedSourceId = NSourceIdEncoding::EncodeSrcId(
+ FullConverter->GetTopicForSrcIdHash(), SourceId, TableGeneration
+ );
+ } catch (yexception& e) {
+ return ReplyError(ErrorCode::BAD_REQUEST, TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), ctx);
+ }
+
+ SelectQuery = GetSelectSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration);
+ UpdateQuery = GetUpdateSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration);
+
+ DEBUG("SelectQuery: " << SelectQuery);
+
+ if (pqConfig.GetTopicsAreFirstClassCitizen()) {
+ if (pqConfig.GetUseSrcIdMetaMappingInFirstClass()) {
+ TThisActor::Become(&TThis::StateInit);
+ InitTable(ctx);
+ } else {
+ ChoosePartition(ctx);
+ }
+ } else {
+ TThisActor::Become(&TThis::StateInit);
+ StartKqpSession(ctx);
+ }
+}
+
+void TPartitionChooserActor::Stop(const TActorContext& ctx) {
+ CloseKqpSession(ctx);
+ if (PipeToBalancer) {
+ NTabletPipe::CloseClient(ctx, PipeToBalancer);
+ }
+ IActor::Die(ctx);
+}
+
+void TPartitionChooserActor::ScheduleStop() {
+ TThisActor::Become(&TThis::StateDestroy);
+}
+
+TString TPartitionChooserActor::GetDatabaseName(const NActors::TActorContext& ctx) {
+ const auto& pqConfig = AppData(ctx)->PQConfig;
+ switch (TableGeneration) {
+ case ESourceIdTableGeneration::SrcIdMeta2:
+ return NKikimr::NPQ::GetDatabaseFromConfig(pqConfig);
+ case ESourceIdTableGeneration::PartitionMapping:
+ return AppData(ctx)->TenantName;
+ }
+}
+
+void TPartitionChooserActor::InitTable(const NActors::TActorContext& ctx) {
+ ctx.Send(
+ NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()),
+ new NMetadata::NProvider::TEvPrepareManager(NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant())
+ );
+}
+
+void TPartitionChooserActor::StartKqpSession(const NActors::TActorContext& ctx) {
+ auto ev = MakeCreateSessionRequest(ctx);
+ ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+}
+
+void TPartitionChooserActor::CloseKqpSession(const TActorContext& ctx) {
+ if (KqpSessionId) {
+ auto ev = MakeCloseSessionRequest();
+ ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+
+ KqpSessionId = "";
+ }
+}
+
+void TPartitionChooserActor::SendUpdateRequests(const TActorContext& ctx) {
+ TThisActor::Become(&TThis::StateUpdate);
+
+ auto ev = MakeUpdateQueryRequest(ctx);
+ ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+
+ UpdatesInflight++;
+}
+
+void TPartitionChooserActor::SendSelectRequest(const NActors::TActorContext& ctx) {
+ TThisActor::Become(&TThis::StateSelect);
+
+ auto ev = MakeSelectQueryRequest(ctx);
+ ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+
+ SelectInflight++;
+}
+
+THolder<NKqp::TEvKqp::TEvCreateSessionRequest> TPartitionChooserActor::MakeCreateSessionRequest(const NActors::TActorContext& ctx) {
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
+ ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
+ return ev;
+}
+
+THolder<NKqp::TEvKqp::TEvCloseSessionRequest> TPartitionChooserActor::MakeCloseSessionRequest() {
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
+ ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
+ return ev;
+}
+
+THolder<NKqp::TEvKqp::TEvQueryRequest> TPartitionChooserActor::MakeSelectQueryRequest(const NActors::TActorContext& ctx) {
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+
+ ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
+ ev->Record.MutableRequest()->SetQuery(SelectQuery);
+
+ ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
+ // fill tx settings: set commit tx flag& begin new serializable tx.
+ ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
+ ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false);
+ ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ // keep compiled query in cache.
+ ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
+
+ NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder();
+
+ SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId);
+
+ paramsBuilder
+ .AddParam("$Topic")
+ .Utf8(FullConverter->GetClientsideName())
+ .Build()
+ .AddParam("$SourceId")
+ .Utf8(EncodedSourceId.EscapedSourceId)
+ .Build();
+
+ NYdb::TParams params = paramsBuilder.Build();
+
+ ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params)));
+
+ return ev;
+}
+
+THolder<NKqp::TEvKqp::TEvQueryRequest> TPartitionChooserActor::MakeUpdateQueryRequest(const NActors::TActorContext& ctx) {
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+
+ ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
+ ev->Record.MutableRequest()->SetQuery(UpdateQuery);
+ ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
+ // fill tx settings: set commit tx flag& begin new serializable tx.
+ ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+ if (KqpSessionId) {
+ ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
+ }
+ if (TxId) {
+ ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId);
+ TxId = "";
+ } else {
+ ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ }
+ // keep compiled query in cache.
+ ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
+
+ NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder();
+
+ SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId);
+
+ paramsBuilder
+ .AddParam("$Topic")
+ .Utf8(FullConverter->GetClientsideName())
+ .Build()
+ .AddParam("$SourceId")
+ .Utf8(EncodedSourceId.EscapedSourceId)
+ .Build()
+ .AddParam("$CreateTime")
+ .Uint64(CreateTime)
+ .Build()
+ .AddParam("$AccessTime")
+ .Uint64(TInstant::Now().MilliSeconds())
+ .Build()
+ .AddParam("$Partition")
+ .Uint32(Partition->PartitionId)
+ .Build();
+
+ NYdb::TParams params = paramsBuilder.Build();
+
+ ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params)));
+
+ return ev;
+}
+
+void TPartitionChooserActor::RequestPQRB(const NActors::TActorContext& ctx) {
+ Y_ABORT_UNLESS(BalancerTabletId);
+
+ if (!PipeToBalancer) {
+ NTabletPipe::TClientConfig clientConfig;
+ clientConfig.RetryPolicy = {
+ .RetryLimitCount = 6,
+ .MinRetryTime = TDuration::MilliSeconds(10),
+ .MaxRetryTime = TDuration::MilliSeconds(100),
+ .BackoffMultiplier = 2,
+ .DoFirstRetryInstantly = true
+ };
+ PipeToBalancer = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, BalancerTabletId, clientConfig));
+ }
+
+ TThisActor::Become(&TThis::StateSelect);
+ NTabletPipe::SendData(ctx, PipeToBalancer, new TEvPersQueue::TEvGetPartitionIdForWrite());
+}
+
+void TPartitionChooserActor::ReplyResult(const NActors::TActorContext& ctx) {
+ ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId));
+}
+
+void TPartitionChooserActor::ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx) {
+ ctx.Send(Parent, new TEvPartitionChooser::TEvChooseError(code, std::move(errorMessage)));
+
+ Stop(ctx);
+}
+
+void TPartitionChooserActor::HandleInit(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx) {
+ StartKqpSession(ctx);
+}
+
+void TPartitionChooserActor::HandleInit(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
+
+ if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ53 : " << record, ctx);
+ return;
+ }
+
+ KqpSessionId = record.GetResponse().GetSessionId();
+ Y_ABORT_UNLESS(!KqpSessionId.empty());
+
+ SendSelectRequest(ctx);
+}
+
+void TPartitionChooserActor::HandleDestroy(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
+ KqpSessionId = record.GetResponse().GetSessionId();
+
+ Stop(ctx);
+}
+
+void TPartitionChooserActor::HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ auto& record = ev->Get()->Record.GetRef();
+
+ if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ return ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ50 : " << record, ctx);
+ }
+
+ auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
+
+ TxId = record.GetResponse().GetTxMeta().id();
+ Y_ABORT_UNLESS(!TxId.empty());
+
+ if (t.ListSize() != 0) {
+ auto& tt = t.GetList(0).GetStruct(0);
+ if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition
+ auto accessTime = t.GetList(0).GetStruct(2).GetOptional().GetUint64();
+ if (accessTime > AccessTime) { // AccessTime
+ PartitionId = tt.GetOptional().GetUint32();
+ DEBUG("Received partition " << PartitionId << " from table for SourceId=" << SourceId);
+ Partition = Chooser->GetPartition(PartitionId.value());
+ CreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64();
+ AccessTime = accessTime;
+ }
+ }
+ }
+
+ if (CreateTime == 0) {
+ CreateTime = TInstant::Now().MilliSeconds();
+ }
+
+ if (!Partition) {
+ ChoosePartition(ctx);
+ } else {
+ OnPartitionChosen(ctx);
+ }
+}
+
+void TPartitionChooserActor::HandleSelect(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) {
+ PartitionId = ev->Get()->Record.GetPartitionId();
+ DEBUG("Received partition " << PartitionId << " from PQRB for SourceId=" << SourceId);
+ Partition = Chooser->GetPartition(PartitionId.value());
+
+ OnPartitionChosen(ctx);
+}
+
+void TPartitionChooserActor::HandleSelect(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
+ Y_UNUSED(ev);
+
+ ReplyError(ErrorCode::INITIALIZING, "Pipe destroyed", ctx);
+}
+
+void TPartitionChooserActor::HandleUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ auto& record = ev->Get()->Record.GetRef();
+
+ if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) {
+ if (!PartitionPersisted) {
+ CloseKqpSession(ctx);
+ StartKqpSession(ctx);
+ }
+ return;
+ }
+
+ if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ if (!PartitionPersisted) {
+ ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ51 : " << record, ctx);
+ }
+ return;
+ }
+
+ if (!PartitionPersisted) {
+ ReplyResult(ctx);
+ PartitionPersisted = true;
+ // Use tx only for query after select. Updating AccessTime without transaction.
+ CloseKqpSession(ctx);
+ }
+
+ TThisActor::Become(&TThis::StateIdle);
+}
+
+void TPartitionChooserActor::HandleDestroy(NKqp::TEvKqp::TEvQueryResponse::TPtr&, const TActorContext& ctx) {
+ Stop(ctx);
+}
+
+void TPartitionChooserActor::HandleIdle(TEvPartitionChooser::TEvRefreshRequest::TPtr&, const TActorContext& ctx) {
+ if (PartitionPersisted) {
+ // we do not update AccessTime for Split/Merge partitions because don't use table.
+ SendUpdateRequests(ctx);
+ }
+}
+
+void TPartitionChooserActor::ChoosePartition(const TActorContext& ctx) {
+ auto [roundRobin, p] = ChoosePartitionSync(ctx);
+ if (roundRobin) {
+ RequestPQRB(ctx);
+ } else {
+ Partition = p;
+ OnPartitionChosen(ctx);
+ }
+}
+
+void TPartitionChooserActor::OnPartitionChosen(const TActorContext& ctx) {
+ if (!Partition && PreferedPartition) {
+ return ReplyError(ErrorCode::BAD_REQUEST,
+ TStringBuilder() << "Prefered partition " << (PreferedPartition.value() + 1) << " is not exists or inactive.",
+ ctx);
+ }
+
+ if (!Partition) {
+ return ReplyError(ErrorCode::INITIALIZING, "Can't choose partition", ctx);
+ }
+
+ if (PreferedPartition && Partition->PartitionId != PreferedPartition.value()) {
+ return ReplyError(ErrorCode::BAD_REQUEST,
+ TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId "
+ << (Partition->PartitionId + 1) << ", but client provided " << (PreferedPartition.value() + 1)
+ << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use "
+ "another MessageGroupId, specify PartitionGroupId " << (Partition->PartitionId + 1)
+ << ", or do not specify PartitionGroupId at all.",
+ ctx);
+ }
+
+ if (SplitMergeEnabled_ && SourceId && PartitionId) {
+ if (Partition != Chooser->GetPartition(SourceId)) {
+ return ReplyError(ErrorCode::BAD_REQUEST,
+ TStringBuilder() << "Message group " << SourceId << " not in a partition boundary", ctx);
+ }
+ }
+
+ if (NeedUpdateTable) {
+ SendUpdateRequests(ctx);
+ } else {
+ TThisActor::Become(&TThis::StateIdle);
+
+ ReplyResult(ctx);
+ }
+}
+
+std::pair<bool, const typename TPartitionChooserActor::TPartitionInfo*> TPartitionChooserActor::ChoosePartitionSync(const TActorContext& ctx) const {
+ const auto& pqConfig = AppData(ctx)->PQConfig;
+ if (SourceId && SplitMergeEnabled_) {
+ return {false, Chooser->GetPartition(SourceId)};
+ } else if (PreferedPartition) {
+ return {false, Chooser->GetPartition(PreferedPartition.value())};
+ } else if (pqConfig.GetTopicsAreFirstClassCitizen() && SourceId) {
+ return {false, Chooser->GetPartition(SourceId)};
+ } else {
+ return {true, nullptr};
+ }
+}
+
} // namespace NPartitionChooser
@@ -51,6 +469,7 @@ std::shared_ptr<IPartitionChooser> CreatePartitionChooser(const NKikimrSchemeOp:
}
}
+
IActor* CreatePartitionChooserActor(TActorId parentId,
const NKikimrSchemeOp::TPersQueueGroupDescription& config,
NPersQueue::TTopicConverterPtr& fullConverter,
@@ -58,13 +477,8 @@ IActor* CreatePartitionChooserActor(TActorId parentId,
std::optional<ui32> preferedPartition,
bool withoutHash) {
auto chooser = CreatePartitionChooser(config, withoutHash);
- if (SplitMergeEnabled(config.GetPQTabletConfig())) {
- return new NPartitionChooser::TSMPartitionChooserActor<NTabletPipe::TPipeHelper>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
- } else {
- return new NPartitionChooser::TPartitionChooserActor<NTabletPipe::TPipeHelper>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
- }
+ return new NPartitionChooser::TPartitionChooserActor(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
}
-} // namespace NKikimr::NPQ
-std::unordered_map<ui64, TActorId> NKikimr::NTabletPipe::NTest::TPipeMock::Tablets;
+} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl.h b/ydb/core/persqueue/writer/partition_chooser_impl.h
index 9fdacf20df..2fede5ae12 100644
--- a/ydb/core/persqueue/writer/partition_chooser_impl.h
+++ b/ydb/core/persqueue/writer/partition_chooser_impl.h
@@ -1,15 +1,29 @@
#pragma once
+#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <util/random/random.h>
+#include <ydb/core/base/appdata_fwd.h>
+#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/kqp/common/events/events.h>
+#include <ydb/core/kqp/common/simple/services.h>
+#include <ydb/core/persqueue/events/global.h>
+#include <ydb/core/persqueue/pq_database.h>
#include <ydb/core/persqueue/utils.h>
+#include <ydb/core/persqueue/writer/metadata_initializers.h>
+#include <ydb/library/yql/public/decimal/yql_decimal.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
-#include "partition_chooser_impl__old_chooser_actor.h"
-#include "partition_chooser_impl__sm_chooser_actor.h"
-
+#include "partition_chooser.h"
namespace NKikimr::NPQ {
namespace NPartitionChooser {
+#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, message);
+
+using namespace NActors;
+using namespace NSourceIdEncoding;
+using namespace Ydb::PersQueue::ErrorCode;
+
// For testing purposes
struct TAsIsSharder {
ui32 operator()(const TString& sourceId, ui32 totalShards) const;
@@ -46,7 +60,6 @@ public:
const TPartitionInfo* GetPartition(const TString& sourceId) const override;
const TPartitionInfo* GetPartition(ui32 partitionId) const override;
- const TPartitionInfo* GetRandomPartition() const override;
private:
const TString TopicName;
@@ -55,14 +68,13 @@ private:
};
// It is old alghoritm of choosing partition by SourceId
-template<typename THasher = TMd5Sharder>
+template<class THasher = TMd5Sharder>
class THashChooser: public IPartitionChooser {
public:
THashChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config);
const TPartitionInfo* GetPartition(const TString& sourceId) const override;
const TPartitionInfo* GetPartition(ui32 partitionId) const override;
- const TPartitionInfo* GetRandomPartition() const override;
private:
std::vector<TPartitionInfo> Partitions;
@@ -70,6 +82,139 @@ private:
};
+class TPartitionChooserActor: public TActorBootstrapped<TPartitionChooserActor> {
+ using TThis = TPartitionChooserActor;
+ using TThisActor = TActor<TThis>;
+
+ friend class TActorBootstrapped<TThis>;
+public:
+ using TPartitionInfo = typename IPartitionChooser::TPartitionInfo;
+
+ TPartitionChooserActor(TActorId parentId,
+ const NKikimrSchemeOp::TPersQueueGroupDescription& config,
+ std::shared_ptr<IPartitionChooser>& chooser,
+ NPersQueue::TTopicConverterPtr& fullConverter,
+ const TString& sourceId,
+ std::optional<ui32> preferedPartition);
+
+ void Bootstrap(const TActorContext& ctx);
+
+private:
+ void HandleInit(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx);
+ void HandleInit(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx);
+
+ STATEFN(StateInit) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NMetadata::NProvider::TEvManagerPrepared, HandleInit);
+ HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleInit);
+ sFunc(TEvents::TEvPoison, ScheduleStop);
+ }
+ }
+
+private:
+ void HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
+ void HandleSelect(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx);
+ void HandleSelect(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx);
+
+ STATEFN(StateSelect) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleSelect);
+ HFunc(TEvPersQueue::TEvGetPartitionIdForWriteResponse, HandleSelect);
+ HFunc(TEvTabletPipe::TEvClientDestroyed, HandleSelect);
+ sFunc(TEvents::TEvPoison, ScheduleStop);
+ }
+ }
+
+private:
+ void HandleIdle(TEvPartitionChooser::TEvRefreshRequest::TPtr& ev, const TActorContext& ctx);
+
+ STATEFN(StateIdle) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvPartitionChooser::TEvRefreshRequest, HandleIdle);
+ SFunc(TEvents::TEvPoison, Stop);
+ }
+ }
+
+private:
+ void HandleUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
+
+ STATEFN(StateUpdate) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleUpdate);
+ sFunc(TEvents::TEvPoison, ScheduleStop);
+ }
+ }
+
+private:
+ void HandleDestroy(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx);
+ void HandleDestroy(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
+
+ STATEFN(StateDestroy) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleDestroy);
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleDestroy);
+ }
+ }
+
+private:
+ void ScheduleStop();
+ void Stop(const TActorContext& ctx);
+
+ void ChoosePartition(const TActorContext& ctx);
+ void OnPartitionChosen(const TActorContext& ctx);
+ std::pair<bool, const TPartitionInfo*> ChoosePartitionSync(const TActorContext& ctx) const;
+
+ TString GetDatabaseName(const NActors::TActorContext& ctx);
+
+ void InitTable(const NActors::TActorContext& ctx);
+
+ void StartKqpSession(const NActors::TActorContext& ctx);
+ void CloseKqpSession(const TActorContext& ctx);
+ void SendUpdateRequests(const TActorContext& ctx);
+ void SendSelectRequest(const NActors::TActorContext& ctx);
+
+ void RequestPQRB(const NActors::TActorContext& ctx);
+
+ THolder<NKqp::TEvKqp::TEvCreateSessionRequest> MakeCreateSessionRequest(const NActors::TActorContext& ctx);
+ THolder<NKqp::TEvKqp::TEvCloseSessionRequest> MakeCloseSessionRequest();
+ THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSelectQueryRequest(const NActors::TActorContext& ctx);
+ THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateQueryRequest(const NActors::TActorContext& ctx);
+
+ void ReplyResult(const NActors::TActorContext& ctx);
+ void ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx);
+
+private:
+ const TActorId Parent;
+ const NPersQueue::TTopicConverterPtr FullConverter;
+ const TString SourceId;
+ const std::optional<ui32> PreferedPartition;
+ const std::shared_ptr<IPartitionChooser> Chooser;
+ const bool SplitMergeEnabled_;
+
+ std::optional<ui32> PartitionId;
+ const TPartitionInfo* Partition;
+ bool PartitionPersisted = false;
+ ui64 CreateTime = 0;
+ ui64 AccessTime = 0;
+
+ bool NeedUpdateTable = false;
+
+ NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId;
+ TString KqpSessionId;
+ TString TxId;
+
+ NPQ::ESourceIdTableGeneration TableGeneration;
+ TString SelectQuery;
+ TString UpdateQuery;
+
+ size_t UpdatesInflight = 0;
+ size_t SelectInflight = 0;
+
+ ui64 BalancerTabletId;
+ TActorId PipeToBalancer;
+};
+
+
//
// TBoundaryChooser
//
@@ -88,7 +233,7 @@ TBoundaryChooser<THasher>::TBoundaryChooser(const NKikimrSchemeOp::TPersQueueGro
}
std::sort(Partitions.begin(), Partitions.end(),
- [](const TPartitionInfo& a, const TPartitionInfo& b) { return !b.ToBound || (a.ToBound && a.ToBound < b.ToBound); });
+ [](const TPartitionInfo& a, const TPartitionInfo& b) { return a.ToBound && a.ToBound < b.ToBound; });
}
template<class THasher>
@@ -107,15 +252,6 @@ const typename TBoundaryChooser<THasher>::TPartitionInfo* TBoundaryChooser<THash
return it == Partitions.end() ? nullptr : it;
}
-template<class THasher>
-const typename TBoundaryChooser<THasher>::TPartitionInfo* TBoundaryChooser<THasher>::GetRandomPartition() const {
- if (Partitions.empty()) {
- return nullptr;
- }
- size_t p = RandomNumber<size_t>(Partitions.size());
- return &Partitions[p];
-}
-
//
@@ -136,9 +272,6 @@ THashChooser<THasher>::THashChooser(const NKikimrSchemeOp::TPersQueueGroupDescri
template<class THasher>
const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::GetPartition(const TString& sourceId) const {
- if (Partitions.empty()) {
- return nullptr;
- }
return &Partitions[Hasher(sourceId, Partitions.size())];
}
@@ -152,32 +285,5 @@ const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::Get
return it->PartitionId == partitionId ? it : nullptr;
}
-template<class THasher>
-const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::GetRandomPartition() const {
- if (Partitions.empty()) {
- return nullptr;
- }
- size_t p = RandomNumber<size_t>(Partitions.size());
- return &Partitions[p];
-}
-
-
} // namespace NPartitionChooser
-
-
-inline IActor* CreatePartitionChooserActorM(TActorId parentId,
- const NKikimrSchemeOp::TPersQueueGroupDescription& config,
- NPersQueue::TTopicConverterPtr& fullConverter,
- const TString& sourceId,
- std::optional<ui32> preferedPartition,
- bool withoutHash) {
- auto chooser = CreatePartitionChooser(config, withoutHash);
- if (SplitMergeEnabled(config.GetPQTabletConfig())) {
- return new NPartitionChooser::TSMPartitionChooserActor<NTabletPipe::NTest::TPipeMock>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
- } else {
- return new NPartitionChooser::TPartitionChooserActor<NTabletPipe::NTest::TPipeMock>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
- }
-}
-
-
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h
deleted file mode 100644
index 8d718d8d45..0000000000
--- a/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h
+++ /dev/null
@@ -1,368 +0,0 @@
-#pragma once
-
-#include "partition_chooser.h"
-#include "partition_chooser_impl__partition_helper.h"
-#include "partition_chooser_impl__table_helper.h"
-
-#include <ydb/core/persqueue/pq_database.h>
-#include <ydb/core/persqueue/writer/metadata_initializers.h>
-#include <ydb/library/actors/core/actor_bootstrapped.h>
-
-namespace NKikimr::NPQ::NPartitionChooser {
-
-#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR)
-#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR"
-#endif
-
-
-#define LOG_PREFIX "TPartitionChooser " << SelfId() \
- << " (SourceId=" << SourceId \
- << ", PreferedPartition=" << PreferedPartition \
- << ") "
-#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-
-using TPartitionInfo = typename IPartitionChooser::TPartitionInfo;
-
-using namespace NActors;
-using namespace NSourceIdEncoding;
-using namespace Ydb::PersQueue::ErrorCode;
-
-template<typename TDerived, typename TPipeCreator>
-class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> {
-public:
- using TThis = TAbstractPartitionChooserActor<TDerived, TPipeCreator>;
- using TThisActor = TActor<TThis>;
-
-
- TAbstractPartitionChooserActor(TActorId parentId,
- std::shared_ptr<IPartitionChooser>& chooser,
- NPersQueue::TTopicConverterPtr& fullConverter,
- const TString& sourceId,
- std::optional<ui32> preferedPartition)
- : Parent(parentId)
- , SourceId(sourceId)
- , PreferedPartition(preferedPartition)
- , Chooser(chooser)
- , TableHelper(fullConverter->GetClientsideName(), fullConverter->GetTopicForSrcIdHash())
- , PartitionHelper() {
- }
-
- TActorIdentity SelfId() const {
- return TActor<TDerived>::SelfId();
- }
-
- void Initialize(const NActors::TActorContext& ctx) {
- TableHelper.Initialize(ctx, SourceId);
- }
-
- void PassAway() {
- auto ctx = TActivationContext::ActorContextFor(SelfId());
- TableHelper.CloseKqpSession(ctx);
- PartitionHelper.Close(ctx);
- }
-
- bool NeedTable(const NActors::TActorContext& ctx) {
- const auto& pqConfig = AppData(ctx)->PQConfig;
- return SourceId && (!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass());
- }
-
-protected:
- void InitTable(const NActors::TActorContext& ctx) {
- TThis::Become(&TThis::StateInitTable);
- const auto& pqConfig = AppData(ctx)->PQConfig;
- if (SourceId && pqConfig.GetTopicsAreFirstClassCitizen() && pqConfig.GetUseSrcIdMetaMappingInFirstClass()) {
- DEBUG("InitTable");
- TableHelper.SendInitTableRequest(ctx);
- } else {
- StartKqpSession(ctx);
- }
- }
-
- void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx) {
- StartKqpSession(ctx);
- }
-
- STATEFN(StateInitTable) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle);
- SFunc(TEvents::TEvPoison, TThis::Die);
- }
- }
-
-protected:
- void StartKqpSession(const NActors::TActorContext& ctx) {
- if (NeedTable(ctx)) {
- DEBUG("StartKqpSession")
- TThis::Become(&TThis::StateCreateKqpSession);
- TableHelper.SendCreateSessionRequest(ctx);
- } else {
- OnSelected(ctx);
- }
- }
-
- void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) {
- if (!TableHelper.Handle(ev, ctx)) {
- return ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ53 : " << ev->Get()->Record.DebugString(), ctx);
- }
-
- SendSelectRequest(ctx);
- }
-
- void ScheduleStop() {
- TThis::Become(&TThis::StateDestroying);
- }
-
- STATEFN(StateCreateKqpSession) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle);
- sFunc(TEvents::TEvPoison, ScheduleStop);
- }
- }
-
-protected:
- void SendSelectRequest(const NActors::TActorContext& ctx) {
- TThis::Become(&TThis::StateSelect);
- DEBUG("Select from the table");
- TableHelper.SendSelectRequest(ctx);
- }
-
- void HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
- if (!TableHelper.HandleSelect(ev, ctx)) {
- return ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ50 : " << ev->Get()->Record.DebugString(), ctx);
- }
-
- TRACE("Selected from table PartitionId=" << TableHelper.PartitionId() << " SeqNo=" << TableHelper.SeqNo());
- if (TableHelper.PartitionId()) {
- Partition = Chooser->GetPartition(TableHelper.PartitionId().value());
- }
- SeqNo = TableHelper.SeqNo();
-
- OnSelected(ctx);
- }
-
- virtual void OnSelected(const NActors::TActorContext& ctx) = 0;
-
- STATEFN(StateSelect) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleSelect);
- SFunc(TEvents::TEvPoison, TThis::Die);
- }
- }
-
-protected:
- void SendUpdateRequests(const TActorContext& ctx) {
- if (NeedTable(ctx)) {
- TThis::Become(&TThis::StateUpdate);
- DEBUG("Update the table");
- TableHelper.SendUpdateRequest(Partition->PartitionId, SeqNo, ctx);
- } else {
- StartGetOwnership(ctx);
- }
- }
-
- void HandleUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record.GetRef();
- DEBUG("HandleUpdate PartitionPersisted=" << PartitionPersisted << " Status=" << record.GetYdbStatus());
-
- if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) {
- if (!PartitionPersisted) {
- TableHelper.CloseKqpSession(ctx);
- StartKqpSession(ctx);
- }
- return;
- }
-
- if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
- if (!PartitionPersisted) {
- ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ51 : " << record, ctx);
- }
- return;
- }
-
- if (!PartitionPersisted) {
- PartitionPersisted = true;
- // Use tx only for query after select. Updating AccessTime without transaction.
- TableHelper.CloseKqpSession(ctx);
-
- return StartGetOwnership(ctx);
- }
-
- StartIdle();
- }
-
- STATEFN(StateUpdate) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleUpdate);
- sFunc(TEvents::TEvPoison, ScheduleStop);
- }
- }
-
-protected:
- void StartCheckPartitionRequest(const TActorContext &ctx) {
- TThis::Become(&TThis::StateCheckPartition);
- PartitionHelper.Open(Partition->TabletId, ctx);
- PartitionHelper.SendCheckPartitionStatusRequest(Partition->PartitionId, ctx);
- }
-
- void Handle(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse::TPtr& ev, const NActors::TActorContext& ctx) {
- if (NKikimrPQ::ETopicPartitionStatus::Active == ev->Get()->Record.GetStatus()) {
- PartitionHelper.Close(ctx);
- return SendUpdateRequests(ctx);
- }
- ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "Partition isn`t active", ctx);
- }
-
- STATEFN(StateCheckPartition) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse, Handle);
- HFunc(TEvTabletPipe::TEvClientConnected, HandleOwnership);
- HFunc(TEvTabletPipe::TEvClientDestroyed, HandleOwnership);
- sFunc(TEvents::TEvPoison, ScheduleStop);
- }
- }
-
-protected:
- void StartGetOwnership(const TActorContext &ctx) {
- TThis::Become(&TThis::StateOwnership);
- if (!Partition) {
- return ReplyError(ErrorCode::INITIALIZING, "Partition not choosed", ctx);
- }
-
- DEBUG("GetOwnership Partition TabletId=" << Partition->TabletId);
-
- PartitionHelper.Open(Partition->TabletId, ctx);
- PartitionHelper.SendGetOwnershipRequest(Partition->PartitionId, SourceId, true, ctx);
- }
-
- void HandleOwnership(TEvPersQueue::TEvResponse::TPtr& ev, const NActors::TActorContext& ctx) {
- DEBUG("HandleOwnership");
- auto& record = ev->Get()->Record;
-
- TString error;
- if (!BasicCheck(record, error)) {
- return ReplyError(ErrorCode::INITIALIZING, std::move(error), ctx);
- }
-
- const auto& response = record.GetPartitionResponse();
- if (!response.HasCmdGetOwnershipResult()) {
- return ReplyError(ErrorCode::INITIALIZING, "Absent Ownership result", ctx);
- }
-
- if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) {
- return ReplyError(ErrorCode::INITIALIZING, "Partition is not active", ctx);
- }
-
- OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie();
-
- PartitionHelper.Close(ctx);
-
- OnOwnership(ctx);
- }
-
- void HandleOwnership(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) {
- auto msg = ev->Get();
- if (PartitionHelper.IsPipe(ev->Sender) && msg->Status != NKikimrProto::OK) {
- TableHelper.CloseKqpSession(ctx);
- ReplyError(ErrorCode::INITIALIZING, "Pipe closed", ctx);
- }
- }
-
- void HandleOwnership(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx) {
- if (PartitionHelper.IsPipe(ev->Sender)) {
- TableHelper.CloseKqpSession(ctx);
- ReplyError(ErrorCode::INITIALIZING, "Pipe closed", ctx);
- }
- }
-
- virtual void OnOwnership(const TActorContext &ctx) = 0;
-
- STATEFN(StateOwnership) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvPersQueue::TEvResponse, HandleOwnership);
- HFunc(TEvTabletPipe::TEvClientConnected, HandleOwnership);
- HFunc(TEvTabletPipe::TEvClientDestroyed, HandleOwnership);
- sFunc(TEvents::TEvPoison, ScheduleStop);
- }
- }
-
-
-protected:
- void StartIdle() {
- TThis::Become(&TThis::StateIdle);
- DEBUG("Start idle");
- }
-
- void HandleIdle(TEvPartitionChooser::TEvRefreshRequest::TPtr&, const TActorContext& ctx) {
- if (PartitionPersisted) {
- SendUpdateRequests(ctx);
- }
- }
-
- STATEFN(StateIdle) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvPartitionChooser::TEvRefreshRequest, HandleIdle);
- SFunc(TEvents::TEvPoison, TThis::Die);
- }
- }
-
-
-protected:
- void HandleDestroy(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) {
- TableHelper.Handle(ev, ctx);
- TThis::Die(ctx);
- }
-
- STATEFN(StateDestroying) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleDestroy);
- }
- }
-
-protected:
- void ReplyResult(const NActors::TActorContext& ctx) {
- ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId, TThis::OwnerCookie, SeqNo));
- }
-
- void ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx) {
- INFO("ReplyError: " << errorMessage);
- ctx.Send(Parent, new TEvPartitionChooser::TEvChooseError(code, std::move(errorMessage)));
-
- TThis::Die(ctx);
- }
-
-
-protected:
- const TActorId Parent;
- const TString SourceId;
- const std::optional<ui32> PreferedPartition;
- const std::shared_ptr<IPartitionChooser> Chooser;
-
- const TPartitionInfo* Partition = nullptr;
-
- TTableHelper TableHelper;
- TPartitionHelper<TPipeCreator> PartitionHelper;
-
- bool PartitionPersisted = false;
-
- TString OwnerCookie;
- std::optional<ui64> SeqNo = 0;
-};
-
-#undef LOG_PREFIX
-#undef TRACE
-#undef DEBUG
-#undef INFO
-#undef ERROR
-
-} // namespace NKikimr::NPQ::NPartitionChooser
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h
deleted file mode 100644
index 97a9750952..0000000000
--- a/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h
+++ /dev/null
@@ -1,163 +0,0 @@
-#pragma once
-
-#include "partition_chooser_impl__abstract_chooser_actor.h"
-#include "partition_chooser_impl__pqrb_helper.h"
-
-namespace NKikimr::NPQ::NPartitionChooser {
-
-#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR)
-#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR"
-#endif
-
-
-#define LOG_PREFIX "TPartitionChooser " << SelfId() \
- << " (SourceId=" << TThis::SourceId \
- << ", PreferedPartition=" << TThis::PreferedPartition \
- << ") "
-#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-
-template<typename TPipeCreator>
-class TPartitionChooserActor: public TAbstractPartitionChooserActor<TPartitionChooserActor<TPipeCreator>, TPipeCreator> {
-public:
- using TThis = TPartitionChooserActor<TPipeCreator>;
- using TThisActor = TActor<TThis>;
- using TParentActor = TAbstractPartitionChooserActor<TPartitionChooserActor<TPipeCreator>, TPipeCreator>;
-
- TPartitionChooserActor(TActorId parentId,
- const NKikimrSchemeOp::TPersQueueGroupDescription& config,
- std::shared_ptr<IPartitionChooser>& chooser,
- NPersQueue::TTopicConverterPtr& fullConverter,
- const TString& sourceId,
- std::optional<ui32> preferedPartition)
- : TAbstractPartitionChooserActor<TPartitionChooserActor<TPipeCreator>, TPipeCreator>(parentId, chooser, fullConverter, sourceId, preferedPartition)
- , PQRBHelper(config.GetBalancerTabletID()) {
- }
-
- void Bootstrap(const TActorContext& ctx) {
- TThis::Initialize(ctx);
- TThis::InitTable(ctx);
- }
-
- TActorIdentity SelfId() const {
- return TActor<TPartitionChooserActor<TPipeCreator>>::SelfId();
- }
-
- void OnSelected(const TActorContext &ctx) override {
- if (TThis::Partition) {
- return OnPartitionChosen(ctx);
- }
-
- auto [roundRobin, p] = ChoosePartitionSync(ctx);
- if (roundRobin) {
- RequestPQRB(ctx);
- } else {
- TThis::Partition = p;
- OnPartitionChosen(ctx);
- }
- }
-
- void OnOwnership(const TActorContext &ctx) override {
- DEBUG("OnOwnership");
- TThis::ReplyResult(ctx);
- }
-
-private:
- void RequestPQRB(const NActors::TActorContext& ctx) {
- DEBUG("RequestPQRB")
- TThis::Become(&TThis::StatePQRB);
-
- if (PQRBHelper.PartitionId()) {
- PartitionId = PQRBHelper.PartitionId();
- OnPartitionChosen(ctx);
- } else {
- PQRBHelper.SendRequest(ctx);
- }
- }
-
- void Handle(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) {
- PartitionId = PQRBHelper.Handle(ev, ctx);
- DEBUG("Received partition " << PartitionId << " from PQRB for SourceId=" << TThis::SourceId);
- TThis::Partition = TThis::Chooser->GetPartition(PQRBHelper.PartitionId().value());
-
- PQRBHelper.Close(ctx);
-
- OnPartitionChosen(ctx);
- }
-
- void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) {
- if (PQRBHelper.IsPipe(ev->Sender) && ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
- TThis::ReplyError(ErrorCode::INITIALIZING, "Pipe connection fail", ctx);
- }
- }
-
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx) {
- if(PQRBHelper.IsPipe(ev->Sender)) {
- TThis::ReplyError(ErrorCode::INITIALIZING, "Pipe destroyed", ctx);
- }
- }
-
- STATEFN(StatePQRB) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvPersQueue::TEvGetPartitionIdForWriteResponse, Handle);
- HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
- SFunc(TEvents::TEvPoison, TThis::Die);
- }
- }
-
-
-private:
- void OnPartitionChosen(const TActorContext& ctx) {
- TRACE("OnPartitionChosen");
-
- if (!TThis::Partition && TThis::PreferedPartition) {
- return TThis::ReplyError(ErrorCode::BAD_REQUEST,
- TStringBuilder() << "Prefered partition " << (TThis::PreferedPartition.value() + 1) << " is not exists or inactive.",
- ctx);
- }
-
- if (!TThis::Partition) {
- return TThis::ReplyError(ErrorCode::INITIALIZING, "Can't choose partition", ctx);
- }
-
- if (TThis::PreferedPartition && TThis::Partition->PartitionId != TThis::PreferedPartition.value()) {
- return TThis::ReplyError(ErrorCode::BAD_REQUEST,
- TStringBuilder() << "MessageGroupId " << TThis::SourceId << " is already bound to PartitionGroupId "
- << (TThis::Partition->PartitionId + 1) << ", but client provided " << (TThis::PreferedPartition.value() + 1)
- << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use "
- "another MessageGroupId, specify PartitionGroupId " << (TThis::Partition->PartitionId + 1)
- << ", or do not specify PartitionGroupId at all.",
- ctx);
- }
-
- TThis::SendUpdateRequests(ctx);
- }
-
- std::pair<bool, const TPartitionInfo*> ChoosePartitionSync(const TActorContext& ctx) const {
- const auto& pqConfig = AppData(ctx)->PQConfig;
- if (TThis::PreferedPartition) {
- return {false, TThis::Chooser->GetPartition(TThis::PreferedPartition.value())};
- } else if (pqConfig.GetTopicsAreFirstClassCitizen() && TThis::SourceId) {
- return {false, TThis::Chooser->GetPartition(TThis::SourceId)};
- } else {
- return {true, nullptr};
- }
- };
-
-
-private:
- std::optional<ui32> PartitionId;
-
- TPQRBHelper<TPipeCreator> PQRBHelper;
-};
-
-#undef LOG_PREFIX
-#undef TRACE
-#undef DEBUG
-#undef INFO
-#undef ERROR
-
-} // namespace NKikimr::NPQ::NPartitionChooser
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h
deleted file mode 100644
index 463217557c..0000000000
--- a/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h
+++ /dev/null
@@ -1,86 +0,0 @@
-#pragma once
-
-#include "common.h"
-#include "pipe_utils.h"
-#include "source_id_encoding.h"
-
-#include <ydb/core/persqueue/events/global.h>
-#include <ydb/core/persqueue/events/internal.h>
-#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
-
-namespace NKikimr::NPQ::NPartitionChooser {
-
-template<typename TPipeCreator>
-class TPartitionHelper {
-public:
- void Open(ui64 tabletId, const TActorContext& ctx) {
- Close(ctx);
-
- NTabletPipe::TClientConfig clientConfig;
- clientConfig.RetryPolicy = {
- .RetryLimitCount = 6,
- .MinRetryTime = TDuration::MilliSeconds(10),
- .MaxRetryTime = TDuration::MilliSeconds(100),
- .BackoffMultiplier = 2,
- .DoFirstRetryInstantly = true
- };
- Pipe = ctx.RegisterWithSameMailbox(TPipeCreator::CreateClient(ctx.SelfID, tabletId, clientConfig));
- }
-
- void SendGetOwnershipRequest(ui32 partitionId, const TString& sourceId, bool registerIfNotExists, const TActorContext& ctx) {
- auto ev = MakeRequest(partitionId, Pipe);
-
- auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetOwnership();
- cmd.SetOwner(sourceId ? sourceId : CreateGuidAsString());
- cmd.SetForce(true);
- cmd.SetRegisterIfNotExists(registerIfNotExists);
-
- NTabletPipe::SendData(ctx, Pipe, ev.Release());
- }
-
- void SendMaxSeqNoRequest(ui32 partitionId, const TString& sourceId, const TActorContext& ctx) {
- auto ev = MakeRequest(partitionId, Pipe);
-
- auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetMaxSeqNo();
- cmd.AddSourceId(NSourceIdEncoding::EncodeSimple(sourceId));
-
- NTabletPipe::SendData(ctx, Pipe, ev.Release());
- }
-
- void SendCheckPartitionStatusRequest(ui32 partitionId, const TActorContext& ctx) {
- auto ev = MakeHolder<NKikimr::TEvPQ::TEvCheckPartitionStatusRequest>(partitionId);
-
- NTabletPipe::SendData(ctx, Pipe, ev.Release());
- }
-
- void Close(const TActorContext& ctx) {
- if (Pipe) {
- NTabletPipe::CloseClient(ctx, Pipe);
- Pipe = TActorId();
- }
- }
-
- const TString& OwnerCookie() const {
- return OwnerCookie_;
- }
-
- bool IsPipe(const TActorId& actorId) const {
- return actorId == Pipe;
- }
-
-private:
- THolder<TEvPersQueue::TEvRequest> MakeRequest(ui32 partitionId, TActorId pipe) {
- auto ev = MakeHolder<TEvPersQueue::TEvRequest>();
-
- ev->Record.MutablePartitionRequest()->SetPartition(partitionId);
- ActorIdToProto(pipe, ev->Record.MutablePartitionRequest()->MutablePipeClient());
-
- return ev;
- }
-
-private:
- TActorId Pipe;
- TString OwnerCookie_;
-};
-
-} // namespace NKikimr::NPQ::NPartitionChooser
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__pqrb_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__pqrb_helper.h
deleted file mode 100644
index c1a235901b..0000000000
--- a/ydb/core/persqueue/writer/partition_chooser_impl__pqrb_helper.h
+++ /dev/null
@@ -1,65 +0,0 @@
-#pragma once
-
-#include "pipe_utils.h"
-
-#include <ydb/core/persqueue/events/global.h>
-
-
-namespace NKikimr::NPQ::NPartitionChooser {
-
-template<typename TPipeCreator>
-class TPQRBHelper {
-public:
- TPQRBHelper(ui64 balancerTabletId)
- : BalancerTabletId(balancerTabletId) {
- }
-
- std::optional<ui32> PartitionId() const {
- return PartitionId_;
- }
-
- void SendRequest(const NActors::TActorContext& ctx) {
- Y_ABORT_UNLESS(BalancerTabletId);
-
- if (!Pipe) {
- NTabletPipe::TClientConfig clientConfig;
- clientConfig.RetryPolicy = {
- .RetryLimitCount = 6,
- .MinRetryTime = TDuration::MilliSeconds(10),
- .MaxRetryTime = TDuration::MilliSeconds(100),
- .BackoffMultiplier = 2,
- .DoFirstRetryInstantly = true
- };
- Pipe = ctx.RegisterWithSameMailbox(TPipeCreator::CreateClient(ctx.SelfID, BalancerTabletId, clientConfig));
- }
-
- NTabletPipe::SendData(ctx, Pipe, new TEvPersQueue::TEvGetPartitionIdForWrite());
- }
-
- ui32 Handle(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) {
- Close(ctx);
-
- PartitionId_ = ev->Get()->Record.GetPartitionId();
- return PartitionId_.value();
- }
-
- void Close(const TActorContext& ctx) {
- if (Pipe) {
- NTabletPipe::CloseClient(ctx, Pipe);
- Pipe = TActorId();
- }
- }
-
- bool IsPipe(const TActorId& actorId) const {
- return actorId == Pipe;
- }
-
-private:
- const ui64 BalancerTabletId;
-
- TActorId Pipe;
- std::optional<ui32> PartitionId_;
-};
-
-
-} // namespace NKikimr::NPQ::NPartitionChooser
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h
deleted file mode 100644
index 695af84d59..0000000000
--- a/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h
+++ /dev/null
@@ -1,272 +0,0 @@
-#pragma once
-
-#include "partition_chooser_impl__abstract_chooser_actor.h"
-
-#include <ydb/core/persqueue/utils.h>
-
-namespace NKikimr::NPQ::NPartitionChooser {
-
-#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR)
-#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR"
-#endif
-
-
-#define LOG_PREFIX "TPartitionChooser " << SelfId() \
- << " (SourceId=" << TThis::SourceId \
- << ", PreferedPartition=" << TThis::PreferedPartition \
- << ") "
-#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define INFO(message) LOG_INFO_S (*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-
-template<typename TPipeCreator>
-class TSMPartitionChooserActor: public TAbstractPartitionChooserActor<TSMPartitionChooserActor<TPipeCreator>, TPipeCreator> {
-public:
- using TThis = TSMPartitionChooserActor<TPipeCreator>;
- using TThisActor = TActor<TThis>;
- using TParentActor = TAbstractPartitionChooserActor<TSMPartitionChooserActor<TPipeCreator>, TPipeCreator>;
-
- TSMPartitionChooserActor(TActorId parentId,
- const NKikimrSchemeOp::TPersQueueGroupDescription& config,
- std::shared_ptr<IPartitionChooser>& chooser,
- NPersQueue::TTopicConverterPtr& fullConverter,
- const TString& sourceId,
- std::optional<ui32> preferedPartition)
- : TAbstractPartitionChooserActor<TSMPartitionChooserActor<TPipeCreator>, TPipeCreator>(parentId, chooser, fullConverter, sourceId, preferedPartition)
- , Graph(MakePartitionGraph(config)) {
-
- }
-
- void Bootstrap(const TActorContext& ctx) {
- BoundaryPartition = ChoosePartitionSync();
-
- if (TThis::SourceId) {
- TThis::Initialize(ctx);
- GetOwnershipFast(ctx);
- } else {
- TThis::Partition = BoundaryPartition;
- TThis::StartGetOwnership(ctx);
- }
- }
-
- TActorIdentity SelfId() const {
- return TActor<TSMPartitionChooserActor<TPipeCreator>>::SelfId();
- }
-
- void OnSelected(const TActorContext &ctx) override {
- if (TThis::Partition) {
- // If we have found a partition, it means that the partition is active, which means
- // that we need to continue writing to it, and it does not matter whether it falls
- // within the boundaries of the distribution.
- return OnPartitionChosen(ctx);
- }
-
- if (!TThis::TableHelper.PartitionId()) {
- // They didn't write with this SourceId earlier, or the SourceID has already been deleted.
- TThis::Partition = BoundaryPartition;
- return OnPartitionChosen(ctx);
- }
-
- const auto* node = Graph.GetPartition(TThis::TableHelper.PartitionId().value());
- if (!node) {
- // The partition where the writting was performed earlier has already been deleted.
- // We can write without taking into account the hierarchy of the partition.
- TThis::Partition = BoundaryPartition;
- return OnPartitionChosen(ctx);
- }
-
- // Choosing a partition based on the split and merge hierarchy.
- auto activeChildren = Graph.GetActiveChildren(TThis::TableHelper.PartitionId().value());
- if (activeChildren.empty()) {
- return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "has't active partition Marker# PC01", ctx);
- }
-
- if (activeChildren.contains(BoundaryPartition->PartitionId)) {
- // First of all, we are trying to write to the partition taking into account the
- // distribution boundaries.
- TThis::Partition = BoundaryPartition;
- } else {
- // It is important to save the write into account the hierarchy of partitions, because
- // this will preserve the guarantees of the order of reading.
- auto n = RandomNumber<size_t>(activeChildren.size());
- std::vector<ui32> ac;
- ac.reserve(activeChildren.size());
- ac.insert(ac.end(), activeChildren.begin(), activeChildren.end());
- auto id = ac[n];
- TThis::Partition = TThis::Chooser->GetPartition(id);
- }
-
- if (!TThis::Partition) {
- return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "can't choose partition Marker# PC02", ctx);
- }
-
- GetOldSeqNo(ctx);
- }
-
- void OnOwnership(const TActorContext &ctx) override {
- DEBUG("OnOwnership");
- TThis::ReplyResult(ctx);
- }
-
-private:
- void GetOwnershipFast(const TActorContext &ctx) {
- TThis::Become(&TThis::StateOwnershipFast);
- if (!BoundaryPartition) {
- return TThis::ReplyError(ErrorCode::INITIALIZING, "A partition not choosed", ctx);
- }
-
- DEBUG("GetOwnershipFast Partition=" << BoundaryPartition->PartitionId << " TabletId=" << BoundaryPartition->TabletId);
-
- TThis::PartitionHelper.Open(BoundaryPartition->TabletId, ctx);
- TThis::PartitionHelper.SendGetOwnershipRequest(BoundaryPartition->PartitionId, TThis::SourceId, false, ctx);
- }
-
- void HandleOwnershipFast(TEvPersQueue::TEvResponse::TPtr& ev, const NActors::TActorContext& ctx) {
- DEBUG("HandleOwnershipFast");
- auto& record = ev->Get()->Record;
-
- TString error;
- if (!BasicCheck(record, error)) {
- return TThis::InitTable(ctx);
- }
-
- const auto& response = record.GetPartitionResponse();
- if (!response.HasCmdGetOwnershipResult()) {
- return TThis::ReplyError(ErrorCode::INITIALIZING, "Absent Ownership result", ctx);
- }
-
- if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) {
- return TThis::ReplyError(ErrorCode::INITIALIZING, "Configuration changed", ctx);
- }
-
- TThis::OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie();
-
- if (response.GetCmdGetOwnershipResult().GetSeqNo() > 0) {
- // Fast path: the partition ative and already written
- TThis::SendUpdateRequests(ctx);
- return TThis::ReplyResult(ctx);
- }
-
- TThis::InitTable(ctx);
- }
-
- STATEFN(StateOwnershipFast) {
- TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER);
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvPersQueue::TEvResponse, HandleOwnershipFast);
- HFunc(TEvTabletPipe::TEvClientConnected, TThis::HandleOwnership);
- HFunc(TEvTabletPipe::TEvClientDestroyed, TThis::HandleOwnership);
- SFunc(TEvents::TEvPoison, TThis::Die);
- }
- }
-
-
-private:
- void GetOldSeqNo(const TActorContext &ctx) {
- DEBUG("GetOldSeqNo");
- TThis::Become(&TThis::StateGetMaxSeqNo);
-
- const auto* oldNode = Graph.GetPartition(TThis::TableHelper.PartitionId().value());
-
- if (!oldNode) {
- return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "Inconsistent status Marker# PC03", ctx);
- }
-
- TThis::PartitionHelper.Open(oldNode->TabletId, ctx);
- TThis::PartitionHelper.SendMaxSeqNoRequest(oldNode->Id, TThis::SourceId, ctx);
- }
-
- void HandleMaxSeqNo(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record;
-
- TString error;
- if (!BasicCheck(record, error)) {
- return TThis::ReplyError(ErrorCode::INITIALIZING, std::move(error), ctx);
- }
-
- const auto& response = record.GetPartitionResponse();
- if (!response.HasCmdGetMaxSeqNoResult()) {
- return TThis::ReplyError(ErrorCode::INITIALIZING, "Absent MaxSeqNo result", ctx);
- }
-
- const auto& result = response.GetCmdGetMaxSeqNoResult();
- if (result.SourceIdInfoSize() < 1) {
- return TThis::ReplyError(ErrorCode::INITIALIZING, "Empty source id info", ctx);
- }
-
- const auto& sourceIdInfo = result.GetSourceIdInfo(0);
- switch (sourceIdInfo.GetState()) {
- case NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED:
- TThis::SeqNo = sourceIdInfo.GetSeqNo();
- break;
- case NKikimrPQ::TMessageGroupInfo::STATE_PENDING_REGISTRATION:
- case NKikimrPQ::TMessageGroupInfo::STATE_UNKNOWN:
- TThis::SeqNo = TThis::TableHelper.SeqNo();
- break;
- }
-
- TThis::PartitionHelper.Close(ctx);
- TThis::StartCheckPartitionRequest(ctx);
- }
-
- STATEFN(StateGetMaxSeqNo) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvPersQueue::TEvResponse, HandleMaxSeqNo);
- SFunc(TEvents::TEvPoison, TThis::Die);
- HFunc(TEvTabletPipe::TEvClientConnected, TThis::HandleOwnership);
- HFunc(TEvTabletPipe::TEvClientDestroyed, TThis::HandleOwnership);
- }
- }
-
-
-private:
- void OnPartitionChosen(const TActorContext& ctx) {
- TRACE("OnPartitionChosen");
-
- if (!TThis::Partition && TThis::PreferedPartition) {
- return TThis::ReplyError(ErrorCode::BAD_REQUEST,
- TStringBuilder() << "Prefered partition " << (TThis::PreferedPartition.value() + 1) << " is not exists or inactive.",
- ctx);
- }
-
- if (!TThis::Partition) {
- return TThis::ReplyError(ErrorCode::INITIALIZING, "Can't choose partition", ctx);
- }
-
- if (TThis::PreferedPartition && TThis::Partition->PartitionId != TThis::PreferedPartition.value()) {
- return TThis::ReplyError(ErrorCode::BAD_REQUEST,
- TStringBuilder() << "MessageGroupId " << TThis::SourceId << " is already bound to PartitionGroupId "
- << (TThis::Partition->PartitionId + 1) << ", but client provided " << (TThis::PreferedPartition.value() + 1)
- << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use "
- "another MessageGroupId, specify PartitionGroupId " << (TThis::Partition->PartitionId + 1)
- << ", or do not specify PartitionGroupId at all.",
- ctx);
- }
-
- TThis::StartCheckPartitionRequest(ctx);
- }
-
- const TPartitionInfo* ChoosePartitionSync() const {
- if (TThis::PreferedPartition) {
- return TThis::Chooser->GetPartition(TThis::PreferedPartition.value());
- } else if (TThis::SourceId) {
- return TThis::Chooser->GetPartition(TThis::SourceId);
- } else {
- return TThis::Chooser->GetRandomPartition();
- }
- };
-
-
-private:
- const TPartitionInfo* BoundaryPartition = nullptr;
- const TPartitionGraph Graph;
-};
-
-#undef LOG_PREFIX
-#undef TRACE
-#undef DEBUG
-#undef INFO
-#undef ERROR
-
-} // namespace NKikimr::NPQ::NPartitionChooser
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h
deleted file mode 100644
index 210ab86ff4..0000000000
--- a/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h
+++ /dev/null
@@ -1,280 +0,0 @@
-#pragma once
-
-#include "metadata_initializers.h"
-#include "source_id_encoding.h"
-
-#include <ydb/core/base/appdata_fwd.h>
-#include <ydb/core/kqp/common/events/events.h>
-#include <ydb/core/kqp/common/simple/services.h>
-#include <ydb/core/persqueue/pq_database.h>
-#include <ydb/library/yql/public/decimal/yql_decimal.h>
-#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
-#include <ydb/services/metadata/service.h>
-
-
-namespace NKikimr::NPQ::NPartitionChooser {
-
-#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR)
-#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR"
-#endif
-
-
-#define LOG_PREFIX "TTableHelper "
-#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
-
-
-class TTableHelper {
-public:
- TTableHelper(const TString& topicName, const TString& topicHashName)
- : TopicName(topicName)
- , TopicHashName(topicHashName) {
- };
-
- std::optional<ui32> PartitionId() const {
- return PartitionId_;
- }
-
- std::optional<ui64> SeqNo() const {
- return SeqNo_;
- }
-
- bool Initialize(const TActorContext& ctx, const TString& sourceId) {
- const auto& pqConfig = AppData(ctx)->PQConfig;
-
- TableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping
- : ESourceIdTableGeneration::SrcIdMeta2;
- try {
- EncodedSourceId = NSourceIdEncoding::EncodeSrcId(
- TopicHashName, sourceId, TableGeneration
- );
- } catch (yexception& e) {
- return false;
- }
-
- SelectQuery = GetSelectSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration);
- UpdateQuery = GetUpdateSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration);
- UpdateAccessTimeQuery = GetUpdateAccessTimeQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration);
-
- DEBUG("SelectQuery: " << SelectQuery);
- DEBUG("UpdateQuery: " << UpdateQuery);
- DEBUG("UpdateAccessTimeQuery: " << UpdateAccessTimeQuery);
-
- return true;
- }
-
- TString GetDatabaseName(const TActorContext& ctx) {
- const auto& pqConfig = AppData(ctx)->PQConfig;
- switch (TableGeneration) {
- case ESourceIdTableGeneration::SrcIdMeta2:
- return NKikimr::NPQ::GetDatabaseFromConfig(pqConfig);
- case ESourceIdTableGeneration::PartitionMapping:
- return AppData(ctx)->TenantName;
- }
- }
-
- void SendInitTableRequest(const TActorContext& ctx) {
- ctx.Send(
- NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()),
- new NMetadata::NProvider::TEvPrepareManager(NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant())
- );
- }
-
- void SendCreateSessionRequest(const TActorContext& ctx) {
- auto ev = MakeCreateSessionRequest(ctx);
- ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
- }
-
- THolder<NKqp::TEvKqp::TEvCreateSessionRequest> MakeCreateSessionRequest(const TActorContext& ctx) {
- auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
- ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
- return ev;
- }
-
- bool Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& /*ctx*/) {
- const auto& record = ev->Get()->Record;
-
- if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
- return false;
- }
-
- KqpSessionId = record.GetResponse().GetSessionId();
- Y_ABORT_UNLESS(!KqpSessionId.empty());
-
- return true;
- }
-
- void CloseKqpSession(const TActorContext& ctx) {
- if (KqpSessionId) {
- auto ev = MakeCloseSessionRequest();
- ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
-
- KqpSessionId = "";
- }
- }
-
- THolder<NKqp::TEvKqp::TEvCloseSessionRequest> MakeCloseSessionRequest() {
- auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
- ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
- return ev;
- }
-
- void SendSelectRequest(const TActorContext& ctx) {
- auto ev = MakeSelectQueryRequest(ctx);
- ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
- }
-
- THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSelectQueryRequest(const NActors::TActorContext& ctx) {
- auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
-
- ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
- ev->Record.MutableRequest()->SetQuery(SelectQuery);
-
- ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
- // fill tx settings: set commit tx flag& begin new serializable tx.
- ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
- ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false);
- ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- // keep compiled query in cache.
- ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
-
- NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder();
-
- SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId);
-
- paramsBuilder
- .AddParam("$Topic")
- .Utf8(TopicName)
- .Build()
- .AddParam("$SourceId")
- .Utf8(EncodedSourceId.EscapedSourceId)
- .Build();
-
- NYdb::TParams params = paramsBuilder.Build();
-
- ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params)));
-
- return ev;
- }
-
- bool HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& /*ctx*/) {
- auto& record = ev->Get()->Record.GetRef();
-
- if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
- return false;
- }
-
- auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
-
- TxId = record.GetResponse().GetTxMeta().id();
- Y_ABORT_UNLESS(!TxId.empty());
-
- if (t.ListSize() != 0) {
- auto& list = t.GetList(0);
- auto& tt = list.GetStruct(0);
- if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition
- auto accessTime = list.GetStruct(2).GetOptional().GetUint64();
- if (accessTime > AccessTime) { // AccessTime
- PartitionId_ = tt.GetOptional().GetUint32();
- CreateTime = list.GetStruct(1).GetOptional().GetUint64();
- AccessTime = accessTime;
- SeqNo_ = list.GetStruct(3).GetOptional().GetUint64();
- }
- }
- }
-
- if (CreateTime == 0) {
- CreateTime = TInstant::Now().MilliSeconds();
- }
-
- return true;
- }
-
- void SendUpdateRequest(ui32 partitionId, std::optional<ui64> seqNo, const TActorContext& ctx) {
- auto ev = MakeUpdateQueryRequest(partitionId, seqNo, ctx);
- ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
- }
-
- THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateQueryRequest(ui32 partitionId, std::optional<ui64> seqNo, const NActors::TActorContext& ctx) {
- auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
-
- ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
- ev->Record.MutableRequest()->SetQuery(TxId ? UpdateQuery : UpdateAccessTimeQuery);
- ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
- // fill tx settings: set commit tx flag& begin new serializable tx.
- ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
- if (KqpSessionId) {
- ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
- }
- if (TxId) {
- ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId);
- TxId = "";
- } else {
- ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- }
- // keep compiled query in cache.
- ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
-
- NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder();
-
- SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId);
-
- paramsBuilder
- .AddParam("$Topic")
- .Utf8(TopicName)
- .Build()
- .AddParam("$SourceId")
- .Utf8(EncodedSourceId.EscapedSourceId)
- .Build()
- .AddParam("$CreateTime")
- .Uint64(CreateTime)
- .Build()
- .AddParam("$AccessTime")
- .Uint64(TInstant::Now().MilliSeconds())
- .Build()
- .AddParam("$SeqNo")
- .Uint64(seqNo.value_or(0))
- .Build()
- .AddParam("$Partition")
- .Uint32(partitionId)
- .Build();
-
- NYdb::TParams params = paramsBuilder.Build();
-
- ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params)));
-
- return ev;
- }
-
-private:
- const TString TopicName;
- const TString TopicHashName;
-
- NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId;
-
- NPQ::ESourceIdTableGeneration TableGeneration;
- TString SelectQuery;
- TString UpdateQuery;
- TString UpdateAccessTimeQuery;
-
- TString KqpSessionId;
- TString TxId;
-
- ui64 CreateTime = 0;
- ui64 AccessTime = 0;
-
- std::optional<ui32> PartitionId_;
- std::optional<ui64> SeqNo_;
-};
-
-#undef LOG_PREFIX
-#undef TRACE
-#undef DEBUG
-#undef INFO
-#undef ERROR
-
-} // namespace NKikimr::NPQ::NPartitionChooser
diff --git a/ydb/core/persqueue/writer/pipe_utils.h b/ydb/core/persqueue/writer/pipe_utils.h
deleted file mode 100644
index db78a3ce12..0000000000
--- a/ydb/core/persqueue/writer/pipe_utils.h
+++ /dev/null
@@ -1,76 +0,0 @@
-#pragma once
-
-#include <ydb/core/base/tablet_pipe.h>
-#include <ydb/library/actors/core/actor_bootstrapped.h>
-
-#include <unordered_map>
-
-namespace NKikimr::NTabletPipe {
-
-class TPipeHelper {
-public:
- static IActor* CreateClient(const TActorId& owner, ui64 tabletId, const TClientConfig& config = TClientConfig()) {
- return NKikimr::NTabletPipe::CreateClient(owner, tabletId, config);
- }
-
- static void SendData(const TActorContext& ctx, const TActorId& clientId, IEventBase* payload, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
- return NKikimr::NTabletPipe::SendData(ctx, clientId, payload, cookie, std::move(traceId));
- }
-};
-
-namespace NTest {
-
- class TPipeMock {
- private:
- class TPipeActorMock: public TActorBootstrapped<TPipeActorMock> {
- public:
- TPipeActorMock(const TActorId& clientId, ui64 tabletId, const TActorId& forwardTo)
- : ClientId(clientId)
- , TabletId(tabletId)
- , ForwardTo(forwardTo) {}
-
- void Bootstrap(const TActorContext& ctx) {
- if (ForwardTo) {
- ctx.Send(ForwardTo, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::EReplyStatus::OK, ClientId, ClientId, true, false, 1));
- Become(&TPipeActorMock::StateForward);
- } else {
- ctx.Send(ForwardTo, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::EReplyStatus::ERROR, ClientId, ClientId, true, false, 0));
- Die(ctx);
- }
- }
-
- private:
- STFUNC(StateForward) {
- auto ctx = TActivationContext::ActorContextFor(TActor<TPipeActorMock>::SelfId());
- ctx.Forward(ev, ForwardTo);
- }
-
- private:
- TActorId ClientId;
- ui64 TabletId;
- TActorId ForwardTo;
- };
-
- public:
- static IActor* CreateClient(const TActorId& owner, ui64 tabletId, const TClientConfig& config = TClientConfig()) {
- Y_UNUSED(config);
-
- auto it = Tablets.find(tabletId);
- auto actorId = it == Tablets.end() ? TActorId() : it->second;
- return new TPipeActorMock(owner, tabletId, actorId);
- }
-
- static void Clear() {
- Tablets.clear();
- }
-
- static void Register(ui64 tabletId, const TActorId& actorId) {
- Tablets[tabletId] = actorId;
- }
- private:
- static std::unordered_map<ui64, TActorId> Tablets;
- };
-
-} // namespace NTest
-
-} // namespace NKikimr::NTabletPipe
diff --git a/ydb/core/persqueue/writer/source_id_encoding.cpp b/ydb/core/persqueue/writer/source_id_encoding.cpp
index 6773cb5893..b4f4af39d5 100644
--- a/ydb/core/persqueue/writer/source_id_encoding.cpp
+++ b/ydb/core/persqueue/writer/source_id_encoding.cpp
@@ -20,15 +20,15 @@ TString GetSelectSourceIdQueryFromPath(const TString& path, ESourceIdTableGenera
return TStringBuilder() << "--!syntax_v1\n"
"DECLARE $Hash AS Uint32; "
"DECLARE $Topic AS Utf8; "
- "DECLARE $SourceId AS Utf8;\n"
- "SELECT Partition, CreateTime, AccessTime, SeqNo FROM `" << path << "` "
+ "DECLARE $SourceId AS Utf8; "
+ "SELECT Partition, CreateTime, AccessTime FROM `" << path << "` "
"WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId;";
case ESourceIdTableGeneration::PartitionMapping:
return TStringBuilder() << "--!syntax_v1\n"
"DECLARE $Hash AS Uint64; "
"DECLARE $Topic AS Utf8; "
- "DECLARE $SourceId AS Utf8;\n"
- "SELECT Partition, CreateTime, AccessTime, SeqNo FROM `"
+ "DECLARE $SourceId AS Utf8; "
+ "SELECT Partition, CreateTime, AccessTime FROM `"
<< NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath()
<< "` WHERE Hash == $Hash AND Topic == $Topic AND ProducerId == $SourceId;";
default:
@@ -59,40 +59,9 @@ TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGenera
"DECLARE $Hash AS Uint32; "
"DECLARE $Partition AS Uint32; "
"DECLARE $CreateTime AS Uint64; "
- "DECLARE $AccessTime AS Uint64;"
- "DECLARE $SeqNo AS Uint64;\n"
- "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition, SeqNo) VALUES "
- "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition, $SeqNo);";
- case ESourceIdTableGeneration::PartitionMapping:
- return TStringBuilder() << "--!syntax_v1\n"
- "DECLARE $SourceId AS Utf8; "
- "DECLARE $Topic AS Utf8; "
- "DECLARE $Hash AS Uint64; "
- "DECLARE $Partition AS Uint32; "
- "DECLARE $CreateTime AS Uint64; "
- "DECLARE $AccessTime AS Uint64; "
- "DECLARE $SeqNo AS Uint64;\n"
- "UPSERT INTO `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath()
- << "` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition, SeqNo) VALUES "
- "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition, $SeqNo);";
- default:
- Y_ABORT();
- }
-}
-
-TString GetUpdateAccessTimeQueryFromPath(const TString& path, ESourceIdTableGeneration generation) {
- switch (generation) {
- case ESourceIdTableGeneration::SrcIdMeta2:
- return TStringBuilder() << "--!syntax_v1\n"
- "DECLARE $SourceId AS Utf8; "
- "DECLARE $Topic AS Utf8; "
- "DECLARE $Hash AS Uint32; "
- "DECLARE $Partition AS Uint32; "
- "DECLARE $CreateTime AS Uint64; "
"DECLARE $AccessTime AS Uint64;\n"
- "UPDATE `" << path << "` "
- "SET AccessTime = $AccessTime "
- "WHERE Hash = $Hash AND Topic = $Topic AND SourceId = $SourceId AND Partition = $Partition;";
+ "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES "
+ "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);";
case ESourceIdTableGeneration::PartitionMapping:
return TStringBuilder() << "--!syntax_v1\n"
"DECLARE $SourceId AS Utf8; "
@@ -101,9 +70,9 @@ TString GetUpdateAccessTimeQueryFromPath(const TString& path, ESourceIdTableGene
"DECLARE $Partition AS Uint32; "
"DECLARE $CreateTime AS Uint64; "
"DECLARE $AccessTime AS Uint64;\n"
- "UPDATE `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() << "` "
- "SET AccessTime = $AccessTime "
- "WHERE Hash = $Hash AND Topic = $Topic AND ProducerId = $SourceId AND Partition = $Partition;";
+ "UPSERT INTO `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath()
+ << "` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition) VALUES "
+ "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);";
default:
Y_ABORT();
}
@@ -123,20 +92,6 @@ TString GetUpdateSourceIdQuery(const TString& root, ESourceIdTableGeneration gen
}
}
-TString GetUpdateAccessTimeQuery(const TString& root, ESourceIdTableGeneration generation) {
- switch (generation) {
- case ESourceIdTableGeneration::SrcIdMeta2:
- return GetUpdateAccessTimeQueryFromPath(root + "/SourceIdMeta2", generation);
- case ESourceIdTableGeneration::PartitionMapping:
- return GetUpdateAccessTimeQueryFromPath(
- NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath(),
- generation
- );
- default:
- Y_ABORT();
- }
-}
-
namespace NSourceIdEncoding {
static const ui32 MURMUR_ARRAY_SEED = 0x9747b28c;
diff --git a/ydb/core/persqueue/writer/source_id_encoding.h b/ydb/core/persqueue/writer/source_id_encoding.h
index 39b6487864..99cdef19f6 100644
--- a/ydb/core/persqueue/writer/source_id_encoding.h
+++ b/ydb/core/persqueue/writer/source_id_encoding.h
@@ -15,11 +15,9 @@ enum class ESourceIdTableGeneration {
TString GetSelectSourceIdQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
TString GetUpdateSourceIdQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
-TString GetUpdateAccessTimeQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
TString GetSelectSourceIdQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
-TString GetUpdateAccessTimeQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2);
namespace NSourceIdEncoding {
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp
index dc5bddf72e..1cc4ab2e9b 100644
--- a/ydb/core/persqueue/writer/writer.cpp
+++ b/ydb/core/persqueue/writer/writer.cpp
@@ -1,4 +1,3 @@
-#include "common.h"
#include "source_id_encoding.h"
#include "util/generic/fwd.h"
#include "writer.h"
@@ -130,6 +129,27 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
return ev;
}
+ static bool BasicCheck(const NKikimrClient::TResponse& response, TString& error, bool mustHaveResponse = true) {
+ if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
+ error = TStringBuilder() << "Status is not ok"
+ << ": status# " << static_cast<ui32>(response.GetStatus());
+ return false;
+ }
+
+ if (response.GetErrorCode() != NPersQueue::NErrorCode::OK) {
+ error = TStringBuilder() << "Error code is not ok"
+ << ": code# " << static_cast<ui32>(response.GetErrorCode());
+ return false;
+ }
+
+ if (mustHaveResponse && !response.HasPartitionResponse()) {
+ error = "Absent partition response";
+ return false;
+ }
+
+ return true;
+ }
+
static NKikimrClient::TResponse MakeResponse(ui64 cookie) {
NKikimrClient::TResponse response;
response.MutablePartitionResponse()->SetCookie(cookie);
@@ -306,10 +326,6 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
return InitResult("Absent Ownership result", std::move(record));
}
- if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) {
- return InitResult("Partition is inactive", std::move(record));
- }
-
OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie();
GetMaxSeqNo();
}
diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto
index e92ea1d7b8..8ab13eff77 100644
--- a/ydb/core/protos/msgbus_pq.proto
+++ b/ydb/core/protos/msgbus_pq.proto
@@ -110,7 +110,6 @@ message TPersQueuePartitionRequest {
message TCmdGetOwnership { // get write ownership for partition
optional string Owner = 1 [default = "default"];
optional bool Force = 2 [ default = true];
- optional bool RegisterIfNotExists = 3 [default = true];
}
message TCmdReserveBytes {
@@ -469,8 +468,6 @@ message TPersQueuePartitionResponse {
message TCmdGetOwnershipResult {
optional string OwnerCookie = 1;
- optional NKikimrPQ.ETopicPartitionStatus Status = 2;
- optional int64 SeqNo = 3;
}
message TCmdPrepareDirectReadResult {
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index b47d3ff20a..6abc3855e4 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -962,13 +962,6 @@ message TEvSourceIdResponse {
repeated TSource Source = 3;
};
-message TEvCheckPartitionStatusRequest {
- optional uint32 Partition = 1;
-};
-
-message TEvCheckPartitionStatusResponse {
- optional ETopicPartitionStatus Status = 1;
-};
message TTransaction {
enum EKind {
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index 798795f331..717d653260 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -599,7 +599,6 @@ public:
"Columns { Name: \"Partition\" Type: \"Uint32\"}"
"Columns { Name: \"CreateTime\" Type: \"Uint64\"}"
"Columns { Name: \"AccessTime\" Type: \"Uint64\"}"
- "Columns { Name: \"SeqNo\" Type: \"Uint64\"}"
"KeyColumnNames: [\"Hash\", \"SourceId\", \"Topic\"]"
);
}