diff options
author | tesseract <tesseract@yandex-team.com> | 2023-11-20 11:25:08 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-11-20 11:48:32 +0300 |
commit | 64d1b59148b5cebbd00dadb728a5a26f14e82775 (patch) | |
tree | ea9451f69ec73fdd6e26f8217fd415189faa063c | |
parent | c7ada3215c527277233a9c3066008e623788617e (diff) | |
download | ydb-64d1b59148b5cebbd00dadb728a5a26f14e82775.tar.gz |
Sinchronization of tx
-rw-r--r-- | ydb/core/persqueue/cache_eviction.h | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/events/internal.h | 6 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 21 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 50 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/read.h | 10 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.cpp | 58 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.h | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/splitmerge_ut.cpp | 275 |
10 files changed, 230 insertions, 202 deletions
diff --git a/ydb/core/persqueue/cache_eviction.h b/ydb/core/persqueue/cache_eviction.h index 59ca7269ba..4707fc4fff 100644 --- a/ydb/core/persqueue/cache_eviction.h +++ b/ydb/core/persqueue/cache_eviction.h @@ -230,11 +230,10 @@ namespace NPQ { } }; - TIntabletCache(ui64 tabletId, ui32 l1Size) + explicit TIntabletCache(ui64 tabletId) : TabletId(tabletId) , L1Strategy(nullptr) { - Y_UNUSED(l1Size); } const TMapType& CachedMap() const { return Cache; } diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index b6335edacf..8e49ba1480 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -500,6 +500,12 @@ struct TEvPQ { : MaxSize(maxSize) {} + TEvChangeCacheConfig(const TString& topicName, ui32 maxSize) + : TopicName(topicName) + , MaxSize(maxSize) + {} + + TString TopicName; ui32 MaxSize; }; diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 2c2c85a304..5d3d8d4c97 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -69,8 +69,20 @@ TString TPartition::LogPrefix() const { } bool TPartition::CanWrite() const { - return (PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active) - && (!PendingPartitionConfig || PendingPartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active); + if (PartitionConfig == nullptr) { + // Old format without AllPartitions configuration field. + // It is not split/merge partition. + return true; + } + if (NewPartition && PartitionConfig->ParentPartitionIdsSize() > 0) { + // A tx of create partition configuration is not commited. + return false; + } + if (PendingPartitionConfig && PendingPartitionConfig->GetStatus() != NKikimrPQ::ETopicPartitionStatus::Active) { + // Pending configuration tx inactivate this partition. + return false; + } + return PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active; } bool TPartition::CanEnqueue() const { @@ -1742,6 +1754,10 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC ProcessTxsAndUserActs(ctx); + + if (ChangeConfig && CurrentStateFunc() == &TThis::StateIdle) { + HandleWrites(ctx); + } } void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, @@ -1752,6 +1768,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf PartitionConfig = GetPartitionConfig(Config, Partition); PartitionGraph.Rebuild(Config); TopicConverter = topicConverter; + NewPartition = false; Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index c8e510ad58..3baead0f6f 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -658,7 +658,7 @@ private: TInstant CreationTime; TDuration InitDuration; bool InitDone; - const bool NewPartition; + bool NewPartition; THashMap<TString, NKikimr::NPQ::TOwnerInfo> Owners; THashSet<TActorId> OwnerPipes; diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 4a08925350..3f6ac3c01f 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1,6 +1,7 @@ #include "pq_impl.h" #include "event_helpers.h" +#include "partition_log.h" #include "partition.h" #include "read.h" #include <ydb/core/base/tx_processing.h> @@ -686,8 +687,7 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig, } Y_ABORT_UNLESS(TopicName.size(), "Need topic name here"); - CacheActor = ctx.Register(new TPQCacheProxy(ctx.SelfID, TopicName, TabletID(), - cacheSize)); + ctx.Send(CacheActor, new TEvPQ::TEvChangeCacheConfig(TopicName, cacheSize)); } else { //Y_ABORT_UNLESS(TopicName == Config.GetTopicName(), "Changing topic name is not supported"); TopicPath = Config.GetTopicPath(); @@ -843,7 +843,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& cacheSize = Config.GetCacheSize(); Y_ABORT_UNLESS(TopicName.size(), "Need topic name here"); - CacheActor = ctx.Register(new TPQCacheProxy(ctx.SelfID, TopicName, TabletID(), cacheSize)); + ctx.Send(CacheActor, new TEvPQ::TEvChangeCacheConfig(TopicName, cacheSize)); } else if (read.GetStatus() == NKikimrProto::NODATA) { LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " no config, start with empty partitions and default config"); } else { @@ -2390,6 +2390,7 @@ TPersQueue::TPersQueue(const TActorId& tablet, TTabletStorageInfo *info) void TPersQueue::CreatedHook(const TActorContext& ctx) { IsServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe + CacheActor = ctx.Register(new TPQCacheProxy(ctx.SelfID, TabletID())); ctx.Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(ctx.SelfID.NodeId())); InitProcessingParams(ctx); @@ -3031,6 +3032,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, TString body; Y_ABORT_UNLESS(data.SerializeToString(&body)); + PQ_LOG_D("Send TEvTxProcessing::TEvReadSet to " << tx.Receivers.size() << " receivers. Wait TEvTxProcessing::TEvReadSet from " << tx.Senders.size() << " senders."); for (ui64 receiverId : tx.Receivers) { if (receiverId != TabletID()) { auto event = std::make_unique<TEvTxProcessing::TEvReadSet>(tx.Step, @@ -3083,6 +3085,8 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx, void TPersQueue::SendEvTxCommitToPartitions(const TActorContext& ctx, TDistributedTransaction& tx) { + PQ_LOG_T("Commit tx " << tx.TxId); + for (ui32 partitionId : tx.Partitions) { auto event = std::make_unique<TEvPQ::TEvTxCommit>(tx.Step, tx.TxId); @@ -3099,6 +3103,8 @@ void TPersQueue::SendEvTxCommitToPartitions(const TActorContext& ctx, void TPersQueue::SendEvTxRollbackToPartitions(const TActorContext& ctx, TDistributedTransaction& tx) { + PQ_LOG_T("Rollback tx " << tx.TxId); + for (ui32 partitionId : tx.Partitions) { auto event = std::make_unique<TEvPQ::TEvTxRollback>(tx.Step, tx.TxId); @@ -3183,6 +3189,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::UNKNOWN: Y_ABORT_UNLESS(tx.TxId != Max<ui64>()); + PQ_LOG_T("TxId="<< tx.TxId << ", State=UNKNOWN"); + WriteTx(tx, NKikimrPQ::TTransaction::PREPARED); ScheduleProposeTransactionResult(tx); @@ -3193,6 +3201,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::PREPARING: Y_ABORT_UNLESS(tx.WriteInProgress); + PQ_LOG_T("TxId="<< tx.TxId << ", State=PREPARING"); + tx.WriteInProgress = false; // @@ -3206,6 +3216,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::PREPARED: Y_ABORT_UNLESS(tx.Step != Max<ui64>()); + PQ_LOG_T("TxId="<< tx.TxId << ", State=PREPARED"); + WriteTx(tx, NKikimrPQ::TTransaction::PLANNED); tx.State = NKikimrPQ::TTransaction::PLANNING; @@ -3215,6 +3227,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::PLANNING: Y_ABORT_UNLESS(tx.WriteInProgress); + PQ_LOG_T("TxId="<< tx.TxId << ", State=PLANNING"); + tx.WriteInProgress = false; // @@ -3226,6 +3240,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, [[fallthrough]]; case NKikimrPQ::TTransaction::PLANNED: + PQ_LOG_T("TxId="<< tx.TxId << ", State=PLANNED" << + ", (!TxQueue.empty())=" << !TxQueue.empty()); + if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) { switch (tx.Kind) { case NKikimrPQ::TTransaction::KIND_DATA: @@ -3255,22 +3272,24 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::CALCULATING: Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected); + PQ_LOG_T("TxId="<< tx.TxId << ", State=CALCULATING" << + ", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount << + ", tx.PartitionRepliesExpected=" << tx.PartitionRepliesExpected); + if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) { switch (tx.Kind) { case NKikimrPQ::TTransaction::KIND_DATA: - SendEvReadSetToReceivers(ctx, tx); - WriteTx(tx, NKikimrPQ::TTransaction::WAIT_RS); + [[fallthrough]]; - tx.State = NKikimrPQ::TTransaction::CALCULATED; - break; case NKikimrPQ::TTransaction::KIND_CONFIG: SendEvReadSetToReceivers(ctx, tx); - tx.State = NKikimrPQ::TTransaction::WAIT_RS; + WriteTx(tx, NKikimrPQ::TTransaction::WAIT_RS); - CheckTxState(ctx, tx); + tx.State = NKikimrPQ::TTransaction::CALCULATED; break; + case NKikimrPQ::TTransaction::KIND_UNKNOWN: Y_ABORT_UNLESS(false); } @@ -3281,6 +3300,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::CALCULATED: Y_ABORT_UNLESS(tx.WriteInProgress); + PQ_LOG_T("TxId="<< tx.TxId << ", State=CALCULATED"); + tx.WriteInProgress = false; tx.State = NKikimrPQ::TTransaction::WAIT_RS; @@ -3294,6 +3315,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, // Y_ABORT_UNLESS(tx.ReadSetAcks.size() <= tx.Senders.size()); + PQ_LOG_T("TxId="<< tx.TxId << ", State=WAIT_RS" << + ", tx.HaveParticipantsDecision()=" << tx.HaveParticipantsDecision()); + if (tx.HaveParticipantsDecision()) { SendEvProposeTransactionResult(ctx, tx); @@ -3313,6 +3337,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::EXECUTING: Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected); + PQ_LOG_T("TxId="<< tx.TxId << ", State=EXECUTING" << + ", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount << + ", tx.PartitionRepliesExpected=" << tx.PartitionRepliesExpected); if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) { Y_ABORT_UNLESS(!TxQueue.empty()); Y_ABORT_UNLESS(TxQueue.front().second == tx.TxId); @@ -3322,6 +3349,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, SendEvReadSetAckToSenders(ctx, tx); break; case NKikimrPQ::TTransaction::KIND_CONFIG: + SendEvReadSetAckToSenders(ctx, tx); ApplyNewConfig(tx.TabletConfig, ctx); TabletConfigTx = tx.TabletConfig; BootstrapConfigTx = tx.BootstrapConfig; @@ -3341,6 +3369,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, [[fallthrough]]; case NKikimrPQ::TTransaction::EXECUTED: + PQ_LOG_T("TxId="<< tx.TxId << ", State=EXECUTED, tx.HaveAllRecipientsReceive()=" << tx.HaveAllRecipientsReceive()); if (tx.HaveAllRecipientsReceive()) { DeleteTx(tx); } @@ -3652,6 +3681,9 @@ void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) { } } +TString TPersQueue::LogPrefix() const { + return TStringBuilder() << SelfId() << " "; +} bool TPersQueue::HandleHook(STFUNC_SIG) { diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 6b31eb64fc..e0db6d8c9e 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -160,6 +160,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx); void ProcessSourceIdRequests(ui32 partitionId); + TString LogPrefix() const; + static constexpr const char * KeyConfig() { return "_config"; } static constexpr const char * KeyState() { return "_state"; } static constexpr const char * KeyTxInfo() { return "_txinfo"; } diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h index 164bcc0965..838562c7ac 100644 --- a/ydb/core/persqueue/read.h +++ b/ydb/core/persqueue/read.h @@ -21,15 +21,13 @@ namespace NPQ { return NKikimrServices::TActivity::PERSQUEUE_CACHE_ACTOR; } - TPQCacheProxy(const TActorId& tablet, TString topicName, ui64 tabletId, ui32 size) + TPQCacheProxy(const TActorId& tablet, ui64 tabletId) : Tablet(tablet) - , TopicName(topicName) , TabletId(tabletId) , Cookie(0) - , Cache(tabletId, size) + , Cache(tabletId) , CountersUpdateTime(TAppData::TimeProvider->Now()) { - Y_ABORT_UNLESS(topicName.size(), "CacheProxy with empty topic name"); } void Bootstrap(const TActorContext& ctx) @@ -92,7 +90,9 @@ namespace NPQ { void Handle(TEvPQ::TEvChangeCacheConfig::TPtr& ev, const TActorContext& ctx) { - Y_UNUSED(ev); + if (ev->Get()->TopicName) { + TopicName = ev->Get()->TopicName; + } Cache.Touch(ctx); } diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 87fdbba5ca..86a1839e4b 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -1,4 +1,5 @@ #include "transaction.h" +#include "utils.h" namespace NKikimr::NPQ { @@ -14,6 +15,13 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& MinStep = tx.GetMinStep(); MaxStep = tx.GetMaxStep(); + for (ui64 tabletId : tx.GetSenders()) { + Senders.insert(tabletId); + } + for (ui64 tabletId : tx.GetReceivers()) { + Receivers.insert(tabletId); + } + switch (Kind) { case NKikimrPQ::TTransaction::KIND_DATA: InitDataTransaction(tx); @@ -40,13 +48,6 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& void TDistributedTransaction::InitDataTransaction(const NKikimrPQ::TTransaction& tx) { - for (ui64 tabletId : tx.GetSenders()) { - Senders.insert(tabletId); - } - for (ui64 tabletId : tx.GetReceivers()) { - Receivers.insert(tabletId); - } - InitPartitions(tx.GetOperations()); } @@ -65,19 +66,19 @@ void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransactio TabletConfig = tx.GetTabletConfig(); BootstrapConfig = tx.GetBootstrapConfig(); - InitPartitions(TabletConfig); + InitPartitions(); } -void TDistributedTransaction::InitPartitions(const NKikimrPQ::TPQTabletConfig& config) +void TDistributedTransaction::InitPartitions() { Partitions.clear(); - if (config.PartitionsSize()) { - for (const auto& partition : config.GetPartitions()) { + if (TabletConfig.PartitionsSize()) { + for (const auto& partition : TabletConfig.GetPartitions()) { Partitions.insert(partition.GetPartitionId()); } } else { - for (auto partitionId : config.GetPartitionIds()) { + for (auto partitionId : TabletConfig.GetPartitionIds()) { Partitions.insert(partitionId); } } @@ -103,7 +104,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr case NKikimrPQ::TEvProposeTransaction::kConfig: Y_ABORT_UNLESS(event.HasConfig()); MaxStep = Max<ui64>(); - OnProposeTransaction(event.GetConfig()); + OnProposeTransaction(event.GetConfig(), extractTabletId); break; default: Y_FAIL_S("unknown TxBody case"); @@ -138,14 +139,41 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac ReadSetCount = 0; } -void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody) +void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody, + ui64 extractTabletId) { Kind = NKikimrPQ::TTransaction::KIND_CONFIG; TabletConfig = txBody.GetTabletConfig(); BootstrapConfig = txBody.GetBootstrapConfig(); - InitPartitions(TabletConfig); + TPartitionGraph graph; + graph.Rebuild(TabletConfig); + + for (const auto& p : TabletConfig.GetPartitions()) { + auto node = graph.GetPartition(p.GetPartitionId()); + if (!node) { + // Old configuration format without AllPartitions. Split/Merge is not supported. + continue; + } + if (node.value()->Children.empty()) { + for (const auto* r : node.value()->Parents) { + if (extractTabletId != r->TabletId) { + Senders.insert(r->TabletId); + } + } + } + + for (const auto* r : node.value()->Children) { + if (r->Children.empty()) { + if (extractTabletId != r->TabletId) { + Receivers.insert(r->TabletId); + } + } + } + } + + InitPartitions(); PartitionRepliesCount = 0; PartitionRepliesExpected = 0; diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index 3c79b81867..fffed8d546 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -24,7 +24,8 @@ struct TDistributedTransaction { ui64 extractTabletId); void OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody, ui64 extractTabletId); - void OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody); + void OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody, + ui64 extractTabletId); void OnPlanStep(ui64 step); void OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPredicateResult& event); void OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event); @@ -85,7 +86,7 @@ struct TDistributedTransaction { void InitConfigTransaction(const NKikimrPQ::TTransaction& tx); void InitPartitions(const google::protobuf::RepeatedPtrField<NKikimrPQ::TPartitionOperation>& tx); - void InitPartitions(const NKikimrPQ::TPQTabletConfig& config); + void InitPartitions(); template<class E> void OnPartitionResult(const E& event, EDecision decision); diff --git a/ydb/core/persqueue/ut/splitmerge_ut.cpp b/ydb/core/persqueue/ut/splitmerge_ut.cpp index fee6ed80eb..b47dd546a2 100644 --- a/ydb/core/persqueue/ut/splitmerge_ut.cpp +++ b/ydb/core/persqueue/ut/splitmerge_ut.cpp @@ -29,6 +29,8 @@ TEvTx* CreateRequest(ui64 txId, NKikimrSchemeOp::TModifyScheme&& tx) { } void DoRequest(TTopicSdkTestSetup& setup, ui64& txId, NKikimrSchemeOp::TPersQueueGroupDescription& scheme) { + Sleep(TDuration::Seconds(1)); + Cerr << "ALTER_SCHEME: " << scheme << Endl; const auto sender = setup.GetRuntime().AllocateEdgeActor(); @@ -54,6 +56,8 @@ void DoRequest(TTopicSdkTestSetup& setup, ui64& txId, NKikimrSchemeOp::TPersQueu auto e = setup.GetRuntime().GrabEdgeEvent<TEvSchemeShard::TEvModifySchemeTransactionResult>(handle); UNIT_ASSERT_EQUAL_C(e->Record.GetStatus(), TEvSchemeShard::EStatus::StatusAccepted, "Unexpected status " << NKikimrScheme::EStatus_Name(e->Record.GetStatus()) << " " << e->Record.GetReason()); + + Sleep(TDuration::Seconds(1)); } void SplitPartition(TTopicSdkTestSetup& setup, ui64& txId, const ui32 partition, TString boundary) { @@ -82,65 +86,60 @@ auto Msg(const TString& data, ui64 seqNo) { return msg; } +TTopicSdkTestSetup CreateSetup() { + NKikimrConfig::TFeatureFlags ff; + ff.SetEnableTopicSplitMerge(true); + ff.SetEnablePQConfigTransactionsAtSchemeShard(true); + auto settings = TTopicSdkTestSetup::MakeServerSettings(); + settings.SetFeatureFlags(ff); -Y_UNIT_TEST_SUITE(TopicSplitMerge) { - Y_UNIT_TEST(PartitionSplit) { - TTopicSdkTestSetup setup("TopicSplitMerge", TTopicSdkTestSetup::MakeServerSettings(), false); - - auto& ff = setup.GetRuntime().GetAppData().FeatureFlags; - ff.SetEnableTopicSplitMerge(true); - ff.SetEnablePQConfigTransactionsAtSchemeShard(true); - - setup.CreateTopic(); - - TTopicClient client = setup.MakeClient(); + auto setup = TTopicSdkTestSetup("TopicSplitMerge", settings, false); - setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); - setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE); + setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE); - TString producer1 = "producer-1"; - TString producer2 = "producer-2"; - TString producer3 = "producer-3"; - TString producer4 = "producer-4"; + return setup; +} - auto writeSettings1 = TWriteSessionSettings() - .Path(TEST_TOPIC) - .ProducerId(producer1) - .MessageGroupId(producer1); - auto writeSession1 = client.CreateSimpleBlockingWriteSession(writeSettings1); +std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt) { + auto writeSettings = TWriteSessionSettings() + .Path(TEST_TOPIC) + .ProducerId(producer); + if (partition) { + writeSettings.PartitionId(*partition); + } else { + writeSettings.MessageGroupId(producer); + } - auto writeSettings2 = TWriteSessionSettings() - .Path(TEST_TOPIC) - .ProducerId(producer2) - .MessageGroupId(producer2); - auto writeSession2 = client.CreateSimpleBlockingWriteSession(writeSettings2); + writeSettings.EventHandlers_.AcksHandler([&](TWriteSessionEvent::TAcksEvent& ev) { + Cerr << ">>>>> Received TWriteSessionEvent::TAcksEvent " << ev.DebugString() << Endl << Flush; + }); - auto writeSettings3 = TWriteSessionSettings() - .Path(TEST_TOPIC) - .ProducerId(producer3) - .PartitionId(0); - auto writeSession3 = client.CreateSimpleBlockingWriteSession(writeSettings3); + return client.CreateSimpleBlockingWriteSession(writeSettings); +} - Cerr << ">>>>> 1 " << Endl; +struct TTestReadSession { + struct MsgInfo { + ui64 PartitionId; + ui64 SeqNo; + ui64 Offset; + TString Data; + }; - struct MsgInfo { - ui64 PartitionId; - ui64 SeqNo; - ui64 Offset; - TString Data; - }; + std::shared_ptr<IReadSession> Session; - NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>(); - std::vector<MsgInfo> receivedMessages; - std::set<size_t> partitions; + NThreading::TPromise<void> Promise = NThreading::NewPromise<void>(); + std::vector<MsgInfo> ReceivedMessages; + std::set<size_t> Partitions; + TTestReadSession(TTopicClient& client, size_t expectedMessagesCount) { auto readSettings = TReadSessionSettings() .ConsumerName(TEST_CONSUMER) .AppendTopics(TEST_TOPIC); readSettings.EventHandlers_.SimpleDataHandlers( - [&] + [&, expectedMessagesCount] (TReadSessionEvent::TDataReceivedEvent& ev) mutable { auto& messages = ev.GetMessages(); for (size_t i = 0u; i < messages.size(); ++i) { @@ -151,14 +150,14 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { << ", seqNo=" << message.GetSeqNo() << ", offset=" << message.GetOffset() << Endl; - receivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(), + ReceivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(), message.GetSeqNo(), message.GetOffset(), message.GetData()}); } - if (receivedMessages.size() == 6) { - checkedPromise.SetValue(); + if (ReceivedMessages.size() == expectedMessagesCount) { + Promise.SetValue(); } }); @@ -166,24 +165,70 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { [&] (TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable { Cerr << ">>>>> Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl; - partitions.insert(ev.GetPartitionSession()->GetPartitionId()); + Partitions.insert(ev.GetPartitionSession()->GetPartitionId()); ev.Confirm(); }); - auto readSession = client.CreateReadSession(readSettings); + Session = client.CreateReadSession(readSettings); + } - Cerr << ">>>>> 2 " << Endl; + void WaitAllMessages() { + Promise.GetFuture().GetValueSync(); + } +}; + +Y_UNIT_TEST_SUITE(TopicSplitMerge) { + + Y_UNIT_TEST(Simple) { + TTopicSdkTestSetup setup = CreateSetup(); + setup.CreateTopic(); + + TTopicClient client = setup.MakeClient(); + + auto writeSession1 = CreateWriteSession(client, "producer-1"); + auto writeSession2 = CreateWriteSession(client, "producer-2"); + + TTestReadSession ReadSession(client, 2); UNIT_ASSERT(writeSession1->Write(Msg("message_1.1", 2))); UNIT_ASSERT(writeSession2->Write(Msg("message_2.1", 3))); - UNIT_ASSERT(writeSession3->Write(Msg("message_3.1", 1))); - Cerr << ">>>>> 3 " << Endl; + ReadSession.WaitAllMessages(); - ui64 txId = 0; - SplitPartition(setup, ++txId, 0, "a"); + for(const auto& info : ReadSession.ReceivedMessages) { + if (info.Data == "message_1.1") { + UNIT_ASSERT_EQUAL(0, info.PartitionId); + UNIT_ASSERT_EQUAL(2, info.SeqNo); + } else if (info.Data == "message_2.1") { + UNIT_ASSERT_EQUAL(0, info.PartitionId); + UNIT_ASSERT_EQUAL(3, info.SeqNo); + } else { + UNIT_ASSERT_C(false, "Unexpected message: " << info.Data); + } + } - Cerr << ">>>>> 4 " << Endl; + writeSession1->Close(TDuration::Seconds(1)); + writeSession2->Close(TDuration::Seconds(1)); + } + + Y_UNIT_TEST(PartitionSplit) { + TTopicSdkTestSetup setup = CreateSetup(); + setup.CreateTopic(); + + TTopicClient client = setup.MakeClient(); + + auto writeSession1 = CreateWriteSession(client, "producer-1"); + auto writeSession2 = CreateWriteSession(client, "producer-2"); + auto writeSession3 = CreateWriteSession(client, "producer-3", 0); + + TTestReadSession ReadSession(client, 6); + + UNIT_ASSERT(writeSession1->Write(Msg("message_1.1", 2))); + UNIT_ASSERT(writeSession2->Write(Msg("message_2.1", 3))); + UNIT_ASSERT(writeSession3->Write(Msg("message_3.1", 1))); + + ui64 txId = 1006; + SplitPartition(setup, ++txId, 0, "a"); writeSession1->Write(Msg("message_1.2_2", 2)); // Will be ignored because duplicated SeqNo writeSession3->Write(Msg("message_3.2", 11)); // Will be fail because partition is not writable after split @@ -197,14 +242,9 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { auto writeSession4 = client.CreateSimpleBlockingWriteSession(writeSettings4); writeSession4->Write(TWriteMessage("message_4.1")); - Cerr << ">>>>> 5 " << Endl; + ReadSession.WaitAllMessages(); - checkedPromise.GetFuture().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(6, receivedMessages.size()); - - Cerr << ">>>>> 6 " << Endl; - - for(const auto& info : receivedMessages) { + for(const auto& info : ReadSession.ReceivedMessages) { if (info.Data == "message_1.1") { UNIT_ASSERT_EQUAL(0, info.PartitionId); UNIT_ASSERT_EQUAL(2, info.SeqNo); @@ -228,131 +268,39 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { } } - Cerr << ">>>>> 7 " << Endl; - writeSession1->Close(TDuration::Seconds(1)); writeSession2->Close(TDuration::Seconds(1)); writeSession3->Close(TDuration::Seconds(1)); - readSession->Close(TDuration::Seconds(1)); - - Cerr << ">>>>> 8 " << Endl; } Y_UNIT_TEST(PartitionMerge) { - TTopicSdkTestSetup setup("TopicSplitMerge", TTopicSdkTestSetup::MakeServerSettings(), false); - - auto& ff = setup.GetRuntime().GetAppData().FeatureFlags; - ff.SetEnableTopicSplitMerge(true); - ff.SetEnablePQConfigTransactionsAtSchemeShard(true); - + TTopicSdkTestSetup setup = CreateSetup(); setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2); TTopicClient client = setup.MakeClient(); - setup.GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); - setup.GetRuntime().SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_TRACE); - - TString producer1 = "producer-1"; - TString producer2 = "producer-2"; - TString producer3 = "producer-3"; - - auto writeSettings1 = TWriteSessionSettings() - .Path(TEST_TOPIC) - .ProducerId(producer1) - .MessageGroupId(producer1) - .PartitionId(0); - auto writeSession1 = client.CreateSimpleBlockingWriteSession(writeSettings1); - - auto writeSettings2 = TWriteSessionSettings() - .Path(TEST_TOPIC) - .ProducerId(producer2) - .MessageGroupId(producer2) - .PartitionId(1); - auto writeSession2 = client.CreateSimpleBlockingWriteSession(writeSettings2); - - Cerr << ">>>>> 1 " << Endl; - - struct MsgInfo { - ui64 PartitionId; - ui64 SeqNo; - ui64 Offset; - TString Data; - }; + auto writeSession1 = CreateWriteSession(client, "producer-1", 0); + auto writeSession2 = CreateWriteSession(client, "producer-2", 1); - NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>(); - std::vector<MsgInfo> receivedMessages; - std::set<size_t> partitions; - - auto readSettings = TReadSessionSettings() - .ConsumerName(TEST_CONSUMER) - .AppendTopics(TEST_TOPIC); - - readSettings.EventHandlers_.SimpleDataHandlers( - [&] - (TReadSessionEvent::TDataReceivedEvent& ev) mutable { - auto& messages = ev.GetMessages(); - for (size_t i = 0u; i < messages.size(); ++i) { - auto& message = messages[i]; - - Cerr << ">>>>> Received message partitionId=" << message.GetPartitionSession()->GetPartitionId() - << ", message=" << message.GetData() - << ", seqNo=" << message.GetSeqNo() - << ", offset=" << message.GetOffset() - << Endl; - receivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(), - message.GetSeqNo(), - message.GetOffset(), - message.GetData()}); - } - - if (receivedMessages.size() == 3) { - checkedPromise.SetValue(); - } - }); - - readSettings.EventHandlers_.StartPartitionSessionHandler( - [&] - (TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable { - Cerr << ">>>>> Received TStartPartitionSessionEvent for partition " << ev.GetPartitionSession()->GetPartitionId() << Endl; - partitions.insert(ev.GetPartitionSession()->GetPartitionId()); - ev.Confirm(); - }); - - auto readSession = client.CreateReadSession(readSettings); - - Cerr << ">>>>> 2 " << Endl; + TTestReadSession ReadSession(client, 3); UNIT_ASSERT(writeSession1->Write(Msg("message_1.1", 2))); UNIT_ASSERT(writeSession2->Write(Msg("message_2.1", 3))); - Cerr << ">>>>> 3 " << Endl; - - ui64 txId = 0; + ui64 txId = 1012; MergePartition(setup, ++txId, 0, 1); - Cerr << ">>>>> 4 " << Endl; - UNIT_ASSERT(writeSession1->Write(Msg("message_1.2", 5))); // Will be fail because partition is not writable after merge UNIT_ASSERT(writeSession2->Write(Msg("message_2.2", 7))); // Will be fail because partition is not writable after merge - auto writeSettings3 = TWriteSessionSettings() - .Path(TEST_TOPIC) - .ProducerId(producer1) - .MessageGroupId(producer1); - auto writeSession3 = client.CreateSimpleBlockingWriteSession(writeSettings3); + auto writeSession3 = CreateWriteSession(client, "producer-2", 2); UNIT_ASSERT(writeSession3->Write(Msg("message_3.1", 2))); // Will be ignored because duplicated SeqNo UNIT_ASSERT(writeSession3->Write(Msg("message_3.2", 11))); + ReadSession.WaitAllMessages(); - Cerr << ">>>>> 5 " << Endl; - - checkedPromise.GetFuture().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(3, receivedMessages.size()); - - Cerr << ">>>>> 6 " << Endl; - - for(const auto& info : receivedMessages) { + for(const auto& info : ReadSession.ReceivedMessages) { if (info.Data == TString("message_1.1")) { UNIT_ASSERT_EQUAL(0, info.PartitionId); UNIT_ASSERT_EQUAL(2, info.SeqNo); @@ -367,14 +315,9 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { } } - Cerr << ">>>>> 7 " << Endl; - writeSession1->Close(TDuration::Seconds(1)); writeSession2->Close(TDuration::Seconds(1)); writeSession3->Close(TDuration::Seconds(1)); - readSession->Close(TDuration::Seconds(1)); - - Cerr << ">>>>> 8 " << Endl; } } |