aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-12-04 16:14:05 +0300
committerGitHub <noreply@github.com>2024-12-04 13:14:05 +0000
commit962438b11d1510b42d2ec6b6d8d6dec07396a8a9 (patch)
tree8de9e9a5d7c2748edd14472835a7d1488046d880
parent5e818ab38b6631bef0c4a846c4c66262c363ce2a (diff)
downloadydb-962438b11d1510b42d2ec6b6d8d6dec07396a8a9.tar.gz
Assign tx id to version (#11440)
-rw-r--r--ydb/core/protos/counters_replication.proto1
-rw-r--r--ydb/core/tx/replication/controller/assign_tx_id_ut.cpp154
-rw-r--r--ydb/core/tx/replication/controller/controller.cpp39
-rw-r--r--ydb/core/tx/replication/controller/controller_impl.h14
-rw-r--r--ydb/core/tx/replication/controller/schema.h12
-rw-r--r--ydb/core/tx/replication/controller/tx_assign_tx_id.cpp176
-rw-r--r--ydb/core/tx/replication/controller/tx_init.cpp27
-rw-r--r--ydb/core/tx/replication/controller/ut_assign_tx_id/ya.make20
-rw-r--r--ydb/core/tx/replication/controller/ya.make3
-rw-r--r--ydb/core/tx/replication/service/service.h5
-rw-r--r--ydb/core/tx/replication/ut_helpers/mock_service.cpp48
-rw-r--r--ydb/core/tx/replication/ut_helpers/mock_service.h9
-rw-r--r--ydb/core/tx/replication/ut_helpers/test_env.h20
-rw-r--r--ydb/core/tx/replication/ut_helpers/ya.make2
14 files changed, 524 insertions, 6 deletions
diff --git a/ydb/core/protos/counters_replication.proto b/ydb/core/protos/counters_replication.proto
index 4c8827674ef..f3048359e46 100644
--- a/ydb/core/protos/counters_replication.proto
+++ b/ydb/core/protos/counters_replication.proto
@@ -45,4 +45,5 @@ enum ETxTypes {
TXTYPE_ALTER_DST_RESULT = 12 [(TxTypeOpts) = {Name: "TxAlterDstResult"}];
TXTYPE_DESCRIBE_REPLICATION = 13 [(TxTypeOpts) = {Name: "TxDescribeReplication"}];
TXTYPE_WORKER_ERROR = 14 [(TxTypeOpts) = {Name: "TxWorkerError"}];
+ TXTYPE_ASSIGN_TX_ID = 15 [(TxTypeOpts) = {Name: "TxAssignTxId"}];
}
diff --git a/ydb/core/tx/replication/controller/assign_tx_id_ut.cpp b/ydb/core/tx/replication/controller/assign_tx_id_ut.cpp
new file mode 100644
index 00000000000..43c9362a553
--- /dev/null
+++ b/ydb/core/tx/replication/controller/assign_tx_id_ut.cpp
@@ -0,0 +1,154 @@
+#include "controller_impl.h"
+
+#include <ydb/core/tx/replication/service/service.h>
+#include <ydb/core/tx/replication/ut_helpers/mock_service.h>
+#include <ydb/core/tx/replication/ut_helpers/test_env.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/algorithm.h>
+#include <util/generic/vector.h>
+#include <util/string/printf.h>
+
+namespace NKikimr::NReplication::NController {
+
+Y_UNIT_TEST_SUITE(AssignTxId) {
+ using namespace NTestHelpers;
+
+ void CheckTxIdResult(const NKikimrReplication::TEvTxIdResult& result, const TMap<TRowVersion, ui64>& expected) {
+ UNIT_ASSERT_VALUES_EQUAL(result.VersionTxIdsSize(), expected.size());
+
+ int i = 0;
+ for (const auto& [version, txId] : expected) {
+ const auto& actual = result.GetVersionTxIds(i++);
+ UNIT_ASSERT_VALUES_EQUAL(TRowVersion::Parse(actual.GetVersion()), version);
+ if (txId) {
+ UNIT_ASSERT_VALUES_EQUAL(actual.GetTxId(), txId);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(Basic) {
+ TEnv env;
+ env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
+
+ env.GetRuntime().RegisterService(
+ MakeReplicationServiceId(env.GetRuntime().GetNodeId(0)),
+ env.GetRuntime().Register(CreateReplicationMockService(env.GetSender()))
+ );
+
+ NYdb::NTable::TTableClient client(env.GetDriver(), NYdb::NTable::TClientSettings()
+ .DiscoveryEndpoint(env.GetEndpoint())
+ .Database(env.GetDatabase())
+ );
+
+ auto session = client.CreateSession().GetValueSync().GetSession();
+ const auto result = session
+ .ExecuteSchemeQuery(Sprintf(R"(
+ CREATE ASYNC REPLICATION `replication` FOR
+ `/Root/table` AS `/Root/replica`
+ WITH (
+ CONNECTION_STRING = "grpc://%s/?database=/Root",
+ CONSISTENCY_MODE = "STRONG",
+ COMMIT_INTERVAL = Interval("PT10S")
+ );
+ )", env.GetEndpoint().c_str()))
+ .GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto desc = env.GetDescription("/Root/replication");
+ const auto& repl = desc.GetPathDescription().GetReplicationDescription();
+ const auto tabletId = repl.GetControllerId();
+
+ const auto& cfg = repl.GetConfig();
+ UNIT_ASSERT(cfg.HasStrongConsistency());
+ UNIT_ASSERT_VALUES_EQUAL(cfg.GetStrongConsistency().GetCommitIntervalMilliSeconds(), 10000);
+
+ TVector<ui64> txIds;
+
+ {
+ auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{
+ TRowVersion(1, 0),
+ }));
+
+ const auto& record = ev->Get()->Record;
+ UNIT_ASSERT_VALUES_EQUAL(record.GetController().GetTabletId(), tabletId);
+
+ CheckTxIdResult(record, {
+ {TRowVersion(10000, 0), 0},
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(Count(txIds, record.GetVersionTxIds(0).GetTxId()), 0);
+ txIds.push_back(record.GetVersionTxIds(0).GetTxId());
+ }
+ {
+ auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{
+ TRowVersion(9999, 0),
+ }));
+
+ CheckTxIdResult(ev->Get()->Record, {
+ {TRowVersion(10000, 0), txIds.back()},
+ });
+ }
+ {
+ auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{
+ TRowVersion(9999, Max<ui64>()),
+ }));
+
+ CheckTxIdResult(ev->Get()->Record, {
+ {TRowVersion(10000, 0), txIds.back()},
+ });
+ }
+ {
+ auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{
+ TRowVersion(10000, 0),
+ }));
+
+ CheckTxIdResult(ev->Get()->Record, {
+ {TRowVersion(20000, 0), 0},
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(Count(txIds, ev->Get()->Record.GetVersionTxIds(0).GetTxId()), 0);
+ txIds.push_back(ev->Get()->Record.GetVersionTxIds(0).GetTxId());
+ }
+ {
+ auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{
+ TRowVersion(5000, 0),
+ }));
+
+ CheckTxIdResult(ev->Get()->Record, {
+ {TRowVersion(10000, 0), txIds[0]},
+ });
+ }
+ {
+ auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{
+ TRowVersion(20000, 0),
+ TRowVersion(30000, 0),
+ TRowVersion(40000, 0),
+ }));
+
+ CheckTxIdResult(ev->Get()->Record, {
+ {TRowVersion(30000, 0), 0},
+ {TRowVersion(40000, 0), 0},
+ {TRowVersion(50000, 0), 0},
+ });
+
+ for (const auto& v : ev->Get()->Record.GetVersionTxIds()) {
+ UNIT_ASSERT_VALUES_EQUAL(Count(txIds, v.GetTxId()), 0);
+ txIds.push_back(v.GetTxId());
+ }
+ }
+ UNIT_ASSERT_VALUES_EQUAL(txIds.size(), 5);
+ {
+ auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{
+ TRowVersion(50000, 0),
+ }));
+
+ CheckTxIdResult(ev->Get()->Record, {
+ {TRowVersion(60000, 0), txIds.back()},
+ });
+ }
+ }
+}
+
+}
diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp
index 92b735829f4..26e96e8f0fa 100644
--- a/ydb/core/tx/replication/controller/controller.cpp
+++ b/ydb/core/tx/replication/controller/controller.cpp
@@ -79,6 +79,8 @@ STFUNC(TController::StateWork) {
HFunc(TEvService::TEvWorkerStatus, Handle);
HFunc(TEvService::TEvRunWorker, Handle);
HFunc(TEvService::TEvWorkerDataEnd, Handle);
+ HFunc(TEvService::TEvGetTxId, Handle);
+ HFunc(TEvTxAllocatorClient::TEvAllocateResult, Handle);
HFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
default:
HandleDefaultEvents(ev, SelfId());
@@ -98,6 +100,10 @@ void TController::Cleanup(const TActorContext& ctx) {
CloseSession(nodeId, ctx);
}
+ if (auto actorId = std::exchange(TxAllocatorClient, {})) {
+ Send(actorId, new TEvents::TEvPoison());
+ }
+
NodesManager.Shutdown(ctx);
}
@@ -111,6 +117,10 @@ void TController::SwitchToWork(const TActorContext& ctx) {
DiscoveryCache = ctx.Register(CreateDiscoveryCache());
}
+ if (!TxAllocatorClient) {
+ TxAllocatorClient = ctx.RegisterWithSameMailbox(CreateTxAllocatorClient(AppData(ctx)));
+ }
+
for (auto& [_, replication] : Replications) {
replication->Progress(ctx);
}
@@ -737,6 +747,35 @@ bool TController::MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ct
return true;
}
+void TController::Handle(TEvService::TEvGetTxId::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+
+ const auto nodeId = ev->Sender.NodeId();
+ if (!Sessions.contains(nodeId)) {
+ return;
+ }
+
+ if (Replications.size() != 1) {
+ CLOG_E(ctx, "Cannot assign tx id: ambiguous replication instance");
+ return;
+ }
+
+ const auto& config = Replications.begin()->second->GetConfig();
+ if (!config.HasStrongConsistency()) {
+ CLOG_E(ctx, "Cannot assign tx id: not in strong consistency mode");
+ return;
+ }
+
+ const auto intervalMs = config.GetStrongConsistency().GetCommitIntervalMilliSeconds();
+ for (const auto& version : ev->Get()->Record.GetVersions()) {
+ const ui64 intervalNo = version.GetStep() / intervalMs;
+ const auto adjustedVersion = TRowVersion(intervalMs * (intervalNo + 1), 0);
+ PendingTxId[adjustedVersion].insert(nodeId);
+ }
+
+ RunTxAssignTxId(ctx);
+}
+
void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) {
const ui32 nodeId = ev->Get()->NodeId;
diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h
index b0dbb5deaaa..f4f506758ad 100644
--- a/ydb/core/tx/replication/controller/controller_impl.h
+++ b/ydb/core/tx/replication/controller/controller_impl.h
@@ -11,16 +11,19 @@
#include <ydb/core/base/blobstorage.h>
#include <ydb/core/base/defs.h>
+#include <ydb/core/base/row_version.h>
#include <ydb/core/protos/counters_replication.pb.h>
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tx/replication/service/service.h>
+#include <ydb/core/tx/tx_allocator_client/actor_client.h>
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
#include <util/generic/deque.h>
#include <util/generic/hash.h>
#include <util/generic/hash_set.h>
+#include <util/generic/map.h>
namespace NKikimr::NReplication::NController {
@@ -93,6 +96,8 @@ private:
void Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvWorkerDataEnd::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvService::TEvGetTxId::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx);
void CreateSession(ui32 nodeId, const TActorContext& ctx);
@@ -127,6 +132,7 @@ private:
class TTxDropDstResult;
class TTxResolveSecretResult;
class TTxWorkerError;
+ class TTxAssignTxId;
// tx runners
void RunTxInitSchema(const TActorContext& ctx);
@@ -146,6 +152,7 @@ private:
void RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
void RunTxResolveSecretResult(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx);
void RunTxWorkerError(const TWorkerId& id, const TString& error, const TActorContext& ctx);
+ void RunTxAssignTxId(const TActorContext& ctx);
// other
template <typename T>
@@ -194,6 +201,13 @@ private:
TDeque<TActorId> RequestedDropStream;
THashSet<TActorId> InflightDropStream;
+ TActorId TxAllocatorClient;
+ TDeque<ui64> AllocatedTxIds; // got from tx allocator
+ bool AllocateTxIdInFlight = false;
+ TMap<TRowVersion, ui64> AssignedTxIds; // tx ids assigned to version
+ TMap<TRowVersion, THashSet<ui32>> PendingTxId;
+ bool AssignTxIdInFlight = false;
+
}; // TController
}
diff --git a/ydb/core/tx/replication/controller/schema.h b/ydb/core/tx/replication/controller/schema.h
index 0c54c95c2c0..082e9d1c678 100644
--- a/ydb/core/tx/replication/controller/schema.h
+++ b/ydb/core/tx/replication/controller/schema.h
@@ -61,11 +61,21 @@ struct TControllerSchema: NIceDb::Schema {
using TColumns = TableColumns<ReplicationId, TargetId, Name, State>;
};
+ struct TxIds: Table<5> {
+ struct VersionStep: Column<1, NScheme::NTypeIds::Uint64> {};
+ struct VersionTxId: Column<2, NScheme::NTypeIds::Uint64> {};
+ struct WriteTxId: Column<3, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<VersionStep, VersionTxId>;
+ using TColumns = TableColumns<VersionStep, VersionTxId, WriteTxId>;
+ };
+
using TTables = SchemaTables<
SysParams,
Replications,
Targets,
- SrcStreams
+ SrcStreams,
+ TxIds
>;
}; // TControllerSchema
diff --git a/ydb/core/tx/replication/controller/tx_assign_tx_id.cpp b/ydb/core/tx/replication/controller/tx_assign_tx_id.cpp
new file mode 100644
index 00000000000..1b4c3d1915e
--- /dev/null
+++ b/ydb/core/tx/replication/controller/tx_assign_tx_id.cpp
@@ -0,0 +1,176 @@
+#include "controller_impl.h"
+
+#include <variant>
+
+namespace NKikimr::NReplication::NController {
+
+class TController::TTxAssignTxId: public TTxBase {
+ // TODO(ilnaz): configurable
+ static constexpr ui64 MaxOpenTxIds = 5;
+ static constexpr ui64 MinAllocatedTxIds = 3;
+
+ THashMap<ui32, THolder<TEvService::TEvTxIdResult>> Result;
+ bool TxIdsExhausted = false;
+
+ struct TTxId {
+ ui64 Value;
+
+ TTxId() = default;
+ TTxId(ui64 value)
+ : Value(value)
+ {}
+ };
+
+ enum class EError {
+ TooManyOpenTxIds,
+ TxIdsExhausted,
+ };
+
+ using TAssignResult = std::variant<TTxId, EError>;
+
+ template <typename TAssignFunc>
+ TAssignResult Process(TAssignFunc assignFunc) {
+ TAssignResult result;
+
+ for (auto it = Self->PendingTxId.begin(); it != Self->PendingTxId.end();) {
+ result = assignFunc(it->first);
+ ui64 txId = 0;
+
+ if (const auto* x = std::get_if<TTxId>(&result)) {
+ txId = x->Value;
+ } else {
+ return result;
+ }
+
+ for (const auto nodeId : it->second) {
+ auto& ev = Result[nodeId];
+ if (!ev) {
+ ev.Reset(new TEvService::TEvTxIdResult(Self->TabletID(), Self->Executor()->Generation()));
+ }
+
+ auto& item = *ev->Record.AddVersionTxIds();
+ it->first.Serialize(*item.MutableVersion());
+ item.SetTxId(txId);
+ }
+
+ Self->PendingTxId.erase(it++);
+ }
+
+ return result;
+ }
+
+public:
+ explicit TTxAssignTxId(TController* self)
+ : TTxBase("TxAssignTxId", self)
+ {
+ }
+
+ TTxType GetTxType() const override {
+ return TXTYPE_ASSIGN_TX_ID;
+ }
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ CLOG_D(ctx, "Execute"
+ << ": pending# " << Self->PendingTxId.size()
+ << ", assigned# " << Self->AssignedTxIds.size()
+ << ", allocated# " << Self->AllocatedTxIds.size());
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ auto result = Process([&](const TRowVersion& version) -> TAssignResult {
+ auto it = Self->AssignedTxIds.find(version);
+ if (it != Self->AssignedTxIds.end()) {
+ return it->second;
+ }
+
+ if (Self->AssignedTxIds.size() >= MaxOpenTxIds) {
+ return EError::TooManyOpenTxIds;
+ }
+
+ if (Self->AllocatedTxIds.empty()) {
+ return EError::TxIdsExhausted;
+ }
+
+ const ui64 txId = Self->AllocatedTxIds.front();
+ Self->AllocatedTxIds.pop_front();
+
+ db.Table<Schema::TxIds>().Key(version.Step, version.TxId).Update(
+ NIceDb::TUpdate<Schema::TxIds::WriteTxId>(txId)
+ );
+ it = Self->AssignedTxIds.emplace(version, txId).first;
+
+ return txId;
+ });
+
+ if (const auto* x = std::get_if<EError>(&result); x && *x == EError::TxIdsExhausted) {
+ TxIdsExhausted = true;
+ return true;
+ }
+
+ if (auto it = Self->PendingTxId.rbegin(); it != Self->PendingTxId.rend()) {
+ Y_ABORT_UNLESS(std::holds_alternative<EError>(result));
+ Y_ABORT_UNLESS(std::get<EError>(result) == EError::TooManyOpenTxIds);
+ Y_ABORT_UNLESS(Self->AssignedTxIds.lower_bound(it->first) == Self->AssignedTxIds.end());
+ Y_ABORT_UNLESS(!Self->AssignedTxIds.empty());
+
+ auto nh = Self->AssignedTxIds.extract(Self->AssignedTxIds.rbegin()->first);
+ db.Table<Schema::TxIds>().Key(nh.key().Step, nh.key().TxId).Delete();
+
+ nh.key() = it->first;
+ db.Table<Schema::TxIds>().Key(nh.key().Step, nh.key().TxId).Update(
+ NIceDb::TUpdate<Schema::TxIds::WriteTxId>(nh.mapped())
+ );
+ Self->AssignedTxIds.insert(std::move(nh));
+ }
+
+ result = Process([&](const TRowVersion&) -> TAssignResult {
+ Y_ABORT_UNLESS(!Self->AssignedTxIds.empty());
+ return Self->AssignedTxIds.rbegin()->second;
+ });
+
+ Y_ABORT_UNLESS(std::holds_alternative<TTxId>(result));
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ CLOG_D(ctx, "Complete"
+ << ": pending# " << Self->PendingTxId.size()
+ << ", assigned# " << Self->AssignedTxIds.size()
+ << ", allocated# " << Self->AllocatedTxIds.size()
+ << ", exhausted# " << TxIdsExhausted);
+
+ for (auto& [nodeId, ev] : Result) {
+ ctx.Send(MakeReplicationServiceId(nodeId), std::move(ev));
+ }
+
+ if (!Self->AllocateTxIdInFlight && Self->AllocatedTxIds.size() < MinAllocatedTxIds) {
+ Self->AllocateTxIdInFlight = true;
+ ctx.Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(MaxOpenTxIds));
+ }
+
+ if (!TxIdsExhausted && Self->PendingTxId) {
+ Self->Execute(new TTxAssignTxId(Self), ctx);
+ } else {
+ Self->AssignTxIdInFlight = false;
+ }
+ }
+
+}; // TTxAssignTxId
+
+void TController::RunTxAssignTxId(const TActorContext& ctx) {
+ if (!AssignTxIdInFlight) {
+ AssignTxIdInFlight = true;
+ Execute(new TTxAssignTxId(this), ctx);
+ }
+}
+
+void TController::Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+
+ std::copy(ev->Get()->TxIds.begin(), ev->Get()->TxIds.end(), std::back_inserter(AllocatedTxIds));
+ AllocateTxIdInFlight = false;
+
+ RunTxAssignTxId(ctx);
+}
+
+}
diff --git a/ydb/core/tx/replication/controller/tx_init.cpp b/ydb/core/tx/replication/controller/tx_init.cpp
index 6ce375b5b93..7c0df395a0b 100644
--- a/ydb/core/tx/replication/controller/tx_init.cpp
+++ b/ydb/core/tx/replication/controller/tx_init.cpp
@@ -133,12 +133,37 @@ class TController::TTxInit: public TTxBase {
return true;
}
+ bool LoadTxIds(NIceDb::TNiceDb& db) {
+ auto rowset = db.Table<Schema::TxIds>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ const auto txId = rowset.GetValue<Schema::TxIds::WriteTxId>();
+ const auto version = TRowVersion(
+ rowset.GetValue<Schema::TxIds::VersionStep>(),
+ rowset.GetValue<Schema::TxIds::VersionTxId>()
+ );
+
+ auto res = Self->AssignedTxIds.emplace(version, txId);
+ Y_VERIFY_S(res.second, "Duplicate version: " << version);
+
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
inline bool Load(NIceDb::TNiceDb& db) {
Self->Reset();
return LoadSysParams(db)
&& LoadReplications(db)
&& LoadTargets(db)
- && LoadSrcStreams(db);
+ && LoadSrcStreams(db)
+ && LoadTxIds(db);
}
inline bool Load(NTable::TDatabase& toughDb) {
diff --git a/ydb/core/tx/replication/controller/ut_assign_tx_id/ya.make b/ydb/core/tx/replication/controller/ut_assign_tx_id/ya.make
new file mode 100644
index 00000000000..7f0e71b6753
--- /dev/null
+++ b/ydb/core/tx/replication/controller/ut_assign_tx_id/ya.make
@@ -0,0 +1,20 @@
+UNITTEST_FOR(ydb/core/tx/replication/controller)
+
+FORK_SUBTESTS()
+
+SIZE(MEDIUM)
+
+TIMEOUT(600)
+
+PEERDIR(
+ ydb/core/tx/replication/ut_helpers
+ library/cpp/testing/unittest
+)
+
+SRCS(
+ assign_tx_id_ut.cpp
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make
index 97a07bd54be..6b13c25a8e6 100644
--- a/ydb/core/tx/replication/controller/ya.make
+++ b/ydb/core/tx/replication/controller/ya.make
@@ -10,6 +10,7 @@ PEERDIR(
ydb/core/tx/replication/common
ydb/core/tx/replication/ydb_proxy
ydb/core/tx/scheme_board
+ ydb/core/tx/tx_allocator_client
ydb/core/util
ydb/core/ydb_convert
ydb/services/metadata
@@ -37,6 +38,7 @@ SRCS(
target_table.cpp
target_with_stream.cpp
tenant_resolver.cpp
+ tx_assign_tx_id.cpp
tx_alter_dst_result.cpp
tx_alter_replication.cpp
tx_assign_stream_name.cpp
@@ -61,6 +63,7 @@ YQL_LAST_ABI_VERSION()
END()
RECURSE_FOR_TESTS(
+ ut_assign_tx_id
ut_dst_creator
ut_stream_creator
ut_target_discoverer
diff --git a/ydb/core/tx/replication/service/service.h b/ydb/core/tx/replication/service/service.h
index abde4ab3e7f..136f2dc00e0 100644
--- a/ydb/core/tx/replication/service/service.h
+++ b/ydb/core/tx/replication/service/service.h
@@ -95,6 +95,11 @@ struct TEvService {
struct TEvTxIdResult: public TEventPB<TEvTxIdResult, NKikimrReplication::TEvTxIdResult, EvTxIdResult> {
TEvTxIdResult() = default;
+
+ explicit TEvTxIdResult(ui64 tabletId, ui64 generation) {
+ Record.MutableController()->SetTabletId(tabletId);
+ Record.MutableController()->SetGeneration(generation);
+ }
};
struct TEvHeartbeat: public TEventPB<TEvHeartbeat, NKikimrReplication::TEvHeartbeat, EvHeartbeat> {
diff --git a/ydb/core/tx/replication/ut_helpers/mock_service.cpp b/ydb/core/tx/replication/ut_helpers/mock_service.cpp
new file mode 100644
index 00000000000..5924ef8209c
--- /dev/null
+++ b/ydb/core/tx/replication/ut_helpers/mock_service.cpp
@@ -0,0 +1,48 @@
+#include <ydb/core/base/statestorage.h>
+#include <ydb/core/tx/replication/service/service.h>
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/core/hfunc.h>
+
+namespace NKikimr::NReplication::NTestHelpers {
+
+class TMockService: public TActorBootstrapped<TMockService> {
+ template <typename TEventPtr>
+ void Forward(TEventPtr& ev) {
+ Send(ev->Forward(Edge));
+ }
+
+ void PassAway() override {
+ Send(BoardPublisher, new TEvents::TEvPoison());
+ TActorBootstrapped<TMockService>::PassAway();
+ }
+
+public:
+ explicit TMockService(const TActorId& edge)
+ : Edge(edge)
+ {}
+
+ void Bootstrap() {
+ Become(&TThis::StateWork);
+ BoardPublisher = Register(CreateBoardPublishActor(NService::MakeDiscoveryPath("/Root"), TString(), SelfId(), 0, true));
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvService::TEvHandshake, Forward);
+ hFunc(TEvService::TEvRunWorker, Forward);
+ hFunc(TEvService::TEvStopWorker, Forward);
+ hFunc(TEvService::TEvTxIdResult, Forward);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+private:
+ const TActorId Edge;
+ TActorId BoardPublisher;
+};
+
+IActor* CreateReplicationMockService(const TActorId& edge) {
+ return new TMockService(edge);
+}
+
+}
diff --git a/ydb/core/tx/replication/ut_helpers/mock_service.h b/ydb/core/tx/replication/ut_helpers/mock_service.h
new file mode 100644
index 00000000000..0df1a36d950
--- /dev/null
+++ b/ydb/core/tx/replication/ut_helpers/mock_service.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <ydb/core/base/defs.h>
+
+namespace NKikimr::NReplication::NTestHelpers {
+
+IActor* CreateReplicationMockService(const TActorId& edge);
+
+}
diff --git a/ydb/core/tx/replication/ut_helpers/test_env.h b/ydb/core/tx/replication/ut_helpers/test_env.h
index 29fc9591c16..fcb4dddc935 100644
--- a/ydb/core/tx/replication/ut_helpers/test_env.h
+++ b/ydb/core/tx/replication/ut_helpers/test_env.h
@@ -43,9 +43,7 @@ class TEnv {
auto req = MakeHolder<NSchemeShard::TEvSchemeShard::TEvLogin>();
req->Record.SetUser(user);
req->Record.SetPassword(password);
- ForwardToTablet(*Server.GetRuntime(), schemeShardId, Sender, req.Release());
-
- auto resp = Server.GetRuntime()->GrabEdgeEvent<NSchemeShard::TEvSchemeShard::TEvLoginResult>(Sender);
+ auto resp = Send<NSchemeShard::TEvSchemeShard::TEvLoginResult>(schemeShardId, std::move(req));
UNIT_ASSERT(resp->Get()->Record.GetError().empty());
UNIT_ASSERT(!resp->Get()->Record.GetToken().empty());
}
@@ -181,10 +179,24 @@ public:
template <typename TEvResponse>
auto Send(const TActorId& recipient, THolder<IEventBase> ev) {
- SendAsync(recipient, ev.Release());
+ return Send<TEvResponse>(recipient, ev.Release());
+ }
+
+ void SendAsync(ui64 tabletId, IEventBase* ev) {
+ ForwardToTablet(*Server.GetRuntime(), tabletId, Sender, ev);
+ }
+
+ template <typename TEvResponse>
+ auto Send(ui64 tabletId, IEventBase* ev) {
+ SendAsync(tabletId, ev);
return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender);
}
+ template <typename TEvResponse>
+ auto Send(ui64 tabletId, THolder<IEventBase> ev) {
+ return Send<TEvResponse>(tabletId, ev.Release());
+ }
+
auto& GetRuntime() {
return *Server.GetRuntime();
}
diff --git a/ydb/core/tx/replication/ut_helpers/ya.make b/ydb/core/tx/replication/ut_helpers/ya.make
index d7c9cc6af3a..54d74d6da07 100644
--- a/ydb/core/tx/replication/ut_helpers/ya.make
+++ b/ydb/core/tx/replication/ut_helpers/ya.make
@@ -5,11 +5,13 @@ PEERDIR(
ydb/core/protos
ydb/core/testlib/pg
ydb/core/tx/replication/ydb_proxy
+ ydb/library/actors/core
ydb/public/sdk/cpp/client/ydb_topic
library/cpp/testing/unittest
)
SRCS(
+ mock_service.cpp
test_env.h
test_table.cpp
write_topic.h