diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-12-25 14:57:12 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-25 11:57:12 +0000 |
commit | d44aeaad9d120090ea6824c76d90184f21bb35bf (patch) | |
tree | ab5ee08a56d45a5dd9a954861b5c4d28d2f3ad7a | |
parent | fd6bcad15abc0587cebad6fa10033122631bf251 (diff) | |
download | ydb-d44aeaad9d120090ea6824c76d90184f21bb35bf.tar.gz |
Commit replication changes (#12680)
8 files changed, 173 insertions, 18 deletions
diff --git a/ydb/core/protos/counters_replication.proto b/ydb/core/protos/counters_replication.proto index 5a31a8135b..d0a308caa5 100644 --- a/ydb/core/protos/counters_replication.proto +++ b/ydb/core/protos/counters_replication.proto @@ -47,4 +47,5 @@ enum ETxTypes { TXTYPE_WORKER_ERROR = 14 [(TxTypeOpts) = {Name: "TxWorkerError"}]; TXTYPE_ASSIGN_TX_ID = 15 [(TxTypeOpts) = {Name: "TxAssignTxId"}]; TXTYPE_HEARTBEAT = 16 [(TxTypeOpts) = {Name: "TxHeartbeat"}]; + TXTYPE_COMMIT_CHANGES = 17 [(TxTypeOpts) = {Name: "TxCommitChanges"}]; } diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index bcd8183334..a14083526c 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -82,6 +82,7 @@ STFUNC(TController::StateWork) { HFunc(TEvService::TEvGetTxId, Handle); HFunc(TEvService::TEvHeartbeat, Handle); HFunc(TEvTxAllocatorClient::TEvAllocateResult, Handle); + HFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle); HFunc(TEvInterconnect::TEvNodeDisconnected, Handle); default: HandleDefaultEvents(ev, SelfId()); diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index 8ebb69e1c5..5249a51a94 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -17,6 +17,7 @@ #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/tx/replication/service/service.h> #include <ydb/core/tx/tx_allocator_client/actor_client.h> +#include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/library/actors/core/interconnect.h> #include <ydb/library/yverify_stream/yverify_stream.h> @@ -99,6 +100,7 @@ private: void Handle(TEvService::TEvGetTxId::TPtr& ev, const TActorContext& ctx); void Handle(TEvService::TEvHeartbeat::TPtr& ev, const TActorContext& ctx); void Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx); void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx); void CreateSession(ui32 nodeId, const TActorContext& ctx); @@ -135,6 +137,7 @@ private: class TTxWorkerError; class TTxAssignTxId; class TTxHeartbeat; + class TTxCommitChanges; // tx runners void RunTxInitSchema(const TActorContext& ctx); @@ -216,6 +219,7 @@ private: TMap<TRowVersion, THashSet<TWorkerId>> WorkersByHeartbeat; THashMap<TWorkerId, TRowVersion> PendingHeartbeats; bool ProcessHeartbeatsInFlight = false; + ui64 CommittingTxId = 0; }; // TController diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp index 298cea4e70..3b698e2525 100644 --- a/ydb/core/tx/replication/controller/replication.cpp +++ b/ydb/core/tx/replication/controller/replication.cpp @@ -94,6 +94,7 @@ public: template <typename... Args> ui64 AddTarget(TReplication* self, ui64 id, ETargetKind kind, Args&&... args) { + TargetTablePaths.clear(); const auto res = Targets.emplace(id, CreateTarget(self, id, kind, std::forward<Args>(args)...)); Y_VERIFY_S(res.second, "Duplicate target: " << id); TLagProvider::AddPendingLag(id); @@ -114,6 +115,23 @@ public: void RemoveTarget(ui64 id) { Targets.erase(id); + TargetTablePaths.clear(); + } + + const TVector<TString>& GetTargetTablePaths() const { + if (!TargetTablePaths) { + TargetTablePaths.reserve(Targets.size()); + for (const auto& [_, target] : Targets) { + switch (target->GetKind()) { + case ETargetKind::Table: + case ETargetKind::IndexTable: + TargetTablePaths.push_back(target->GetDstPath()); + break; + } + } + } + + return TargetTablePaths; } void Progress(const TActorContext& ctx) { @@ -215,6 +233,7 @@ private: ui64 NextTargetId = 1; THashMap<ui64, TTarget> Targets; THashSet<ui64> PendingAlterTargets; + mutable TVector<TString> TargetTablePaths; TActorId SecretResolver; TActorId YdbProxy; TActorId TenantResolver; @@ -264,6 +283,10 @@ void TReplication::RemoveTarget(ui64 id) { return Impl->RemoveTarget(id); } +const TVector<TString>& TReplication::GetTargetTablePaths() const { + return Impl->GetTargetTablePaths(); +} + void TReplication::Progress(const TActorContext& ctx) { Impl->Progress(ctx); } diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index 8c7535a81b..3bfa605006 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -110,6 +110,7 @@ public: const ITarget* FindTarget(ui64 id) const; ITarget* FindTarget(ui64 id); void RemoveTarget(ui64 id); + const TVector<TString>& GetTargetTablePaths() const; void Progress(const TActorContext& ctx); void Shutdown(const TActorContext& ctx); diff --git a/ydb/core/tx/replication/controller/tx_heartbeat.cpp b/ydb/core/tx/replication/controller/tx_heartbeat.cpp index 1eeb0994c4..c718c6e0f0 100644 --- a/ydb/core/tx/replication/controller/tx_heartbeat.cpp +++ b/ydb/core/tx/replication/controller/tx_heartbeat.cpp @@ -2,7 +2,21 @@ namespace NKikimr::NReplication::NController { +THolder<TEvTxUserProxy::TEvProposeTransaction> MakeCommitProposal(ui64 writeTxId, const TVector<TString>& tables) { + auto ev = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); + auto& tx = *ev->Record.MutableTransaction()->MutableCommitWrites(); + + tx.SetWriteTxId(writeTxId); + for (const auto& path : tables) { + tx.AddTables()->SetTablePath(path); + } + + return ev; +} + class TController::TTxHeartbeat: public TTxBase { + THolder<TEvTxUserProxy::TEvProposeTransaction> CommitProposal; + public: explicit TTxHeartbeat(TController* self) : TTxBase("TxHeartbeat", self) @@ -22,24 +36,37 @@ public: return true; } - const auto prevMinVersion = !Self->WorkersByHeartbeat.empty() - ? std::make_optional<TRowVersion>(Self->WorkersByHeartbeat.begin()->first) - : std::nullopt; + auto replication = Self->GetSingle(); + if (!replication) { + CLOG_E(ctx, "Ambiguous replication instance"); + return true; + } NIceDb::TNiceDb db(txc.DB); - for (const auto& [id, version] : Self->PendingHeartbeats) { + while (!Self->PendingHeartbeats.empty()) { + auto it = Self->PendingHeartbeats.begin(); + const auto& id = it->first; + const auto& version = it->second; + if (!Self->Workers.contains(id)) { + Self->PendingHeartbeats.erase(it); continue; } auto& worker = Self->Workers[id]; if (worker.HasHeartbeat()) { - auto it = Self->WorkersByHeartbeat.find(worker.GetHeartbeat()); - if (it != Self->WorkersByHeartbeat.end()) { - it->second.erase(id); - if (it->second.empty()) { - Self->WorkersByHeartbeat.erase(it); + const auto& prevVersion = worker.GetHeartbeat(); + if (version < prevVersion) { + Self->PendingHeartbeats.erase(it); + continue; + } + + auto jt = Self->WorkersByHeartbeat.find(prevVersion); + if (jt != Self->WorkersByHeartbeat.end()) { + jt->second.erase(id); + if (jt->second.empty()) { + Self->WorkersByHeartbeat.erase(jt); } } } @@ -52,24 +79,30 @@ public: NIceDb::TUpdate<Schema::Workers::HeartbeatVersionStep>(version.Step), NIceDb::TUpdate<Schema::Workers::HeartbeatVersionTxId>(version.TxId) ); + + Self->PendingHeartbeats.erase(it); } if (Self->Workers.size() != Self->WorkersWithHeartbeat.size()) { - return true; + return true; // no quorum } - Y_ABORT_UNLESS(!Self->WorkersByHeartbeat.empty()); - const auto newMinVersion = Self->WorkersByHeartbeat.begin()->first; + if (Self->CommittingTxId) { + return true; // another commit in progress + } - if (newMinVersion <= prevMinVersion.value_or(TRowVersion::Min())) { - return true; + if (Self->AssignedTxIds.empty()) { + return true; // nothing to commit } - CLOG_N(ctx, "Min version has been changed" - << ": prev# " << prevMinVersion.value_or(TRowVersion::Min()) - << ", new# " << newMinVersion); + Y_ABORT_UNLESS(!Self->WorkersByHeartbeat.empty()); + if (Self->WorkersByHeartbeat.begin()->first < Self->AssignedTxIds.begin()->first) { + return true; // version has not been changed + } + + Self->CommittingTxId = Self->AssignedTxIds.begin()->second; + CommitProposal = MakeCommitProposal(Self->CommittingTxId, replication->GetTargetTablePaths()); - // TODO: run commit return true; } @@ -77,6 +110,12 @@ public: CLOG_D(ctx, "Complete" << ": pending# " << Self->PendingHeartbeats.size()); + if (auto& ev = CommitProposal) { + CLOG_N(ctx, "Propose commit" + << ": writeTxId# " << Self->CommittingTxId); + ctx.Send(MakeTxProxyID(), std::move(ev), 0, Self->CommittingTxId); + } + if (Self->PendingHeartbeats) { Self->Execute(new TTxHeartbeat(Self), ctx); } else { @@ -93,4 +132,85 @@ void TController::RunTxHeartbeat(const TActorContext& ctx) { } } +class TController::TTxCommitChanges: public TTxBase { + TEvTxUserProxy::TEvProposeTransactionStatus::TPtr Status; + THolder<TEvTxUserProxy::TEvProposeTransaction> CommitProposal; + +public: + explicit TTxCommitChanges(TController* self, TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) + : TTxBase("TxCommitChanges", self) + , Status(ev) + { + } + + TTxType GetTxType() const override { + return TXTYPE_COMMIT_CHANGES; + } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + CLOG_D(ctx, "Execute" + << ": writeTxId# " << Self->CommittingTxId); + + auto replication = Self->GetSingle(); + if (!replication) { + CLOG_E(ctx, "Ambiguous replication instance"); + return true; + } + + auto it = Self->AssignedTxIds.begin(); + Y_ABORT_UNLESS(it != Self->AssignedTxIds.end()); + Y_ABORT_UNLESS(it->second == Self->CommittingTxId); + + const auto& record = Status->Get()->Record; + const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(record.GetStatus()); + if (status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { + CommitProposal = MakeCommitProposal(Self->CommittingTxId, replication->GetTargetTablePaths()); + return true; + } + + NIceDb::TNiceDb db(txc.DB); + + db.Table<Schema::TxIds>().Key(it->first.Step, it->first.TxId).Delete(); + it = Self->AssignedTxIds.erase(it); + Self->CommittingTxId = 0; + + if (it == Self->AssignedTxIds.end() || Self->WorkersByHeartbeat.empty()) { + return true; + } + + if (Self->WorkersByHeartbeat.begin()->first < it->first) { + return true; + } + + Self->CommittingTxId = Self->AssignedTxIds.begin()->second; + CommitProposal = MakeCommitProposal(Self->CommittingTxId, replication->GetTargetTablePaths()); + + return true; + } + + void Complete(const TActorContext& ctx) override { + CLOG_D(ctx, "Complete"); + + if (auto& ev = CommitProposal) { + CLOG_N(ctx, "Propose commit" + << ": writeTxId# " << Self->CommittingTxId); + ctx.Send(MakeTxProxyID(), std::move(ev), 0, Self->CommittingTxId); + } + } + +}; // TTxCommitChanges + +void TController::Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + + if (ev->Cookie != CommittingTxId) { + CLOG_E(ctx, "Cookie mismatch" + << ": expected# " << CommittingTxId + << ", got# " << ev->Cookie); + return; + } + + Execute(new TTxCommitChanges(this, ev), ctx); +} + } diff --git a/ydb/core/tx/replication/service/base_table_writer.cpp b/ydb/core/tx/replication/service/base_table_writer.cpp index 09f66c2619..0742dcf418 100644 --- a/ydb/core/tx/replication/service/base_table_writer.cpp +++ b/ydb/core/tx/replication/service/base_table_writer.cpp @@ -465,6 +465,9 @@ class TLocalTableWriter if (records) { EnqueueRecords(std::move(records)); + } else if (PendingTxId.empty()) { + Y_ABORT_UNLESS(PendingRecords.empty()); + Send(Worker, new TEvWorker::TEvPoll()); } } diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp index 236387d655..cfc0d1a79c 100644 --- a/ydb/core/tx/replication/service/table_writer_ut.cpp +++ b/ydb/core/tx/replication/service/table_writer_ut.cpp @@ -181,6 +181,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { TRecord(order++, R"({"resolved":[10,0]})"), })); UNIT_ASSERT_VALUES_EQUAL(TRowVersion::FromProto(ev->Get()->Record.GetVersion()), TRowVersion(10, 0)); + env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender()); } env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", { @@ -206,6 +207,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData("TestSource", { TRecord(order++, R"({"resolved":[30,0]})"), })); + env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender()); } } |