aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-05-03 14:14:14 +0300
committerabcdef <akotov@ydb.tech>2023-05-03 14:14:14 +0300
commit38cdbddff4e7f44b5d262f430456b4bfd27009c2 (patch)
tree523b99bed12243d353a5818ac81b55a8cbaaf090
parent80284603b494e5f2fd78bd0ca005f3de80adc304 (diff)
downloadydb-38cdbddff4e7f44b5d262f430456b4bfd27009c2.tar.gz
tests for transactions with topics and database tables
Добавлены тесты для транзакций с топиками и таблицами. В тестах показаны ситуации когда фиксируются изменения в транзакции и когда возникает ошибка во время исполнения
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp110
-rw-r--r--ydb/core/persqueue/events/global.h3
-rw-r--r--ydb/core/persqueue/partition.cpp27
-rw-r--r--ydb/core/persqueue/pq_impl.cpp52
-rw-r--r--ydb/core/persqueue/pq_impl.h4
-rw-r--r--ydb/core/persqueue/transaction.cpp16
-rw-r--r--ydb/core/persqueue/transaction.h6
-rw-r--r--ydb/core/protos/pqconfig.proto1
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp292
-rw-r--r--ydb/services/persqueue_v1/ut/demo_tx.cpp571
-rw-r--r--ydb/services/persqueue_v1/ut/topic_service_ut.cpp2
15 files changed, 706 insertions, 386 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 784988357bb..63afc38d289 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -154,13 +154,6 @@ public:
<< ", datashard " << x.first << " not finished yet: " << ToString(x.second.State));
}
}
- for (const auto& x : TopicTabletStates) {
- if (x.second.State != TShardState::EState::Finished) {
- ++notFinished;
- LOG_D("ActorState: " << CurrentStateFuncName()
- << ", topicTablet " << x.first << " not finished yet: " << ToString(x.second.State));
- }
- }
if (notFinished == 0 && TBase::CheckExecutionComplete()) {
return;
}
@@ -177,11 +170,6 @@ public:
sb << "DS " << shardId << " (" << ToString(shardState.State) << "), ";
}
}
- for (const auto& [tabletId, tabletState] : TopicTabletStates) {
- if (tabletState.State != TShardState::EState::Finished) {
- sb << "PQ " << tabletId << " (" << ToString(tabletState.State) << "), ";
- }
- }
LOG_D(sb);
}
}
@@ -330,18 +318,28 @@ private:
}
void HandlePrepare(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) {
- NKikimrPQ::TEvProposeTransactionResult& event = ev->Get()->Record;
+ auto& event = ev->Get()->Record;
+ const ui64 tabletId = event.GetOrigin();
- LOG_D("Got propose result, topic tablet: " << event.GetOrigin() << ", status: "
- << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus()));
+ LOG_D("Got propose result" <<
+ ", PQ tablet: " << tabletId <<
+ ", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus()));
- TShardState *state = TopicTabletStates.FindPtr(event.GetOrigin());
- YQL_ENSURE(state);
-
- YQL_ENSURE(event.GetStatus() == NKikimrPQ::TEvProposeTransactionResult::ERROR);
+ TShardState* state = ShardStates.FindPtr(tabletId);
+ YQL_ENSURE(state, "Unexpected propose result from unknown PQ tablet " << tabletId);
- auto issue = YqlIssue({}, TIssuesIds::KIKIMR_OPERATION_ABORTED);
- ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issue);
+ switch (event.GetStatus()) {
+ case NKikimrPQ::TEvProposeTransactionResult::PREPARED:
+ if (!ShardPrepared(*state, event)) {
+ return CancelProposal(tabletId);
+ }
+ return CheckPrepareCompleted();
+ case NKikimrPQ::TEvProposeTransactionResult::COMPLETE:
+ YQL_ENSURE(false);
+ default:
+ CancelProposal(tabletId);
+ return PQTabletError(event);
+ }
}
void HandlePrepare(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) {
@@ -531,6 +529,7 @@ private:
state.State = TShardState::EState::Finished;
+ YQL_ENSURE(state.DatashardState.Defined());
YQL_ENSURE(!state.DatashardState->Follower);
Send(MakePipePeNodeCacheID(/* allowFollowers */ false), new TEvPipeCache::TEvForward(
@@ -539,13 +538,13 @@ private:
}
}
- bool ShardPrepared(TShardState& state, const NKikimrTxDataShard::TEvProposeTransactionResult& result) {
+ template<class E>
+ bool ShardPreparedImpl(TShardState& state, const E& result) {
YQL_ENSURE(state.State == TShardState::EState::Preparing);
state.State = TShardState::EState::Prepared;
state.DatashardState->ShardMinStep = result.GetMinStep();
state.DatashardState->ShardMaxStep = result.GetMaxStep();
- state.DatashardState->ReadSize += result.GetReadSize();
ui64 coordinator = 0;
if (result.DomainCoordinatorsSize()) {
@@ -577,42 +576,20 @@ private:
return true;
}
- bool ShardPrepared(TShardState& state, const NKikimrTxColumnShard::TEvProposeTransactionResult& result) {
- YQL_ENSURE(state.State == TShardState::EState::Preparing);
- state.State = TShardState::EState::Prepared;
-
- state.DatashardState->ShardMinStep = result.GetMinStep();
- state.DatashardState->ShardMaxStep = result.GetMaxStep();
-// state.DatashardState->ReadSize += result.GetReadSize();
-
- ui64 coordinator = 0;
- if (result.DomainCoordinatorsSize()) {
- auto domainCoordinators = TCoordinators(TVector<ui64>(result.GetDomainCoordinators().begin(),
- result.GetDomainCoordinators().end()));
- coordinator = domainCoordinators.Select(TxId);
- }
-
- if (coordinator && !TxCoordinator) {
- TxCoordinator = coordinator;
- }
-
- if (!TxCoordinator || TxCoordinator != coordinator) {
- LOG_E("Handle TEvProposeTransactionResult: unable to select coordinator. Tx canceled, actorId: " << SelfId()
- << ", previously selected coordinator: " << TxCoordinator
- << ", coordinator selected at propose result: " << coordinator);
-
- Counters->TxProxyMon->TxResultAborted->Inc();
- ReplyErrorAndDie(Ydb::StatusIds::CANCELLED, MakeIssue(
- NKikimrIssues::TIssuesIds::TX_DECLINED_IMPLICIT_COORDINATOR, "Unable to choose coordinator."));
- return false;
- }
-
- LastPrepareReply = TInstant::Now();
- if (!FirstPrepareReply) {
- FirstPrepareReply = LastPrepareReply;
+ bool ShardPrepared(TShardState& state, const NKikimrTxDataShard::TEvProposeTransactionResult& result) {
+ bool success = ShardPreparedImpl(state, result);
+ if (success) {
+ state.DatashardState->ReadSize += result.GetReadSize();
}
+ return success;
+ }
- return true;
+ bool ShardPrepared(TShardState& state, const NKikimrTxColumnShard::TEvProposeTransactionResult& result) {
+ return ShardPreparedImpl(state, result);
+ }
+
+ bool ShardPrepared(TShardState& state, const NKikimrPQ::TEvProposeTransactionResult& result) {
+ return ShardPreparedImpl(state, result);
}
void ShardError(const NKikimrTxDataShard::TEvProposeTransactionResult& result) {
@@ -819,10 +796,6 @@ private:
auto& affectedSet = *transaction.MutableAffectedSet();
affectedSet.Reserve(static_cast<int>(ShardStates.size()));
- //
- // TODO(abcdef): учесть таблетки топиков
- //
-
ui64 aggrMinStep = 0;
ui64 aggrMaxStep = Max<ui64>();
ui64 totalReadSize = 0;
@@ -858,10 +831,6 @@ private:
}
}
- //
- // TODO(abcdef): учесть таблетки топиков
- //
-
item.SetFlags(affectedFlags);
}
@@ -920,10 +889,11 @@ private:
void HandleExecute(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) {
NKikimrPQ::TEvProposeTransactionResult& event = ev->Get()->Record;
- LOG_D("Got propose result, topic tablet: " << event.GetOrigin() << ", status: "
- << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus()));
+ LOG_D("Got propose result" <<
+ ", topic tablet: " << event.GetOrigin() <<
+ ", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus()));
- TShardState *state = TopicTabletStates.FindPtr(event.GetOrigin());
+ TShardState *state = ShardStates.FindPtr(event.GetOrigin());
YQL_ENSURE(state);
switch (event.GetStatus()) {
@@ -2028,6 +1998,7 @@ private:
if (auto tabletIds = Request.TopicOperations.GetSendingTabletIds()) {
sendingShardsSet.insert(tabletIds.begin(), tabletIds.end());
+ receivingShardsSet.insert(tabletIds.begin(), tabletIds.end());
}
if (auto tabletIds = Request.TopicOperations.GetReceivingTabletIds()) {
@@ -2276,7 +2247,7 @@ private:
state.DatashardState->ShardReadLocks = Request.TopicOperations.TabletHasReadOperations(tabletId);
- auto result = TopicTabletStates.emplace(tabletId, std::move(state));
+ auto result = ShardStates.emplace(tabletId, std::move(state));
YQL_ENSURE(result.second);
}
}
@@ -2352,7 +2323,6 @@ private:
ui64 TxCoordinator = 0;
THashMap<ui64, TShardState> ShardStates;
- THashMap<ui64, TShardState> TopicTabletStates;
TVector<NKikimrTxDataShard::TLock> Locks;
bool ReadOnlyTx = true;
bool VolatileTx = false;
diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h
index 2c39ed2375b..6caf7858267 100644
--- a/ydb/core/persqueue/events/global.h
+++ b/ydb/core/persqueue/events/global.h
@@ -234,6 +234,9 @@ struct TEvPersQueue {
};
struct TEvCancelTransactionProposal : public TEventPB<TEvCancelTransactionProposal, NKikimrPQ::TEvCancelTransactionProposal, EvCancelTransactionProposal> {
+ explicit TEvCancelTransactionProposal(ui64 txId) {
+ Record.SetTxId(txId);
+ }
};
struct TEvPeriodicTopicStats : public TEventPB<TEvPeriodicTopicStats, NKikimrPQ::TEvPeriodicTopicStats, EvPeriodicTopicStats> {
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 5aa9dc3275d..8e4b8affb6f 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1406,11 +1406,17 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
const TString& consumer = operation.GetConsumer();
if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "Partition " << Partition <<
+ " Consumer '" << consumer << "' has been removed");
predicate = false;
break;
}
if (!UsersInfoStorage->GetIfExists(consumer)) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "Partition " << Partition <<
+ " Unknown consumer '" << consumer << "'");
predicate = false;
break;
}
@@ -1419,13 +1425,28 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
if (operation.GetBegin() > operation.GetEnd()) {
- // BAD_REQUEST
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "Partition " << Partition <<
+ " Consumer '" << consumer << "'" <<
+ " Bad request (invalid range) " <<
+ " Begin " << operation.GetBegin() <<
+ " End " << operation.GetEnd());
predicate = false;
} else if (userInfo.Offset != (i64)operation.GetBegin()) {
- // ABORTED
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "Partition " << Partition <<
+ " Consumer '" << consumer << "'" <<
+ " Bad request (gap) " <<
+ " Offset " << userInfo.Offset <<
+ " Begin " << operation.GetBegin());
predicate = false;
} else if (operation.GetEnd() > EndOffset) {
- // BAD_REQUEST
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "Partition " << Partition <<
+ " Consumer '" << consumer << "'" <<
+ " Bad request (behind the last offset) " <<
+ " EndOffset " << EndOffset <<
+ " End " << operation.GetEnd());
predicate = false;
}
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 228d5077d9c..b285e24565f 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -3,6 +3,7 @@
#include "event_helpers.h"
#include "partition.h"
#include "read.h"
+#include <ydb/core/base/tx_processing.h>
#include <ydb/core/persqueue/config/config.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/protos/pqconfig.pb.h>
@@ -881,6 +882,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
ConfigInited = true;
+ InitProcessingParams(ctx);
InitializeMeteringSink(ctx);
Y_VERIFY(!NewConfigShouldBeApplied);
@@ -2485,6 +2487,8 @@ void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransa
void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx)
{
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvTxProcessing::TEvPlanStep");
+
NKikimrTx::TEvMediatorPlanStep& event = ev->Get()->Record;
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
@@ -2499,6 +2503,8 @@ void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorCont
void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorContext& ctx)
{
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvTxProcessing::TEvReadSet");
+
NKikimrTx::TEvReadSet& event = ev->Get()->Record;
Y_VERIFY(event.HasTxId());
@@ -2525,6 +2531,8 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte
void TPersQueue::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorContext& ctx)
{
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvTxProcessing::TEvReadSetAck");
+
NKikimrTx::TEvReadSetAck& event = ev->Get()->Record;
Y_VERIFY(event.HasTxId());
@@ -2547,6 +2555,14 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC
{
const TEvPQ::TEvTxCalcPredicateResult& event = *ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "Tablet " << TabletID() <<
+ " Handle TEvPQ::TEvTxCalcPredicateResult" <<
+ " Step " << event.Step <<
+ " TxId " << event.TxId <<
+ " Partition " << event.Partition <<
+ " Predicate " << (event.Predicate ? "true" : "false"));
+
auto tx = GetTransaction(ctx, event.TxId);
if (!tx) {
return;
@@ -2579,6 +2595,8 @@ void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const
void TPersQueue::Handle(TEvPQ::TEvTxCommitDone::TPtr& ev, const TActorContext& ctx)
{
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvPQ::TEvTxCommitDone");
+
const TEvPQ::TEvTxCommitDone& event = *ev->Get();
auto tx = GetTransaction(ctx, event.TxId);
@@ -2669,7 +2687,8 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
TDistributedTransaction& tx = Txs[event.GetTxId()];
switch (tx.State) {
case NKikimrPQ::TTransaction::UNKNOWN: {
- tx.OnProposeTransaction(event, GetAllowedStep());
+ tx.OnProposeTransaction(event, GetAllowedStep(),
+ TabletID());
CheckTxState(ctx, tx);
break;
}
@@ -2838,6 +2857,10 @@ void TPersQueue::ScheduleProposeTransactionResult(const TDistributedTransaction&
event->Record.SetMinStep(tx.MinStep);
event->Record.SetMaxStep(tx.MaxStep);
+ if (ProcessingParams) {
+ event->Record.MutableDomainCoordinators()->CopyFrom(ProcessingParams->GetCoordinators());
+ }
+
if (tx.SourceTablet != InvalidTabletId) {
RepliesToPipe.emplace_back(tx.SourceTablet, tx.TxId, std::move(event));
} else {
@@ -2875,14 +2898,16 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx,
Y_VERIFY(data.SerializeToString(&body));
for (ui64 receiverId : tx.Receivers) {
- auto event = std::make_unique<TEvTxProcessing::TEvReadSet>(tx.Step,
- tx.TxId,
- TabletID(),
- receiverId,
- TabletID(),
- body,
- 0);
- SendToPipe(receiverId, tx, std::move(event), ctx);
+ if (receiverId != TabletID()) {
+ auto event = std::make_unique<TEvTxProcessing::TEvReadSet>(tx.Step,
+ tx.TxId,
+ TabletID(),
+ receiverId,
+ TabletID(),
+ body,
+ 0);
+ SendToPipe(receiverId, tx, std::move(event), ctx);
+ }
}
tx.ReadSetAcks.clear();
@@ -3239,6 +3264,15 @@ void TPersQueue::CheckChangedTxStates(const TActorContext& ctx)
}
ChangedTxs.clear();
}
+
+void TPersQueue::InitProcessingParams(const TActorContext& ctx)
+{
+ auto appdata = AppData(ctx);
+ const ui32 domainId = appdata->DomainsInfo->GetDomainUidByTabletId(TabletID());
+ Y_VERIFY(domainId != appdata->DomainsInfo->BadDomainId);
+ const auto& domain = appdata->DomainsInfo->GetDomain(domainId);
+ ProcessingParams = ExtractProcessingParams(domain);
+}
bool TPersQueue::AllTransactionsHaveBeenProcessed() const
{
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index ac21b3834e9..912a9c4f680 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -351,6 +351,10 @@ private:
const THashSet<ui64>& GetBindedTxs(ui64 tabletId);
THashMap<ui64, THashSet<ui64>> BindedTxs;
+
+ void InitProcessingParams(const TActorContext& ctx);
+
+ TMaybe<NKikimrSubDomains::TProcessingParams> ProcessingParams;
};
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp
index 80a428f650a..1454c66de13 100644
--- a/ydb/core/persqueue/transaction.cpp
+++ b/ydb/core/persqueue/transaction.cpp
@@ -92,7 +92,8 @@ void TDistributedTransaction::InitPartitions(const NKikimrPQ::TPQTabletConfig& c
}
void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event,
- ui64 minStep)
+ ui64 minStep,
+ ui64 extractTabletId)
{
Y_VERIFY(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET);
Y_VERIFY(event.GetSourceCase() != NKikimrPQ::TEvProposeTransaction::SOURCE_NOT_SET);
@@ -106,7 +107,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr
case NKikimrPQ::TEvProposeTransaction::kData:
Y_VERIFY(event.HasData());
MaxStep = MinStep + TDuration::Seconds(30).MilliSeconds();
- OnProposeTransaction(event.GetData());
+ OnProposeTransaction(event.GetData(), extractTabletId);
break;
case NKikimrPQ::TEvProposeTransaction::kConfig:
Y_VERIFY(event.HasConfig());
@@ -131,16 +132,21 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr
}
}
-void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody)
+void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody,
+ ui64 extractTabletId)
{
Kind = NKikimrPQ::TTransaction::KIND_DATA;
for (ui64 tablet : txBody.GetSendingShards()) {
- Senders.insert(tablet);
+ if (tablet != extractTabletId) {
+ Senders.insert(tablet);
+ }
}
for (ui64 tablet : txBody.GetReceivingShards()) {
- Receivers.insert(tablet);
+ if (tablet != extractTabletId) {
+ Receivers.insert(tablet);
+ }
}
InitPartitions(txBody.GetOperations());
diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h
index ab58a4b7a5f..8e08f29ad46 100644
--- a/ydb/core/persqueue/transaction.h
+++ b/ydb/core/persqueue/transaction.h
@@ -20,8 +20,10 @@ struct TDistributedTransaction {
explicit TDistributedTransaction(const NKikimrPQ::TTransaction& tx);
void OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event,
- ui64 minStep);
- void OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody);
+ ui64 minStep,
+ ui64 extractTabletId);
+ void OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody,
+ ui64 extractTabletId);
void OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody);
void OnPlanStep(ui64 step);
void OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPredicateResult& event);
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 3dc3dc5bc27..b9b438290f3 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -856,6 +856,7 @@ message TEvProposeTransactionResult {
optional uint64 MaxStep = 6;
optional string StatusMessage = 7;
optional uint64 Step = 8;
+ repeated fixed64 DomainCoordinators = 9;
};
message TEvCancelTransactionProposal {
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
index e309e18a0a9..872b8eb58f9 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
@@ -50,7 +50,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp
)
set_property(
TARGET
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
index bfe8fcae8a5..ca936d0636a 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
@@ -53,7 +53,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp
)
set_property(
TARGET
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
index c105b4ea1a7..7a009a2e332 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
@@ -54,7 +54,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp
)
set_property(
TARGET
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
index ce19fe5c450..7f2067278e5 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
@@ -43,7 +43,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp
)
set_property(
TARGET
diff --git a/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp b/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp
deleted file mode 100644
index 88b3943cd14..00000000000
--- a/ydb/services/persqueue_v1/ut/demo_immediate_tx.cpp
+++ /dev/null
@@ -1,292 +0,0 @@
-#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
-
-#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h>
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
-#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
-#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
-
-#include <ydb/core/protos/services.pb.h>
-
-#include <util/stream/output.h>
-#include <util/string/builder.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-#include "pq_data_writer.h"
-
-namespace NKikimr::NPersQueueTests {
-
-Y_UNIT_TEST_SUITE(ImmediateTx) {
-
-class TImmediateTxFixture : public NUnitTest::TBaseFixture {
-protected:
- void SetUp(NUnitTest::TTestContext&) override;
-
- void CreateTestServer();
- void CreateTopic();
- void CreateTopicTxStub();
-
- NYdb::NTable::TSession CreateSession();
- NYdb::NTable::TTransaction BeginTx(NYdb::NTable::TSession& session);
- void CommitTx(NYdb::NTable::TTransaction& tx, NYdb::EStatus status);
-
- using TTopicReadSession = NYdb::NPersQueue::IReadSession;
- using TTopicReadSessionPtr = std::shared_ptr<TTopicReadSession>;
-
- TTopicReadSessionPtr CreateTopicReadSession(const TString& topic,
- const TString& consumer);
- void Wait_CreatePartitionStreamEvent(TTopicReadSession& reader,
- ui64 committedOffset);
- void Wait_DataReceivedEvent(TTopicReadSession& reader,
- ui64 offset);
-
- void Call_UpdateOffsetsInTransaction(const TString& sessionId,
- const TString& txId,
- const TString& consumer,
- ui64 rangeBegin,
- ui64 rangeEnd);
-
- const TString CONSUMER = "user";
- const TString SHORT_TOPIC_NAME = "demo";
- const TString DC = "dc1";
- const TString FULL_TOPIC_NAME = "rt3." + DC + "--" + SHORT_TOPIC_NAME;
- const TString AUTH_TOKEN = "x-user-x@builtin";
- const TString DATABASE = "/Root";
- const TString TOPIC_PARENT = DATABASE + "/PQ";
- const TString TOPIC_PATH = TOPIC_PARENT + "/" + FULL_TOPIC_NAME;
-
- TMaybe<NPersQueue::TTestServer> Server;
- std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStub;
-};
-
-void TImmediateTxFixture::SetUp(NUnitTest::TTestContext&)
-{
- CreateTestServer();
- CreateTopic();
- CreateTopicTxStub();
-}
-
-void TImmediateTxFixture::CreateTestServer()
-{
- Server.ConstructInPlace(PQSettings(0).SetDomainName("Root").SetEnableTopicServiceTx(true));
-
- Server->EnableLogs({NKikimrServices::FLAT_TX_SCHEMESHARD
- , NKikimrServices::PERSQUEUE});
-}
-
-void TImmediateTxFixture::CreateTopic()
-{
- //
- // создать топик...
- //
- Server->AnnoyingClient->CreateTopicNoLegacy(TOPIC_PATH, 1);
-
- NACLib::TDiffACL acl;
- acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, AUTH_TOKEN);
- Server->AnnoyingClient->ModifyACL(TOPIC_PARENT, FULL_TOPIC_NAME, acl.SerializeAsString());
-
- //
- // ...с несколькими сообщениями
- //
- TPQDataWriter writer("source-id", *Server);
-
- for (ui32 offset = 0; offset < 4; ++offset) {
- writer.Write(TOPIC_PATH, {"data"}, false, AUTH_TOKEN);
- }
-}
-
-void TImmediateTxFixture::CreateTopicTxStub()
-{
- auto channel = grpc::CreateChannel("localhost:" + ToString(Server->GrpcPort), grpc::InsecureChannelCredentials());
- TopicStub = Ydb::Topic::V1::TopicService::NewStub(channel);
-}
-
-NYdb::NTable::TSession TImmediateTxFixture::CreateSession()
-{
- NYdb::TDriverConfig config;
- config.SetEndpoint(TStringBuilder() << "localhost:" << Server->GrpcPort);
- config.SetDatabase(DATABASE);
- config.SetAuthToken(AUTH_TOKEN);
-
- NYdb::TDriver driver(config);
- NYdb::NTable::TClientSettings settings;
- NYdb::NTable::TTableClient client(driver, settings);
-
- auto result = client.CreateSession().ExtractValueSync();
- UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
-
- return result.GetSession();
-}
-
-NYdb::NTable::TTransaction TImmediateTxFixture::BeginTx(NYdb::NTable::TSession& session)
-{
- auto result = session.BeginTransaction().ExtractValueSync();
- UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
-
- return result.GetTransaction();
-}
-
-void TImmediateTxFixture::CommitTx(NYdb::NTable::TTransaction& tx, NYdb::EStatus status)
-{
- auto result = tx.Commit().ExtractValueSync();
- UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
-
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status);
-}
-
-auto TImmediateTxFixture::CreateTopicReadSession(const TString& topic,
- const TString& consumer) -> TTopicReadSessionPtr
-{
- NYdb::NPersQueue::TReadSessionSettings settings;
- settings.AppendTopics(topic);
- settings.ConsumerName(consumer);
- settings.ReadOriginal({DC});
-
- return CreateReader(*Server->AnnoyingClient->GetDriver(), settings);
-}
-
-template<class E>
-E ReadEvent(NYdb::NPersQueue::IReadSession& reader, bool block, size_t count)
-{
- auto msg = reader.GetEvent(block, count);
- UNIT_ASSERT(msg);
-
- auto ev = std::get_if<E>(&*msg);
- UNIT_ASSERT(ev);
-
- return *ev;
-}
-
-void TImmediateTxFixture::Wait_CreatePartitionStreamEvent(TTopicReadSession& reader,
- ui64 committedOffset)
-{
- auto event = ReadEvent<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(reader, true, 1);
- Cerr << "TCreatePartitionStreamEvent: " << event.DebugString() << Endl;
-
- UNIT_ASSERT_VALUES_EQUAL(event.GetCommittedOffset(), committedOffset);
-
- event.Confirm();
-}
-
-void TImmediateTxFixture::Wait_DataReceivedEvent(TTopicReadSession& reader,
- ui64 offset)
-{
- auto event = ReadEvent<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(reader, true, 1);
- Cerr << "TDataReceivedEvent: " << event.DebugString() << Endl;
-
- UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), offset);
-}
-
-void TImmediateTxFixture::Call_UpdateOffsetsInTransaction(const TString& sessionId,
- const TString& txId,
- const TString& consumer,
- ui64 rangeBegin,
- ui64 rangeEnd)
-{
- grpc::ClientContext rcontext;
- rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN);
- rcontext.AddMetadata("x-ydb-database", DATABASE);
-
- Ydb::Topic::UpdateOffsetsInTransactionRequest request;
- Ydb::Topic::UpdateOffsetsInTransactionResponse response;
-
- request.mutable_tx()->set_id(txId);
- request.mutable_tx()->set_session(sessionId);
- request.set_consumer(consumer);
-
- auto *topic = request.mutable_topics()->Add();
- topic->set_path(TOPIC_PATH);
-
- auto *partition = topic->mutable_partitions()->Add();
- partition->set_partition_id(0);
-
- auto *range = partition->mutable_partition_offsets()->Add();
- range->set_start(rangeBegin);
- range->set_end(rangeEnd);
-
- grpc::Status status = TopicStub->UpdateOffsetsInTransaction(&rcontext,
- request,
- &response);
- UNIT_ASSERT(status.ok());
-
- UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
-}
-
-Y_UNIT_TEST_F(Scenario_1, TImmediateTxFixture)
-{
- {
- auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
-
- Wait_CreatePartitionStreamEvent(*reader, 0);
-
- NYdb::NTable::TSession session = CreateSession();
- NYdb::NTable::TTransaction tx = BeginTx(session);
-
- Wait_DataReceivedEvent(*reader, 0);
- Wait_DataReceivedEvent(*reader, 1);
-
- Call_UpdateOffsetsInTransaction(session.GetId(), tx.GetId(), CONSUMER, 0, 2);
-
- CommitTx(tx, NYdb::EStatus::SUCCESS);
- }
-
- {
- auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
-
- Wait_CreatePartitionStreamEvent(*reader, 2);
-
- NYdb::NTable::TSession session = CreateSession();
- NYdb::NTable::TTransaction tx = BeginTx(session);
-
- Wait_DataReceivedEvent(*reader, 2);
- Wait_DataReceivedEvent(*reader, 3);
-
- Call_UpdateOffsetsInTransaction(session.GetId(), tx.GetId(), CONSUMER, 2, 4);
- }
-
- {
- auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
-
- Wait_CreatePartitionStreamEvent(*reader, 2);
- }
-}
-
-Y_UNIT_TEST_F(Scenario_2, TImmediateTxFixture)
-{
- NYdb::NTable::TSession s1 = CreateSession();
- NYdb::NTable::TTransaction t1 = BeginTx(s1);
-
- {
- auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
-
- Wait_CreatePartitionStreamEvent(*reader, 0);
-
- Wait_DataReceivedEvent(*reader, 0);
- Wait_DataReceivedEvent(*reader, 1);
- Wait_DataReceivedEvent(*reader, 2);
-
- Call_UpdateOffsetsInTransaction(s1.GetId(), t1.GetId(), CONSUMER, 0, 3);
- }
-
- NYdb::NTable::TSession s2 = CreateSession();
- NYdb::NTable::TTransaction t2 = BeginTx(s2);
-
- {
- auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
-
- Wait_CreatePartitionStreamEvent(*reader, 0);
-
- Wait_DataReceivedEvent(*reader, 0);
- Wait_DataReceivedEvent(*reader, 1);
-
- Call_UpdateOffsetsInTransaction(s2.GetId(), t2.GetId(), CONSUMER, 0, 2);
- }
-
- CommitTx(t2, NYdb::EStatus::SUCCESS);
- CommitTx(t1, NYdb::EStatus::ABORTED);
-}
-
-}
-
-}
diff --git a/ydb/services/persqueue_v1/ut/demo_tx.cpp b/ydb/services/persqueue_v1/ut/demo_tx.cpp
new file mode 100644
index 00000000000..18ca69792af
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/demo_tx.cpp
@@ -0,0 +1,571 @@
+#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
+
+#include <ydb/core/protos/services.pb.h>
+
+#include <util/stream/output.h>
+#include <util/string/builder.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include "pq_data_writer.h"
+
+namespace NKikimr::NPersQueueTests {
+
+Y_UNIT_TEST_SUITE(DemoTx) {
+
+class TTopicNameConstructor {
+public:
+ TTopicNameConstructor(const TString& dc,
+ const TString& database);
+
+ TString GetTopicParent() const;
+ TString GetTopicPath(const TString& topicName) const;
+ TString GetFullTopicName(const TString& topicName) const;
+
+private:
+ TString Dc;
+ TString Database;
+};
+
+TTopicNameConstructor::TTopicNameConstructor(const TString& dc,
+ const TString& database) :
+ Dc(dc),
+ Database(database)
+{
+}
+
+TString TTopicNameConstructor::GetTopicParent() const
+{
+ return Database + "/PQ";
+}
+
+TString TTopicNameConstructor::GetTopicPath(const TString& topicName) const
+{
+ return GetTopicParent() + "/" + GetFullTopicName(topicName);
+}
+
+TString TTopicNameConstructor::GetFullTopicName(const TString& topicName) const
+{
+ return "rt3." + Dc + "--" + topicName;
+}
+
+class TTxFixture : public NUnitTest::TBaseFixture {
+protected:
+ TTxFixture();
+
+ void SetUp(NUnitTest::TTestContext&) override;
+
+ void CreateTestServer();
+ void CreateTopic(const TString& topicPath);
+ void CreateTopicStub();
+ void CreateTable(const TString& parent, const TString& name);
+
+ NYdb::NTable::TSession CreateSession(const TString& authToken);
+ NYdb::NTable::TTransaction BeginTx(NYdb::NTable::TSession& session);
+ void CommitTx(NYdb::NTable::TTransaction& tx, NYdb::EStatus status);
+
+ using TTopicReadSession = NYdb::NPersQueue::IReadSession;
+ using TTopicReadSessionPtr = std::shared_ptr<TTopicReadSession>;
+
+ TTopicReadSessionPtr CreateTopicReadSession(const TString& topic,
+ const TString& consumer);
+ void Wait_CreatePartitionStreamEvent(TTopicReadSession& reader,
+ ui64 committedOffset);
+ void Wait_DataReceivedEvent(TTopicReadSession& reader,
+ ui64 offset);
+
+ void Call_UpdateOffsetsInTransaction(const NYdb::NTable::TTransaction& tx,
+ const TString& consumer,
+ const TString& topicPath,
+ ui64 rangeBegin,
+ ui64 rangeEnd);
+
+ void ExecSQL(const NYdb::NTable::TTransaction& tx,
+ const TString& query,
+ NYdb::EStatus status);
+
+ void Ensure_In_Table(const TString& tablePath,
+ const THashMap<i64, i64>& values);
+
+ const TString CONSUMER = "user";
+ const TString SHORT_TOPIC_NAME = "demo";
+ const TString DC = "dc1";
+ const TString FULL_TOPIC_NAME = "rt3." + DC + "--" + SHORT_TOPIC_NAME;
+ const TString AUTH_TOKEN = "x-user-x@builtin";
+ const TString DATABASE = "/Root";
+ const TString TOPIC_PARENT = DATABASE + "/PQ";
+ const TString TOPIC_PATH = TOPIC_PARENT + "/" + FULL_TOPIC_NAME;
+
+ TMaybe<NPersQueue::TTestServer> Server;
+ std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStub;
+ TTopicNameConstructor NameCtor;
+};
+
+TTxFixture::TTxFixture() :
+ NameCtor(DC, DATABASE)
+{
+}
+
+void TTxFixture::SetUp(NUnitTest::TTestContext&)
+{
+ CreateTestServer();
+
+ CreateTopic(SHORT_TOPIC_NAME);
+ CreateTopicStub();
+}
+
+void TTxFixture::CreateTestServer()
+{
+ auto settings = PQSettings(0)
+ .SetDomainName("Root")
+ .SetEnableTopicServiceTx(true);
+
+ Server.ConstructInPlace(settings);
+}
+
+void TTxFixture::CreateTopic(const TString& topicName)
+{
+ const TString topicPath = NameCtor.GetTopicPath(topicName);
+ const TString fullTopicName = NameCtor.GetFullTopicName(topicName);
+
+ //
+ // создать топик...
+ //
+ Server->AnnoyingClient->CreateTopicNoLegacy(topicPath, 1);
+
+ NACLib::TDiffACL acl;
+ acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, AUTH_TOKEN);
+ Server->AnnoyingClient->ModifyACL(NameCtor.GetTopicParent(),
+ fullTopicName,
+ acl.SerializeAsString());
+
+ //
+ // ...с несколькими сообщениями
+ //
+ TPQDataWriter writer("source-id", *Server);
+
+ for (ui32 offset = 0; offset < 4; ++offset) {
+ writer.Write(topicPath, {"data"}, false, AUTH_TOKEN);
+ }
+}
+
+void TTxFixture::CreateTopicStub()
+{
+ auto channel = grpc::CreateChannel("localhost:" + ToString(Server->GrpcPort), grpc::InsecureChannelCredentials());
+ TopicStub = Ydb::Topic::V1::TopicService::NewStub(channel);
+}
+
+void TTxFixture::CreateTable(const TString& parent, const TString& name)
+{
+ auto session = CreateSession("");
+
+ auto result = session.ExecuteSchemeQuery(Sprintf(R"___(
+ CREATE TABLE `%s/%s` (
+ key Int64,
+ value Int64,
+ PRIMARY KEY (key)
+ );
+ )___", parent.data(), name.data())).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
+
+ NACLib::TDiffACL acl;
+ acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, AUTH_TOKEN);
+ Server->AnnoyingClient->ModifyACL(parent,
+ name,
+ acl.SerializeAsString());
+
+ Server->AnnoyingClient->ModifyACL("/",
+ "Root",
+ acl.SerializeAsString());
+}
+
+NYdb::NTable::TSession TTxFixture::CreateSession(const TString& authToken)
+{
+ NYdb::TDriverConfig config;
+ config.SetEndpoint(TStringBuilder() << "localhost:" << Server->GrpcPort);
+ config.SetDatabase(DATABASE);
+ if (!authToken.empty()) {
+ config.SetAuthToken(authToken);
+ }
+
+ NYdb::TDriver driver(config);
+ NYdb::NTable::TClientSettings settings;
+ NYdb::NTable::TTableClient client(driver, settings);
+
+ auto result = client.CreateSession().ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+
+ return result.GetSession();
+}
+
+NYdb::NTable::TTransaction TTxFixture::BeginTx(NYdb::NTable::TSession& session)
+{
+ auto result = session.BeginTransaction().ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+
+ return result.GetTransaction();
+}
+
+void TTxFixture::CommitTx(NYdb::NTable::TTransaction& tx, NYdb::EStatus status)
+{
+ auto result = tx.Commit().ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status);
+}
+
+auto TTxFixture::CreateTopicReadSession(const TString& topic,
+ const TString& consumer) -> TTopicReadSessionPtr
+{
+ NYdb::NPersQueue::TReadSessionSettings settings;
+ settings.AppendTopics(topic);
+ settings.ConsumerName(consumer);
+ settings.ReadOriginal({DC});
+
+ return CreateReader(*Server->AnnoyingClient->GetDriver(), settings);
+}
+
+template<class E>
+E ReadEvent(NYdb::NPersQueue::IReadSession& reader, bool block, size_t count)
+{
+ auto msg = reader.GetEvent(block, count);
+ UNIT_ASSERT(msg);
+
+ auto ev = std::get_if<E>(&*msg);
+ UNIT_ASSERT(ev);
+
+ return *ev;
+}
+
+void TTxFixture::Wait_CreatePartitionStreamEvent(TTopicReadSession& reader,
+ ui64 committedOffset)
+{
+ auto event = ReadEvent<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(reader, true, 1);
+ Cerr << "TCreatePartitionStreamEvent: " << event.DebugString() << Endl;
+
+ UNIT_ASSERT_VALUES_EQUAL(event.GetCommittedOffset(), committedOffset);
+
+ event.Confirm();
+}
+
+void TTxFixture::Wait_DataReceivedEvent(TTopicReadSession& reader,
+ ui64 offset)
+{
+ auto event = ReadEvent<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(reader, true, 1);
+ Cerr << "TDataReceivedEvent: " << event.DebugString() << Endl;
+
+ UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), offset);
+}
+
+void TTxFixture::Call_UpdateOffsetsInTransaction(const NYdb::NTable::TTransaction& tx,
+ const TString& consumer,
+ const TString& topicPath,
+ ui64 rangeBegin,
+ ui64 rangeEnd)
+{
+ grpc::ClientContext rcontext;
+ rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN);
+ rcontext.AddMetadata("x-ydb-database", DATABASE);
+
+ Ydb::Topic::UpdateOffsetsInTransactionRequest request;
+ Ydb::Topic::UpdateOffsetsInTransactionResponse response;
+
+ request.mutable_tx()->set_id(tx.GetId());
+ request.mutable_tx()->set_session(tx.GetSession().GetId());
+ request.set_consumer(consumer);
+
+ auto *topic = request.mutable_topics()->Add();
+ topic->set_path(topicPath);
+
+ auto *partition = topic->mutable_partitions()->Add();
+ partition->set_partition_id(0);
+
+ auto *range = partition->mutable_partition_offsets()->Add();
+ range->set_start(rangeBegin);
+ range->set_end(rangeEnd);
+
+ grpc::Status status = TopicStub->UpdateOffsetsInTransaction(&rcontext,
+ request,
+ &response);
+ UNIT_ASSERT(status.ok());
+
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+}
+
+void TTxFixture::ExecSQL(const NYdb::NTable::TTransaction& tx,
+ const TString& query,
+ NYdb::EStatus status)
+{
+ auto result =
+ tx.GetSession().ExecuteDataQuery(query,
+ NYdb::NTable::TTxControl::Tx(tx)).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status);
+}
+
+void TTxFixture::Ensure_In_Table(const TString& tablePath,
+ const THashMap<i64, i64>& values)
+{
+ auto query = Sprintf(R"___(
+ SELECT key, value FROM `%s`
+ )___", tablePath.data());
+
+ auto session = CreateSession("");
+
+ auto result = session.ExecuteDataQuery(query,
+ NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
+
+ auto rs = result.GetResultSetParser(0);
+ UNIT_ASSERT_VALUES_EQUAL(rs.RowsCount(), values.size());
+
+ for (size_t i = 0; i < values.size(); ++i) {
+ UNIT_ASSERT(rs.TryNextRow());
+
+ auto key = rs.ColumnParser("key").GetOptionalInt64();
+ UNIT_ASSERT(key.Defined());
+
+ auto value = rs.ColumnParser("value").GetOptionalInt64();
+ UNIT_ASSERT(value.Defined());
+
+ auto p = values.find(*key);
+ UNIT_ASSERT(p != values.end());
+
+ UNIT_ASSERT_VALUES_EQUAL(*value, p->second);
+ }
+}
+
+Y_UNIT_TEST_F(Scenario_1, TTxFixture)
+{
+ {
+ auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader, 0);
+
+ NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN);
+ NYdb::NTable::TTransaction tx = BeginTx(session);
+
+ Wait_DataReceivedEvent(*reader, 0);
+ Wait_DataReceivedEvent(*reader, 1);
+
+ Call_UpdateOffsetsInTransaction(tx, CONSUMER, TOPIC_PATH, 0, 2);
+
+ CommitTx(tx, NYdb::EStatus::SUCCESS);
+ }
+
+ {
+ auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader, 2);
+
+ NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN);
+ NYdb::NTable::TTransaction tx = BeginTx(session);
+
+ Wait_DataReceivedEvent(*reader, 2);
+ Wait_DataReceivedEvent(*reader, 3);
+
+ Call_UpdateOffsetsInTransaction(tx, CONSUMER, TOPIC_PATH, 2, 4);
+ }
+
+ {
+ auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader, 2);
+ }
+}
+
+Y_UNIT_TEST_F(Scenario_2, TTxFixture)
+{
+ NYdb::NTable::TSession s1 = CreateSession(AUTH_TOKEN);
+ NYdb::NTable::TTransaction t1 = BeginTx(s1);
+
+ {
+ auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader, 0);
+
+ Wait_DataReceivedEvent(*reader, 0);
+ Wait_DataReceivedEvent(*reader, 1);
+ Wait_DataReceivedEvent(*reader, 2);
+
+ Call_UpdateOffsetsInTransaction(t1, CONSUMER, TOPIC_PATH, 0, 3);
+ }
+
+ NYdb::NTable::TSession s2 = CreateSession(AUTH_TOKEN);
+ NYdb::NTable::TTransaction t2 = BeginTx(s2);
+
+ {
+ auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader, 0);
+
+ Wait_DataReceivedEvent(*reader, 0);
+ Wait_DataReceivedEvent(*reader, 1);
+
+ Call_UpdateOffsetsInTransaction(t2, CONSUMER, TOPIC_PATH, 0, 2);
+ }
+
+ CommitTx(t2, NYdb::EStatus::SUCCESS);
+ CommitTx(t1, NYdb::EStatus::ABORTED);
+}
+
+Y_UNIT_TEST_F(Scenario_3, TTxFixture)
+{
+ CreateTopic("demo_1");
+ CreateTopic("demo_2");
+
+ {
+ auto reader_1 = CreateTopicReadSession("demo_1", CONSUMER);
+ auto reader_2 = CreateTopicReadSession("demo_2", CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader_1, 0);
+ Wait_CreatePartitionStreamEvent(*reader_2, 0);
+
+ NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN);
+ NYdb::NTable::TTransaction tx = BeginTx(session);
+
+ Wait_DataReceivedEvent(*reader_1, 0);
+ Wait_DataReceivedEvent(*reader_1, 1);
+
+ Wait_DataReceivedEvent(*reader_2, 0);
+
+ Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo_1"), 0, 2);
+ Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo_2"), 0, 1);
+
+ CommitTx(tx, NYdb::EStatus::SUCCESS);
+ }
+
+ {
+ auto reader_1 = CreateTopicReadSession("demo_1", CONSUMER);
+ auto reader_2 = CreateTopicReadSession("demo_2", CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader_1, 2);
+ Wait_CreatePartitionStreamEvent(*reader_2, 1);
+
+ NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN);
+ NYdb::NTable::TTransaction tx = BeginTx(session);
+
+ Wait_DataReceivedEvent(*reader_1, 2);
+
+ Wait_DataReceivedEvent(*reader_2, 1);
+ Wait_DataReceivedEvent(*reader_2, 2);
+
+ Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo_1"), 2, 3);
+ Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo_2"), 1, 3);
+ }
+
+ {
+ auto reader_1 = CreateTopicReadSession("demo_1", CONSUMER);
+ auto reader_2 = CreateTopicReadSession("demo_2", CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader_1, 2);
+ Wait_CreatePartitionStreamEvent(*reader_2, 1);
+ }
+}
+
+Y_UNIT_TEST_F(Scenario_4, TTxFixture)
+{
+ CreateTable("/Root", "table");
+
+ {
+ auto reader = CreateTopicReadSession("demo", CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader, 0);
+
+ NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN);
+ NYdb::NTable::TTransaction tx = BeginTx(session);
+
+ ExecSQL(tx, "SELECT key, value FROM `/Root/table` WHERE key = 1;",
+ NYdb::EStatus::SUCCESS);
+
+ Wait_DataReceivedEvent(*reader, 0);
+ Wait_DataReceivedEvent(*reader, 1);
+
+ ExecSQL(tx, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 1);",
+ NYdb::EStatus::SUCCESS);
+
+ Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo"), 0, 2);
+
+ CommitTx(tx, NYdb::EStatus::SUCCESS);
+ }
+
+ {
+ Ensure_In_Table("/Root/table", {{1, 1}});
+
+ auto reader = CreateTopicReadSession("demo", CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader, 2);
+
+ NYdb::NTable::TSession session = CreateSession(AUTH_TOKEN);
+ NYdb::NTable::TTransaction tx = BeginTx(session);
+
+ Wait_DataReceivedEvent(*reader, 2);
+
+ ExecSQL(tx, "UPSERT INTO `/Root/table` (key, value) VALUES (2, 2);",
+ NYdb::EStatus::SUCCESS);
+
+ Call_UpdateOffsetsInTransaction(tx, CONSUMER, NameCtor.GetTopicPath("demo"), 2, 3);
+ }
+
+ {
+ Ensure_In_Table("/Root/table", {{1, 1}});
+
+ auto reader = CreateTopicReadSession("demo", CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader, 2);
+ }
+}
+
+Y_UNIT_TEST_F(Scenario_5, TTxFixture)
+{
+ CreateTable("/Root", "table");
+
+ NYdb::NTable::TSession s1 = CreateSession(AUTH_TOKEN);
+ NYdb::NTable::TTransaction t1 = BeginTx(s1);
+
+ {
+ auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader, 0);
+
+ Wait_DataReceivedEvent(*reader, 0);
+ Wait_DataReceivedEvent(*reader, 1);
+ Wait_DataReceivedEvent(*reader, 2);
+
+ ExecSQL(t1, "SELECT key, value FROM `/Root/table` WHERE key = 1;",
+ NYdb::EStatus::SUCCESS);
+
+ Call_UpdateOffsetsInTransaction(t1, CONSUMER, TOPIC_PATH, 0, 3);
+ }
+
+ NYdb::NTable::TSession s2 = CreateSession(AUTH_TOKEN);
+ NYdb::NTable::TTransaction t2 = BeginTx(s2);
+
+ {
+ auto reader = CreateTopicReadSession(SHORT_TOPIC_NAME, CONSUMER);
+
+ Wait_CreatePartitionStreamEvent(*reader, 0);
+
+ Wait_DataReceivedEvent(*reader, 0);
+ Wait_DataReceivedEvent(*reader, 1);
+
+ Call_UpdateOffsetsInTransaction(t2, CONSUMER, TOPIC_PATH, 0, 2);
+ }
+
+ CommitTx(t2, NYdb::EStatus::SUCCESS);
+ CommitTx(t1, NYdb::EStatus::ABORTED);
+}
+
+}
+
+}
diff --git a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
index 6388fc8fbe4..16882a8c62f 100644
--- a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
+++ b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
@@ -409,7 +409,7 @@ Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TUpdateOffsetsInTransacti
auto result = tx->Commit().ExtractValueSync();
Cerr << ">>> CommitTx >>>" << Endl;
UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::INTERNAL_ERROR);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED);
}
}