diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-02-22 14:53:01 +0300 |
---|---|---|
committer | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-02-22 14:53:01 +0300 |
commit | 3aba9cd5716dd2f91a9f8b11998e244d8c2357d2 (patch) | |
tree | d2092cb220c0be7fcf243ebb69aa87722a0a4557 | |
parent | bc2ba82f561fdad8ca5e0c23d9e61d83fed26641 (diff) | |
download | ydb-3aba9cd5716dd2f91a9f8b11998e244d8c2357d2.tar.gz |
Mediator timecast readstep subscription support, KIKIMR-13910
ref:19ba66f6d071c2d8fb2124b44149b36962177a24
-rw-r--r-- | ydb/core/tx/time_cast/time_cast.cpp | 281 | ||||
-rw-r--r-- | ydb/core/tx/time_cast/time_cast.h | 120 | ||||
-rw-r--r-- | ydb/core/tx/time_cast/time_cast_ut.cpp | 84 | ||||
-rw-r--r-- | ydb/core/tx/time_cast/ut/ya.make | 26 | ||||
-rw-r--r-- | ydb/core/tx/time_cast/ya.make | 4 |
5 files changed, 505 insertions, 10 deletions
diff --git a/ydb/core/tx/time_cast/time_cast.cpp b/ydb/core/tx/time_cast/time_cast.cpp index 08042194fc..32047b725a 100644 --- a/ydb/core/tx/time_cast/time_cast.cpp +++ b/ydb/core/tx/time_cast/time_cast.cpp @@ -15,6 +15,9 @@ namespace NKikimr { +// We will unsubscribe from idle coordinators after 5 minutes +static constexpr TDuration MaxIdleCoordinatorSubscriptionTime = TDuration::Minutes(5); + ui64 TMediatorTimecastEntry::Get(ui64 tabletId) const { Y_UNUSED(tabletId); return AtomicGet(Step); @@ -72,9 +75,29 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { ui32 RefCount = 0; }; + struct TCoordinatorSubscriber { + TSet<ui64> Coordinators; + }; + + struct TCoordinator { + TMediatorTimecastReadStep::TPtr ReadStep = new TMediatorTimecastReadStep; + TActorId PipeClient; + ui64 LastSentSeqNo = 0; + ui64 LastConfirmedSeqNo = 0; + ui64 LastObservedReadStep = 0; + THashSet<TActorId> Subscribers; + TMap<std::pair<ui64, TActorId>, ui64> SubscribeRequests; // (seqno, subscriber) -> Cookie + TMap<std::pair<ui64, TActorId>, ui64> WaitRequests; // (step, subscriber) -> Cookie + TMonotonic IdleStart; + }; + THashMap<ui64, TMediator> Mediators; // mediator tablet -> info THashMap<ui64, TTabletInfo> Tablets; + ui64 LastSeqNo = 0; + THashMap<ui64, TCoordinator> Coordinators; + THashMap<TActorId, TCoordinatorSubscriber> CoordinatorSubscribers; + TMediator& MediatorInfo(ui64 mediator, const NKikimrSubDomains::TProcessingParams &processing) { auto pr = Mediators.try_emplace(mediator, processing.GetTimeCastBucketsPerMediator()); if (!pr.second) { @@ -118,19 +141,24 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { } void TryResync(const TActorId &pipeClient, ui64 tabletId, const TActorContext &ctx) { - for (auto &xpair : Mediators) { - const ui64 mediatorTabletId = xpair.first; - TMediator &mediator = xpair.second; - - if (mediator.PipeClient == pipeClient) { - Y_VERIFY(tabletId == mediatorTabletId); - mediator.PipeClient = TActorId(); - RegisterMediator(mediatorTabletId, mediator, ctx); - return; - } + ResyncCoordinator(tabletId, pipeClient, ctx); + + auto it = Mediators.find(tabletId); + if (it == Mediators.end()) { + return; + } + + TMediator &mediator = it->second; + if (mediator.PipeClient == pipeClient) { + mediator.PipeClient = TActorId(); + RegisterMediator(tabletId, mediator, ctx); } } + void SyncCoordinator(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx); + void ResyncCoordinator(ui64 coordinatorId, const TActorId &pipeClient, const TActorContext &ctx); + void NotifyCoordinatorWaiters(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx); + void Handle(TEvMediatorTimecast::TEvRegisterTablet::TPtr &ev, const TActorContext &ctx); void Handle(TEvMediatorTimecast::TEvUnregisterTablet::TPtr &ev, const TActorContext &ctx); void Handle(TEvMediatorTimecast::TEvWaitPlanStep::TPtr &ev, const TActorContext &ctx); @@ -138,6 +166,15 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> { void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx); + // Client requests for readstep subscriptions + void Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx); + void Handle(TEvMediatorTimecast::TEvUnsubscribeReadStep::TPtr &ev, const TActorContext &ctx); + void Handle(TEvMediatorTimecast::TEvWaitReadStep::TPtr &ev, const TActorContext &ctx); + + // Coordinator replies for readstep subscriptions + void Handle(TEvTxProxy::TEvSubscribeReadStepResult::TPtr &ev, const TActorContext &ctx); + void Handle(TEvTxProxy::TEvSubscribeReadStepUpdate::TPtr &ev, const TActorContext &ctx); + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::TX_MEDIATOR_ACTOR; @@ -154,6 +191,13 @@ public: HFunc(TEvMediatorTimecast::TEvWaitPlanStep, Handle); HFunc(TEvMediatorTimecast::TEvUpdate, Handle); + HFunc(TEvMediatorTimecast::TEvSubscribeReadStep, Handle); + HFunc(TEvMediatorTimecast::TEvUnsubscribeReadStep, Handle); + HFunc(TEvMediatorTimecast::TEvWaitReadStep, Handle); + + HFunc(TEvTxProxy::TEvSubscribeReadStepResult, Handle); + HFunc(TEvTxProxy::TEvSubscribeReadStepUpdate, Handle); + HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); } @@ -297,6 +341,223 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co } } +void TMediatorTimecastProxy::SyncCoordinator(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx) { + const ui64 seqNo = LastSeqNo; + + if (!coordinator.PipeClient) { + ui64 maxDelay = 100 + TAppData::RandomProvider->GenRand64() % 50; + auto retryPolicy = NTabletPipe::TClientRetryPolicy{ + .RetryLimitCount = 6 /* delays: 0, 10, 20, 40, 80, 100-150 */, + .MinRetryTime = TDuration::MilliSeconds(10), + .MaxRetryTime = TDuration::MilliSeconds(maxDelay), + }; + coordinator.PipeClient = ctx.RegisterWithSameMailbox( + NTabletPipe::CreateClient(ctx.SelfID, coordinatorId, retryPolicy)); + } + + coordinator.LastSentSeqNo = seqNo; + NTabletPipe::SendData(ctx, coordinator.PipeClient, new TEvTxProxy::TEvSubscribeReadStep(coordinatorId, seqNo)); +} + +void TMediatorTimecastProxy::ResyncCoordinator(ui64 coordinatorId, const TActorId &pipeClient, const TActorContext &ctx) { + auto itCoordinator = Coordinators.find(coordinatorId); + if (itCoordinator == Coordinators.end()) { + return; + } + + auto &coordinator = itCoordinator->second; + if (coordinator.PipeClient != pipeClient) { + return; + } + + coordinator.PipeClient = TActorId(); + if (coordinator.Subscribers.empty()) { + // Just forget disconnected idle coordinators + Coordinators.erase(itCoordinator); + return; + } + + ++LastSeqNo; + SyncCoordinator(coordinatorId, coordinator, ctx); +} + +void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx) { + const auto *msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID + << " HANDLE " << msg->ToString()); + + const ui64 coordinatorId = msg->CoordinatorId; + auto &subscriber = CoordinatorSubscribers[ev->Sender]; + auto &coordinator = Coordinators[coordinatorId]; + subscriber.Coordinators.insert(coordinatorId); + coordinator.Subscribers.insert(ev->Sender); + ui64 seqNo = ++LastSeqNo; + auto key = std::make_pair(seqNo, ev->Sender); + coordinator.SubscribeRequests[key] = ev->Cookie; + SyncCoordinator(coordinatorId, coordinator, ctx); +} + +void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUnsubscribeReadStep::TPtr &ev, const TActorContext &ctx) { + const auto *msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID + << " HANDLE " << msg->ToString()); + + auto &subscriber = CoordinatorSubscribers[ev->Sender]; + if (msg->CoordinatorId == 0) { + // Unsubscribe from all coordinators + for (ui64 coordinatorId : subscriber.Coordinators) { + auto &coordinator = Coordinators[coordinatorId]; + coordinator.Subscribers.erase(ev->Sender); + if (coordinator.Subscribers.empty()) { + coordinator.IdleStart = ctx.Monotonic(); + } + } + subscriber.Coordinators.clear(); + } else if (subscriber.Coordinators.contains(msg->CoordinatorId)) { + // Unsubscribe from specific coordinator + auto &coordinator = Coordinators[msg->CoordinatorId]; + coordinator.Subscribers.erase(ev->Sender); + if (coordinator.Subscribers.empty()) { + coordinator.IdleStart = ctx.Monotonic(); + } + subscriber.Coordinators.erase(msg->CoordinatorId); + } + + if (subscriber.Coordinators.empty()) { + // Don't track unnecessary subscribers + CoordinatorSubscribers.erase(ev->Sender); + } +} + +void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvWaitReadStep::TPtr &ev, const TActorContext &ctx) { + const auto *msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID + << " HANDLE " << msg->ToString()); + + const ui64 coordinatorId = msg->CoordinatorId; + auto itCoordinator = Coordinators.find(coordinatorId); + if (itCoordinator == Coordinators.end()) { + return; + } + auto &coordinator = itCoordinator->second; + + if (!coordinator.Subscribers.contains(ev->Sender)) { + return; + } + + const ui64 step = coordinator.LastObservedReadStep; + const ui64 waitReadStep = msg->ReadStep; + if (waitReadStep <= step) { + ctx.Send(ev->Sender, + new TEvMediatorTimecast::TEvNotifyReadStep(coordinatorId, step), + 0, ev->Cookie); + return; + } + + auto key = std::make_pair(waitReadStep, ev->Sender); + coordinator.WaitRequests[key] = ev->Cookie; +} + +void TMediatorTimecastProxy::Handle(TEvTxProxy::TEvSubscribeReadStepResult::TPtr &ev, const TActorContext &ctx) { + const auto &record = ev->Get()->Record; + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID + << " HANDLE TEvSubscribeReadStepResult " << record.ShortDebugString()); + + const ui64 coordinatorId = record.GetCoordinatorID(); + auto itCoordinator = Coordinators.find(coordinatorId); + if (itCoordinator == Coordinators.end()) { + return; + } + auto &coordinator = itCoordinator->second; + + bool updated = false; + const ui64 nextReadStep = record.GetNextAcquireStep(); + if (coordinator.LastObservedReadStep < nextReadStep) { + coordinator.LastObservedReadStep = nextReadStep; + coordinator.ReadStep->Update(nextReadStep); + updated = true; + } + + const ui64 seqNo = record.GetSeqNo(); + if (coordinator.LastConfirmedSeqNo < seqNo) { + coordinator.LastConfirmedSeqNo = seqNo; + + const ui64 lastReadStep = record.GetLastAcquireStep(); + for (auto it = coordinator.SubscribeRequests.begin(); it != coordinator.SubscribeRequests.end();) { + const ui64 waitSeqNo = it->first.first; + if (seqNo < waitSeqNo) { + break; + } + const TActorId subscriberId = it->first.second; + const ui64 cookie = it->second; + if (coordinator.Subscribers.contains(subscriberId)) { + ctx.Send(subscriberId, + new TEvMediatorTimecast::TEvSubscribeReadStepResult( + coordinatorId, + lastReadStep, + coordinator.ReadStep), + 0, cookie); + } + it = coordinator.SubscribeRequests.erase(it); + } + } + + if (updated) { + NotifyCoordinatorWaiters(coordinatorId, coordinator, ctx); + } +} + +void TMediatorTimecastProxy::Handle(TEvTxProxy::TEvSubscribeReadStepUpdate::TPtr &ev, const TActorContext &ctx) { + const auto &record = ev->Get()->Record; + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID + << " HANDLE TEvSubscribeReadStepUpdate " << record.ShortDebugString()); + + const ui64 coordinatorId = record.GetCoordinatorID(); + auto itCoordinator = Coordinators.find(coordinatorId); + if (itCoordinator == Coordinators.end()) { + return; + } + auto &coordinator = itCoordinator->second; + + const ui64 nextReadStep = record.GetNextAcquireStep(); + if (coordinator.LastObservedReadStep < nextReadStep) { + coordinator.LastObservedReadStep = nextReadStep; + coordinator.ReadStep->Update(nextReadStep); + + NotifyCoordinatorWaiters(coordinatorId, coordinator, ctx); + } + + // Unsubscribe from idle coordinators + if (coordinator.Subscribers.empty() && (ctx.Monotonic() - coordinator.IdleStart) >= MaxIdleCoordinatorSubscriptionTime) { + if (coordinator.PipeClient) { + NTabletPipe::CloseClient(ctx, coordinator.PipeClient); + coordinator.PipeClient = TActorId(); + } + + Coordinators.erase(itCoordinator); + } +} + +void TMediatorTimecastProxy::NotifyCoordinatorWaiters(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx) { + const ui64 step = coordinator.LastObservedReadStep; + for (auto it = coordinator.WaitRequests.begin(); it != coordinator.WaitRequests.end();) { + const ui64 waitStep = it->first.first; + if (step < waitStep) { + break; + } + const TActorId subscriberId = it->first.second; + const ui64 cookie = it->second; + if (coordinator.Subscribers.contains(subscriberId)) { + ctx.Send(subscriberId, + new TEvMediatorTimecast::TEvNotifyReadStep(coordinatorId, step), + 0, cookie); + } + it = coordinator.WaitRequests.erase(it); + } +} + + + IActor* CreateMediatorTimecastProxy() { return new TMediatorTimecastProxy(); } diff --git a/ydb/core/tx/time_cast/time_cast.h b/ydb/core/tx/time_cast/time_cast.h index 1202d5a799..9771a462c6 100644 --- a/ydb/core/tx/time_cast/time_cast.h +++ b/ydb/core/tx/time_cast/time_cast.h @@ -21,15 +21,41 @@ public: void Update(ui64 step, ui64 *exemption, ui64 exsz); }; +class TMediatorTimecastReadStep : public TThrRefBase { +public: + using TPtr = TIntrusivePtr<TMediatorTimecastReadStep>; + using TCPtr = TIntrusiveConstPtr<TMediatorTimecastReadStep>; + + TMediatorTimecastReadStep(ui64 nextReadStep = 0) + : NextReadStep{ nextReadStep } + { } + + ui64 Get() const { + return NextReadStep.load(); + } + + void Update(ui64 nextReadStep) { + NextReadStep.store(nextReadStep); + } + +private: + std::atomic<ui64> NextReadStep; +}; + struct TEvMediatorTimecast { enum EEv { // local part EvRegisterTablet = EventSpaceBegin(TKikimrEvents::ES_TX_MEDIATORTIMECAST), EvUnregisterTablet, EvWaitPlanStep, + EvSubscribeReadStep, + EvUnsubscribeReadStep, + EvWaitReadStep, EvRegisterTabletResult = EvRegisterTablet + 1 * 512, EvNotifyPlanStep, + EvSubscribeReadStepResult, + EvNotifyReadStep, // mediator part EvWatch = EvRegisterTablet + 2 * 512, @@ -138,6 +164,100 @@ struct TEvMediatorTimecast { } }; + struct TEvSubscribeReadStep : public TEventLocal<TEvSubscribeReadStep, EvSubscribeReadStep> { + const ui64 CoordinatorId; + + explicit TEvSubscribeReadStep(ui64 coordinatorId) + : CoordinatorId(coordinatorId) + { + Y_VERIFY(coordinatorId != 0); + } + + TString ToString() const { + return TStringBuilder() + << ToStringHeader() << "{" + << " CoordinatorId# " << CoordinatorId + << " }"; + } + }; + + struct TEvUnsubscribeReadStep : public TEventLocal<TEvUnsubscribeReadStep, EvUnsubscribeReadStep> { + const ui64 CoordinatorId; + + explicit TEvUnsubscribeReadStep(ui64 coordinatorId = 0) + : CoordinatorId(coordinatorId) + { } + + TString ToString() const { + return TStringBuilder() + << ToStringHeader() << "{" + << " CoordinatorId# " << CoordinatorId + << " }"; + } + }; + + struct TEvSubscribeReadStepResult : public TEventLocal<TEvSubscribeReadStepResult, EvSubscribeReadStepResult> { + const ui64 CoordinatorId; + const ui64 LastReadStep; + const TMediatorTimecastReadStep::TCPtr ReadStep; + + TEvSubscribeReadStepResult( + ui64 coordinatorId, + ui64 lastReadStep, + TMediatorTimecastReadStep::TCPtr readStep) + : CoordinatorId(coordinatorId) + , LastReadStep(lastReadStep) + , ReadStep(std::move(readStep)) + { + Y_VERIFY(ReadStep); + } + + TString ToString() const { + return TStringBuilder() + << ToStringHeader() << "{" + << " CoordinatorId# " << CoordinatorId + << " LastReadStep# " << LastReadStep + << " ReadStep# " << ReadStep->Get() + << " }"; + } + }; + + struct TEvWaitReadStep : public TEventLocal<TEvWaitReadStep, EvWaitReadStep> { + const ui64 CoordinatorId; + const ui64 ReadStep; + + TEvWaitReadStep(ui64 coordinatorId, ui64 readStep) + : CoordinatorId(coordinatorId) + , ReadStep(readStep) + { } + + TString ToString() const { + return TStringBuilder() + << ToStringHeader() << "{" + << " CoordinatorId# " << CoordinatorId + << " ReadStep# " << ReadStep + << " }"; + } + }; + + struct TEvNotifyReadStep : public TEventLocal<TEvNotifyReadStep, EvNotifyReadStep> { + const ui64 CoordinatorId; + const ui64 ReadStep; + + TEvNotifyReadStep(ui64 coordinatorId, ui64 readStep) + : CoordinatorId(coordinatorId) + , ReadStep(readStep) + { } + + TString ToString() const { + return TStringBuilder() + << ToStringHeader() << "{" + << " CoordinatorId# " << CoordinatorId + << " ReadStep# " << ReadStep + << " }"; + } + }; + struct TEvWatch : public TEventPB<TEvWatch, NKikimrTxMediatorTimecast::TEvWatch, EvWatch> { TEvWatch() {} diff --git a/ydb/core/tx/time_cast/time_cast_ut.cpp b/ydb/core/tx/time_cast/time_cast_ut.cpp new file mode 100644 index 0000000000..0acb2ee128 --- /dev/null +++ b/ydb/core/tx/time_cast/time_cast_ut.cpp @@ -0,0 +1,84 @@ +#include "time_cast.h" +#include <ydb/core/testlib/tablet_helpers.h> +#include <ydb/core/testlib/test_client.h> +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NMediatorTimeCastTest { + + using namespace Tests; + + Y_UNIT_TEST_SUITE(MediatorTimeCast) { + + void SimulateSleep(TTestActorRuntime& runtime, TDuration duration) { + auto sender = runtime.AllocateEdgeActor(); + runtime.Schedule(new IEventHandle(sender, sender, new TEvents::TEvWakeup()), duration); + runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender); + } + + void SendSubscribeRequest(TTestActorRuntime& runtime, const TActorId& sender, ui64 coordinatorId, ui64 cookie = 0) { + auto request = MakeHolder<TEvMediatorTimecast::TEvSubscribeReadStep>(coordinatorId); + runtime.Send(new IEventHandle(MakeMediatorTimecastProxyID(), sender, request.Release(), 0, cookie), 0, true); + } + + TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr WaitSubscribeResult(TTestActorRuntime& runtime, const TActorId& sender) { + return runtime.GrabEdgeEventRethrow<TEvMediatorTimecast::TEvSubscribeReadStepResult>(sender); + } + + void SendWaitRequest(TTestActorRuntime& runtime, const TActorId& sender, ui64 coordinatorId, ui64 readStep, ui64 cookie = 0) { + auto request = MakeHolder<TEvMediatorTimecast::TEvWaitReadStep>(coordinatorId, readStep); + runtime.Send(new IEventHandle(MakeMediatorTimecastProxyID(), sender, request.Release(), 0, cookie), 0, true); + } + + TEvMediatorTimecast::TEvNotifyReadStep::TPtr WaitNotifyResult(TTestActorRuntime& runtime, const TActorId& sender) { + return runtime.GrabEdgeEventRethrow<TEvMediatorTimecast::TEvNotifyReadStep>(sender); + } + + Y_UNIT_TEST(ReadStepSubscribe) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + + auto &runtime = *server->GetRuntime(); + runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NActors::NLog::PRI_DEBUG); + + auto sender = runtime.AllocateEdgeActor(); + ui64 coordinatorId = ChangeStateStorage(Coordinator, server->GetSettings().Domain); + + SendSubscribeRequest(runtime, sender, coordinatorId); + auto result = WaitSubscribeResult(runtime, sender); + auto stepPtr = result->Get()->ReadStep; + + ui64 readStep1 = stepPtr->Get(); + UNIT_ASSERT_GE(readStep1, result->Get()->LastReadStep); + + SimulateSleep(runtime, TDuration::Seconds(5)); + + ui64 readStep2 = stepPtr->Get(); + UNIT_ASSERT_GT(readStep2, readStep1); + + ui64 waitStep = readStep2 + 2000; + SendWaitRequest(runtime, sender, coordinatorId, waitStep); + + { + auto notify = WaitNotifyResult(runtime, sender); + UNIT_ASSERT_GE(notify->Get()->ReadStep, waitStep); + UNIT_ASSERT_GE(notify->Get()->ReadStep, stepPtr->Get()); + } + + ui64 waitStep2 = stepPtr->Get() + 5000; + RebootTablet(runtime, coordinatorId, sender); + SendWaitRequest(runtime, sender, coordinatorId, waitStep2); + + { + auto notify = WaitNotifyResult(runtime, sender); + UNIT_ASSERT_GE(notify->Get()->ReadStep, waitStep2); + UNIT_ASSERT_GE(notify->Get()->ReadStep, stepPtr->Get()); + } + } + + } // Y_UNIT_TEST_SUITE(MediatorTimeCast) + +} // namespace NKikimr::NMediatorTimeCastTest diff --git a/ydb/core/tx/time_cast/ut/ya.make b/ydb/core/tx/time_cast/ut/ya.make new file mode 100644 index 0000000000..1f055a3e2e --- /dev/null +++ b/ydb/core/tx/time_cast/ut/ya.make @@ -0,0 +1,26 @@ +UNITTEST_FOR(ydb/core/tx/time_cast) + +OWNER(g:kikimr) + +IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) + TIMEOUT(3600) + SIZE(LARGE) + TAG(ya:fat) + REQUIREMENTS(ram:16) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +PEERDIR( + ydb/core/testlib + ydb/core/tx +) + +YQL_LAST_ABI_VERSION() + +SRCS( + time_cast_ut.cpp +) + +END() diff --git a/ydb/core/tx/time_cast/ya.make b/ydb/core/tx/time_cast/ya.make index 9c059ac2b9..e8c260833e 100644 --- a/ydb/core/tx/time_cast/ya.make +++ b/ydb/core/tx/time_cast/ya.make @@ -18,3 +18,7 @@ PEERDIR( ) END() + +RECURSE_FOR_TESTS( + ut +) |