aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-11-20 11:25:08 +0300
committertesseract <tesseract@yandex-team.com>2023-11-20 11:48:32 +0300
commit64d1b59148b5cebbd00dadb728a5a26f14e82775 (patch)
treeea9451f69ec73fdd6e26f8217fd415189faa063c
parentc7ada3215c527277233a9c3066008e623788617e (diff)
downloadydb-64d1b59148b5cebbd00dadb728a5a26f14e82775.tar.gz
Sinchronization of tx
-rw-r--r--ydb/core/persqueue/cache_eviction.h3
-rw-r--r--ydb/core/persqueue/events/internal.h6
-rw-r--r--ydb/core/persqueue/partition.cpp21
-rw-r--r--ydb/core/persqueue/partition.h2
-rw-r--r--ydb/core/persqueue/pq_impl.cpp50
-rw-r--r--ydb/core/persqueue/pq_impl.h2
-rw-r--r--ydb/core/persqueue/read.h10
-rw-r--r--ydb/core/persqueue/transaction.cpp58
-rw-r--r--ydb/core/persqueue/transaction.h5
-rw-r--r--ydb/core/persqueue/ut/splitmerge_ut.cpp275
10 files changed, 230 insertions, 202 deletions
diff --git a/ydb/core/persqueue/cache_eviction.h b/ydb/core/persqueue/cache_eviction.h
index 59ca7269ba..4707fc4fff 100644
--- a/ydb/core/persqueue/cache_eviction.h
+++ b/ydb/core/persqueue/cache_eviction.h
@@ -230,11 +230,10 @@ namespace NPQ {
}
};
- TIntabletCache(ui64 tabletId, ui32 l1Size)
+ explicit TIntabletCache(ui64 tabletId)
: TabletId(tabletId)
, L1Strategy(nullptr)
{
- Y_UNUSED(l1Size);
}
const TMapType& CachedMap() const { return Cache; }
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index b6335edacf..8e49ba1480 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -500,6 +500,12 @@ struct TEvPQ {
: MaxSize(maxSize)
{}
+ TEvChangeCacheConfig(const TString& topicName, ui32 maxSize)
+ : TopicName(topicName)
+ , MaxSize(maxSize)
+ {}
+
+ TString TopicName;
ui32 MaxSize;
};
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 2c2c85a304..5d3d8d4c97 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -69,8 +69,20 @@ TString TPartition::LogPrefix() const {
}
bool TPartition::CanWrite() const {
- return (PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active)
- && (!PendingPartitionConfig || PendingPartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active);
+ if (PartitionConfig == nullptr) {
+ // Old format without AllPartitions configuration field.
+ // It is not split/merge partition.
+ return true;
+ }
+ if (NewPartition && PartitionConfig->ParentPartitionIdsSize() > 0) {
+ // A tx of create partition configuration is not commited.
+ return false;
+ }
+ if (PendingPartitionConfig && PendingPartitionConfig->GetStatus() != NKikimrPQ::ETopicPartitionStatus::Active) {
+ // Pending configuration tx inactivate this partition.
+ return false;
+ }
+ return PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active;
}
bool TPartition::CanEnqueue() const {
@@ -1742,6 +1754,10 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
ProcessTxsAndUserActs(ctx);
+
+ if (ChangeConfig && CurrentStateFunc() == &TThis::StateIdle) {
+ HandleWrites(ctx);
+ }
}
void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
@@ -1752,6 +1768,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf
PartitionConfig = GetPartitionConfig(Config, Partition);
PartitionGraph.Rebuild(Config);
TopicConverter = topicConverter;
+ NewPartition = false;
Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0);
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index c8e510ad58..3baead0f6f 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -658,7 +658,7 @@ private:
TInstant CreationTime;
TDuration InitDuration;
bool InitDone;
- const bool NewPartition;
+ bool NewPartition;
THashMap<TString, NKikimr::NPQ::TOwnerInfo> Owners;
THashSet<TActorId> OwnerPipes;
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 4a08925350..3f6ac3c01f 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1,6 +1,7 @@
#include "pq_impl.h"
#include "event_helpers.h"
+#include "partition_log.h"
#include "partition.h"
#include "read.h"
#include <ydb/core/base/tx_processing.h>
@@ -686,8 +687,7 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
}
Y_ABORT_UNLESS(TopicName.size(), "Need topic name here");
- CacheActor = ctx.Register(new TPQCacheProxy(ctx.SelfID, TopicName, TabletID(),
- cacheSize));
+ ctx.Send(CacheActor, new TEvPQ::TEvChangeCacheConfig(TopicName, cacheSize));
} else {
//Y_ABORT_UNLESS(TopicName == Config.GetTopicName(), "Changing topic name is not supported");
TopicPath = Config.GetTopicPath();
@@ -843,7 +843,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
cacheSize = Config.GetCacheSize();
Y_ABORT_UNLESS(TopicName.size(), "Need topic name here");
- CacheActor = ctx.Register(new TPQCacheProxy(ctx.SelfID, TopicName, TabletID(), cacheSize));
+ ctx.Send(CacheActor, new TEvPQ::TEvChangeCacheConfig(TopicName, cacheSize));
} else if (read.GetStatus() == NKikimrProto::NODATA) {
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " no config, start with empty partitions and default config");
} else {
@@ -2390,6 +2390,7 @@ TPersQueue::TPersQueue(const TActorId& tablet, TTabletStorageInfo *info)
void TPersQueue::CreatedHook(const TActorContext& ctx)
{
IsServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe
+ CacheActor = ctx.Register(new TPQCacheProxy(ctx.SelfID, TabletID()));
ctx.Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(ctx.SelfID.NodeId()));
InitProcessingParams(ctx);
@@ -3031,6 +3032,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx,
TString body;
Y_ABORT_UNLESS(data.SerializeToString(&body));
+ PQ_LOG_D("Send TEvTxProcessing::TEvReadSet to " << tx.Receivers.size() << " receivers. Wait TEvTxProcessing::TEvReadSet from " << tx.Senders.size() << " senders.");
for (ui64 receiverId : tx.Receivers) {
if (receiverId != TabletID()) {
auto event = std::make_unique<TEvTxProcessing::TEvReadSet>(tx.Step,
@@ -3083,6 +3085,8 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
void TPersQueue::SendEvTxCommitToPartitions(const TActorContext& ctx,
TDistributedTransaction& tx)
{
+ PQ_LOG_T("Commit tx " << tx.TxId);
+
for (ui32 partitionId : tx.Partitions) {
auto event = std::make_unique<TEvPQ::TEvTxCommit>(tx.Step, tx.TxId);
@@ -3099,6 +3103,8 @@ void TPersQueue::SendEvTxCommitToPartitions(const TActorContext& ctx,
void TPersQueue::SendEvTxRollbackToPartitions(const TActorContext& ctx,
TDistributedTransaction& tx)
{
+ PQ_LOG_T("Rollback tx " << tx.TxId);
+
for (ui32 partitionId : tx.Partitions) {
auto event = std::make_unique<TEvPQ::TEvTxRollback>(tx.Step, tx.TxId);
@@ -3183,6 +3189,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
case NKikimrPQ::TTransaction::UNKNOWN:
Y_ABORT_UNLESS(tx.TxId != Max<ui64>());
+ PQ_LOG_T("TxId="<< tx.TxId << ", State=UNKNOWN");
+
WriteTx(tx, NKikimrPQ::TTransaction::PREPARED);
ScheduleProposeTransactionResult(tx);
@@ -3193,6 +3201,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
case NKikimrPQ::TTransaction::PREPARING:
Y_ABORT_UNLESS(tx.WriteInProgress);
+ PQ_LOG_T("TxId="<< tx.TxId << ", State=PREPARING");
+
tx.WriteInProgress = false;
//
@@ -3206,6 +3216,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
case NKikimrPQ::TTransaction::PREPARED:
Y_ABORT_UNLESS(tx.Step != Max<ui64>());
+ PQ_LOG_T("TxId="<< tx.TxId << ", State=PREPARED");
+
WriteTx(tx, NKikimrPQ::TTransaction::PLANNED);
tx.State = NKikimrPQ::TTransaction::PLANNING;
@@ -3215,6 +3227,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
case NKikimrPQ::TTransaction::PLANNING:
Y_ABORT_UNLESS(tx.WriteInProgress);
+ PQ_LOG_T("TxId="<< tx.TxId << ", State=PLANNING");
+
tx.WriteInProgress = false;
//
@@ -3226,6 +3240,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
[[fallthrough]];
case NKikimrPQ::TTransaction::PLANNED:
+ PQ_LOG_T("TxId="<< tx.TxId << ", State=PLANNED" <<
+ ", (!TxQueue.empty())=" << !TxQueue.empty());
+
if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) {
switch (tx.Kind) {
case NKikimrPQ::TTransaction::KIND_DATA:
@@ -3255,22 +3272,24 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
case NKikimrPQ::TTransaction::CALCULATING:
Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected);
+ PQ_LOG_T("TxId="<< tx.TxId << ", State=CALCULATING" <<
+ ", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount <<
+ ", tx.PartitionRepliesExpected=" << tx.PartitionRepliesExpected);
+
if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) {
switch (tx.Kind) {
case NKikimrPQ::TTransaction::KIND_DATA:
- SendEvReadSetToReceivers(ctx, tx);
- WriteTx(tx, NKikimrPQ::TTransaction::WAIT_RS);
+ [[fallthrough]];
- tx.State = NKikimrPQ::TTransaction::CALCULATED;
- break;
case NKikimrPQ::TTransaction::KIND_CONFIG:
SendEvReadSetToReceivers(ctx, tx);
- tx.State = NKikimrPQ::TTransaction::WAIT_RS;
+ WriteTx(tx, NKikimrPQ::TTransaction::WAIT_RS);
- CheckTxState(ctx, tx);
+ tx.State = NKikimrPQ::TTransaction::CALCULATED;
break;
+
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
Y_ABORT_UNLESS(false);
}
@@ -3281,6 +3300,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
case NKikimrPQ::TTransaction::CALCULATED:
Y_ABORT_UNLESS(tx.WriteInProgress);
+ PQ_LOG_T("TxId="<< tx.TxId << ", State=CALCULATED");
+
tx.WriteInProgress = false;
tx.State = NKikimrPQ::TTransaction::WAIT_RS;
@@ -3294,6 +3315,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
//
Y_ABORT_UNLESS(tx.ReadSetAcks.size() <= tx.Senders.size());
+ PQ_LOG_T("TxId="<< tx.TxId << ", State=WAIT_RS" <<
+ ", tx.HaveParticipantsDecision()=" << tx.HaveParticipantsDecision());
+
if (tx.HaveParticipantsDecision()) {
SendEvProposeTransactionResult(ctx, tx);
@@ -3313,6 +3337,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
case NKikimrPQ::TTransaction::EXECUTING:
Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected);
+ PQ_LOG_T("TxId="<< tx.TxId << ", State=EXECUTING" <<
+ ", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount <<
+ ", tx.PartitionRepliesExpected=" << tx.PartitionRepliesExpected);
if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) {
Y_ABORT_UNLESS(!TxQueue.empty());
Y_ABORT_UNLESS(TxQueue.front().second == tx.TxId);
@@ -3322,6 +3349,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
SendEvReadSetAckToSenders(ctx, tx);
break;
case NKikimrPQ::TTransaction::KIND_CONFIG:
+ SendEvReadSetAckToSenders(ctx, tx);
ApplyNewConfig(tx.TabletConfig, ctx);
TabletConfigTx = tx.TabletConfig;
BootstrapConfigTx = tx.BootstrapConfig;
@@ -3341,6 +3369,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
[[fallthrough]];
case NKikimrPQ::TTransaction::EXECUTED:
+ PQ_LOG_T("TxId="<< tx.TxId << ", State=EXECUTED, tx.HaveAllRecipientsReceive()=" << tx.HaveAllRecipientsReceive());
if (tx.HaveAllRecipientsReceive()) {
DeleteTx(tx);
}
@@ -3652,6 +3681,9 @@ void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) {
}
}
+TString TPersQueue::LogPrefix() const {
+ return TStringBuilder() << SelfId() << " ";
+}
bool TPersQueue::HandleHook(STFUNC_SIG)
{
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 6b31eb64fc..e0db6d8c9e 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -160,6 +160,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
void ProcessSourceIdRequests(ui32 partitionId);
+ TString LogPrefix() const;
+
static constexpr const char * KeyConfig() { return "_config"; }
static constexpr const char * KeyState() { return "_state"; }
static constexpr const char * KeyTxInfo() { return "_txinfo"; }
diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h
index 164bcc0965..838562c7ac 100644
--- a/ydb/core/persqueue/read.h
+++ b/ydb/core/persqueue/read.h
@@ -21,15 +21,13 @@ namespace NPQ {
return NKikimrServices::TActivity::PERSQUEUE_CACHE_ACTOR;
}
- TPQCacheProxy(const TActorId& tablet, TString topicName, ui64 tabletId, ui32 size)
+ TPQCacheProxy(const TActorId& tablet, ui64 tabletId)
: Tablet(tablet)
- , TopicName(topicName)
, TabletId(tabletId)
, Cookie(0)
- , Cache(tabletId, size)
+ , Cache(tabletId)
, CountersUpdateTime(TAppData::TimeProvider->Now())
{
- Y_ABORT_UNLESS(topicName.size(), "CacheProxy with empty topic name");
}
void Bootstrap(const TActorContext& ctx)
@@ -92,7 +90,9 @@ namespace NPQ {
void Handle(TEvPQ::TEvChangeCacheConfig::TPtr& ev, const TActorContext& ctx)
{
- Y_UNUSED(ev);
+ if (ev->Get()->TopicName) {
+ TopicName = ev->Get()->TopicName;
+ }
Cache.Touch(ctx);
}
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp
index 87fdbba5ca..86a1839e4b 100644
--- a/ydb/core/persqueue/transaction.cpp
+++ b/ydb/core/persqueue/transaction.cpp
@@ -1,4 +1,5 @@
#include "transaction.h"
+#include "utils.h"
namespace NKikimr::NPQ {
@@ -14,6 +15,13 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction&
MinStep = tx.GetMinStep();
MaxStep = tx.GetMaxStep();
+ for (ui64 tabletId : tx.GetSenders()) {
+ Senders.insert(tabletId);
+ }
+ for (ui64 tabletId : tx.GetReceivers()) {
+ Receivers.insert(tabletId);
+ }
+
switch (Kind) {
case NKikimrPQ::TTransaction::KIND_DATA:
InitDataTransaction(tx);
@@ -40,13 +48,6 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction&
void TDistributedTransaction::InitDataTransaction(const NKikimrPQ::TTransaction& tx)
{
- for (ui64 tabletId : tx.GetSenders()) {
- Senders.insert(tabletId);
- }
- for (ui64 tabletId : tx.GetReceivers()) {
- Receivers.insert(tabletId);
- }
-
InitPartitions(tx.GetOperations());
}
@@ -65,19 +66,19 @@ void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransactio
TabletConfig = tx.GetTabletConfig();
BootstrapConfig = tx.GetBootstrapConfig();
- InitPartitions(TabletConfig);
+ InitPartitions();
}
-void TDistributedTransaction::InitPartitions(const NKikimrPQ::TPQTabletConfig& config)
+void TDistributedTransaction::InitPartitions()
{
Partitions.clear();
- if (config.PartitionsSize()) {
- for (const auto& partition : config.GetPartitions()) {
+ if (TabletConfig.PartitionsSize()) {
+ for (const auto& partition : TabletConfig.GetPartitions()) {
Partitions.insert(partition.GetPartitionId());
}
} else {
- for (auto partitionId : config.GetPartitionIds()) {
+ for (auto partitionId : TabletConfig.GetPartitionIds()) {
Partitions.insert(partitionId);
}
}
@@ -103,7 +104,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr
case NKikimrPQ::TEvProposeTransaction::kConfig:
Y_ABORT_UNLESS(event.HasConfig());
MaxStep = Max<ui64>();
- OnProposeTransaction(event.GetConfig());
+ OnProposeTransaction(event.GetConfig(), extractTabletId);
break;
default:
Y_FAIL_S("unknown TxBody case");
@@ -138,14 +139,41 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac
ReadSetCount = 0;
}
-void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody)
+void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody,
+ ui64 extractTabletId)
{
Kind = NKikimrPQ::TTransaction::KIND_CONFIG;
TabletConfig = txBody.GetTabletConfig();
BootstrapConfig = txBody.GetBootstrapConfig();
- InitPartitions(TabletConfig);
+ TPartitionGraph graph;
+ graph.Rebuild(TabletConfig);
+
+ for (const auto& p : TabletConfig.GetPartitions()) {
+ auto node = graph.GetPartition(p.GetPartitionId());
+ if (!node) {
+ // Old configuration format without AllPartitions. Split/Merge is not supported.
+ continue;
+ }
+ if (node.value()->Children.empty()) {
+ for (const auto* r : node.value()->Parents) {
+ if (extractTabletId != r->TabletId) {
+ Senders.insert(r->TabletId);
+ }
+ }
+ }
+
+ for (const auto* r : node.value()->Children) {
+ if (r->Children.empty()) {
+ if (extractTabletId != r->TabletId) {
+ Receivers.insert(r->TabletId);
+ }
+ }
+ }
+ }
+
+ InitPartitions();
PartitionRepliesCount = 0;
PartitionRepliesExpected = 0;
diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h
index 3c79b81867..fffed8d546 100644
--- a/ydb/core/persqueue/transaction.h
+++ b/ydb/core/persqueue/transaction.h
@@ -24,7 +24,8 @@ struct TDistributedTransaction {
ui64 extractTabletId);
void OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody,
ui64 extractTabletId);
- void OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody);
+ void OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody,
+ ui64 extractTabletId);
void OnPlanStep(ui64 step);
void OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPredicateResult& event);
void OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event);
@@ -85,7 +86,7 @@ struct TDistributedTransaction {
void InitConfigTransaction(const NKikimrPQ::TTransaction& tx);
void InitPartitions(const google::protobuf::RepeatedPtrField<NKikimrPQ::TPartitionOperation>& tx);
- void InitPartitions(const NKikimrPQ::TPQTabletConfig& config);
+ void InitPartitions();
template<class E>
void OnPartitionResult(const E& event, EDecision decision);
diff --git a/ydb/core/persqueue/ut/splitmerge_ut.cpp b/ydb/core/persqueue/ut/splitmerge_ut.cpp
index fee6ed80eb..b47dd546a2 100644
--- a/ydb/core/persqueue/ut/splitmerge_ut.cpp
+++ b/ydb/core/persqueue/ut/splitmerge_ut.cpp
@@ -29,6 +29,8 @@ TEvTx* CreateRequest(ui64 txId, NKikimrSchemeOp::TModifyScheme&& tx) {
}
void DoRequest(TTopicSdkTestSetup& setup, ui64& txId, NKikimrSchemeOp::TPersQueueGroupDescription& scheme) {
+ Sleep(TDuration::Seconds(1));
+
Cerr << "ALTER_SCHEME: " << scheme << Endl;
const auto sender = setup.GetRuntime().AllocateEdgeActor();
@@ -54,6 +56,8 @@ void DoRequest(TTopicSdkTestSetup& setup, ui64& txId, NKikimrSchemeOp::TPersQueu
auto e = setup.GetRuntime().GrabEdgeEvent<TEvSchemeShard::TEvModifySchemeTransactionResult>(handle);
UNIT_ASSERT_EQUAL_C(e->Record.GetStatus(), TEvSchemeShard::EStatus::StatusAccepted,
"Unexpected status " << NKikimrScheme::EStatus_Name(e->Record.GetStatus()) << " " << e->Record.GetReason());
+
+ Sleep(TDuration::Seconds(1));
}
void SplitPartition(TTopicSdkTestSetup& setup, ui64& txId, const ui32 partition, TString boundary) {
@@ -82,65 +86,60 @@ auto Msg(const TString& data, ui64 seqNo) {
return msg;
}
+TTopicSdkTestSetup CreateSetup() {
+ NKikimrConfig::TFeatureFlags ff;
+ ff.SetEnableTopicSplitMerge(true);
+ ff.SetEnablePQConfigTransactionsAtSchemeShard(true);
+ auto settings = TTopicSdkTestSetup::MakeServerSettings();
+ settings.SetFeatureFlags(ff);
-Y_UNIT_TEST_SUITE(TopicSplitMerge) {
- Y_UNIT_TEST(PartitionSplit) {
- TTopicSdkTestSetup setup("TopicSplitMerge", TTopicSdkTestSetup::MakeServerSettings(), false);
-
- auto& ff = setup.GetRuntime().GetAppData().FeatureFlags;
- ff.SetEnableTopicSplitMerge(true);
- ff.SetEnablePQConfigTransactionsAtSchemeShard(true);
-
- setup.CreateTopic();
-
- TTopicClient client = setup.MakeClient();
+ auto setup = TTopicSdkTestSetup("TopicSplitMerge", settings, false);
- setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
- setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE);
+ setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+ setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE);
- TString producer1 = "producer-1";
- TString producer2 = "producer-2";
- TString producer3 = "producer-3";
- TString producer4 = "producer-4";
+ return setup;
+}
- auto writeSettings1 = TWriteSessionSettings()
- .Path(TEST_TOPIC)
- .ProducerId(producer1)
- .MessageGroupId(producer1);
- auto writeSession1 = client.CreateSimpleBlockingWriteSession(writeSettings1);
+std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt) {
+ auto writeSettings = TWriteSessionSettings()
+ .Path(TEST_TOPIC)
+ .ProducerId(producer);
+ if (partition) {
+ writeSettings.PartitionId(*partition);
+ } else {
+ writeSettings.MessageGroupId(producer);
+ }
- auto writeSettings2 = TWriteSessionSettings()
- .Path(TEST_TOPIC)
- .ProducerId(producer2)
- .MessageGroupId(producer2);
- auto writeSession2 = client.CreateSimpleBlockingWriteSession(writeSettings2);
+ writeSettings.EventHandlers_.AcksHandler([&](TWriteSessionEvent::TAcksEvent& ev) {
+ Cerr << ">>>>> Received TWriteSessionEvent::TAcksEvent " << ev.DebugString() << Endl << Flush;
+ });
- auto writeSettings3 = TWriteSessionSettings()
- .Path(TEST_TOPIC)
- .ProducerId(producer3)
- .PartitionId(0);
- auto writeSession3 = client.CreateSimpleBlockingWriteSession(writeSettings3);
+ return client.CreateSimpleBlockingWriteSession(writeSettings);
+}
- Cerr << ">>>>> 1 " << Endl;
+struct TTestReadSession {
+ struct MsgInfo {
+ ui64 PartitionId;
+ ui64 SeqNo;
+ ui64 Offset;
+ TString Data;
+ };
- struct MsgInfo {
- ui64 PartitionId;
- ui64 SeqNo;
- ui64 Offset;
- TString Data;
- };
+ std::shared_ptr<IReadSession> Session;
- NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>();
- std::vector<MsgInfo> receivedMessages;
- std::set<size_t> partitions;
+ NThreading::TPromise<void> Promise = NThreading::NewPromise<void>();
+ std::vector<MsgInfo> ReceivedMessages;
+ std::set<size_t> Partitions;
+ TTestReadSession(TTopicClient& client, size_t expectedMessagesCount) {
auto readSettings = TReadSessionSettings()
.ConsumerName(TEST_CONSUMER)
.AppendTopics(TEST_TOPIC);
readSettings.EventHandlers_.SimpleDataHandlers(
- [&]
+ [&, expectedMessagesCount]
(TReadSessionEvent::TDataReceivedEvent& ev) mutable {
auto& messages = ev.GetMessages();
for (size_t i = 0u; i < messages.size(); ++i) {
@@ -151,14 +150,14 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
<< ", seqNo=" << message.GetSeqNo()
<< ", offset=" << message.GetOffset()
<< Endl;
- receivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(),
+ ReceivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(),
message.GetSeqNo(),
message.GetOffset(),
message.GetData()});
}
- if (receivedMessages.size() == 6) {
- checkedPromise.SetValue();
+ if (ReceivedMessages.size() == expectedMessagesCount) {
+ Promise.SetValue();
}
});
@@ -166,24 +165,70 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
[&]
(TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable {
Cerr << ">>>>> Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl;
- partitions.insert(ev.GetPartitionSession()->GetPartitionId());
+ Partitions.insert(ev.GetPartitionSession()->GetPartitionId());
ev.Confirm();
});
- auto readSession = client.CreateReadSession(readSettings);
+ Session = client.CreateReadSession(readSettings);
+ }
- Cerr << ">>>>> 2 " << Endl;
+ void WaitAllMessages() {
+ Promise.GetFuture().GetValueSync();
+ }
+};
+
+Y_UNIT_TEST_SUITE(TopicSplitMerge) {
+
+ Y_UNIT_TEST(Simple) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ setup.CreateTopic();
+
+ TTopicClient client = setup.MakeClient();
+
+ auto writeSession1 = CreateWriteSession(client, "producer-1");
+ auto writeSession2 = CreateWriteSession(client, "producer-2");
+
+ TTestReadSession ReadSession(client, 2);
UNIT_ASSERT(writeSession1->Write(Msg("message_1.1", 2)));
UNIT_ASSERT(writeSession2->Write(Msg("message_2.1", 3)));
- UNIT_ASSERT(writeSession3->Write(Msg("message_3.1", 1)));
- Cerr << ">>>>> 3 " << Endl;
+ ReadSession.WaitAllMessages();
- ui64 txId = 0;
- SplitPartition(setup, ++txId, 0, "a");
+ for(const auto& info : ReadSession.ReceivedMessages) {
+ if (info.Data == "message_1.1") {
+ UNIT_ASSERT_EQUAL(0, info.PartitionId);
+ UNIT_ASSERT_EQUAL(2, info.SeqNo);
+ } else if (info.Data == "message_2.1") {
+ UNIT_ASSERT_EQUAL(0, info.PartitionId);
+ UNIT_ASSERT_EQUAL(3, info.SeqNo);
+ } else {
+ UNIT_ASSERT_C(false, "Unexpected message: " << info.Data);
+ }
+ }
- Cerr << ">>>>> 4 " << Endl;
+ writeSession1->Close(TDuration::Seconds(1));
+ writeSession2->Close(TDuration::Seconds(1));
+ }
+
+ Y_UNIT_TEST(PartitionSplit) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ setup.CreateTopic();
+
+ TTopicClient client = setup.MakeClient();
+
+ auto writeSession1 = CreateWriteSession(client, "producer-1");
+ auto writeSession2 = CreateWriteSession(client, "producer-2");
+ auto writeSession3 = CreateWriteSession(client, "producer-3", 0);
+
+ TTestReadSession ReadSession(client, 6);
+
+ UNIT_ASSERT(writeSession1->Write(Msg("message_1.1", 2)));
+ UNIT_ASSERT(writeSession2->Write(Msg("message_2.1", 3)));
+ UNIT_ASSERT(writeSession3->Write(Msg("message_3.1", 1)));
+
+ ui64 txId = 1006;
+ SplitPartition(setup, ++txId, 0, "a");
writeSession1->Write(Msg("message_1.2_2", 2)); // Will be ignored because duplicated SeqNo
writeSession3->Write(Msg("message_3.2", 11)); // Will be fail because partition is not writable after split
@@ -197,14 +242,9 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
auto writeSession4 = client.CreateSimpleBlockingWriteSession(writeSettings4);
writeSession4->Write(TWriteMessage("message_4.1"));
- Cerr << ">>>>> 5 " << Endl;
+ ReadSession.WaitAllMessages();
- checkedPromise.GetFuture().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(6, receivedMessages.size());
-
- Cerr << ">>>>> 6 " << Endl;
-
- for(const auto& info : receivedMessages) {
+ for(const auto& info : ReadSession.ReceivedMessages) {
if (info.Data == "message_1.1") {
UNIT_ASSERT_EQUAL(0, info.PartitionId);
UNIT_ASSERT_EQUAL(2, info.SeqNo);
@@ -228,131 +268,39 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
}
}
- Cerr << ">>>>> 7 " << Endl;
-
writeSession1->Close(TDuration::Seconds(1));
writeSession2->Close(TDuration::Seconds(1));
writeSession3->Close(TDuration::Seconds(1));
- readSession->Close(TDuration::Seconds(1));
-
- Cerr << ">>>>> 8 " << Endl;
}
Y_UNIT_TEST(PartitionMerge) {
- TTopicSdkTestSetup setup("TopicSplitMerge", TTopicSdkTestSetup::MakeServerSettings(), false);
-
- auto& ff = setup.GetRuntime().GetAppData().FeatureFlags;
- ff.SetEnableTopicSplitMerge(true);
- ff.SetEnablePQConfigTransactionsAtSchemeShard(true);
-
+ TTopicSdkTestSetup setup = CreateSetup();
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2);
TTopicClient client = setup.MakeClient();
- setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
- setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE);
-
- TString producer1 = "producer-1";
- TString producer2 = "producer-2";
- TString producer3 = "producer-3";
-
- auto writeSettings1 = TWriteSessionSettings()
- .Path(TEST_TOPIC)
- .ProducerId(producer1)
- .MessageGroupId(producer1)
- .PartitionId(0);
- auto writeSession1 = client.CreateSimpleBlockingWriteSession(writeSettings1);
-
- auto writeSettings2 = TWriteSessionSettings()
- .Path(TEST_TOPIC)
- .ProducerId(producer2)
- .MessageGroupId(producer2)
- .PartitionId(1);
- auto writeSession2 = client.CreateSimpleBlockingWriteSession(writeSettings2);
-
- Cerr << ">>>>> 1 " << Endl;
-
- struct MsgInfo {
- ui64 PartitionId;
- ui64 SeqNo;
- ui64 Offset;
- TString Data;
- };
+ auto writeSession1 = CreateWriteSession(client, "producer-1", 0);
+ auto writeSession2 = CreateWriteSession(client, "producer-2", 1);
- NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>();
- std::vector<MsgInfo> receivedMessages;
- std::set<size_t> partitions;
-
- auto readSettings = TReadSessionSettings()
- .ConsumerName(TEST_CONSUMER)
- .AppendTopics(TEST_TOPIC);
-
- readSettings.EventHandlers_.SimpleDataHandlers(
- [&]
- (TReadSessionEvent::TDataReceivedEvent& ev) mutable {
- auto& messages = ev.GetMessages();
- for (size_t i = 0u; i < messages.size(); ++i) {
- auto& message = messages[i];
-
- Cerr << ">>>>> Received message partitionId=" << message.GetPartitionSession()->GetPartitionId()
- << ", message=" << message.GetData()
- << ", seqNo=" << message.GetSeqNo()
- << ", offset=" << message.GetOffset()
- << Endl;
- receivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(),
- message.GetSeqNo(),
- message.GetOffset(),
- message.GetData()});
- }
-
- if (receivedMessages.size() == 3) {
- checkedPromise.SetValue();
- }
- });
-
- readSettings.EventHandlers_.StartPartitionSessionHandler(
- [&]
- (TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable {
- Cerr << ">>>>> Received TStartPartitionSessionEvent for partition " << ev.GetPartitionSession()->GetPartitionId() << Endl;
- partitions.insert(ev.GetPartitionSession()->GetPartitionId());
- ev.Confirm();
- });
-
- auto readSession = client.CreateReadSession(readSettings);
-
- Cerr << ">>>>> 2 " << Endl;
+ TTestReadSession ReadSession(client, 3);
UNIT_ASSERT(writeSession1->Write(Msg("message_1.1", 2)));
UNIT_ASSERT(writeSession2->Write(Msg("message_2.1", 3)));
- Cerr << ">>>>> 3 " << Endl;
-
- ui64 txId = 0;
+ ui64 txId = 1012;
MergePartition(setup, ++txId, 0, 1);
- Cerr << ">>>>> 4 " << Endl;
-
UNIT_ASSERT(writeSession1->Write(Msg("message_1.2", 5))); // Will be fail because partition is not writable after merge
UNIT_ASSERT(writeSession2->Write(Msg("message_2.2", 7))); // Will be fail because partition is not writable after merge
- auto writeSettings3 = TWriteSessionSettings()
- .Path(TEST_TOPIC)
- .ProducerId(producer1)
- .MessageGroupId(producer1);
- auto writeSession3 = client.CreateSimpleBlockingWriteSession(writeSettings3);
+ auto writeSession3 = CreateWriteSession(client, "producer-2", 2);
UNIT_ASSERT(writeSession3->Write(Msg("message_3.1", 2))); // Will be ignored because duplicated SeqNo
UNIT_ASSERT(writeSession3->Write(Msg("message_3.2", 11)));
+ ReadSession.WaitAllMessages();
- Cerr << ">>>>> 5 " << Endl;
-
- checkedPromise.GetFuture().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(3, receivedMessages.size());
-
- Cerr << ">>>>> 6 " << Endl;
-
- for(const auto& info : receivedMessages) {
+ for(const auto& info : ReadSession.ReceivedMessages) {
if (info.Data == TString("message_1.1")) {
UNIT_ASSERT_EQUAL(0, info.PartitionId);
UNIT_ASSERT_EQUAL(2, info.SeqNo);
@@ -367,14 +315,9 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
}
}
- Cerr << ">>>>> 7 " << Endl;
-
writeSession1->Close(TDuration::Seconds(1));
writeSession2->Close(TDuration::Seconds(1));
writeSession3->Close(TDuration::Seconds(1));
- readSession->Close(TDuration::Seconds(1));
-
- Cerr << ">>>>> 8 " << Endl;
}
}