aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-12-25 14:57:12 +0300
committerGitHub <noreply@github.com>2024-12-25 11:57:12 +0000
commitd44aeaad9d120090ea6824c76d90184f21bb35bf (patch)
treeab5ee08a56d45a5dd9a954861b5c4d28d2f3ad7a
parentfd6bcad15abc0587cebad6fa10033122631bf251 (diff)
downloadydb-d44aeaad9d120090ea6824c76d90184f21bb35bf.tar.gz
Commit replication changes (#12680)
-rw-r--r--ydb/core/protos/counters_replication.proto1
-rw-r--r--ydb/core/tx/replication/controller/controller.cpp1
-rw-r--r--ydb/core/tx/replication/controller/controller_impl.h4
-rw-r--r--ydb/core/tx/replication/controller/replication.cpp23
-rw-r--r--ydb/core/tx/replication/controller/replication.h1
-rw-r--r--ydb/core/tx/replication/controller/tx_heartbeat.cpp156
-rw-r--r--ydb/core/tx/replication/service/base_table_writer.cpp3
-rw-r--r--ydb/core/tx/replication/service/table_writer_ut.cpp2
8 files changed, 173 insertions, 18 deletions
diff --git a/ydb/core/protos/counters_replication.proto b/ydb/core/protos/counters_replication.proto
index 5a31a8135b..d0a308caa5 100644
--- a/ydb/core/protos/counters_replication.proto
+++ b/ydb/core/protos/counters_replication.proto
@@ -47,4 +47,5 @@ enum ETxTypes {
TXTYPE_WORKER_ERROR = 14 [(TxTypeOpts) = {Name: "TxWorkerError"}];
TXTYPE_ASSIGN_TX_ID = 15 [(TxTypeOpts) = {Name: "TxAssignTxId"}];
TXTYPE_HEARTBEAT = 16 [(TxTypeOpts) = {Name: "TxHeartbeat"}];
+ TXTYPE_COMMIT_CHANGES = 17 [(TxTypeOpts) = {Name: "TxCommitChanges"}];
}
diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp
index bcd8183334..a14083526c 100644
--- a/ydb/core/tx/replication/controller/controller.cpp
+++ b/ydb/core/tx/replication/controller/controller.cpp
@@ -82,6 +82,7 @@ STFUNC(TController::StateWork) {
HFunc(TEvService::TEvGetTxId, Handle);
HFunc(TEvService::TEvHeartbeat, Handle);
HFunc(TEvTxAllocatorClient::TEvAllocateResult, Handle);
+ HFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle);
HFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
default:
HandleDefaultEvents(ev, SelfId());
diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h
index 8ebb69e1c5..5249a51a94 100644
--- a/ydb/core/tx/replication/controller/controller_impl.h
+++ b/ydb/core/tx/replication/controller/controller_impl.h
@@ -17,6 +17,7 @@
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tx/replication/service/service.h>
#include <ydb/core/tx/tx_allocator_client/actor_client.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
@@ -99,6 +100,7 @@ private:
void Handle(TEvService::TEvGetTxId::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvHeartbeat::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx);
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx);
void CreateSession(ui32 nodeId, const TActorContext& ctx);
@@ -135,6 +137,7 @@ private:
class TTxWorkerError;
class TTxAssignTxId;
class TTxHeartbeat;
+ class TTxCommitChanges;
// tx runners
void RunTxInitSchema(const TActorContext& ctx);
@@ -216,6 +219,7 @@ private:
TMap<TRowVersion, THashSet<TWorkerId>> WorkersByHeartbeat;
THashMap<TWorkerId, TRowVersion> PendingHeartbeats;
bool ProcessHeartbeatsInFlight = false;
+ ui64 CommittingTxId = 0;
}; // TController
diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp
index 298cea4e70..3b698e2525 100644
--- a/ydb/core/tx/replication/controller/replication.cpp
+++ b/ydb/core/tx/replication/controller/replication.cpp
@@ -94,6 +94,7 @@ public:
template <typename... Args>
ui64 AddTarget(TReplication* self, ui64 id, ETargetKind kind, Args&&... args) {
+ TargetTablePaths.clear();
const auto res = Targets.emplace(id, CreateTarget(self, id, kind, std::forward<Args>(args)...));
Y_VERIFY_S(res.second, "Duplicate target: " << id);
TLagProvider::AddPendingLag(id);
@@ -114,6 +115,23 @@ public:
void RemoveTarget(ui64 id) {
Targets.erase(id);
+ TargetTablePaths.clear();
+ }
+
+ const TVector<TString>& GetTargetTablePaths() const {
+ if (!TargetTablePaths) {
+ TargetTablePaths.reserve(Targets.size());
+ for (const auto& [_, target] : Targets) {
+ switch (target->GetKind()) {
+ case ETargetKind::Table:
+ case ETargetKind::IndexTable:
+ TargetTablePaths.push_back(target->GetDstPath());
+ break;
+ }
+ }
+ }
+
+ return TargetTablePaths;
}
void Progress(const TActorContext& ctx) {
@@ -215,6 +233,7 @@ private:
ui64 NextTargetId = 1;
THashMap<ui64, TTarget> Targets;
THashSet<ui64> PendingAlterTargets;
+ mutable TVector<TString> TargetTablePaths;
TActorId SecretResolver;
TActorId YdbProxy;
TActorId TenantResolver;
@@ -264,6 +283,10 @@ void TReplication::RemoveTarget(ui64 id) {
return Impl->RemoveTarget(id);
}
+const TVector<TString>& TReplication::GetTargetTablePaths() const {
+ return Impl->GetTargetTablePaths();
+}
+
void TReplication::Progress(const TActorContext& ctx) {
Impl->Progress(ctx);
}
diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h
index 8c7535a81b..3bfa605006 100644
--- a/ydb/core/tx/replication/controller/replication.h
+++ b/ydb/core/tx/replication/controller/replication.h
@@ -110,6 +110,7 @@ public:
const ITarget* FindTarget(ui64 id) const;
ITarget* FindTarget(ui64 id);
void RemoveTarget(ui64 id);
+ const TVector<TString>& GetTargetTablePaths() const;
void Progress(const TActorContext& ctx);
void Shutdown(const TActorContext& ctx);
diff --git a/ydb/core/tx/replication/controller/tx_heartbeat.cpp b/ydb/core/tx/replication/controller/tx_heartbeat.cpp
index 1eeb0994c4..c718c6e0f0 100644
--- a/ydb/core/tx/replication/controller/tx_heartbeat.cpp
+++ b/ydb/core/tx/replication/controller/tx_heartbeat.cpp
@@ -2,7 +2,21 @@
namespace NKikimr::NReplication::NController {
+THolder<TEvTxUserProxy::TEvProposeTransaction> MakeCommitProposal(ui64 writeTxId, const TVector<TString>& tables) {
+ auto ev = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
+ auto& tx = *ev->Record.MutableTransaction()->MutableCommitWrites();
+
+ tx.SetWriteTxId(writeTxId);
+ for (const auto& path : tables) {
+ tx.AddTables()->SetTablePath(path);
+ }
+
+ return ev;
+}
+
class TController::TTxHeartbeat: public TTxBase {
+ THolder<TEvTxUserProxy::TEvProposeTransaction> CommitProposal;
+
public:
explicit TTxHeartbeat(TController* self)
: TTxBase("TxHeartbeat", self)
@@ -22,24 +36,37 @@ public:
return true;
}
- const auto prevMinVersion = !Self->WorkersByHeartbeat.empty()
- ? std::make_optional<TRowVersion>(Self->WorkersByHeartbeat.begin()->first)
- : std::nullopt;
+ auto replication = Self->GetSingle();
+ if (!replication) {
+ CLOG_E(ctx, "Ambiguous replication instance");
+ return true;
+ }
NIceDb::TNiceDb db(txc.DB);
- for (const auto& [id, version] : Self->PendingHeartbeats) {
+ while (!Self->PendingHeartbeats.empty()) {
+ auto it = Self->PendingHeartbeats.begin();
+ const auto& id = it->first;
+ const auto& version = it->second;
+
if (!Self->Workers.contains(id)) {
+ Self->PendingHeartbeats.erase(it);
continue;
}
auto& worker = Self->Workers[id];
if (worker.HasHeartbeat()) {
- auto it = Self->WorkersByHeartbeat.find(worker.GetHeartbeat());
- if (it != Self->WorkersByHeartbeat.end()) {
- it->second.erase(id);
- if (it->second.empty()) {
- Self->WorkersByHeartbeat.erase(it);
+ const auto& prevVersion = worker.GetHeartbeat();
+ if (version < prevVersion) {
+ Self->PendingHeartbeats.erase(it);
+ continue;
+ }
+
+ auto jt = Self->WorkersByHeartbeat.find(prevVersion);
+ if (jt != Self->WorkersByHeartbeat.end()) {
+ jt->second.erase(id);
+ if (jt->second.empty()) {
+ Self->WorkersByHeartbeat.erase(jt);
}
}
}
@@ -52,24 +79,30 @@ public:
NIceDb::TUpdate<Schema::Workers::HeartbeatVersionStep>(version.Step),
NIceDb::TUpdate<Schema::Workers::HeartbeatVersionTxId>(version.TxId)
);
+
+ Self->PendingHeartbeats.erase(it);
}
if (Self->Workers.size() != Self->WorkersWithHeartbeat.size()) {
- return true;
+ return true; // no quorum
}
- Y_ABORT_UNLESS(!Self->WorkersByHeartbeat.empty());
- const auto newMinVersion = Self->WorkersByHeartbeat.begin()->first;
+ if (Self->CommittingTxId) {
+ return true; // another commit in progress
+ }
- if (newMinVersion <= prevMinVersion.value_or(TRowVersion::Min())) {
- return true;
+ if (Self->AssignedTxIds.empty()) {
+ return true; // nothing to commit
}
- CLOG_N(ctx, "Min version has been changed"
- << ": prev# " << prevMinVersion.value_or(TRowVersion::Min())
- << ", new# " << newMinVersion);
+ Y_ABORT_UNLESS(!Self->WorkersByHeartbeat.empty());
+ if (Self->WorkersByHeartbeat.begin()->first < Self->AssignedTxIds.begin()->first) {
+ return true; // version has not been changed
+ }
+
+ Self->CommittingTxId = Self->AssignedTxIds.begin()->second;
+ CommitProposal = MakeCommitProposal(Self->CommittingTxId, replication->GetTargetTablePaths());
- // TODO: run commit
return true;
}
@@ -77,6 +110,12 @@ public:
CLOG_D(ctx, "Complete"
<< ": pending# " << Self->PendingHeartbeats.size());
+ if (auto& ev = CommitProposal) {
+ CLOG_N(ctx, "Propose commit"
+ << ": writeTxId# " << Self->CommittingTxId);
+ ctx.Send(MakeTxProxyID(), std::move(ev), 0, Self->CommittingTxId);
+ }
+
if (Self->PendingHeartbeats) {
Self->Execute(new TTxHeartbeat(Self), ctx);
} else {
@@ -93,4 +132,85 @@ void TController::RunTxHeartbeat(const TActorContext& ctx) {
}
}
+class TController::TTxCommitChanges: public TTxBase {
+ TEvTxUserProxy::TEvProposeTransactionStatus::TPtr Status;
+ THolder<TEvTxUserProxy::TEvProposeTransaction> CommitProposal;
+
+public:
+ explicit TTxCommitChanges(TController* self, TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev)
+ : TTxBase("TxCommitChanges", self)
+ , Status(ev)
+ {
+ }
+
+ TTxType GetTxType() const override {
+ return TXTYPE_COMMIT_CHANGES;
+ }
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ CLOG_D(ctx, "Execute"
+ << ": writeTxId# " << Self->CommittingTxId);
+
+ auto replication = Self->GetSingle();
+ if (!replication) {
+ CLOG_E(ctx, "Ambiguous replication instance");
+ return true;
+ }
+
+ auto it = Self->AssignedTxIds.begin();
+ Y_ABORT_UNLESS(it != Self->AssignedTxIds.end());
+ Y_ABORT_UNLESS(it->second == Self->CommittingTxId);
+
+ const auto& record = Status->Get()->Record;
+ const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(record.GetStatus());
+ if (status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
+ CommitProposal = MakeCommitProposal(Self->CommittingTxId, replication->GetTargetTablePaths());
+ return true;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ db.Table<Schema::TxIds>().Key(it->first.Step, it->first.TxId).Delete();
+ it = Self->AssignedTxIds.erase(it);
+ Self->CommittingTxId = 0;
+
+ if (it == Self->AssignedTxIds.end() || Self->WorkersByHeartbeat.empty()) {
+ return true;
+ }
+
+ if (Self->WorkersByHeartbeat.begin()->first < it->first) {
+ return true;
+ }
+
+ Self->CommittingTxId = Self->AssignedTxIds.begin()->second;
+ CommitProposal = MakeCommitProposal(Self->CommittingTxId, replication->GetTargetTablePaths());
+
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ CLOG_D(ctx, "Complete");
+
+ if (auto& ev = CommitProposal) {
+ CLOG_N(ctx, "Propose commit"
+ << ": writeTxId# " << Self->CommittingTxId);
+ ctx.Send(MakeTxProxyID(), std::move(ev), 0, Self->CommittingTxId);
+ }
+ }
+
+}; // TTxCommitChanges
+
+void TController::Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+
+ if (ev->Cookie != CommittingTxId) {
+ CLOG_E(ctx, "Cookie mismatch"
+ << ": expected# " << CommittingTxId
+ << ", got# " << ev->Cookie);
+ return;
+ }
+
+ Execute(new TTxCommitChanges(this, ev), ctx);
+}
+
}
diff --git a/ydb/core/tx/replication/service/base_table_writer.cpp b/ydb/core/tx/replication/service/base_table_writer.cpp
index 09f66c2619..0742dcf418 100644
--- a/ydb/core/tx/replication/service/base_table_writer.cpp
+++ b/ydb/core/tx/replication/service/base_table_writer.cpp
@@ -465,6 +465,9 @@ class TLocalTableWriter
if (records) {
EnqueueRecords(std::move(records));
+ } else if (PendingTxId.empty()) {
+ Y_ABORT_UNLESS(PendingRecords.empty());
+ Send(Worker, new TEvWorker::TEvPoll());
}
}
diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp
index 236387d655..cfc0d1a79c 100644
--- a/ydb/core/tx/replication/service/table_writer_ut.cpp
+++ b/ydb/core/tx/replication/service/table_writer_ut.cpp
@@ -181,6 +181,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
TRecord(order++, R"({"resolved":[10,0]})"),
}));
UNIT_ASSERT_VALUES_EQUAL(TRowVersion::FromProto(ev->Get()->Record.GetVersion()), TRowVersion(10, 0));
+ env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
}
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
@@ -206,6 +207,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData("TestSource", {
TRecord(order++, R"({"resolved":[30,0]})"),
}));
+ env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
}
}