diff options
author | abcdef <akotov@ydb.tech> | 2023-05-03 14:14:14 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-05-03 14:14:14 +0300 |
commit | 38cdbddff4e7f44b5d262f430456b4bfd27009c2 (patch) | |
tree | 523b99bed12243d353a5818ac81b55a8cbaaf090 | |
parent | 80284603b494e5f2fd78bd0ca005f3de80adc304 (diff) | |
download | ydb-38cdbddff4e7f44b5d262f430456b4bfd27009c2.tar.gz |
tests for transactions with topics and database tables
Добавлены тесты для транзакций с топиками и таблицами. В тестах показаны ситуации когда фиксируются изменения в транзакции и когда возникает ошибка во время исполнения
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 110 | ||||
-rw-r--r-- | ydb/core/persqueue/events/global.h | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 27 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 52 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.cpp | 16 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.h | 6 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp | 292 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/demo_tx.cpp | 571 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/topic_service_ut.cpp | 2 |
15 files changed, 706 insertions, 386 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 784988357bb..63afc38d289 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -154,13 +154,6 @@ public: << ", datashard " << x.first << " not finished yet: " << ToString(x.second.State)); } } - for (const auto& x : TopicTabletStates) { - if (x.second.State != TShardState::EState::Finished) { - ++notFinished; - LOG_D("ActorState: " << CurrentStateFuncName() - << ", topicTablet " << x.first << " not finished yet: " << ToString(x.second.State)); - } - } if (notFinished == 0 && TBase::CheckExecutionComplete()) { return; } @@ -177,11 +170,6 @@ public: sb << "DS " << shardId << " (" << ToString(shardState.State) << "), "; } } - for (const auto& [tabletId, tabletState] : TopicTabletStates) { - if (tabletState.State != TShardState::EState::Finished) { - sb << "PQ " << tabletId << " (" << ToString(tabletState.State) << "), "; - } - } LOG_D(sb); } } @@ -330,18 +318,28 @@ private: } void HandlePrepare(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) { - NKikimrPQ::TEvProposeTransactionResult& event = ev->Get()->Record; + auto& event = ev->Get()->Record; + const ui64 tabletId = event.GetOrigin(); - LOG_D("Got propose result, topic tablet: " << event.GetOrigin() << ", status: " - << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); + LOG_D("Got propose result" << + ", PQ tablet: " << tabletId << + ", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); - TShardState *state = TopicTabletStates.FindPtr(event.GetOrigin()); - YQL_ENSURE(state); - - YQL_ENSURE(event.GetStatus() == NKikimrPQ::TEvProposeTransactionResult::ERROR); + TShardState* state = ShardStates.FindPtr(tabletId); + YQL_ENSURE(state, "Unexpected propose result from unknown PQ tablet " << tabletId); - auto issue = YqlIssue({}, TIssuesIds::KIKIMR_OPERATION_ABORTED); - ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issue); + switch (event.GetStatus()) { + case NKikimrPQ::TEvProposeTransactionResult::PREPARED: + if (!ShardPrepared(*state, event)) { + return CancelProposal(tabletId); + } + return CheckPrepareCompleted(); + case NKikimrPQ::TEvProposeTransactionResult::COMPLETE: + YQL_ENSURE(false); + default: + CancelProposal(tabletId); + return PQTabletError(event); + } } void HandlePrepare(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) { @@ -531,6 +529,7 @@ private: state.State = TShardState::EState::Finished; + YQL_ENSURE(state.DatashardState.Defined()); YQL_ENSURE(!state.DatashardState->Follower); Send(MakePipePeNodeCacheID(/* allowFollowers */ false), new TEvPipeCache::TEvForward( @@ -539,13 +538,13 @@ private: } } - bool ShardPrepared(TShardState& state, const NKikimrTxDataShard::TEvProposeTransactionResult& result) { + template<class E> + bool ShardPreparedImpl(TShardState& state, const E& result) { YQL_ENSURE(state.State == TShardState::EState::Preparing); state.State = TShardState::EState::Prepared; state.DatashardState->ShardMinStep = result.GetMinStep(); state.DatashardState->ShardMaxStep = result.GetMaxStep(); - state.DatashardState->ReadSize += result.GetReadSize(); ui64 coordinator = 0; if (result.DomainCoordinatorsSize()) { @@ -577,42 +576,20 @@ private: return true; } - bool ShardPrepared(TShardState& state, const NKikimrTxColumnShard::TEvProposeTransactionResult& result) { - YQL_ENSURE(state.State == TShardState::EState::Preparing); - state.State = TShardState::EState::Prepared; - - state.DatashardState->ShardMinStep = result.GetMinStep(); - state.DatashardState->ShardMaxStep = result.GetMaxStep(); -// state.DatashardState->ReadSize += result.GetReadSize(); - - ui64 coordinator = 0; - if (result.DomainCoordinatorsSize()) { - auto domainCoordinators = TCoordinators(TVector<ui64>(result.GetDomainCoordinators().begin(), - result.GetDomainCoordinators().end())); - coordinator = domainCoordinators.Select(TxId); - } - - if (coordinator && !TxCoordinator) { - TxCoordinator = coordinator; - } - - if (!TxCoordinator || TxCoordinator != coordinator) { - LOG_E("Handle TEvProposeTransactionResult: unable to select coordinator. Tx canceled, actorId: " << SelfId() - << ", previously selected coordinator: " << TxCoordinator - << ", coordinator selected at propose result: " << coordinator); - - Counters->TxProxyMon->TxResultAborted->Inc(); - ReplyErrorAndDie(Ydb::StatusIds::CANCELLED, MakeIssue( - NKikimrIssues::TIssuesIds::TX_DECLINED_IMPLICIT_COORDINATOR, "Unable to choose coordinator.")); - return false; - } - - LastPrepareReply = TInstant::Now(); - if (!FirstPrepareReply) { - FirstPrepareReply = LastPrepareReply; + bool ShardPrepared(TShardState& state, const NKikimrTxDataShard::TEvProposeTransactionResult& result) { + bool success = ShardPreparedImpl(state, result); + if (success) { + state.DatashardState->ReadSize += result.GetReadSize(); } + return success; + } - return true; + bool ShardPrepared(TShardState& state, const NKikimrTxColumnShard::TEvProposeTransactionResult& result) { + return ShardPreparedImpl(state, result); + } + + bool ShardPrepared(TShardState& state, const NKikimrPQ::TEvProposeTransactionResult& result) { + return ShardPreparedImpl(state, result); } void ShardError(const NKikimrTxDataShard::TEvProposeTransactionResult& result) { @@ -819,10 +796,6 @@ private: auto& affectedSet = *transaction.MutableAffectedSet(); affectedSet.Reserve(static_cast<int>(ShardStates.size())); - // - // TODO(abcdef): учесть таблетки топиков - // - ui64 aggrMinStep = 0; ui64 aggrMaxStep = Max<ui64>(); ui64 totalReadSize = 0; @@ -858,10 +831,6 @@ private: } } - // - // TODO(abcdef): учесть таблетки топиков - // - item.SetFlags(affectedFlags); } @@ -920,10 +889,11 @@ private: void HandleExecute(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) { NKikimrPQ::TEvProposeTransactionResult& event = ev->Get()->Record; - LOG_D("Got propose result, topic tablet: " << event.GetOrigin() << ", status: " - << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); + LOG_D("Got propose result" << + ", topic tablet: " << event.GetOrigin() << + ", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); - TShardState *state = TopicTabletStates.FindPtr(event.GetOrigin()); + TShardState *state = ShardStates.FindPtr(event.GetOrigin()); YQL_ENSURE(state); switch (event.GetStatus()) { @@ -2028,6 +1998,7 @@ private: if (auto tabletIds = Request.TopicOperations.GetSendingTabletIds()) { sendingShardsSet.insert(tabletIds.begin(), tabletIds.end()); + receivingShardsSet.insert(tabletIds.begin(), tabletIds.end()); } if (auto tabletIds = Request.TopicOperations.GetReceivingTabletIds()) { @@ -2276,7 +2247,7 @@ private: state.DatashardState->ShardReadLocks = Request.TopicOperations.TabletHasReadOperations(tabletId); - auto result = TopicTabletStates.emplace(tabletId, std::move(state)); + auto result = ShardStates.emplace(tabletId, std::move(state)); YQL_ENSURE(result.second); } } @@ -2352,7 +2323,6 @@ private: ui64 TxCoordinator = 0; THashMap<ui64, TShardState> ShardStates; - THashMap<ui64, TShardState> TopicTabletStates; TVector<NKikimrTxDataShard::TLock> Locks; bool ReadOnlyTx = true; bool VolatileTx = false; diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h index 2c39ed2375b..6caf7858267 100644 --- a/ydb/core/persqueue/events/global.h +++ b/ydb/core/persqueue/events/global.h @@ -234,6 +234,9 @@ struct TEvPersQueue { }; struct TEvCancelTransactionProposal : public TEventPB<TEvCancelTransactionProposal, NKikimrPQ::TEvCancelTransactionProposal, EvCancelTransactionProposal> { + explicit TEvCancelTransactionProposal(ui64 txId) { + Record.SetTxId(txId); + } }; struct TEvPeriodicTopicStats : public TEventPB<TEvPeriodicTopicStats, NKikimrPQ::TEvPeriodicTopicStats, EvPeriodicTopicStats> { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 5aa9dc3275d..8e4b8affb6f 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1406,11 +1406,17 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, const TString& consumer = operation.GetConsumer(); if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "Partition " << Partition << + " Consumer '" << consumer << "' has been removed"); predicate = false; break; } if (!UsersInfoStorage->GetIfExists(consumer)) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "Partition " << Partition << + " Unknown consumer '" << consumer << "'"); predicate = false; break; } @@ -1419,13 +1425,28 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer); if (operation.GetBegin() > operation.GetEnd()) { - // BAD_REQUEST + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (invalid range) " << + " Begin " << operation.GetBegin() << + " End " << operation.GetEnd()); predicate = false; } else if (userInfo.Offset != (i64)operation.GetBegin()) { - // ABORTED + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (gap) " << + " Offset " << userInfo.Offset << + " Begin " << operation.GetBegin()); predicate = false; } else if (operation.GetEnd() > EndOffset) { - // BAD_REQUEST + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (behind the last offset) " << + " EndOffset " << EndOffset << + " End " << operation.GetEnd()); predicate = false; } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 228d5077d9c..b285e24565f 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -3,6 +3,7 @@ #include "event_helpers.h" #include "partition.h" #include "read.h" +#include <ydb/core/base/tx_processing.h> #include <ydb/core/persqueue/config/config.h> #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> #include <ydb/core/protos/pqconfig.pb.h> @@ -881,6 +882,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& ConfigInited = true; + InitProcessingParams(ctx); InitializeMeteringSink(ctx); Y_VERIFY(!NewConfigShouldBeApplied); @@ -2485,6 +2487,8 @@ void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransa void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvTxProcessing::TEvPlanStep"); + NKikimrTx::TEvMediatorPlanStep& event = ev->Get()->Record; LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, @@ -2499,6 +2503,8 @@ void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorCont void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvTxProcessing::TEvReadSet"); + NKikimrTx::TEvReadSet& event = ev->Get()->Record; Y_VERIFY(event.HasTxId()); @@ -2525,6 +2531,8 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte void TPersQueue::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvTxProcessing::TEvReadSetAck"); + NKikimrTx::TEvReadSetAck& event = ev->Get()->Record; Y_VERIFY(event.HasTxId()); @@ -2547,6 +2555,14 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC { const TEvPQ::TEvTxCalcPredicateResult& event = *ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "Tablet " << TabletID() << + " Handle TEvPQ::TEvTxCalcPredicateResult" << + " Step " << event.Step << + " TxId " << event.TxId << + " Partition " << event.Partition << + " Predicate " << (event.Predicate ? "true" : "false")); + auto tx = GetTransaction(ctx, event.TxId); if (!tx) { return; @@ -2579,6 +2595,8 @@ void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const void TPersQueue::Handle(TEvPQ::TEvTxCommitDone::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvPQ::TEvTxCommitDone"); + const TEvPQ::TEvTxCommitDone& event = *ev->Get(); auto tx = GetTransaction(ctx, event.TxId); @@ -2669,7 +2687,8 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) TDistributedTransaction& tx = Txs[event.GetTxId()]; switch (tx.State) { case NKikimrPQ::TTransaction::UNKNOWN: { - tx.OnProposeTransaction(event, GetAllowedStep()); + tx.OnProposeTransaction(event, GetAllowedStep(), + TabletID()); CheckTxState(ctx, tx); break; } @@ -2838,6 +2857,10 @@ void TPersQueue::ScheduleProposeTransactionResult(const TDistributedTransaction& event->Record.SetMinStep(tx.MinStep); event->Record.SetMaxStep(tx.MaxStep); + if (ProcessingParams) { + event->Record.MutableDomainCoordinators()->CopyFrom(ProcessingParams->GetCoordinators()); + } + if (tx.SourceTablet != InvalidTabletId) { RepliesToPipe.emplace_back(tx.SourceTablet, tx.TxId, std::move(event)); } else { @@ -2875,14 +2898,16 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, Y_VERIFY(data.SerializeToString(&body)); for (ui64 receiverId : tx.Receivers) { - auto event = std::make_unique<TEvTxProcessing::TEvReadSet>(tx.Step, - tx.TxId, - TabletID(), - receiverId, - TabletID(), - body, - 0); - SendToPipe(receiverId, tx, std::move(event), ctx); + if (receiverId != TabletID()) { + auto event = std::make_unique<TEvTxProcessing::TEvReadSet>(tx.Step, + tx.TxId, + TabletID(), + receiverId, + TabletID(), + body, + 0); + SendToPipe(receiverId, tx, std::move(event), ctx); + } } tx.ReadSetAcks.clear(); @@ -3239,6 +3264,15 @@ void TPersQueue::CheckChangedTxStates(const TActorContext& ctx) } ChangedTxs.clear(); } + +void TPersQueue::InitProcessingParams(const TActorContext& ctx) +{ + auto appdata = AppData(ctx); + const ui32 domainId = appdata->DomainsInfo->GetDomainUidByTabletId(TabletID()); + Y_VERIFY(domainId != appdata->DomainsInfo->BadDomainId); + const auto& domain = appdata->DomainsInfo->GetDomain(domainId); + ProcessingParams = ExtractProcessingParams(domain); +} bool TPersQueue::AllTransactionsHaveBeenProcessed() const { diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index ac21b3834e9..912a9c4f680 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -351,6 +351,10 @@ private: const THashSet<ui64>& GetBindedTxs(ui64 tabletId); THashMap<ui64, THashSet<ui64>> BindedTxs; + + void InitProcessingParams(const TActorContext& ctx); + + TMaybe<NKikimrSubDomains::TProcessingParams> ProcessingParams; }; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 80a428f650a..1454c66de13 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -92,7 +92,8 @@ void TDistributedTransaction::InitPartitions(const NKikimrPQ::TPQTabletConfig& c } void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event, - ui64 minStep) + ui64 minStep, + ui64 extractTabletId) { Y_VERIFY(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET); Y_VERIFY(event.GetSourceCase() != NKikimrPQ::TEvProposeTransaction::SOURCE_NOT_SET); @@ -106,7 +107,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr case NKikimrPQ::TEvProposeTransaction::kData: Y_VERIFY(event.HasData()); MaxStep = MinStep + TDuration::Seconds(30).MilliSeconds(); - OnProposeTransaction(event.GetData()); + OnProposeTransaction(event.GetData(), extractTabletId); break; case NKikimrPQ::TEvProposeTransaction::kConfig: Y_VERIFY(event.HasConfig()); @@ -131,16 +132,21 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr } } -void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody) +void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody, + ui64 extractTabletId) { Kind = NKikimrPQ::TTransaction::KIND_DATA; for (ui64 tablet : txBody.GetSendingShards()) { - Senders.insert(tablet); + if (tablet != extractTabletId) { + Senders.insert(tablet); + } } for (ui64 tablet : txBody.GetReceivingShards()) { - Receivers.insert(tablet); + if (tablet != extractTabletId) { + Receivers.insert(tablet); + } } InitPartitions(txBody.GetOperations()); diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index ab58a4b7a5f..8e08f29ad46 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -20,8 +20,10 @@ struct TDistributedTransaction { explicit TDistributedTransaction(const NKikimrPQ::TTransaction& tx); void OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event, - ui64 minStep); - void OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody); + ui64 minStep, + ui64 extractTabletId); + void OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody, + ui64 extractTabletId); void OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody); void OnPlanStep(ui64 step); void OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPredicateResult& event); diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 3dc3dc5bc27..b9b438290f3 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -856,6 +856,7 @@ message TEvProposeTransactionResult { optional uint64 MaxStep = 6; optional string StatusMessage = 7; optional uint64 Step = 8; + repeated fixed64 DomainCoordinators = 9; }; message TEvCancelTransactionProposal { diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt index e309e18a0a9..872b8eb58f9 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt @@ -50,7 +50,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp ) set_property( TARGET diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt index bfe8fcae8a5..ca936d0636a 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt @@ -53,7 +53,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp ) set_property( TARGET diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt index c105b4ea1a7..7a009a2e332 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt @@ -54,7 +54,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp ) set_property( TARGET diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt index ce19fe5c450..7f2067278e5 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt @@ -43,7 +43,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp ) set_property( TARGET diff --git a/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp b/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp deleted file mode 100644 index 88b3943cd14..00000000000 --- a/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp +++ /dev/null @@ -1,292 +0,0 @@ -#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> - -#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h> -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> -#include <ydb/public/sdk/cpp/client/ydb_table/table.h> -#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> - -#include <ydb/core/protos/services.pb.h> - -#include <util/stream/output.h> -#include <util/string/builder.h> - -#include <library/cpp/testing/unittest/registar.h> - -#include "pq_data_writer.h" - -namespace NKikimr::NPersQueueTests { - -Y_UNIT_TEST_SUITE(ImmediateTx) { - -class TImmediateTxFixture : public NUnitTest::TBaseFixture { -protected: - void SetUp(NUnitTest::TTestContext&) override; - - void CreateTestServer(); - void CreateTopic(); - void CreateTopicTxStub(); - - NYdb::NTable::TSession CreateSession(); - NYdb::NTable::TTransaction BeginTx(NYdb::NTable::TSession& session); - void CommitTx(NYdb::NTable::TTransaction& tx, NYdb::EStatus status); - - using TTopicReadSession = NYdb::NPersQueue::IReadSession; - using TTopicReadSessionPtr = std::shared_ptr<TTopicReadSession>; - - TTopicReadSessionPtr CreateTopicReadSession(const TString& topic, - const TString& consumer); - void Wait_CreatePartitionStreamEvent(TTopicReadSession& reader, - ui64 committedOffset); - void Wait_DataReceivedEvent(TTopicReadSession& reader, - ui64 offset); - - void Call_UpdateOffsetsInTransaction(const TString& sessionId, - const TString& txId, - const TString& consumer, - ui64 rangeBegin, - ui64 rangeEnd); - - const TString CONSUMER = "user"; - const TString SHORT_TOPIC_NAME = "demo"; - const TString DC = "dc1"; - const TString FULL_TOPIC_NAME = "rt3." + DC + "--" + SHORT_TOPIC_NAME; - const TString AUTH_TOKEN = "x-user-x@builtin"; - const TString DATABASE = "/Root"; - const TString TOPIC_PARENT = DATABASE + "/PQ"; - const TString TOPIC_PATH = TOPIC_PARENT + "/" + FULL_TOPIC_NAME; - - TMaybe<NPersQueue::TTestServer> Server; - std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStub; -}; - -void TImmediateTxFixture::SetUp(NUnitTest::TTestContext&) -{ - CreateTestServer(); - CreateTopic(); - CreateTopicTxStub(); -} - -void TImmediateTxFixture::CreateTestServer() -{ - Server.ConstructInPlace(PQSettings(0).SetDomainName("Root").SetEnableTopicServiceTx(true)); - - Server->EnableLogs({NKikimrServices::FLAT_TX_SCHEMESHARD - , NKikimrServices::PERSQUEUE}); -} - -void TImmediateTxFixture::CreateTopic() -{ - // - // создать топик... - // - Server->AnnoyingClient->CreateTopicNoLegacy(TOPIC_PATH, 1); - - NACLib::TDiffACL acl; - acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, AUTH_TOKEN); - Server->AnnoyingClient->ModifyACL(TOPIC_PARENT, FULL_TOPIC_NAME, acl.SerializeAsString()); - - // - // ...с несколькими сообщениями - // - TPQDataWriter writer("source-id", *Server); - - for (ui32 offset = 0; offset < 4; ++offset) { - writer.Write(TOPIC_PATH, {"data"}, false, AUTH_TOKEN); - } -} - -void TImmediateTxFixture::CreateTopicTxStub() -{ - auto channel = grpc::CreateChannel("localhost:" + ToString(Server->GrpcPort), grpc::InsecureChannelCredentials()); - TopicStub = Ydb::Topic::V1::TopicService::NewStub(channel); -} - -NYdb::NTable::TSession TImmediateTxFixture::CreateSession() -{ - NYdb::TDriverConfig config; - config.SetEndpoint(TStringBuilder() << "localhost:" << Server->GrpcPort); - config.SetDatabase(DATABASE); - config.SetAuthToken(AUTH_TOKEN); - - NYdb::TDriver driver(config); - NYdb::NTable::TClientSettings settings; - NYdb::NTable::TTableClient client(driver, settings); - - auto result = client.CreateSession().ExtractValueSync(); - UNIT_ASSERT_EQUAL(result.IsTransportError(), false); - - return result.GetSession(); -} - -NYdb::NTable::TTransaction TImmediateTxFixture::BeginTx(NYdb::NTable::TSession& session) -{ - auto result = session.BeginTransaction().ExtractValueSync(); - UNIT_ASSERT_EQUAL(result.IsTransportError(), false); - - return result.GetTransaction(); -} - -void TImmediateTxFixture::CommitTx(NYdb::NTable::TTransaction& tx, NYdb::EStatus status) -{ - auto result = tx.Commit().ExtractValueSync(); - UNIT_ASSERT_EQUAL(result.IsTransportError(), false); - - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status); -} - -auto TImmediateTxFixture::CreateTopicReadSession(const TString& topic, - const TString& consumer) -> TTopicReadSessionPtr -{ - NYdb::NPersQueue::TReadSessionSettings settings; - settings.AppendTopics(topic); - settings.ConsumerName(consumer); - settings.ReadOriginal({DC}); - - return CreateReader(*Server->AnnoyingClient->GetDriver(), settings); -} - -template<class E> -E ReadEvent(NYdb::NPersQueue::IReadSession& reader, bool block, size_t count) -{ - auto msg = reader.GetEvent(block, count); - UNIT_ASSERT(msg); - - auto ev = std::get_if<E>(&*msg); - UNIT_ASSERT(ev); - - return *ev; -} - -void TImmediateTxFixture::Wait_CreatePartitionStreamEvent(TTopicReadSession& reader, - ui64 committedOffset) -{ - auto event = ReadEvent<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(reader, true, 1); - Cerr << "TCreatePartitionStreamEvent: " << event.DebugString() << Endl; - - UNIT_ASSERT_VALUES_EQUAL(event.GetCommittedOffset(), committedOffset); - - event.Confirm(); -} - -void TImmediateTxFixture::Wait_DataReceivedEvent(TTopicReadSession& reader, - ui64 offset) -{ - auto event = ReadEvent<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(reader, true, 1); - Cerr << "TDataReceivedEvent: " << event.DebugString() << Endl; - - UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), offset); -} - -void TImmediateTxFixture::Call_UpdateOffsetsInTransaction(const TString& sessionId, - const TString& txId, - const TString& consumer, - ui64 rangeBegin, - ui64 rangeEnd) -{ - grpc::ClientContext rcontext; - rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN); - rcontext.AddMetadata("x-ydb-database", DATABASE); - - Ydb::Topic::UpdateOffsetsInTransactionRequest request; - Ydb::Topic::UpdateOffsetsInTransactionResponse response; - - request.mutable_tx()->set_id(txId); - request.mutable_tx()->set_session(sessionId); - request.set_consumer(consumer); - - auto *topic = request.mutable_topics()->Add(); - topic->set_path(TOPIC_PATH); - - auto *partition = topic->mutable_partitions()->Add(); - partition->set_partition_id(0); - - auto *range = partition->mutable_partition_offsets()->Add(); - range->set_start(rangeBegin); - range->set_end(rangeEnd); - - grpc::Status status = TopicStub->UpdateOffsetsInTransaction(&rcontext, - request, - &response); - UNIT_ASSERT(status.ok()); - - UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); -} - -Y_UNIT_TEST_F(Scenario_1, TImmediateTxFixture) -{ - { - auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); - - Wait_CreatePartitionStreamEvent(*reader, 0); - - NYdb::NTable::TSession session = CreateSession(); - NYdb::NTable::TTransaction tx = BeginTx(session); - - Wait_DataReceivedEvent(*reader, 0); - Wait_DataReceivedEvent(*reader, 1); - - Call_UpdateOffsetsInTransaction(session.GetId(), tx.GetId(), CONSUMER, 0, 2); - - CommitTx(tx, NYdb::EStatus::SUCCESS); - } - - { - auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); - - Wait_CreatePartitionStreamEvent(*reader, 2); - - NYdb::NTable::TSession session = CreateSession(); - NYdb::NTable::TTransaction tx = BeginTx(session); - - Wait_DataReceivedEvent(*reader, 2); - Wait_DataReceivedEvent(*reader, 3); - - Call_UpdateOffsetsInTransaction(session.GetId(), tx.GetId(), CONSUMER, 2, 4); - } - - { - auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); - - Wait_CreatePartitionStreamEvent(*reader, 2); - } -} - -Y_UNIT_TEST_F(Scenario_2, TImmediateTxFixture) -{ - NYdb::NTable::TSession s1 = CreateSession(); - NYdb::NTable::TTransaction t1 = BeginTx(s1); - - { - auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); - - Wait_CreatePartitionStreamEvent(*reader, 0); - - Wait_DataReceivedEvent(*reader, 0); - Wait_DataReceivedEvent(*reader, 1); - Wait_DataReceivedEvent(*reader, 2); - - Call_UpdateOffsetsInTransaction(s1.GetId(), t1.GetId(), CONSUMER, 0, 3); - } - - NYdb::NTable::TSession s2 = CreateSession(); - NYdb::NTable::TTransaction t2 = BeginTx(s2); - - { - auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); - - Wait_CreatePartitionStreamEvent(*reader, 0); - - Wait_DataReceivedEvent(*reader, 0); - Wait_DataReceivedEvent(*reader, 1); - - Call_UpdateOffsetsInTransaction(s2.GetId(), t2.GetId(), CONSUMER, 0, 2); - } - - CommitTx(t2, NYdb::EStatus::SUCCESS); - CommitTx(t1, NYdb::EStatus::ABORTED); -} - -} - -} diff --git a/ydb/services/persqueue_v1/ut/demo_tx.cpp b/ydb/services/persqueue_v1/ut/demo_tx.cpp new file mode 100644 index 00000000000..18ca69792af --- /dev/null +++ b/ydb/services/persqueue_v1/ut/demo_tx.cpp @@ -0,0 +1,571 @@ +#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> + +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> + +#include <ydb/core/protos/services.pb.h> + +#include <util/stream/output.h> +#include <util/string/builder.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include "pq_data_writer.h" + +namespace NKikimr::NPersQueueTests { + +Y_UNIT_TEST_SUITE(DemoTx) { + +class TTopicNameConstructor { +public: + TTopicNameConstructor(const TString& dc, + const TString& database); + + TString GetTopicParent() const; + TString GetTopicPath(const TString& topicName) const; + TString GetFullTopicName(const TString& topicName) const; + +private: + TString Dc; + TString Database; +}; + +TTopicNameConstructor::TTopicNameConstructor(const TString& dc, + const TString& database) : + Dc(dc), + Database(database) +{ +} + +TString TTopicNameConstructor::GetTopicParent() const +{ + return Database + "/PQ"; +} + +TString TTopicNameConstructor::GetTopicPath(const TString& topicName) const +{ + return GetTopicParent() + "/" + GetFullTopicName(topicName); +} + +TString TTopicNameConstructor::GetFullTopicName(const TString& topicName) const +{ + return "rt3." + Dc + "--" + topicName; +} + +class TTxFixture : public NUnitTest::TBaseFixture { +protected: + TTxFixture(); + + void SetUp(NUnitTest::TTestContext&) override; + + void CreateTestServer(); + void CreateTopic(const TString& topicPath); + void CreateTopicStub(); + void CreateTable(const TString& parent, const TString& name); + + NYdb::NTable::TSession CreateSession(const TString& authToken); + NYdb::NTable::TTransaction BeginTx(NYdb::NTable::TSession& session); + void CommitTx(NYdb::NTable::TTransaction& tx, NYdb::EStatus status); + + using TTopicReadSession = NYdb::NPersQueue::IReadSession; + using TTopicReadSessionPtr = std::shared_ptr<TTopicReadSession>; + + TTopicReadSessionPtr CreateTopicReadSession(const TString& topic, + const TString& consumer); + void Wait_CreatePartitionStreamEvent(TTopicReadSession& reader, + ui64 committedOffset); + void Wait_DataReceivedEvent(TTopicReadSession& reader, + ui64 offset); + + void Call_UpdateOffsetsInTransaction(const NYdb::NTable::TTransaction& tx, + const TString& consumer, + const TString& topicPath, + ui64 rangeBegin, + ui64 rangeEnd); + + void ExecSQL(const NYdb::NTable::TTransaction& tx, + const TString& query, + NYdb::EStatus status); + + void Ensure_In_Table(const TString& tablePath, + const THashMap<i64, i64>& values); + + const TString CONSUMER = "user"; + const TString SHORT_TOPIC_NAME = "demo"; + const TString DC = "dc1"; + const TString FULL_TOPIC_NAME = "rt3." + DC + "--" + SHORT_TOPIC_NAME; + const TString AUTH_TOKEN = "x-user-x@builtin"; + const TString DATABASE = "/Root"; + const TString TOPIC_PARENT = DATABASE + "/PQ"; + const TString TOPIC_PATH = TOPIC_PARENT + "/" + FULL_TOPIC_NAME; + + TMaybe<NPersQueue::TTestServer> Server; + std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStub; + TTopicNameConstructor NameCtor; +}; + +TTxFixture::TTxFixture() : + NameCtor(DC, DATABASE) +{ +} + +void TTxFixture::SetUp(NUnitTest::TTestContext&) +{ + CreateTestServer(); + + CreateTopic(SHORT_TOPIC_NAME); + CreateTopicStub(); +} + +void TTxFixture::CreateTestServer() +{ + auto settings = PQSettings(0) + .SetDomainName("Root") + .SetEnableTopicServiceTx(true); + + Server.ConstructInPlace(settings); +} + +void TTxFixture::CreateTopic(const TString& topicName) +{ + const TString topicPath = NameCtor.GetTopicPath(topicName); + const TString fullTopicName = NameCtor.GetFullTopicName(topicName); + + // + // создать топик... + // + Server->AnnoyingClient->CreateTopicNoLegacy(topicPath, 1); + + NACLib::TDiffACL acl; + acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, AUTH_TOKEN); + Server->AnnoyingClient->ModifyACL(NameCtor.GetTopicParent(), + fullTopicName, + acl.SerializeAsString()); + + // + // ...с несколькими сообщениями + // + TPQDataWriter writer("source-id", *Server); + + for (ui32 offset = 0; offset < 4; ++offset) { + writer.Write(topicPath, {"data"}, false, AUTH_TOKEN); + } +} + +void TTxFixture::CreateTopicStub() +{ + auto channel = grpc::CreateChannel("localhost:" + ToString(Server->GrpcPort), grpc::InsecureChannelCredentials()); + TopicStub = Ydb::Topic::V1::TopicService::NewStub(channel); +} + +void TTxFixture::CreateTable(const TString& parent, const TString& name) +{ + auto session = CreateSession(""); + + auto result = session.ExecuteSchemeQuery(Sprintf(R"___( + CREATE TABLE `%s/%s` ( + key Int64, + value Int64, + PRIMARY KEY (key) + ); + )___", parent.data(), name.data())).ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS); + + NACLib::TDiffACL acl; + acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, AUTH_TOKEN); + Server->AnnoyingClient->ModifyACL(parent, + name, + acl.SerializeAsString()); + + Server->AnnoyingClient->ModifyACL("/", + "Root", + acl.SerializeAsString()); +} + +NYdb::NTable::TSession TTxFixture::CreateSession(const TString& authToken) +{ + NYdb::TDriverConfig config; + config.SetEndpoint(TStringBuilder() << "localhost:" << Server->GrpcPort); + config.SetDatabase(DATABASE); + if (!authToken.empty()) { + config.SetAuthToken(authToken); + } + + NYdb::TDriver driver(config); + NYdb::NTable::TClientSettings settings; + NYdb::NTable::TTableClient client(driver, settings); + + auto result = client.CreateSession().ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + + return result.GetSession(); +} + +NYdb::NTable::TTransaction TTxFixture::BeginTx(NYdb::NTable::TSession& session) +{ + auto result = session.BeginTransaction().ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + + return result.GetTransaction(); +} + +void TTxFixture::CommitTx(NYdb::NTable::TTransaction& tx, NYdb::EStatus status) +{ + auto result = tx.Commit().ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status); +} + +auto TTxFixture::CreateTopicReadSession(const TString& topic, + const TString& consumer) -> TTopicReadSessionPtr +{ + NYdb::NPersQueue::TReadSessionSettings settings; + settings.AppendTopics(topic); + settings.ConsumerName(consumer); + settings.ReadOriginal({DC}); + + return CreateReader(*Server->AnnoyingClient->GetDriver(), settings); +} + +template<class E> +E ReadEvent(NYdb::NPersQueue::IReadSession& reader, bool block, size_t count) +{ + auto msg = reader.GetEvent(block, count); + UNIT_ASSERT(msg); + + auto ev = std::get_if<E>(&*msg); + UNIT_ASSERT(ev); + + return *ev; +} + +void TTxFixture::Wait_CreatePartitionStreamEvent(TTopicReadSession& reader, + ui64 committedOffset) +{ + auto event = ReadEvent<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(reader, true, 1); + Cerr << "TCreatePartitionStreamEvent: " << event.DebugString() << Endl; + + UNIT_ASSERT_VALUES_EQUAL(event.GetCommittedOffset(), committedOffset); + + event.Confirm(); +} + +void TTxFixture::Wait_DataReceivedEvent(TTopicReadSession& reader, + ui64 offset) +{ + auto event = ReadEvent<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(reader, true, 1); + Cerr << "TDataReceivedEvent: " << event.DebugString() << Endl; + + UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), offset); +} + +void TTxFixture::Call_UpdateOffsetsInTransaction(const NYdb::NTable::TTransaction& tx, + const TString& consumer, + const TString& topicPath, + ui64 rangeBegin, + ui64 rangeEnd) +{ + grpc::ClientContext rcontext; + rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN); + rcontext.AddMetadata("x-ydb-database", DATABASE); + + Ydb::Topic::UpdateOffsetsInTransactionRequest request; + Ydb::Topic::UpdateOffsetsInTransactionResponse response; + + request.mutable_tx()->set_id(tx.GetId()); + request.mutable_tx()->set_session(tx.GetSession().GetId()); + request.set_consumer(consumer); + + auto *topic = request.mutable_topics()->Add(); + topic->set_path(topicPath); + + auto *partition = topic->mutable_partitions()->Add(); + partition->set_partition_id(0); + + auto *range = partition->mutable_partition_offsets()->Add(); + range->set_start(rangeBegin); + range->set_end(rangeEnd); + + grpc::Status status = TopicStub->UpdateOffsetsInTransaction(&rcontext, + request, + &response); + UNIT_ASSERT(status.ok()); + + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); +} + +void TTxFixture::ExecSQL(const NYdb::NTable::TTransaction& tx, + const TString& query, + NYdb::EStatus status) +{ + auto result = + tx.GetSession().ExecuteDataQuery(query, + NYdb::NTable::TTxControl::Tx(tx)).ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status); +} + +void TTxFixture::Ensure_In_Table(const TString& tablePath, + const THashMap<i64, i64>& values) +{ + auto query = Sprintf(R"___( + SELECT key, value FROM `%s` + )___", tablePath.data()); + + auto session = CreateSession(""); + + auto result = session.ExecuteDataQuery(query, + NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS); + + auto rs = result.GetResultSetParser(0); + UNIT_ASSERT_VALUES_EQUAL(rs.RowsCount(), values.size()); + + for (size_t i = 0; i < values.size(); ++i) { + UNIT_ASSERT(rs.TryNextRow()); + + auto key = rs.ColumnParser("key").GetOptionalInt64(); + UNIT_ASSERT(key.Defined()); + + auto value = rs.ColumnParser("value").GetOptionalInt64(); + UNIT_ASSERT(value.Defined()); + + auto p = values.find(*key); + UNIT_ASSERT(p != values.end()); + + UNIT_ASSERT_VALUES_EQUAL(*value, p->second); + } +} + +Y_UNIT_TEST_F(Scenario_1, TTxFixture) +{ + { + auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader, 0); + + NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN); + NYdb::NTable::TTransaction tx = BeginTx(session); + + Wait_DataReceivedEvent(*reader, 0); + Wait_DataReceivedEvent(*reader, 1); + + Call_UpdateOffsetsInTransaction(tx, CONSUMER, TOPIC_PATH, 0, 2); + + CommitTx(tx, NYdb::EStatus::SUCCESS); + } + + { + auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader, 2); + + NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN); + NYdb::NTable::TTransaction tx = BeginTx(session); + + Wait_DataReceivedEvent(*reader, 2); + Wait_DataReceivedEvent(*reader, 3); + + Call_UpdateOffsetsInTransaction(tx, CONSUMER, TOPIC_PATH, 2, 4); + } + + { + auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader, 2); + } +} + +Y_UNIT_TEST_F(Scenario_2, TTxFixture) +{ + NYdb::NTable::TSession s1 = CreateSession(AUTH_TOKEN); + NYdb::NTable::TTransaction t1 = BeginTx(s1); + + { + auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader, 0); + + Wait_DataReceivedEvent(*reader, 0); + Wait_DataReceivedEvent(*reader, 1); + Wait_DataReceivedEvent(*reader, 2); + + Call_UpdateOffsetsInTransaction(t1, CONSUMER, TOPIC_PATH, 0, 3); + } + + NYdb::NTable::TSession s2 = CreateSession(AUTH_TOKEN); + NYdb::NTable::TTransaction t2 = BeginTx(s2); + + { + auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader, 0); + + Wait_DataReceivedEvent(*reader, 0); + Wait_DataReceivedEvent(*reader, 1); + + Call_UpdateOffsetsInTransaction(t2, CONSUMER, TOPIC_PATH, 0, 2); + } + + CommitTx(t2, NYdb::EStatus::SUCCESS); + CommitTx(t1, NYdb::EStatus::ABORTED); +} + +Y_UNIT_TEST_F(Scenario_3, TTxFixture) +{ + CreateTopic("demo_1"); + CreateTopic("demo_2"); + + { + auto reader_1 = CreateTopicReadSession("demo_1", CONSUMER); + auto reader_2 = CreateTopicReadSession("demo_2", CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader_1, 0); + Wait_CreatePartitionStreamEvent(*reader_2, 0); + + NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN); + NYdb::NTable::TTransaction tx = BeginTx(session); + + Wait_DataReceivedEvent(*reader_1, 0); + Wait_DataReceivedEvent(*reader_1, 1); + + Wait_DataReceivedEvent(*reader_2, 0); + + Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo_1"), 0, 2); + Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo_2"), 0, 1); + + CommitTx(tx, NYdb::EStatus::SUCCESS); + } + + { + auto reader_1 = CreateTopicReadSession("demo_1", CONSUMER); + auto reader_2 = CreateTopicReadSession("demo_2", CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader_1, 2); + Wait_CreatePartitionStreamEvent(*reader_2, 1); + + NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN); + NYdb::NTable::TTransaction tx = BeginTx(session); + + Wait_DataReceivedEvent(*reader_1, 2); + + Wait_DataReceivedEvent(*reader_2, 1); + Wait_DataReceivedEvent(*reader_2, 2); + + Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo_1"), 2, 3); + Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo_2"), 1, 3); + } + + { + auto reader_1 = CreateTopicReadSession("demo_1", CONSUMER); + auto reader_2 = CreateTopicReadSession("demo_2", CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader_1, 2); + Wait_CreatePartitionStreamEvent(*reader_2, 1); + } +} + +Y_UNIT_TEST_F(Scenario_4, TTxFixture) +{ + CreateTable("/Root", "table"); + + { + auto reader = CreateTopicReadSession("demo", CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader, 0); + + NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN); + NYdb::NTable::TTransaction tx = BeginTx(session); + + ExecSQL(tx, "SELECT key, value FROM `/Root/table` WHERE key = 1;", + NYdb::EStatus::SUCCESS); + + Wait_DataReceivedEvent(*reader, 0); + Wait_DataReceivedEvent(*reader, 1); + + ExecSQL(tx, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 1);", + NYdb::EStatus::SUCCESS); + + Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo"), 0, 2); + + CommitTx(tx, NYdb::EStatus::SUCCESS); + } + + { + Ensure_In_Table("/Root/table", {{1, 1}}); + + auto reader = CreateTopicReadSession("demo", CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader, 2); + + NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN); + NYdb::NTable::TTransaction tx = BeginTx(session); + + Wait_DataReceivedEvent(*reader, 2); + + ExecSQL(tx, "UPSERT INTO `/Root/table` (key, value) VALUES (2, 2);", + NYdb::EStatus::SUCCESS); + + Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo"), 2, 3); + } + + { + Ensure_In_Table("/Root/table", {{1, 1}}); + + auto reader = CreateTopicReadSession("demo", CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader, 2); + } +} + +Y_UNIT_TEST_F(Scenario_5, TTxFixture) +{ + CreateTable("/Root", "table"); + + NYdb::NTable::TSession s1 = CreateSession(AUTH_TOKEN); + NYdb::NTable::TTransaction t1 = BeginTx(s1); + + { + auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader, 0); + + Wait_DataReceivedEvent(*reader, 0); + Wait_DataReceivedEvent(*reader, 1); + Wait_DataReceivedEvent(*reader, 2); + + ExecSQL(t1, "SELECT key, value FROM `/Root/table` WHERE key = 1;", + NYdb::EStatus::SUCCESS); + + Call_UpdateOffsetsInTransaction(t1, CONSUMER, TOPIC_PATH, 0, 3); + } + + NYdb::NTable::TSession s2 = CreateSession(AUTH_TOKEN); + NYdb::NTable::TTransaction t2 = BeginTx(s2); + + { + auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER); + + Wait_CreatePartitionStreamEvent(*reader, 0); + + Wait_DataReceivedEvent(*reader, 0); + Wait_DataReceivedEvent(*reader, 1); + + Call_UpdateOffsetsInTransaction(t2, CONSUMER, TOPIC_PATH, 0, 2); + } + + CommitTx(t2, NYdb::EStatus::SUCCESS); + CommitTx(t1, NYdb::EStatus::ABORTED); +} + +} + +} diff --git a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp index 6388fc8fbe4..16882a8c62f 100644 --- a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp +++ b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp @@ -409,7 +409,7 @@ Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TUpdateOffsetsInTransacti auto result = tx->Commit().ExtractValueSync(); Cerr << ">>> CommitTx >>>" << Endl; UNIT_ASSERT_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::INTERNAL_ERROR); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED); } } |