aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-02-22 14:53:01 +0300
committerAlexey Borzenkov <snaury@yandex-team.ru>2022-02-22 14:53:01 +0300
commit3aba9cd5716dd2f91a9f8b11998e244d8c2357d2 (patch)
treed2092cb220c0be7fcf243ebb69aa87722a0a4557
parentbc2ba82f561fdad8ca5e0c23d9e61d83fed26641 (diff)
downloadydb-3aba9cd5716dd2f91a9f8b11998e244d8c2357d2.tar.gz
Mediator timecast readstep subscription support, KIKIMR-13910
ref:19ba66f6d071c2d8fb2124b44149b36962177a24
-rw-r--r--ydb/core/tx/time_cast/time_cast.cpp281
-rw-r--r--ydb/core/tx/time_cast/time_cast.h120
-rw-r--r--ydb/core/tx/time_cast/time_cast_ut.cpp84
-rw-r--r--ydb/core/tx/time_cast/ut/ya.make26
-rw-r--r--ydb/core/tx/time_cast/ya.make4
5 files changed, 505 insertions, 10 deletions
diff --git a/ydb/core/tx/time_cast/time_cast.cpp b/ydb/core/tx/time_cast/time_cast.cpp
index 08042194fc..32047b725a 100644
--- a/ydb/core/tx/time_cast/time_cast.cpp
+++ b/ydb/core/tx/time_cast/time_cast.cpp
@@ -15,6 +15,9 @@
namespace NKikimr {
+// We will unsubscribe from idle coordinators after 5 minutes
+static constexpr TDuration MaxIdleCoordinatorSubscriptionTime = TDuration::Minutes(5);
+
ui64 TMediatorTimecastEntry::Get(ui64 tabletId) const {
Y_UNUSED(tabletId);
return AtomicGet(Step);
@@ -72,9 +75,29 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
ui32 RefCount = 0;
};
+ struct TCoordinatorSubscriber {
+ TSet<ui64> Coordinators;
+ };
+
+ struct TCoordinator {
+ TMediatorTimecastReadStep::TPtr ReadStep = new TMediatorTimecastReadStep;
+ TActorId PipeClient;
+ ui64 LastSentSeqNo = 0;
+ ui64 LastConfirmedSeqNo = 0;
+ ui64 LastObservedReadStep = 0;
+ THashSet<TActorId> Subscribers;
+ TMap<std::pair<ui64, TActorId>, ui64> SubscribeRequests; // (seqno, subscriber) -> Cookie
+ TMap<std::pair<ui64, TActorId>, ui64> WaitRequests; // (step, subscriber) -> Cookie
+ TMonotonic IdleStart;
+ };
+
THashMap<ui64, TMediator> Mediators; // mediator tablet -> info
THashMap<ui64, TTabletInfo> Tablets;
+ ui64 LastSeqNo = 0;
+ THashMap<ui64, TCoordinator> Coordinators;
+ THashMap<TActorId, TCoordinatorSubscriber> CoordinatorSubscribers;
+
TMediator& MediatorInfo(ui64 mediator, const NKikimrSubDomains::TProcessingParams &processing) {
auto pr = Mediators.try_emplace(mediator, processing.GetTimeCastBucketsPerMediator());
if (!pr.second) {
@@ -118,19 +141,24 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
}
void TryResync(const TActorId &pipeClient, ui64 tabletId, const TActorContext &ctx) {
- for (auto &xpair : Mediators) {
- const ui64 mediatorTabletId = xpair.first;
- TMediator &mediator = xpair.second;
-
- if (mediator.PipeClient == pipeClient) {
- Y_VERIFY(tabletId == mediatorTabletId);
- mediator.PipeClient = TActorId();
- RegisterMediator(mediatorTabletId, mediator, ctx);
- return;
- }
+ ResyncCoordinator(tabletId, pipeClient, ctx);
+
+ auto it = Mediators.find(tabletId);
+ if (it == Mediators.end()) {
+ return;
+ }
+
+ TMediator &mediator = it->second;
+ if (mediator.PipeClient == pipeClient) {
+ mediator.PipeClient = TActorId();
+ RegisterMediator(tabletId, mediator, ctx);
}
}
+ void SyncCoordinator(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx);
+ void ResyncCoordinator(ui64 coordinatorId, const TActorId &pipeClient, const TActorContext &ctx);
+ void NotifyCoordinatorWaiters(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx);
+
void Handle(TEvMediatorTimecast::TEvRegisterTablet::TPtr &ev, const TActorContext &ctx);
void Handle(TEvMediatorTimecast::TEvUnregisterTablet::TPtr &ev, const TActorContext &ctx);
void Handle(TEvMediatorTimecast::TEvWaitPlanStep::TPtr &ev, const TActorContext &ctx);
@@ -138,6 +166,15 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx);
+ // Client requests for readstep subscriptions
+ void Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvMediatorTimecast::TEvUnsubscribeReadStep::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvMediatorTimecast::TEvWaitReadStep::TPtr &ev, const TActorContext &ctx);
+
+ // Coordinator replies for readstep subscriptions
+ void Handle(TEvTxProxy::TEvSubscribeReadStepResult::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvTxProxy::TEvSubscribeReadStepUpdate::TPtr &ev, const TActorContext &ctx);
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::TX_MEDIATOR_ACTOR;
@@ -154,6 +191,13 @@ public:
HFunc(TEvMediatorTimecast::TEvWaitPlanStep, Handle);
HFunc(TEvMediatorTimecast::TEvUpdate, Handle);
+ HFunc(TEvMediatorTimecast::TEvSubscribeReadStep, Handle);
+ HFunc(TEvMediatorTimecast::TEvUnsubscribeReadStep, Handle);
+ HFunc(TEvMediatorTimecast::TEvWaitReadStep, Handle);
+
+ HFunc(TEvTxProxy::TEvSubscribeReadStepResult, Handle);
+ HFunc(TEvTxProxy::TEvSubscribeReadStepUpdate, Handle);
+
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
}
@@ -297,6 +341,223 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co
}
}
+void TMediatorTimecastProxy::SyncCoordinator(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx) {
+ const ui64 seqNo = LastSeqNo;
+
+ if (!coordinator.PipeClient) {
+ ui64 maxDelay = 100 + TAppData::RandomProvider->GenRand64() % 50;
+ auto retryPolicy = NTabletPipe::TClientRetryPolicy{
+ .RetryLimitCount = 6 /* delays: 0, 10, 20, 40, 80, 100-150 */,
+ .MinRetryTime = TDuration::MilliSeconds(10),
+ .MaxRetryTime = TDuration::MilliSeconds(maxDelay),
+ };
+ coordinator.PipeClient = ctx.RegisterWithSameMailbox(
+ NTabletPipe::CreateClient(ctx.SelfID, coordinatorId, retryPolicy));
+ }
+
+ coordinator.LastSentSeqNo = seqNo;
+ NTabletPipe::SendData(ctx, coordinator.PipeClient, new TEvTxProxy::TEvSubscribeReadStep(coordinatorId, seqNo));
+}
+
+void TMediatorTimecastProxy::ResyncCoordinator(ui64 coordinatorId, const TActorId &pipeClient, const TActorContext &ctx) {
+ auto itCoordinator = Coordinators.find(coordinatorId);
+ if (itCoordinator == Coordinators.end()) {
+ return;
+ }
+
+ auto &coordinator = itCoordinator->second;
+ if (coordinator.PipeClient != pipeClient) {
+ return;
+ }
+
+ coordinator.PipeClient = TActorId();
+ if (coordinator.Subscribers.empty()) {
+ // Just forget disconnected idle coordinators
+ Coordinators.erase(itCoordinator);
+ return;
+ }
+
+ ++LastSeqNo;
+ SyncCoordinator(coordinatorId, coordinator, ctx);
+}
+
+void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx) {
+ const auto *msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID
+ << " HANDLE " << msg->ToString());
+
+ const ui64 coordinatorId = msg->CoordinatorId;
+ auto &subscriber = CoordinatorSubscribers[ev->Sender];
+ auto &coordinator = Coordinators[coordinatorId];
+ subscriber.Coordinators.insert(coordinatorId);
+ coordinator.Subscribers.insert(ev->Sender);
+ ui64 seqNo = ++LastSeqNo;
+ auto key = std::make_pair(seqNo, ev->Sender);
+ coordinator.SubscribeRequests[key] = ev->Cookie;
+ SyncCoordinator(coordinatorId, coordinator, ctx);
+}
+
+void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUnsubscribeReadStep::TPtr &ev, const TActorContext &ctx) {
+ const auto *msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID
+ << " HANDLE " << msg->ToString());
+
+ auto &subscriber = CoordinatorSubscribers[ev->Sender];
+ if (msg->CoordinatorId == 0) {
+ // Unsubscribe from all coordinators
+ for (ui64 coordinatorId : subscriber.Coordinators) {
+ auto &coordinator = Coordinators[coordinatorId];
+ coordinator.Subscribers.erase(ev->Sender);
+ if (coordinator.Subscribers.empty()) {
+ coordinator.IdleStart = ctx.Monotonic();
+ }
+ }
+ subscriber.Coordinators.clear();
+ } else if (subscriber.Coordinators.contains(msg->CoordinatorId)) {
+ // Unsubscribe from specific coordinator
+ auto &coordinator = Coordinators[msg->CoordinatorId];
+ coordinator.Subscribers.erase(ev->Sender);
+ if (coordinator.Subscribers.empty()) {
+ coordinator.IdleStart = ctx.Monotonic();
+ }
+ subscriber.Coordinators.erase(msg->CoordinatorId);
+ }
+
+ if (subscriber.Coordinators.empty()) {
+ // Don't track unnecessary subscribers
+ CoordinatorSubscribers.erase(ev->Sender);
+ }
+}
+
+void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvWaitReadStep::TPtr &ev, const TActorContext &ctx) {
+ const auto *msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID
+ << " HANDLE " << msg->ToString());
+
+ const ui64 coordinatorId = msg->CoordinatorId;
+ auto itCoordinator = Coordinators.find(coordinatorId);
+ if (itCoordinator == Coordinators.end()) {
+ return;
+ }
+ auto &coordinator = itCoordinator->second;
+
+ if (!coordinator.Subscribers.contains(ev->Sender)) {
+ return;
+ }
+
+ const ui64 step = coordinator.LastObservedReadStep;
+ const ui64 waitReadStep = msg->ReadStep;
+ if (waitReadStep <= step) {
+ ctx.Send(ev->Sender,
+ new TEvMediatorTimecast::TEvNotifyReadStep(coordinatorId, step),
+ 0, ev->Cookie);
+ return;
+ }
+
+ auto key = std::make_pair(waitReadStep, ev->Sender);
+ coordinator.WaitRequests[key] = ev->Cookie;
+}
+
+void TMediatorTimecastProxy::Handle(TEvTxProxy::TEvSubscribeReadStepResult::TPtr &ev, const TActorContext &ctx) {
+ const auto &record = ev->Get()->Record;
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID
+ << " HANDLE TEvSubscribeReadStepResult " << record.ShortDebugString());
+
+ const ui64 coordinatorId = record.GetCoordinatorID();
+ auto itCoordinator = Coordinators.find(coordinatorId);
+ if (itCoordinator == Coordinators.end()) {
+ return;
+ }
+ auto &coordinator = itCoordinator->second;
+
+ bool updated = false;
+ const ui64 nextReadStep = record.GetNextAcquireStep();
+ if (coordinator.LastObservedReadStep < nextReadStep) {
+ coordinator.LastObservedReadStep = nextReadStep;
+ coordinator.ReadStep->Update(nextReadStep);
+ updated = true;
+ }
+
+ const ui64 seqNo = record.GetSeqNo();
+ if (coordinator.LastConfirmedSeqNo < seqNo) {
+ coordinator.LastConfirmedSeqNo = seqNo;
+
+ const ui64 lastReadStep = record.GetLastAcquireStep();
+ for (auto it = coordinator.SubscribeRequests.begin(); it != coordinator.SubscribeRequests.end();) {
+ const ui64 waitSeqNo = it->first.first;
+ if (seqNo < waitSeqNo) {
+ break;
+ }
+ const TActorId subscriberId = it->first.second;
+ const ui64 cookie = it->second;
+ if (coordinator.Subscribers.contains(subscriberId)) {
+ ctx.Send(subscriberId,
+ new TEvMediatorTimecast::TEvSubscribeReadStepResult(
+ coordinatorId,
+ lastReadStep,
+ coordinator.ReadStep),
+ 0, cookie);
+ }
+ it = coordinator.SubscribeRequests.erase(it);
+ }
+ }
+
+ if (updated) {
+ NotifyCoordinatorWaiters(coordinatorId, coordinator, ctx);
+ }
+}
+
+void TMediatorTimecastProxy::Handle(TEvTxProxy::TEvSubscribeReadStepUpdate::TPtr &ev, const TActorContext &ctx) {
+ const auto &record = ev->Get()->Record;
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID
+ << " HANDLE TEvSubscribeReadStepUpdate " << record.ShortDebugString());
+
+ const ui64 coordinatorId = record.GetCoordinatorID();
+ auto itCoordinator = Coordinators.find(coordinatorId);
+ if (itCoordinator == Coordinators.end()) {
+ return;
+ }
+ auto &coordinator = itCoordinator->second;
+
+ const ui64 nextReadStep = record.GetNextAcquireStep();
+ if (coordinator.LastObservedReadStep < nextReadStep) {
+ coordinator.LastObservedReadStep = nextReadStep;
+ coordinator.ReadStep->Update(nextReadStep);
+
+ NotifyCoordinatorWaiters(coordinatorId, coordinator, ctx);
+ }
+
+ // Unsubscribe from idle coordinators
+ if (coordinator.Subscribers.empty() && (ctx.Monotonic() - coordinator.IdleStart) >= MaxIdleCoordinatorSubscriptionTime) {
+ if (coordinator.PipeClient) {
+ NTabletPipe::CloseClient(ctx, coordinator.PipeClient);
+ coordinator.PipeClient = TActorId();
+ }
+
+ Coordinators.erase(itCoordinator);
+ }
+}
+
+void TMediatorTimecastProxy::NotifyCoordinatorWaiters(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx) {
+ const ui64 step = coordinator.LastObservedReadStep;
+ for (auto it = coordinator.WaitRequests.begin(); it != coordinator.WaitRequests.end();) {
+ const ui64 waitStep = it->first.first;
+ if (step < waitStep) {
+ break;
+ }
+ const TActorId subscriberId = it->first.second;
+ const ui64 cookie = it->second;
+ if (coordinator.Subscribers.contains(subscriberId)) {
+ ctx.Send(subscriberId,
+ new TEvMediatorTimecast::TEvNotifyReadStep(coordinatorId, step),
+ 0, cookie);
+ }
+ it = coordinator.WaitRequests.erase(it);
+ }
+}
+
+
+
IActor* CreateMediatorTimecastProxy() {
return new TMediatorTimecastProxy();
}
diff --git a/ydb/core/tx/time_cast/time_cast.h b/ydb/core/tx/time_cast/time_cast.h
index 1202d5a799..9771a462c6 100644
--- a/ydb/core/tx/time_cast/time_cast.h
+++ b/ydb/core/tx/time_cast/time_cast.h
@@ -21,15 +21,41 @@ public:
void Update(ui64 step, ui64 *exemption, ui64 exsz);
};
+class TMediatorTimecastReadStep : public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<TMediatorTimecastReadStep>;
+ using TCPtr = TIntrusiveConstPtr<TMediatorTimecastReadStep>;
+
+ TMediatorTimecastReadStep(ui64 nextReadStep = 0)
+ : NextReadStep{ nextReadStep }
+ { }
+
+ ui64 Get() const {
+ return NextReadStep.load();
+ }
+
+ void Update(ui64 nextReadStep) {
+ NextReadStep.store(nextReadStep);
+ }
+
+private:
+ std::atomic<ui64> NextReadStep;
+};
+
struct TEvMediatorTimecast {
enum EEv {
// local part
EvRegisterTablet = EventSpaceBegin(TKikimrEvents::ES_TX_MEDIATORTIMECAST),
EvUnregisterTablet,
EvWaitPlanStep,
+ EvSubscribeReadStep,
+ EvUnsubscribeReadStep,
+ EvWaitReadStep,
EvRegisterTabletResult = EvRegisterTablet + 1 * 512,
EvNotifyPlanStep,
+ EvSubscribeReadStepResult,
+ EvNotifyReadStep,
// mediator part
EvWatch = EvRegisterTablet + 2 * 512,
@@ -138,6 +164,100 @@ struct TEvMediatorTimecast {
}
};
+ struct TEvSubscribeReadStep : public TEventLocal<TEvSubscribeReadStep, EvSubscribeReadStep> {
+ const ui64 CoordinatorId;
+
+ explicit TEvSubscribeReadStep(ui64 coordinatorId)
+ : CoordinatorId(coordinatorId)
+ {
+ Y_VERIFY(coordinatorId != 0);
+ }
+
+ TString ToString() const {
+ return TStringBuilder()
+ << ToStringHeader() << "{"
+ << " CoordinatorId# " << CoordinatorId
+ << " }";
+ }
+ };
+
+ struct TEvUnsubscribeReadStep : public TEventLocal<TEvUnsubscribeReadStep, EvUnsubscribeReadStep> {
+ const ui64 CoordinatorId;
+
+ explicit TEvUnsubscribeReadStep(ui64 coordinatorId = 0)
+ : CoordinatorId(coordinatorId)
+ { }
+
+ TString ToString() const {
+ return TStringBuilder()
+ << ToStringHeader() << "{"
+ << " CoordinatorId# " << CoordinatorId
+ << " }";
+ }
+ };
+
+ struct TEvSubscribeReadStepResult : public TEventLocal<TEvSubscribeReadStepResult, EvSubscribeReadStepResult> {
+ const ui64 CoordinatorId;
+ const ui64 LastReadStep;
+ const TMediatorTimecastReadStep::TCPtr ReadStep;
+
+ TEvSubscribeReadStepResult(
+ ui64 coordinatorId,
+ ui64 lastReadStep,
+ TMediatorTimecastReadStep::TCPtr readStep)
+ : CoordinatorId(coordinatorId)
+ , LastReadStep(lastReadStep)
+ , ReadStep(std::move(readStep))
+ {
+ Y_VERIFY(ReadStep);
+ }
+
+ TString ToString() const {
+ return TStringBuilder()
+ << ToStringHeader() << "{"
+ << " CoordinatorId# " << CoordinatorId
+ << " LastReadStep# " << LastReadStep
+ << " ReadStep# " << ReadStep->Get()
+ << " }";
+ }
+ };
+
+ struct TEvWaitReadStep : public TEventLocal<TEvWaitReadStep, EvWaitReadStep> {
+ const ui64 CoordinatorId;
+ const ui64 ReadStep;
+
+ TEvWaitReadStep(ui64 coordinatorId, ui64 readStep)
+ : CoordinatorId(coordinatorId)
+ , ReadStep(readStep)
+ { }
+
+ TString ToString() const {
+ return TStringBuilder()
+ << ToStringHeader() << "{"
+ << " CoordinatorId# " << CoordinatorId
+ << " ReadStep# " << ReadStep
+ << " }";
+ }
+ };
+
+ struct TEvNotifyReadStep : public TEventLocal<TEvNotifyReadStep, EvNotifyReadStep> {
+ const ui64 CoordinatorId;
+ const ui64 ReadStep;
+
+ TEvNotifyReadStep(ui64 coordinatorId, ui64 readStep)
+ : CoordinatorId(coordinatorId)
+ , ReadStep(readStep)
+ { }
+
+ TString ToString() const {
+ return TStringBuilder()
+ << ToStringHeader() << "{"
+ << " CoordinatorId# " << CoordinatorId
+ << " ReadStep# " << ReadStep
+ << " }";
+ }
+ };
+
struct TEvWatch : public TEventPB<TEvWatch, NKikimrTxMediatorTimecast::TEvWatch, EvWatch> {
TEvWatch()
{}
diff --git a/ydb/core/tx/time_cast/time_cast_ut.cpp b/ydb/core/tx/time_cast/time_cast_ut.cpp
new file mode 100644
index 0000000000..0acb2ee128
--- /dev/null
+++ b/ydb/core/tx/time_cast/time_cast_ut.cpp
@@ -0,0 +1,84 @@
+#include "time_cast.h"
+#include <ydb/core/testlib/tablet_helpers.h>
+#include <ydb/core/testlib/test_client.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NMediatorTimeCastTest {
+
+ using namespace Tests;
+
+ Y_UNIT_TEST_SUITE(MediatorTimeCast) {
+
+ void SimulateSleep(TTestActorRuntime& runtime, TDuration duration) {
+ auto sender = runtime.AllocateEdgeActor();
+ runtime.Schedule(new IEventHandle(sender, sender, new TEvents::TEvWakeup()), duration);
+ runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender);
+ }
+
+ void SendSubscribeRequest(TTestActorRuntime& runtime, const TActorId& sender, ui64 coordinatorId, ui64 cookie = 0) {
+ auto request = MakeHolder<TEvMediatorTimecast::TEvSubscribeReadStep>(coordinatorId);
+ runtime.Send(new IEventHandle(MakeMediatorTimecastProxyID(), sender, request.Release(), 0, cookie), 0, true);
+ }
+
+ TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr WaitSubscribeResult(TTestActorRuntime& runtime, const TActorId& sender) {
+ return runtime.GrabEdgeEventRethrow<TEvMediatorTimecast::TEvSubscribeReadStepResult>(sender);
+ }
+
+ void SendWaitRequest(TTestActorRuntime& runtime, const TActorId& sender, ui64 coordinatorId, ui64 readStep, ui64 cookie = 0) {
+ auto request = MakeHolder<TEvMediatorTimecast::TEvWaitReadStep>(coordinatorId, readStep);
+ runtime.Send(new IEventHandle(MakeMediatorTimecastProxyID(), sender, request.Release(), 0, cookie), 0, true);
+ }
+
+ TEvMediatorTimecast::TEvNotifyReadStep::TPtr WaitNotifyResult(TTestActorRuntime& runtime, const TActorId& sender) {
+ return runtime.GrabEdgeEventRethrow<TEvMediatorTimecast::TEvNotifyReadStep>(sender);
+ }
+
+ Y_UNIT_TEST(ReadStepSubscribe) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+
+ auto &runtime = *server->GetRuntime();
+ runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NActors::NLog::PRI_DEBUG);
+
+ auto sender = runtime.AllocateEdgeActor();
+ ui64 coordinatorId = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
+
+ SendSubscribeRequest(runtime, sender, coordinatorId);
+ auto result = WaitSubscribeResult(runtime, sender);
+ auto stepPtr = result->Get()->ReadStep;
+
+ ui64 readStep1 = stepPtr->Get();
+ UNIT_ASSERT_GE(readStep1, result->Get()->LastReadStep);
+
+ SimulateSleep(runtime, TDuration::Seconds(5));
+
+ ui64 readStep2 = stepPtr->Get();
+ UNIT_ASSERT_GT(readStep2, readStep1);
+
+ ui64 waitStep = readStep2 + 2000;
+ SendWaitRequest(runtime, sender, coordinatorId, waitStep);
+
+ {
+ auto notify = WaitNotifyResult(runtime, sender);
+ UNIT_ASSERT_GE(notify->Get()->ReadStep, waitStep);
+ UNIT_ASSERT_GE(notify->Get()->ReadStep, stepPtr->Get());
+ }
+
+ ui64 waitStep2 = stepPtr->Get() + 5000;
+ RebootTablet(runtime, coordinatorId, sender);
+ SendWaitRequest(runtime, sender, coordinatorId, waitStep2);
+
+ {
+ auto notify = WaitNotifyResult(runtime, sender);
+ UNIT_ASSERT_GE(notify->Get()->ReadStep, waitStep2);
+ UNIT_ASSERT_GE(notify->Get()->ReadStep, stepPtr->Get());
+ }
+ }
+
+ } // Y_UNIT_TEST_SUITE(MediatorTimeCast)
+
+} // namespace NKikimr::NMediatorTimeCastTest
diff --git a/ydb/core/tx/time_cast/ut/ya.make b/ydb/core/tx/time_cast/ut/ya.make
new file mode 100644
index 0000000000..1f055a3e2e
--- /dev/null
+++ b/ydb/core/tx/time_cast/ut/ya.make
@@ -0,0 +1,26 @@
+UNITTEST_FOR(ydb/core/tx/time_cast)
+
+OWNER(g:kikimr)
+
+IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
+ TIMEOUT(3600)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ REQUIREMENTS(ram:16)
+ELSE()
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
+PEERDIR(
+ ydb/core/testlib
+ ydb/core/tx
+)
+
+YQL_LAST_ABI_VERSION()
+
+SRCS(
+ time_cast_ut.cpp
+)
+
+END()
diff --git a/ydb/core/tx/time_cast/ya.make b/ydb/core/tx/time_cast/ya.make
index 9c059ac2b9..e8c260833e 100644
--- a/ydb/core/tx/time_cast/ya.make
+++ b/ydb/core/tx/time_cast/ya.make
@@ -18,3 +18,7 @@ PEERDIR(
)
END()
+
+RECURSE_FOR_TESTS(
+ ut
+)