diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-12-04 16:14:05 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-04 13:14:05 +0000 |
commit | 962438b11d1510b42d2ec6b6d8d6dec07396a8a9 (patch) | |
tree | 8de9e9a5d7c2748edd14472835a7d1488046d880 | |
parent | 5e818ab38b6631bef0c4a846c4c66262c363ce2a (diff) | |
download | ydb-962438b11d1510b42d2ec6b6d8d6dec07396a8a9.tar.gz |
Assign tx id to version (#11440)
-rw-r--r-- | ydb/core/protos/counters_replication.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/assign_tx_id_ut.cpp | 154 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/controller.cpp | 39 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/controller_impl.h | 14 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/schema.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/tx_assign_tx_id.cpp | 176 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/tx_init.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/ut_assign_tx_id/ya.make | 20 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/ya.make | 3 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/service.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/replication/ut_helpers/mock_service.cpp | 48 | ||||
-rw-r--r-- | ydb/core/tx/replication/ut_helpers/mock_service.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/replication/ut_helpers/test_env.h | 20 | ||||
-rw-r--r-- | ydb/core/tx/replication/ut_helpers/ya.make | 2 |
14 files changed, 524 insertions, 6 deletions
diff --git a/ydb/core/protos/counters_replication.proto b/ydb/core/protos/counters_replication.proto index 4c8827674ef..f3048359e46 100644 --- a/ydb/core/protos/counters_replication.proto +++ b/ydb/core/protos/counters_replication.proto @@ -45,4 +45,5 @@ enum ETxTypes { TXTYPE_ALTER_DST_RESULT = 12 [(TxTypeOpts) = {Name: "TxAlterDstResult"}]; TXTYPE_DESCRIBE_REPLICATION = 13 [(TxTypeOpts) = {Name: "TxDescribeReplication"}]; TXTYPE_WORKER_ERROR = 14 [(TxTypeOpts) = {Name: "TxWorkerError"}]; + TXTYPE_ASSIGN_TX_ID = 15 [(TxTypeOpts) = {Name: "TxAssignTxId"}]; } diff --git a/ydb/core/tx/replication/controller/assign_tx_id_ut.cpp b/ydb/core/tx/replication/controller/assign_tx_id_ut.cpp new file mode 100644 index 00000000000..43c9362a553 --- /dev/null +++ b/ydb/core/tx/replication/controller/assign_tx_id_ut.cpp @@ -0,0 +1,154 @@ +#include "controller_impl.h" + +#include <ydb/core/tx/replication/service/service.h> +#include <ydb/core/tx/replication/ut_helpers/mock_service.h> +#include <ydb/core/tx/replication/ut_helpers/test_env.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/algorithm.h> +#include <util/generic/vector.h> +#include <util/string/printf.h> + +namespace NKikimr::NReplication::NController { + +Y_UNIT_TEST_SUITE(AssignTxId) { + using namespace NTestHelpers; + + void CheckTxIdResult(const NKikimrReplication::TEvTxIdResult& result, const TMap<TRowVersion, ui64>& expected) { + UNIT_ASSERT_VALUES_EQUAL(result.VersionTxIdsSize(), expected.size()); + + int i = 0; + for (const auto& [version, txId] : expected) { + const auto& actual = result.GetVersionTxIds(i++); + UNIT_ASSERT_VALUES_EQUAL(TRowVersion::Parse(actual.GetVersion()), version); + if (txId) { + UNIT_ASSERT_VALUES_EQUAL(actual.GetTxId(), txId); + } + } + } + + Y_UNIT_TEST(Basic) { + TEnv env; + env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE); + + env.GetRuntime().RegisterService( + MakeReplicationServiceId(env.GetRuntime().GetNodeId(0)), + env.GetRuntime().Register(CreateReplicationMockService(env.GetSender())) + ); + + NYdb::NTable::TTableClient client(env.GetDriver(), NYdb::NTable::TClientSettings() + .DiscoveryEndpoint(env.GetEndpoint()) + .Database(env.GetDatabase()) + ); + + auto session = client.CreateSession().GetValueSync().GetSession(); + const auto result = session + .ExecuteSchemeQuery(Sprintf(R"( + CREATE ASYNC REPLICATION `replication` FOR + `/Root/table` AS `/Root/replica` + WITH ( + CONNECTION_STRING = "grpc://%s/?database=/Root", + CONSISTENCY_MODE = "STRONG", + COMMIT_INTERVAL = Interval("PT10S") + ); + )", env.GetEndpoint().c_str())) + .GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto desc = env.GetDescription("/Root/replication"); + const auto& repl = desc.GetPathDescription().GetReplicationDescription(); + const auto tabletId = repl.GetControllerId(); + + const auto& cfg = repl.GetConfig(); + UNIT_ASSERT(cfg.HasStrongConsistency()); + UNIT_ASSERT_VALUES_EQUAL(cfg.GetStrongConsistency().GetCommitIntervalMilliSeconds(), 10000); + + TVector<ui64> txIds; + + { + auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{ + TRowVersion(1, 0), + })); + + const auto& record = ev->Get()->Record; + UNIT_ASSERT_VALUES_EQUAL(record.GetController().GetTabletId(), tabletId); + + CheckTxIdResult(record, { + {TRowVersion(10000, 0), 0}, + }); + + UNIT_ASSERT_VALUES_EQUAL(Count(txIds, record.GetVersionTxIds(0).GetTxId()), 0); + txIds.push_back(record.GetVersionTxIds(0).GetTxId()); + } + { + auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{ + TRowVersion(9999, 0), + })); + + CheckTxIdResult(ev->Get()->Record, { + {TRowVersion(10000, 0), txIds.back()}, + }); + } + { + auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{ + TRowVersion(9999, Max<ui64>()), + })); + + CheckTxIdResult(ev->Get()->Record, { + {TRowVersion(10000, 0), txIds.back()}, + }); + } + { + auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{ + TRowVersion(10000, 0), + })); + + CheckTxIdResult(ev->Get()->Record, { + {TRowVersion(20000, 0), 0}, + }); + + UNIT_ASSERT_VALUES_EQUAL(Count(txIds, ev->Get()->Record.GetVersionTxIds(0).GetTxId()), 0); + txIds.push_back(ev->Get()->Record.GetVersionTxIds(0).GetTxId()); + } + { + auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{ + TRowVersion(5000, 0), + })); + + CheckTxIdResult(ev->Get()->Record, { + {TRowVersion(10000, 0), txIds[0]}, + }); + } + { + auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{ + TRowVersion(20000, 0), + TRowVersion(30000, 0), + TRowVersion(40000, 0), + })); + + CheckTxIdResult(ev->Get()->Record, { + {TRowVersion(30000, 0), 0}, + {TRowVersion(40000, 0), 0}, + {TRowVersion(50000, 0), 0}, + }); + + for (const auto& v : ev->Get()->Record.GetVersionTxIds()) { + UNIT_ASSERT_VALUES_EQUAL(Count(txIds, v.GetTxId()), 0); + txIds.push_back(v.GetTxId()); + } + } + UNIT_ASSERT_VALUES_EQUAL(txIds.size(), 5); + { + auto ev = env.Send<TEvService::TEvTxIdResult>(tabletId, new TEvService::TEvGetTxId(TVector<TRowVersion>{ + TRowVersion(50000, 0), + })); + + CheckTxIdResult(ev->Get()->Record, { + {TRowVersion(60000, 0), txIds.back()}, + }); + } + } +} + +} diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index 92b735829f4..26e96e8f0fa 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -79,6 +79,8 @@ STFUNC(TController::StateWork) { HFunc(TEvService::TEvWorkerStatus, Handle); HFunc(TEvService::TEvRunWorker, Handle); HFunc(TEvService::TEvWorkerDataEnd, Handle); + HFunc(TEvService::TEvGetTxId, Handle); + HFunc(TEvTxAllocatorClient::TEvAllocateResult, Handle); HFunc(TEvInterconnect::TEvNodeDisconnected, Handle); default: HandleDefaultEvents(ev, SelfId()); @@ -98,6 +100,10 @@ void TController::Cleanup(const TActorContext& ctx) { CloseSession(nodeId, ctx); } + if (auto actorId = std::exchange(TxAllocatorClient, {})) { + Send(actorId, new TEvents::TEvPoison()); + } + NodesManager.Shutdown(ctx); } @@ -111,6 +117,10 @@ void TController::SwitchToWork(const TActorContext& ctx) { DiscoveryCache = ctx.Register(CreateDiscoveryCache()); } + if (!TxAllocatorClient) { + TxAllocatorClient = ctx.RegisterWithSameMailbox(CreateTxAllocatorClient(AppData(ctx))); + } + for (auto& [_, replication] : Replications) { replication->Progress(ctx); } @@ -737,6 +747,35 @@ bool TController::MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ct return true; } +void TController::Handle(TEvService::TEvGetTxId::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + + const auto nodeId = ev->Sender.NodeId(); + if (!Sessions.contains(nodeId)) { + return; + } + + if (Replications.size() != 1) { + CLOG_E(ctx, "Cannot assign tx id: ambiguous replication instance"); + return; + } + + const auto& config = Replications.begin()->second->GetConfig(); + if (!config.HasStrongConsistency()) { + CLOG_E(ctx, "Cannot assign tx id: not in strong consistency mode"); + return; + } + + const auto intervalMs = config.GetStrongConsistency().GetCommitIntervalMilliSeconds(); + for (const auto& version : ev->Get()->Record.GetVersions()) { + const ui64 intervalNo = version.GetStep() / intervalMs; + const auto adjustedVersion = TRowVersion(intervalMs * (intervalNo + 1), 0); + PendingTxId[adjustedVersion].insert(nodeId); + } + + RunTxAssignTxId(ctx); +} + void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) { const ui32 nodeId = ev->Get()->NodeId; diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index b0dbb5deaaa..f4f506758ad 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -11,16 +11,19 @@ #include <ydb/core/base/blobstorage.h> #include <ydb/core/base/defs.h> +#include <ydb/core/base/row_version.h> #include <ydb/core/protos/counters_replication.pb.h> #include <ydb/core/tablet/tablet_counters.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/tx/replication/service/service.h> +#include <ydb/core/tx/tx_allocator_client/actor_client.h> #include <ydb/library/actors/core/interconnect.h> #include <ydb/library/yverify_stream/yverify_stream.h> #include <util/generic/deque.h> #include <util/generic/hash.h> #include <util/generic/hash_set.h> +#include <util/generic/map.h> namespace NKikimr::NReplication::NController { @@ -93,6 +96,8 @@ private: void Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorContext& ctx); void Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx); void Handle(TEvService::TEvWorkerDataEnd::TPtr& ev, const TActorContext& ctx); + void Handle(TEvService::TEvGetTxId::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx); void CreateSession(ui32 nodeId, const TActorContext& ctx); @@ -127,6 +132,7 @@ private: class TTxDropDstResult; class TTxResolveSecretResult; class TTxWorkerError; + class TTxAssignTxId; // tx runners void RunTxInitSchema(const TActorContext& ctx); @@ -146,6 +152,7 @@ private: void RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx); void RunTxResolveSecretResult(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx); void RunTxWorkerError(const TWorkerId& id, const TString& error, const TActorContext& ctx); + void RunTxAssignTxId(const TActorContext& ctx); // other template <typename T> @@ -194,6 +201,13 @@ private: TDeque<TActorId> RequestedDropStream; THashSet<TActorId> InflightDropStream; + TActorId TxAllocatorClient; + TDeque<ui64> AllocatedTxIds; // got from tx allocator + bool AllocateTxIdInFlight = false; + TMap<TRowVersion, ui64> AssignedTxIds; // tx ids assigned to version + TMap<TRowVersion, THashSet<ui32>> PendingTxId; + bool AssignTxIdInFlight = false; + }; // TController } diff --git a/ydb/core/tx/replication/controller/schema.h b/ydb/core/tx/replication/controller/schema.h index 0c54c95c2c0..082e9d1c678 100644 --- a/ydb/core/tx/replication/controller/schema.h +++ b/ydb/core/tx/replication/controller/schema.h @@ -61,11 +61,21 @@ struct TControllerSchema: NIceDb::Schema { using TColumns = TableColumns<ReplicationId, TargetId, Name, State>; }; + struct TxIds: Table<5> { + struct VersionStep: Column<1, NScheme::NTypeIds::Uint64> {}; + struct VersionTxId: Column<2, NScheme::NTypeIds::Uint64> {}; + struct WriteTxId: Column<3, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<VersionStep, VersionTxId>; + using TColumns = TableColumns<VersionStep, VersionTxId, WriteTxId>; + }; + using TTables = SchemaTables< SysParams, Replications, Targets, - SrcStreams + SrcStreams, + TxIds >; }; // TControllerSchema diff --git a/ydb/core/tx/replication/controller/tx_assign_tx_id.cpp b/ydb/core/tx/replication/controller/tx_assign_tx_id.cpp new file mode 100644 index 00000000000..1b4c3d1915e --- /dev/null +++ b/ydb/core/tx/replication/controller/tx_assign_tx_id.cpp @@ -0,0 +1,176 @@ +#include "controller_impl.h" + +#include <variant> + +namespace NKikimr::NReplication::NController { + +class TController::TTxAssignTxId: public TTxBase { + // TODO(ilnaz): configurable + static constexpr ui64 MaxOpenTxIds = 5; + static constexpr ui64 MinAllocatedTxIds = 3; + + THashMap<ui32, THolder<TEvService::TEvTxIdResult>> Result; + bool TxIdsExhausted = false; + + struct TTxId { + ui64 Value; + + TTxId() = default; + TTxId(ui64 value) + : Value(value) + {} + }; + + enum class EError { + TooManyOpenTxIds, + TxIdsExhausted, + }; + + using TAssignResult = std::variant<TTxId, EError>; + + template <typename TAssignFunc> + TAssignResult Process(TAssignFunc assignFunc) { + TAssignResult result; + + for (auto it = Self->PendingTxId.begin(); it != Self->PendingTxId.end();) { + result = assignFunc(it->first); + ui64 txId = 0; + + if (const auto* x = std::get_if<TTxId>(&result)) { + txId = x->Value; + } else { + return result; + } + + for (const auto nodeId : it->second) { + auto& ev = Result[nodeId]; + if (!ev) { + ev.Reset(new TEvService::TEvTxIdResult(Self->TabletID(), Self->Executor()->Generation())); + } + + auto& item = *ev->Record.AddVersionTxIds(); + it->first.Serialize(*item.MutableVersion()); + item.SetTxId(txId); + } + + Self->PendingTxId.erase(it++); + } + + return result; + } + +public: + explicit TTxAssignTxId(TController* self) + : TTxBase("TxAssignTxId", self) + { + } + + TTxType GetTxType() const override { + return TXTYPE_ASSIGN_TX_ID; + } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + CLOG_D(ctx, "Execute" + << ": pending# " << Self->PendingTxId.size() + << ", assigned# " << Self->AssignedTxIds.size() + << ", allocated# " << Self->AllocatedTxIds.size()); + + NIceDb::TNiceDb db(txc.DB); + + auto result = Process([&](const TRowVersion& version) -> TAssignResult { + auto it = Self->AssignedTxIds.find(version); + if (it != Self->AssignedTxIds.end()) { + return it->second; + } + + if (Self->AssignedTxIds.size() >= MaxOpenTxIds) { + return EError::TooManyOpenTxIds; + } + + if (Self->AllocatedTxIds.empty()) { + return EError::TxIdsExhausted; + } + + const ui64 txId = Self->AllocatedTxIds.front(); + Self->AllocatedTxIds.pop_front(); + + db.Table<Schema::TxIds>().Key(version.Step, version.TxId).Update( + NIceDb::TUpdate<Schema::TxIds::WriteTxId>(txId) + ); + it = Self->AssignedTxIds.emplace(version, txId).first; + + return txId; + }); + + if (const auto* x = std::get_if<EError>(&result); x && *x == EError::TxIdsExhausted) { + TxIdsExhausted = true; + return true; + } + + if (auto it = Self->PendingTxId.rbegin(); it != Self->PendingTxId.rend()) { + Y_ABORT_UNLESS(std::holds_alternative<EError>(result)); + Y_ABORT_UNLESS(std::get<EError>(result) == EError::TooManyOpenTxIds); + Y_ABORT_UNLESS(Self->AssignedTxIds.lower_bound(it->first) == Self->AssignedTxIds.end()); + Y_ABORT_UNLESS(!Self->AssignedTxIds.empty()); + + auto nh = Self->AssignedTxIds.extract(Self->AssignedTxIds.rbegin()->first); + db.Table<Schema::TxIds>().Key(nh.key().Step, nh.key().TxId).Delete(); + + nh.key() = it->first; + db.Table<Schema::TxIds>().Key(nh.key().Step, nh.key().TxId).Update( + NIceDb::TUpdate<Schema::TxIds::WriteTxId>(nh.mapped()) + ); + Self->AssignedTxIds.insert(std::move(nh)); + } + + result = Process([&](const TRowVersion&) -> TAssignResult { + Y_ABORT_UNLESS(!Self->AssignedTxIds.empty()); + return Self->AssignedTxIds.rbegin()->second; + }); + + Y_ABORT_UNLESS(std::holds_alternative<TTxId>(result)); + return true; + } + + void Complete(const TActorContext& ctx) override { + CLOG_D(ctx, "Complete" + << ": pending# " << Self->PendingTxId.size() + << ", assigned# " << Self->AssignedTxIds.size() + << ", allocated# " << Self->AllocatedTxIds.size() + << ", exhausted# " << TxIdsExhausted); + + for (auto& [nodeId, ev] : Result) { + ctx.Send(MakeReplicationServiceId(nodeId), std::move(ev)); + } + + if (!Self->AllocateTxIdInFlight && Self->AllocatedTxIds.size() < MinAllocatedTxIds) { + Self->AllocateTxIdInFlight = true; + ctx.Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(MaxOpenTxIds)); + } + + if (!TxIdsExhausted && Self->PendingTxId) { + Self->Execute(new TTxAssignTxId(Self), ctx); + } else { + Self->AssignTxIdInFlight = false; + } + } + +}; // TTxAssignTxId + +void TController::RunTxAssignTxId(const TActorContext& ctx) { + if (!AssignTxIdInFlight) { + AssignTxIdInFlight = true; + Execute(new TTxAssignTxId(this), ctx); + } +} + +void TController::Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + + std::copy(ev->Get()->TxIds.begin(), ev->Get()->TxIds.end(), std::back_inserter(AllocatedTxIds)); + AllocateTxIdInFlight = false; + + RunTxAssignTxId(ctx); +} + +} diff --git a/ydb/core/tx/replication/controller/tx_init.cpp b/ydb/core/tx/replication/controller/tx_init.cpp index 6ce375b5b93..7c0df395a0b 100644 --- a/ydb/core/tx/replication/controller/tx_init.cpp +++ b/ydb/core/tx/replication/controller/tx_init.cpp @@ -133,12 +133,37 @@ class TController::TTxInit: public TTxBase { return true; } + bool LoadTxIds(NIceDb::TNiceDb& db) { + auto rowset = db.Table<Schema::TxIds>().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + const auto txId = rowset.GetValue<Schema::TxIds::WriteTxId>(); + const auto version = TRowVersion( + rowset.GetValue<Schema::TxIds::VersionStep>(), + rowset.GetValue<Schema::TxIds::VersionTxId>() + ); + + auto res = Self->AssignedTxIds.emplace(version, txId); + Y_VERIFY_S(res.second, "Duplicate version: " << version); + + if (!rowset.Next()) { + return false; + } + } + + return true; + } + inline bool Load(NIceDb::TNiceDb& db) { Self->Reset(); return LoadSysParams(db) && LoadReplications(db) && LoadTargets(db) - && LoadSrcStreams(db); + && LoadSrcStreams(db) + && LoadTxIds(db); } inline bool Load(NTable::TDatabase& toughDb) { diff --git a/ydb/core/tx/replication/controller/ut_assign_tx_id/ya.make b/ydb/core/tx/replication/controller/ut_assign_tx_id/ya.make new file mode 100644 index 00000000000..7f0e71b6753 --- /dev/null +++ b/ydb/core/tx/replication/controller/ut_assign_tx_id/ya.make @@ -0,0 +1,20 @@ +UNITTEST_FOR(ydb/core/tx/replication/controller) + +FORK_SUBTESTS() + +SIZE(MEDIUM) + +TIMEOUT(600) + +PEERDIR( + ydb/core/tx/replication/ut_helpers + library/cpp/testing/unittest +) + +SRCS( + assign_tx_id_ut.cpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make index 97a07bd54be..6b13c25a8e6 100644 --- a/ydb/core/tx/replication/controller/ya.make +++ b/ydb/core/tx/replication/controller/ya.make @@ -10,6 +10,7 @@ PEERDIR( ydb/core/tx/replication/common ydb/core/tx/replication/ydb_proxy ydb/core/tx/scheme_board + ydb/core/tx/tx_allocator_client ydb/core/util ydb/core/ydb_convert ydb/services/metadata @@ -37,6 +38,7 @@ SRCS( target_table.cpp target_with_stream.cpp tenant_resolver.cpp + tx_assign_tx_id.cpp tx_alter_dst_result.cpp tx_alter_replication.cpp tx_assign_stream_name.cpp @@ -61,6 +63,7 @@ YQL_LAST_ABI_VERSION() END() RECURSE_FOR_TESTS( + ut_assign_tx_id ut_dst_creator ut_stream_creator ut_target_discoverer diff --git a/ydb/core/tx/replication/service/service.h b/ydb/core/tx/replication/service/service.h index abde4ab3e7f..136f2dc00e0 100644 --- a/ydb/core/tx/replication/service/service.h +++ b/ydb/core/tx/replication/service/service.h @@ -95,6 +95,11 @@ struct TEvService { struct TEvTxIdResult: public TEventPB<TEvTxIdResult, NKikimrReplication::TEvTxIdResult, EvTxIdResult> { TEvTxIdResult() = default; + + explicit TEvTxIdResult(ui64 tabletId, ui64 generation) { + Record.MutableController()->SetTabletId(tabletId); + Record.MutableController()->SetGeneration(generation); + } }; struct TEvHeartbeat: public TEventPB<TEvHeartbeat, NKikimrReplication::TEvHeartbeat, EvHeartbeat> { diff --git a/ydb/core/tx/replication/ut_helpers/mock_service.cpp b/ydb/core/tx/replication/ut_helpers/mock_service.cpp new file mode 100644 index 00000000000..5924ef8209c --- /dev/null +++ b/ydb/core/tx/replication/ut_helpers/mock_service.cpp @@ -0,0 +1,48 @@ +#include <ydb/core/base/statestorage.h> +#include <ydb/core/tx/replication/service/service.h> +#include <ydb/library/actors/core/actor_bootstrapped.h> +#include <ydb/library/actors/core/hfunc.h> + +namespace NKikimr::NReplication::NTestHelpers { + +class TMockService: public TActorBootstrapped<TMockService> { + template <typename TEventPtr> + void Forward(TEventPtr& ev) { + Send(ev->Forward(Edge)); + } + + void PassAway() override { + Send(BoardPublisher, new TEvents::TEvPoison()); + TActorBootstrapped<TMockService>::PassAway(); + } + +public: + explicit TMockService(const TActorId& edge) + : Edge(edge) + {} + + void Bootstrap() { + Become(&TThis::StateWork); + BoardPublisher = Register(CreateBoardPublishActor(NService::MakeDiscoveryPath("/Root"), TString(), SelfId(), 0, true)); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvService::TEvHandshake, Forward); + hFunc(TEvService::TEvRunWorker, Forward); + hFunc(TEvService::TEvStopWorker, Forward); + hFunc(TEvService::TEvTxIdResult, Forward); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TActorId Edge; + TActorId BoardPublisher; +}; + +IActor* CreateReplicationMockService(const TActorId& edge) { + return new TMockService(edge); +} + +} diff --git a/ydb/core/tx/replication/ut_helpers/mock_service.h b/ydb/core/tx/replication/ut_helpers/mock_service.h new file mode 100644 index 00000000000..0df1a36d950 --- /dev/null +++ b/ydb/core/tx/replication/ut_helpers/mock_service.h @@ -0,0 +1,9 @@ +#pragma once + +#include <ydb/core/base/defs.h> + +namespace NKikimr::NReplication::NTestHelpers { + +IActor* CreateReplicationMockService(const TActorId& edge); + +} diff --git a/ydb/core/tx/replication/ut_helpers/test_env.h b/ydb/core/tx/replication/ut_helpers/test_env.h index 29fc9591c16..fcb4dddc935 100644 --- a/ydb/core/tx/replication/ut_helpers/test_env.h +++ b/ydb/core/tx/replication/ut_helpers/test_env.h @@ -43,9 +43,7 @@ class TEnv { auto req = MakeHolder<NSchemeShard::TEvSchemeShard::TEvLogin>(); req->Record.SetUser(user); req->Record.SetPassword(password); - ForwardToTablet(*Server.GetRuntime(), schemeShardId, Sender, req.Release()); - - auto resp = Server.GetRuntime()->GrabEdgeEvent<NSchemeShard::TEvSchemeShard::TEvLoginResult>(Sender); + auto resp = Send<NSchemeShard::TEvSchemeShard::TEvLoginResult>(schemeShardId, std::move(req)); UNIT_ASSERT(resp->Get()->Record.GetError().empty()); UNIT_ASSERT(!resp->Get()->Record.GetToken().empty()); } @@ -181,10 +179,24 @@ public: template <typename TEvResponse> auto Send(const TActorId& recipient, THolder<IEventBase> ev) { - SendAsync(recipient, ev.Release()); + return Send<TEvResponse>(recipient, ev.Release()); + } + + void SendAsync(ui64 tabletId, IEventBase* ev) { + ForwardToTablet(*Server.GetRuntime(), tabletId, Sender, ev); + } + + template <typename TEvResponse> + auto Send(ui64 tabletId, IEventBase* ev) { + SendAsync(tabletId, ev); return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender); } + template <typename TEvResponse> + auto Send(ui64 tabletId, THolder<IEventBase> ev) { + return Send<TEvResponse>(tabletId, ev.Release()); + } + auto& GetRuntime() { return *Server.GetRuntime(); } diff --git a/ydb/core/tx/replication/ut_helpers/ya.make b/ydb/core/tx/replication/ut_helpers/ya.make index d7c9cc6af3a..54d74d6da07 100644 --- a/ydb/core/tx/replication/ut_helpers/ya.make +++ b/ydb/core/tx/replication/ut_helpers/ya.make @@ -5,11 +5,13 @@ PEERDIR( ydb/core/protos ydb/core/testlib/pg ydb/core/tx/replication/ydb_proxy + ydb/library/actors/core ydb/public/sdk/cpp/client/ydb_topic library/cpp/testing/unittest ) SRCS( + mock_service.cpp test_env.h test_table.cpp write_topic.h |