diff options
author | abcdef <akotov@ydb.tech> | 2023-10-13 13:23:15 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-10-13 14:32:59 +0300 |
commit | 1571ff1ca97f39cb416d0224d0656d5bcff01f1b (patch) | |
tree | 672815d6a3805ea0a9339a5416ed01e0663a1822 | |
parent | cf4d301c44398f63b387618472918acfff0dc87c (diff) | |
download | ydb-1571ff1ca97f39cb416d0224d0656d5bcff01f1b.tar.gz |
transaction implementation errors in PQ
- удаляются транзакции с истёкшим сроком
- удаляются транзакции по `TEvCancelTransactionProposal`
- подписка на MediatorTimecast
-rw-r--r-- | ydb/core/kqp/topics/kqp_topics.cpp | 22 | ||||
-rw-r--r-- | ydb/core/kqp/topics/kqp_topics.h | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/events/global.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 159 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.h | 23 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_ut.cpp | 80 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 1 |
9 files changed, 240 insertions, 54 deletions
diff --git a/ydb/core/kqp/topics/kqp_topics.cpp b/ydb/core/kqp/topics/kqp_topics.cpp index 17cfec45f2..d9e01db375 100644 --- a/ydb/core/kqp/topics/kqp_topics.cpp +++ b/ydb/core/kqp/topics/kqp_topics.cpp @@ -111,8 +111,6 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra o->SetPartitionId(*Partition_); o->SetPath(*Topic_); } - - tx.MutableCoordinators()->Add(Coordinators_.begin(), Coordinators_.end()); } void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs) @@ -132,20 +130,6 @@ void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs) } HasWriteOperations_ |= rhs.HasWriteOperations_; - - // If the list of coordinators is empty, then we use a ready-made one. - // Otherwise, we leave the common elements - if (Coordinators_.empty()) { - Coordinators_ = rhs.Coordinators_; - } else { - for (auto iter = Coordinators_.begin(); iter != Coordinators_.end(); ) { - if (rhs.Coordinators_.contains(*iter)) { - ++iter; - } else { - iter = Coordinators_.erase(iter); - } - } - } } ui64 TTopicPartitionOperations::GetTabletId() const @@ -172,11 +156,6 @@ bool TTopicPartitionOperations::HasWriteOperations() const return HasWriteOperations_; } -void TTopicPartitionOperations::SetCoordinators(const TVector<ui64>& coordinators) -{ - Coordinators_ = std::unordered_set<ui64>(coordinators.begin(), coordinators.end()); -} - // // TTopicPartition // @@ -344,7 +323,6 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac << ", " << partition.GetTabletId()); p->second.SetTabletId(partition.GetTabletId()); - p->second.SetCoordinators(result.DomainInfo->Coordinators.List()); } } } else { diff --git a/ydb/core/kqp/topics/kqp_topics.h b/ydb/core/kqp/topics/kqp_topics.h index 9b7ace263b..4d0164d512 100644 --- a/ydb/core/kqp/topics/kqp_topics.h +++ b/ydb/core/kqp/topics/kqp_topics.h @@ -61,15 +61,12 @@ public: bool HasReadOperations() const; bool HasWriteOperations() const; - void SetCoordinators(const TVector<ui64>& coordinators); - private: TMaybe<TString> Topic_; TMaybe<ui32> Partition_; THashMap<TString, TConsumerOperations> Operations_; bool HasWriteOperations_ = false; TMaybe<ui64> TabletId_; - std::unordered_set<ui64> Coordinators_; }; struct TTopicPartition { diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h index e7b2a53c9d..1e2e28befb 100644 --- a/ydb/core/persqueue/events/global.h +++ b/ydb/core/persqueue/events/global.h @@ -257,6 +257,8 @@ struct TEvPersQueue { }; struct TEvCancelTransactionProposal : public TEventPB<TEvCancelTransactionProposal, NKikimrPQ::TEvCancelTransactionProposal, EvCancelTransactionProposal> { + TEvCancelTransactionProposal() = default; + explicit TEvCancelTransactionProposal(ui64 txId) { Record.SetTxId(txId); } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 2b7d5788fc..6d3508fa49 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -868,7 +868,6 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& ConfigInited = true; - InitProcessingParams(ctx); InitializeMeteringSink(ctx); Y_ABORT_UNLESS(!NewConfigShouldBeApplied); @@ -2344,6 +2343,10 @@ void TPersQueue::HandleDie(const TActorContext& ctx) Y_ABORT_UNLESS(res); } ResponseProxy.clear(); + + StopWatchingTenantPathId(ctx); + MediatorTimeCastUnregisterTablet(ctx); + NKeyValue::TKeyValueFlat::HandleDie(ctx); } @@ -2379,10 +2382,55 @@ TPersQueue::TPersQueue(const TActorId& tablet, TTabletStorageInfo *info) void TPersQueue::CreatedHook(const TActorContext& ctx) { - IsServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe ctx.Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(ctx.SelfID.NodeId())); + InitProcessingParams(ctx); +} + +void TPersQueue::StartWatchingTenantPathId(const TActorContext& ctx) +{ + ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(Info()->TenantPathId)); +} + +void TPersQueue::StopWatchingTenantPathId(const TActorContext& ctx) +{ + ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove()); +} + +void TPersQueue::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) +{ + const auto& result = ev->Get()->Result; + ProcessingParams = result->GetPathDescription().GetDomainDescription().GetProcessingParams(); + + InitMediatorTimeCast(ctx); +} + +void TPersQueue::MediatorTimeCastRegisterTablet(const TActorContext& ctx) +{ + Y_ABORT_UNLESS(ProcessingParams); + ctx.Send(MakeMediatorTimecastProxyID(), + new TEvMediatorTimecast::TEvRegisterTablet(TabletID(), *ProcessingParams)); +} + +void TPersQueue::MediatorTimeCastUnregisterTablet(const TActorContext& ctx) +{ + ctx.Send(MakeMediatorTimecastProxyID(), + new TEvMediatorTimecast::TEvUnregisterTablet(TabletID())); +} + +void TPersQueue::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx) +{ + const auto* message = ev->Get(); + Y_ABORT_UNLESS(message->TabletId == TabletID()); + + MediatorTimeCastEntry = message->Entry; + Y_ABORT_UNLESS(MediatorTimeCastEntry); + + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << + "Registered with mediator time cast"); + + TryWriteTxs(ctx); } void TPersQueue::Handle(TEvInterconnect::TEvNodeInfo::TPtr& ev, const TActorContext& ctx) @@ -2422,9 +2470,43 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) { AggregateAndSendLabeledCountersFor(g, ctx); } MeteringSink.MayFlush(ctx.Now()); + DeleteExpiredTransactions(ctx); ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup()); } +void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx) +{ + if (!MediatorTimeCastEntry) { + return; + } + + ui64 step = MediatorTimeCastEntry->Get(TabletID()); + + for (auto& [txId, tx] : Txs) { + if ((tx.MaxStep < step) && (tx.State <= NKikimrPQ::TTransaction::PREPARED)) { + DeleteTx(tx); + } + } + + TryWriteTxs(ctx); +} + +void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx) +{ + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvPersQueue::TEvCancelTransactionProposal"); + + NKikimrPQ::TEvCancelTransactionProposal& event = ev->Get()->Record; + Y_ABORT_UNLESS(event.HasTxId()); + + if (auto tx = GetTransaction(ctx, event.GetTxId()); tx) { + Y_ABORT_UNLESS(tx->State <= NKikimrPQ::TTransaction::PREPARED); + + DeleteTx(*tx); + + TryWriteTxs(ctx); + } +} + void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvPersQueue::TEvProposeTransaction"); @@ -2639,11 +2721,34 @@ void TPersQueue::Handle(TEvPQ::TEvTxCommitDone::TPtr& ev, const TActorContext& c TryWriteTxs(ctx); } +bool TPersQueue::CanProcessProposeTransactionQueue() const +{ + return + !EvProposeTransactionQueue.empty() + && ProcessingParams + && (!UseMediatorTimeCast || MediatorTimeCastEntry); +} + +bool TPersQueue::CanProcessPlanStepQueue() const +{ + return !EvPlanStepQueue.empty(); +} + +bool TPersQueue::CanProcessWriteTxs() const +{ + return !WriteTxs.empty(); +} + +bool TPersQueue::CanProcessDeleteTxs() const +{ + return !DeleteTxs.empty(); +} + void TPersQueue::BeginWriteTxs(const TActorContext& ctx) { Y_ABORT_UNLESS(!WriteTxsInProgress); - if (EvProposeTransactionQueue.empty() && EvPlanStepQueue.empty() && WriteTxs.empty() && DeleteTxs.empty()) { + if (!CanProcessProposeTransactionQueue() && !CanProcessPlanStepQueue() && !CanProcessWriteTxs() && !CanProcessDeleteTxs()) { return; } @@ -2704,7 +2809,9 @@ void TPersQueue::TryWriteTxs(const TActorContext& ctx) void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) { - while (!EvProposeTransactionQueue.empty()) { + Y_ABORT_UNLESS(!WriteTxsInProgress); + + while (CanProcessProposeTransactionQueue()) { const auto front = std::move(EvProposeTransactionQueue.front()); EvProposeTransactionQueue.pop_front(); @@ -2736,7 +2843,7 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx) { Y_ABORT_UNLESS(!WriteTxsInProgress); - while (!EvPlanStepQueue.empty()) { + while (CanProcessPlanStepQueue()) { const auto front = std::move(EvPlanStepQueue.front()); EvPlanStepQueue.pop_front(); @@ -2765,6 +2872,8 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx) if (auto p = Txs.find(txId); p != Txs.end()) { TDistributedTransaction& tx = p->second; + Y_ABORT_UNLESS(tx.MaxStep >= step); + if (tx.Step == Max<ui64>()) { Y_ABORT_UNLESS(TxQueue.empty() || (TxQueue.back() < std::make_pair(step, txId))); @@ -2879,11 +2988,7 @@ void TPersQueue::ScheduleProposeTransactionResult(const TDistributedTransaction& event->Record.SetMinStep(tx.MinStep); event->Record.SetMaxStep(tx.MaxStep); - // If you have sent a list of preferred coordinators, then we will use them. - // Otherwise, we take the list of coordinators from the domain description - if (tx.Coordinators) { - event->Record.MutableDomainCoordinators()->Add(tx.Coordinators.begin(), tx.Coordinators.end()); - } else if (ProcessingParams) { + if (ProcessingParams) { event->Record.MutableDomainCoordinators()->CopyFrom(ProcessingParams->GetCoordinators()); } @@ -3269,14 +3374,37 @@ void TPersQueue::CheckChangedTxStates(const TActorContext& ctx) } ChangedTxs.clear(); } - + void TPersQueue::InitProcessingParams(const TActorContext& ctx) { + if (Info()->TenantPathId) { + UseMediatorTimeCast = true; + StartWatchingTenantPathId(ctx); + return; + } + auto appdata = AppData(ctx); const ui32 domainId = appdata->DomainsInfo->GetDomainUidByTabletId(TabletID()); Y_ABORT_UNLESS(domainId != appdata->DomainsInfo->BadDomainId); + const auto& domain = appdata->DomainsInfo->GetDomain(domainId); ProcessingParams = ExtractProcessingParams(domain); + + InitMediatorTimeCast(ctx); +} + +void TPersQueue::InitMediatorTimeCast(const TActorContext& ctx) +{ + Y_ABORT_UNLESS(ProcessingParams); + + if (ProcessingParams->MediatorsSize()) { + UseMediatorTimeCast = true; + MediatorTimeCastRegisterTablet(ctx); + } else { + UseMediatorTimeCast = false; + + TryWriteTxs(ctx); + } } bool TPersQueue::AllTransactionsHaveBeenProcessed() const @@ -3439,10 +3567,8 @@ void TPersQueue::OnInitComplete(const TActorContext& ctx) ui64 TPersQueue::GetAllowedStep() const { - // - // TODO(abcdef): использовать MediatorTimeCastEntry - // - return Max(LastStep + 1, TAppData::TimeProvider->Now().MilliSeconds()); + return Max(LastStep + 1, + MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : TAppData::TimeProvider->Now().MilliSeconds()); } NTabletPipe::TClientConfig TPersQueue::GetPipeClientConfig() @@ -3528,6 +3654,9 @@ bool TPersQueue::HandleHook(STFUNC_SIG) HFuncTraced(TEvPQ::TEvTxCommitDone, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); HFuncTraced(TEvPersQueue::TEvProposeTransactionAttach, Handle); + HFuncTraced(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); + HFuncTraced(TEvPersQueue::TEvCancelTransactionProposal, Handle); + HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle); default: return false; } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 24e1be3ccc..589403c64c 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -9,6 +9,8 @@ #include <ydb/core/tablet/tablet_pipe_client_cache.h> #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/persqueue/events/internal.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/core/tx/time_cast/time_cast.h> #include <ydb/core/tx/tx_processing.h> #include <library/cpp/actors/interconnect/interconnect.h> @@ -347,10 +349,31 @@ private: THashMap<ui64, THashSet<ui64>> BindedTxs; void InitProcessingParams(const TActorContext& ctx); + void InitMediatorTimeCast(const TActorContext& ctx); TMaybe<NKikimrSubDomains::TProcessingParams> ProcessingParams; void Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr& ev, const TActorContext& ctx); + + void StartWatchingTenantPathId(const TActorContext& ctx); + void StopWatchingTenantPathId(const TActorContext& ctx); + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx); + + void MediatorTimeCastRegisterTablet(const TActorContext& ctx); + void MediatorTimeCastUnregisterTablet(const TActorContext& ctx); + void Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx); + + TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry; + + void DeleteExpiredTransactions(const TActorContext& ctx); + void Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx); + + bool CanProcessProposeTransactionQueue() const; + bool CanProcessPlanStepQueue() const; + bool CanProcessWriteTxs() const; + bool CanProcessDeleteTxs() const; + + bool UseMediatorTimeCast = true; }; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index da670066ab..90ef0703b6 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -136,8 +136,6 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac PartitionRepliesExpected = 0; ReadSetCount = 0; - - Coordinators.assign(txBody.GetCoordinators().begin(), txBody.GetCoordinators().end()); } void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody) diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index 1650d33a02..3c79b81867 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -106,8 +106,6 @@ struct TDistributedTransaction { void BindMsgToPipe(ui64 tabletId, const IEventBase& event); void UnbindMsgsFromPipe(ui64 tabletId); const TVector<TSerializedMessage>& GetBindedMsgs(ui64 tabletId); - - TVector<ui64> Coordinators; }; } diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 81bf7974bc..8383cca1f4 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -65,6 +65,10 @@ struct TDropTabletParams { ui64 TxId = 0; }; +struct TCancelTransactionProposalParams { + ui64 TxId = 0; +}; + using NKikimr::NPQ::NHelpers::CreatePQTabletMock; using TPQTabletMock = NKikimr::NPQ::NHelpers::TPQTabletMock; @@ -131,6 +135,7 @@ protected: using TPlanStepParams = NHelpers::TPlanStepParams; using TReadSetParams = NHelpers::TReadSetParams; using TDropTabletParams = NHelpers::TDropTabletParams; + using TCancelTransactionProposalParams = NHelpers::TCancelTransactionProposalParams; void SetUp(NUnitTest::TTestContext&) override; void TearDown(NUnitTest::TTestContext&) override; @@ -158,11 +163,20 @@ protected: void StartPQWriteStateObserver(); void WaitForPQWriteState(); + void SendCancelTransactionProposal(const TCancelTransactionProposalParams& params); + + void StartPQWriteTxsObserver(); + void WaitForPQWriteTxs(); + void WaitForCalcPredicateResult(); void TestWaitingForTEvReadSet(size_t senders, size_t receivers); + void StartPQWriteObserver(bool& flag, unsigned cookie); + void WaitForPQWriteComplete(bool& flag); + bool FoundPQWriteState = false; + bool FoundPQWriteTxs = false; // // TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait @@ -465,16 +479,15 @@ void TPQTabletFixture::WaitForCalcPredicateResult() UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); } -void TPQTabletFixture::StartPQWriteStateObserver() +void TPQTabletFixture::StartPQWriteObserver(bool& flag, unsigned cookie) { - FoundPQWriteState = false; - - auto observer = [this](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { + flag = false; + auto observer = [&flag, cookie](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { if (auto* kvResponse = event->CastAsLocal<TEvKeyValue::TEvResponse>()) { if ((event->Sender == event->Recipient) && kvResponse->Record.HasCookie() && - (kvResponse->Record.GetCookie() == 4)) { // TPersQueue::WRITE_STATE_COOKIE - FoundPQWriteState = true; + (kvResponse->Record.GetCookie() == cookie)) { + flag = true; } } @@ -483,15 +496,43 @@ void TPQTabletFixture::StartPQWriteStateObserver() Ctx->Runtime->SetObserverFunc(observer); } -void TPQTabletFixture::WaitForPQWriteState() +void TPQTabletFixture::WaitForPQWriteComplete(bool& flag) { TDispatchOptions options; - options.CustomFinalCondition = [this]() { - return FoundPQWriteState; + options.CustomFinalCondition = [&flag]() { + return flag; }; UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); } +void TPQTabletFixture::StartPQWriteStateObserver() +{ + StartPQWriteObserver(FoundPQWriteState, 4); // TPersQueue::WRITE_STATE_COOKIE +} + +void TPQTabletFixture::WaitForPQWriteState() +{ + WaitForPQWriteComplete(FoundPQWriteState); +} + +void TPQTabletFixture::SendCancelTransactionProposal(const TCancelTransactionProposalParams& params) +{ + auto event = MakeHolder<TEvPersQueue::TEvCancelTransactionProposal>(params.TxId); + + SendToPipe(Ctx->Edge, + event.Release()); +} + +void TPQTabletFixture::StartPQWriteTxsObserver() +{ + StartPQWriteObserver(FoundPQWriteTxs, 5); // TPersQueue::WRITE_TX_COOKIE +} + +void TPQTabletFixture::WaitForPQWriteTxs() +{ + WaitForPQWriteComplete(FoundPQWriteTxs); +} + NHelpers::TPQTabletMock* TPQTabletFixture::CreatePQTabletMock(ui64 tabletId) { NHelpers::TPQTabletMock* mock = nullptr; @@ -961,6 +1002,27 @@ Y_UNIT_TEST_F(TEvReadSet_comes_before_TEvPlanStep, TPQTabletFixture) WaitPlanStepAccepted({.Step=100}); } +Y_UNIT_TEST_F(Cancel_Tx, TPQTabletFixture) +{ + PQTabletPrepare({.partitions=1}, {}, *Ctx); + + const ui64 txId = 67890; + + SendProposeTransactionRequest({.TxId=txId, + .Senders={22222}, .Receivers={22222}, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + }}); + WaitProposeTransactionResponse({.TxId=txId, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + StartPQWriteTxsObserver(); + + SendCancelTransactionProposal({.TxId=txId}); + + WaitForPQWriteTxs(); +} + } } diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 20db0f8c05..6c8944426f 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -869,7 +869,6 @@ message TDataTransaction { repeated uint64 ReceivingShards = 4; optional bool Immediate = 5; optional fixed64 WriteId = 6; - repeated uint64 Coordinators = 7; } message TConfigTransaction { |