aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-10-13 13:23:15 +0300
committerabcdef <akotov@ydb.tech>2023-10-13 14:32:59 +0300
commit1571ff1ca97f39cb416d0224d0656d5bcff01f1b (patch)
tree672815d6a3805ea0a9339a5416ed01e0663a1822
parentcf4d301c44398f63b387618472918acfff0dc87c (diff)
downloadydb-1571ff1ca97f39cb416d0224d0656d5bcff01f1b.tar.gz
transaction implementation errors in PQ
- удаляются транзакции с истёкшим сроком - удаляются транзакции по `TEvCancelTransactionProposal` - подписка на MediatorTimecast
-rw-r--r--ydb/core/kqp/topics/kqp_topics.cpp22
-rw-r--r--ydb/core/kqp/topics/kqp_topics.h3
-rw-r--r--ydb/core/persqueue/events/global.h2
-rw-r--r--ydb/core/persqueue/pq_impl.cpp159
-rw-r--r--ydb/core/persqueue/pq_impl.h23
-rw-r--r--ydb/core/persqueue/transaction.cpp2
-rw-r--r--ydb/core/persqueue/transaction.h2
-rw-r--r--ydb/core/persqueue/ut/pqtablet_ut.cpp80
-rw-r--r--ydb/core/protos/pqconfig.proto1
9 files changed, 240 insertions, 54 deletions
diff --git a/ydb/core/kqp/topics/kqp_topics.cpp b/ydb/core/kqp/topics/kqp_topics.cpp
index 17cfec45f2..d9e01db375 100644
--- a/ydb/core/kqp/topics/kqp_topics.cpp
+++ b/ydb/core/kqp/topics/kqp_topics.cpp
@@ -111,8 +111,6 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra
o->SetPartitionId(*Partition_);
o->SetPath(*Topic_);
}
-
- tx.MutableCoordinators()->Add(Coordinators_.begin(), Coordinators_.end());
}
void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs)
@@ -132,20 +130,6 @@ void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs)
}
HasWriteOperations_ |= rhs.HasWriteOperations_;
-
- // If the list of coordinators is empty, then we use a ready-made one.
- // Otherwise, we leave the common elements
- if (Coordinators_.empty()) {
- Coordinators_ = rhs.Coordinators_;
- } else {
- for (auto iter = Coordinators_.begin(); iter != Coordinators_.end(); ) {
- if (rhs.Coordinators_.contains(*iter)) {
- ++iter;
- } else {
- iter = Coordinators_.erase(iter);
- }
- }
- }
}
ui64 TTopicPartitionOperations::GetTabletId() const
@@ -172,11 +156,6 @@ bool TTopicPartitionOperations::HasWriteOperations() const
return HasWriteOperations_;
}
-void TTopicPartitionOperations::SetCoordinators(const TVector<ui64>& coordinators)
-{
- Coordinators_ = std::unordered_set<ui64>(coordinators.begin(), coordinators.end());
-}
-
//
// TTopicPartition
//
@@ -344,7 +323,6 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
<< ", " << partition.GetTabletId());
p->second.SetTabletId(partition.GetTabletId());
- p->second.SetCoordinators(result.DomainInfo->Coordinators.List());
}
}
} else {
diff --git a/ydb/core/kqp/topics/kqp_topics.h b/ydb/core/kqp/topics/kqp_topics.h
index 9b7ace263b..4d0164d512 100644
--- a/ydb/core/kqp/topics/kqp_topics.h
+++ b/ydb/core/kqp/topics/kqp_topics.h
@@ -61,15 +61,12 @@ public:
bool HasReadOperations() const;
bool HasWriteOperations() const;
- void SetCoordinators(const TVector<ui64>& coordinators);
-
private:
TMaybe<TString> Topic_;
TMaybe<ui32> Partition_;
THashMap<TString, TConsumerOperations> Operations_;
bool HasWriteOperations_ = false;
TMaybe<ui64> TabletId_;
- std::unordered_set<ui64> Coordinators_;
};
struct TTopicPartition {
diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h
index e7b2a53c9d..1e2e28befb 100644
--- a/ydb/core/persqueue/events/global.h
+++ b/ydb/core/persqueue/events/global.h
@@ -257,6 +257,8 @@ struct TEvPersQueue {
};
struct TEvCancelTransactionProposal : public TEventPB<TEvCancelTransactionProposal, NKikimrPQ::TEvCancelTransactionProposal, EvCancelTransactionProposal> {
+ TEvCancelTransactionProposal() = default;
+
explicit TEvCancelTransactionProposal(ui64 txId) {
Record.SetTxId(txId);
}
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 2b7d5788fc..6d3508fa49 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -868,7 +868,6 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
ConfigInited = true;
- InitProcessingParams(ctx);
InitializeMeteringSink(ctx);
Y_ABORT_UNLESS(!NewConfigShouldBeApplied);
@@ -2344,6 +2343,10 @@ void TPersQueue::HandleDie(const TActorContext& ctx)
Y_ABORT_UNLESS(res);
}
ResponseProxy.clear();
+
+ StopWatchingTenantPathId(ctx);
+ MediatorTimeCastUnregisterTablet(ctx);
+
NKeyValue::TKeyValueFlat::HandleDie(ctx);
}
@@ -2379,10 +2382,55 @@ TPersQueue::TPersQueue(const TActorId& tablet, TTabletStorageInfo *info)
void TPersQueue::CreatedHook(const TActorContext& ctx)
{
-
IsServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe
ctx.Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(ctx.SelfID.NodeId()));
+ InitProcessingParams(ctx);
+}
+
+void TPersQueue::StartWatchingTenantPathId(const TActorContext& ctx)
+{
+ ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(Info()->TenantPathId));
+}
+
+void TPersQueue::StopWatchingTenantPathId(const TActorContext& ctx)
+{
+ ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove());
+}
+
+void TPersQueue::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx)
+{
+ const auto& result = ev->Get()->Result;
+ ProcessingParams = result->GetPathDescription().GetDomainDescription().GetProcessingParams();
+
+ InitMediatorTimeCast(ctx);
+}
+
+void TPersQueue::MediatorTimeCastRegisterTablet(const TActorContext& ctx)
+{
+ Y_ABORT_UNLESS(ProcessingParams);
+ ctx.Send(MakeMediatorTimecastProxyID(),
+ new TEvMediatorTimecast::TEvRegisterTablet(TabletID(), *ProcessingParams));
+}
+
+void TPersQueue::MediatorTimeCastUnregisterTablet(const TActorContext& ctx)
+{
+ ctx.Send(MakeMediatorTimecastProxyID(),
+ new TEvMediatorTimecast::TEvUnregisterTablet(TabletID()));
+}
+
+void TPersQueue::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx)
+{
+ const auto* message = ev->Get();
+ Y_ABORT_UNLESS(message->TabletId == TabletID());
+
+ MediatorTimeCastEntry = message->Entry;
+ Y_ABORT_UNLESS(MediatorTimeCastEntry);
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() <<
+ "Registered with mediator time cast");
+
+ TryWriteTxs(ctx);
}
void TPersQueue::Handle(TEvInterconnect::TEvNodeInfo::TPtr& ev, const TActorContext& ctx)
@@ -2422,9 +2470,43 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) {
AggregateAndSendLabeledCountersFor(g, ctx);
}
MeteringSink.MayFlush(ctx.Now());
+ DeleteExpiredTransactions(ctx);
ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup());
}
+void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx)
+{
+ if (!MediatorTimeCastEntry) {
+ return;
+ }
+
+ ui64 step = MediatorTimeCastEntry->Get(TabletID());
+
+ for (auto& [txId, tx] : Txs) {
+ if ((tx.MaxStep < step) && (tx.State <= NKikimrPQ::TTransaction::PREPARED)) {
+ DeleteTx(tx);
+ }
+ }
+
+ TryWriteTxs(ctx);
+}
+
+void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx)
+{
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvPersQueue::TEvCancelTransactionProposal");
+
+ NKikimrPQ::TEvCancelTransactionProposal& event = ev->Get()->Record;
+ Y_ABORT_UNLESS(event.HasTxId());
+
+ if (auto tx = GetTransaction(ctx, event.GetTxId()); tx) {
+ Y_ABORT_UNLESS(tx->State <= NKikimrPQ::TTransaction::PREPARED);
+
+ DeleteTx(*tx);
+
+ TryWriteTxs(ctx);
+ }
+}
+
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
{
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvPersQueue::TEvProposeTransaction");
@@ -2639,11 +2721,34 @@ void TPersQueue::Handle(TEvPQ::TEvTxCommitDone::TPtr& ev, const TActorContext& c
TryWriteTxs(ctx);
}
+bool TPersQueue::CanProcessProposeTransactionQueue() const
+{
+ return
+ !EvProposeTransactionQueue.empty()
+ && ProcessingParams
+ && (!UseMediatorTimeCast || MediatorTimeCastEntry);
+}
+
+bool TPersQueue::CanProcessPlanStepQueue() const
+{
+ return !EvPlanStepQueue.empty();
+}
+
+bool TPersQueue::CanProcessWriteTxs() const
+{
+ return !WriteTxs.empty();
+}
+
+bool TPersQueue::CanProcessDeleteTxs() const
+{
+ return !DeleteTxs.empty();
+}
+
void TPersQueue::BeginWriteTxs(const TActorContext& ctx)
{
Y_ABORT_UNLESS(!WriteTxsInProgress);
- if (EvProposeTransactionQueue.empty() && EvPlanStepQueue.empty() && WriteTxs.empty() && DeleteTxs.empty()) {
+ if (!CanProcessProposeTransactionQueue() && !CanProcessPlanStepQueue() && !CanProcessWriteTxs() && !CanProcessDeleteTxs()) {
return;
}
@@ -2704,7 +2809,9 @@ void TPersQueue::TryWriteTxs(const TActorContext& ctx)
void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
{
- while (!EvProposeTransactionQueue.empty()) {
+ Y_ABORT_UNLESS(!WriteTxsInProgress);
+
+ while (CanProcessProposeTransactionQueue()) {
const auto front = std::move(EvProposeTransactionQueue.front());
EvProposeTransactionQueue.pop_front();
@@ -2736,7 +2843,7 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
{
Y_ABORT_UNLESS(!WriteTxsInProgress);
- while (!EvPlanStepQueue.empty()) {
+ while (CanProcessPlanStepQueue()) {
const auto front = std::move(EvPlanStepQueue.front());
EvPlanStepQueue.pop_front();
@@ -2765,6 +2872,8 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
if (auto p = Txs.find(txId); p != Txs.end()) {
TDistributedTransaction& tx = p->second;
+ Y_ABORT_UNLESS(tx.MaxStep >= step);
+
if (tx.Step == Max<ui64>()) {
Y_ABORT_UNLESS(TxQueue.empty() || (TxQueue.back() < std::make_pair(step, txId)));
@@ -2879,11 +2988,7 @@ void TPersQueue::ScheduleProposeTransactionResult(const TDistributedTransaction&
event->Record.SetMinStep(tx.MinStep);
event->Record.SetMaxStep(tx.MaxStep);
- // If you have sent a list of preferred coordinators, then we will use them.
- // Otherwise, we take the list of coordinators from the domain description
- if (tx.Coordinators) {
- event->Record.MutableDomainCoordinators()->Add(tx.Coordinators.begin(), tx.Coordinators.end());
- } else if (ProcessingParams) {
+ if (ProcessingParams) {
event->Record.MutableDomainCoordinators()->CopyFrom(ProcessingParams->GetCoordinators());
}
@@ -3269,14 +3374,37 @@ void TPersQueue::CheckChangedTxStates(const TActorContext& ctx)
}
ChangedTxs.clear();
}
-
+
void TPersQueue::InitProcessingParams(const TActorContext& ctx)
{
+ if (Info()->TenantPathId) {
+ UseMediatorTimeCast = true;
+ StartWatchingTenantPathId(ctx);
+ return;
+ }
+
auto appdata = AppData(ctx);
const ui32 domainId = appdata->DomainsInfo->GetDomainUidByTabletId(TabletID());
Y_ABORT_UNLESS(domainId != appdata->DomainsInfo->BadDomainId);
+
const auto& domain = appdata->DomainsInfo->GetDomain(domainId);
ProcessingParams = ExtractProcessingParams(domain);
+
+ InitMediatorTimeCast(ctx);
+}
+
+void TPersQueue::InitMediatorTimeCast(const TActorContext& ctx)
+{
+ Y_ABORT_UNLESS(ProcessingParams);
+
+ if (ProcessingParams->MediatorsSize()) {
+ UseMediatorTimeCast = true;
+ MediatorTimeCastRegisterTablet(ctx);
+ } else {
+ UseMediatorTimeCast = false;
+
+ TryWriteTxs(ctx);
+ }
}
bool TPersQueue::AllTransactionsHaveBeenProcessed() const
@@ -3439,10 +3567,8 @@ void TPersQueue::OnInitComplete(const TActorContext& ctx)
ui64 TPersQueue::GetAllowedStep() const
{
- //
- // TODO(abcdef): использовать MediatorTimeCastEntry
- //
- return Max(LastStep + 1, TAppData::TimeProvider->Now().MilliSeconds());
+ return Max(LastStep + 1,
+ MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : TAppData::TimeProvider->Now().MilliSeconds());
}
NTabletPipe::TClientConfig TPersQueue::GetPipeClientConfig()
@@ -3528,6 +3654,9 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
HFuncTraced(TEvPQ::TEvTxCommitDone, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
HFuncTraced(TEvPersQueue::TEvProposeTransactionAttach, Handle);
+ HFuncTraced(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
+ HFuncTraced(TEvPersQueue::TEvCancelTransactionProposal, Handle);
+ HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
default:
return false;
}
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 24e1be3ccc..589403c64c 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -9,6 +9,8 @@
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/persqueue/events/internal.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/tx/time_cast/time_cast.h>
#include <ydb/core/tx/tx_processing.h>
#include <library/cpp/actors/interconnect/interconnect.h>
@@ -347,10 +349,31 @@ private:
THashMap<ui64, THashSet<ui64>> BindedTxs;
void InitProcessingParams(const TActorContext& ctx);
+ void InitMediatorTimeCast(const TActorContext& ctx);
TMaybe<NKikimrSubDomains::TProcessingParams> ProcessingParams;
void Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr& ev, const TActorContext& ctx);
+
+ void StartWatchingTenantPathId(const TActorContext& ctx);
+ void StopWatchingTenantPathId(const TActorContext& ctx);
+ void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);
+
+ void MediatorTimeCastRegisterTablet(const TActorContext& ctx);
+ void MediatorTimeCastUnregisterTablet(const TActorContext& ctx);
+ void Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx);
+
+ TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry;
+
+ void DeleteExpiredTransactions(const TActorContext& ctx);
+ void Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx);
+
+ bool CanProcessProposeTransactionQueue() const;
+ bool CanProcessPlanStepQueue() const;
+ bool CanProcessWriteTxs() const;
+ bool CanProcessDeleteTxs() const;
+
+ bool UseMediatorTimeCast = true;
};
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp
index da670066ab..90ef0703b6 100644
--- a/ydb/core/persqueue/transaction.cpp
+++ b/ydb/core/persqueue/transaction.cpp
@@ -136,8 +136,6 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac
PartitionRepliesExpected = 0;
ReadSetCount = 0;
-
- Coordinators.assign(txBody.GetCoordinators().begin(), txBody.GetCoordinators().end());
}
void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody)
diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h
index 1650d33a02..3c79b81867 100644
--- a/ydb/core/persqueue/transaction.h
+++ b/ydb/core/persqueue/transaction.h
@@ -106,8 +106,6 @@ struct TDistributedTransaction {
void BindMsgToPipe(ui64 tabletId, const IEventBase& event);
void UnbindMsgsFromPipe(ui64 tabletId);
const TVector<TSerializedMessage>& GetBindedMsgs(ui64 tabletId);
-
- TVector<ui64> Coordinators;
};
}
diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp
index 81bf7974bc..8383cca1f4 100644
--- a/ydb/core/persqueue/ut/pqtablet_ut.cpp
+++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp
@@ -65,6 +65,10 @@ struct TDropTabletParams {
ui64 TxId = 0;
};
+struct TCancelTransactionProposalParams {
+ ui64 TxId = 0;
+};
+
using NKikimr::NPQ::NHelpers::CreatePQTabletMock;
using TPQTabletMock = NKikimr::NPQ::NHelpers::TPQTabletMock;
@@ -131,6 +135,7 @@ protected:
using TPlanStepParams = NHelpers::TPlanStepParams;
using TReadSetParams = NHelpers::TReadSetParams;
using TDropTabletParams = NHelpers::TDropTabletParams;
+ using TCancelTransactionProposalParams = NHelpers::TCancelTransactionProposalParams;
void SetUp(NUnitTest::TTestContext&) override;
void TearDown(NUnitTest::TTestContext&) override;
@@ -158,11 +163,20 @@ protected:
void StartPQWriteStateObserver();
void WaitForPQWriteState();
+ void SendCancelTransactionProposal(const TCancelTransactionProposalParams& params);
+
+ void StartPQWriteTxsObserver();
+ void WaitForPQWriteTxs();
+
void WaitForCalcPredicateResult();
void TestWaitingForTEvReadSet(size_t senders, size_t receivers);
+ void StartPQWriteObserver(bool& flag, unsigned cookie);
+ void WaitForPQWriteComplete(bool& flag);
+
bool FoundPQWriteState = false;
+ bool FoundPQWriteTxs = false;
//
// TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
@@ -465,16 +479,15 @@ void TPQTabletFixture::WaitForCalcPredicateResult()
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
}
-void TPQTabletFixture::StartPQWriteStateObserver()
+void TPQTabletFixture::StartPQWriteObserver(bool& flag, unsigned cookie)
{
- FoundPQWriteState = false;
-
- auto observer = [this](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
+ flag = false;
+ auto observer = [&flag, cookie](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
if (auto* kvResponse = event->CastAsLocal<TEvKeyValue::TEvResponse>()) {
if ((event->Sender == event->Recipient) &&
kvResponse->Record.HasCookie() &&
- (kvResponse->Record.GetCookie() == 4)) { // TPersQueue::WRITE_STATE_COOKIE
- FoundPQWriteState = true;
+ (kvResponse->Record.GetCookie() == cookie)) {
+ flag = true;
}
}
@@ -483,15 +496,43 @@ void TPQTabletFixture::StartPQWriteStateObserver()
Ctx->Runtime->SetObserverFunc(observer);
}
-void TPQTabletFixture::WaitForPQWriteState()
+void TPQTabletFixture::WaitForPQWriteComplete(bool& flag)
{
TDispatchOptions options;
- options.CustomFinalCondition = [this]() {
- return FoundPQWriteState;
+ options.CustomFinalCondition = [&flag]() {
+ return flag;
};
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
}
+void TPQTabletFixture::StartPQWriteStateObserver()
+{
+ StartPQWriteObserver(FoundPQWriteState, 4); // TPersQueue::WRITE_STATE_COOKIE
+}
+
+void TPQTabletFixture::WaitForPQWriteState()
+{
+ WaitForPQWriteComplete(FoundPQWriteState);
+}
+
+void TPQTabletFixture::SendCancelTransactionProposal(const TCancelTransactionProposalParams& params)
+{
+ auto event = MakeHolder<TEvPersQueue::TEvCancelTransactionProposal>(params.TxId);
+
+ SendToPipe(Ctx->Edge,
+ event.Release());
+}
+
+void TPQTabletFixture::StartPQWriteTxsObserver()
+{
+ StartPQWriteObserver(FoundPQWriteTxs, 5); // TPersQueue::WRITE_TX_COOKIE
+}
+
+void TPQTabletFixture::WaitForPQWriteTxs()
+{
+ WaitForPQWriteComplete(FoundPQWriteTxs);
+}
+
NHelpers::TPQTabletMock* TPQTabletFixture::CreatePQTabletMock(ui64 tabletId)
{
NHelpers::TPQTabletMock* mock = nullptr;
@@ -961,6 +1002,27 @@ Y_UNIT_TEST_F(TEvReadSet_comes_before_TEvPlanStep, TPQTabletFixture)
WaitPlanStepAccepted({.Step=100});
}
+Y_UNIT_TEST_F(Cancel_Tx, TPQTabletFixture)
+{
+ PQTabletPrepare({.partitions=1}, {}, *Ctx);
+
+ const ui64 txId = 67890;
+
+ SendProposeTransactionRequest({.TxId=txId,
+ .Senders={22222}, .Receivers={22222},
+ .TxOps={
+ {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
+ }});
+ WaitProposeTransactionResponse({.TxId=txId,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ StartPQWriteTxsObserver();
+
+ SendCancelTransactionProposal({.TxId=txId});
+
+ WaitForPQWriteTxs();
+}
+
}
}
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 20db0f8c05..6c8944426f 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -869,7 +869,6 @@ message TDataTransaction {
repeated uint64 ReceivingShards = 4;
optional bool Immediate = 5;
optional fixed64 WriteId = 6;
- repeated uint64 Coordinators = 7;
}
message TConfigTransaction {