diff options
author | msherbakov <msherbakov@yandex-team.ru> | 2022-02-10 16:49:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:17 +0300 |
commit | a0ffafe83b7d6229709a32fa942c71d672ac989c (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/actors | |
parent | c224a621661ddd69699f9476922eb316607ef57e (diff) | |
download | ydb-a0ffafe83b7d6229709a32fa942c71d672ac989c.tar.gz |
Restoring authorship annotation for <msherbakov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors')
-rw-r--r-- | library/cpp/actors/core/actor.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/log.cpp | 80 | ||||
-rw-r--r-- | library/cpp/actors/core/log.h | 76 | ||||
-rw-r--r-- | library/cpp/actors/core/log_ut.cpp | 250 | ||||
-rw-r--r-- | library/cpp/actors/core/mon.h | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/process_stats.cpp | 156 | ||||
-rw-r--r-- | library/cpp/actors/core/process_stats.h | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/ut/ya.make | 2 | ||||
-rw-r--r-- | library/cpp/actors/http/http_cache.cpp | 4 | ||||
-rw-r--r-- | library/cpp/actors/http/http_static.cpp | 4 | ||||
-rw-r--r-- | library/cpp/actors/http/http_ut.cpp | 4 | ||||
-rw-r--r-- | library/cpp/actors/http/ut/ya.make | 6 | ||||
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.cpp | 326 | ||||
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.h | 184 | ||||
-rw-r--r-- | library/cpp/actors/ya.make | 4 |
15 files changed, 555 insertions, 555 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index c30ac17946..ed29bd14b9 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -234,7 +234,7 @@ namespace NActors { INTERCONNECT_SESSION_TCP = 13, INTERCONNECT_COMMON = 171, SELF_PING_ACTOR = 207, - TEST_ACTOR_RUNTIME = 283, + TEST_ACTOR_RUNTIME = 283, INTERCONNECT_HANDSHAKE = 284, INTERCONNECT_POLLER = 285, INTERCONNECT_SESSION_KILLER = 286, diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index 46af66104d..5f63b5af58 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -217,8 +217,8 @@ namespace NActors { TString formatted; vsprintf(formatted, c, params); - auto ok = OutputRecord(time, NLog::EPrio(priority), component, formatted); - Y_UNUSED(ok); + auto ok = OutputRecord(time, NLog::EPrio(priority), component, formatted); + Y_UNUSED(ok); va_end(params); } } @@ -230,9 +230,9 @@ namespace NActors { void TLoggerActor::LogIgnoredCount(TInstant now) { TString message = Sprintf("Ignored IgnoredCount# %" PRIu64 " log records due to logger overflow!", IgnoredCount); - if (!OutputRecord(now, NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) { - BecomeDefunct(); - } + if (!OutputRecord(now, NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) { + BecomeDefunct(); + } } void TLoggerActor::HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx) { @@ -242,14 +242,14 @@ namespace NActors { PassedCount = 0; } - void TLoggerActor::HandleIgnoredEventDrop() { - // logger backend is unavailable, just ignore - } - - void TLoggerActor::WriteMessageStat(const NLog::TEvLog& ev) { + void TLoggerActor::HandleIgnoredEventDrop() { + // logger backend is unavailable, just ignore + } + + void TLoggerActor::WriteMessageStat(const NLog::TEvLog& ev) { Metrics->IncActorMsgs(); - const auto prio = ev.Level.ToPrio(); + const auto prio = ev.Level.ToPrio(); switch (prio) { case ::NActors::NLog::EPrio::Alert: @@ -262,11 +262,11 @@ namespace NActors { break; } - } - - void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) { - i64 delayMillisec = (ctx.Now() - ev->Get()->Stamp).MilliSeconds(); - WriteMessageStat(*ev->Get()); + } + + void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) { + i64 delayMillisec = (ctx.Now() - ev->Get()->Stamp).MilliSeconds(); + WriteMessageStat(*ev->Get()); if (Settings->AllowDrop) { // Disable throttling if it was enabled previously if (AtomicGet(IsOverflow)) @@ -291,17 +291,17 @@ namespace NActors { AtomicSet(IsOverflow, 0); } - const auto prio = ev->Get()->Level.ToPrio(); - if (!OutputRecord(ev->Get()->Stamp, prio, ev->Get()->Component, ev->Get()->Line)) { - BecomeDefunct(); - } + const auto prio = ev->Get()->Level.ToPrio(); + if (!OutputRecord(ev->Get()->Stamp, prio, ev->Get()->Component, ev->Get()->Line)) { + BecomeDefunct(); + } + } + + void TLoggerActor::BecomeDefunct() { + Become(&TThis::StateDefunct); + Schedule(WakeupInterval, new TEvents::TEvWakeup); } - void TLoggerActor::BecomeDefunct() { - Become(&TThis::StateDefunct); - Schedule(WakeupInterval, new TEvents::TEvWakeup); - } - void TLoggerActor::HandleLogComponentLevelRequest(TLogComponentLevelRequest::TPtr& ev, const NActors::TActorContext& ctx) { Metrics->IncLevelRequests(); TString explanation; @@ -367,7 +367,7 @@ namespace NActors { * 4. Log level changes (last N changes) */ void TLoggerActor::HandleMonInfo(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) { - const auto& params = ev->Get()->Request.GetParams(); + const auto& params = ev->Get()->Request.GetParams(); NLog::EComponent component = NLog::InvalidComponent; NLog::EPriority priority = NLog::PRI_DEBUG; NLog::EPriority samplingPriority = NLog::PRI_DEBUG; @@ -574,8 +574,8 @@ namespace NActors { constexpr size_t TimeBufSize = 512; - bool TLoggerActor::OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, - const TString& formatted) noexcept try { + bool TLoggerActor::OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, + const TString& formatted) noexcept try { const auto logPrio = ::ELogPriority(ui16(priority)); char buf[TimeBufSize]; @@ -634,21 +634,21 @@ namespace NActors { TLogRecord(logPrio, logRecord.data(), logRecord.size())); } break; } - - return true; - } catch (...) { - return false; + + return true; + } catch (...) { + return false; } - void TLoggerActor::HandleLogEventDrop(const NLog::TEvLog::TPtr& ev) { - WriteMessageStat(*ev->Get()); + void TLoggerActor::HandleLogEventDrop(const NLog::TEvLog::TPtr& ev) { + WriteMessageStat(*ev->Get()); Metrics->IncDroppedMsgs(); - } - - void TLoggerActor::HandleWakeup() { - Become(&TThis::StateFunc); - } - + } + + void TLoggerActor::HandleWakeup() { + Become(&TThis::StateFunc); + } + const char* TLoggerActor::FormatLocalTimestamp(TInstant time, char* buf) { struct tm localTime; time.LocalTime(&localTime); diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index 0bd6e41729..c11a7cf3c1 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -219,16 +219,16 @@ namespace NActors { } } - STFUNC(StateDefunct) { - switch (ev->GetTypeRewrite()) { - cFunc(TLogIgnored::EventType, HandleIgnoredEventDrop); - hFunc(NLog::TEvLog, HandleLogEventDrop); - HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest); - HFunc(NMon::TEvHttpInfo, HandleMonInfo); - cFunc(TEvents::TEvWakeup::EventType, HandleWakeup); - } - } - + STFUNC(StateDefunct) { + switch (ev->GetTypeRewrite()) { + cFunc(TLogIgnored::EventType, HandleIgnoredEventDrop); + hFunc(NLog::TEvLog, HandleLogEventDrop); + HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest); + HFunc(NMon::TEvHttpInfo, HandleMonInfo); + cFunc(TEvents::TEvWakeup::EventType, HandleWakeup); + } + } + // Directly call logger instead of sending a message void Log(TInstant time, NLog::EPriority priority, NLog::EComponent component, const char* c, ...); @@ -240,21 +240,21 @@ namespace NActors { ui64 IgnoredCount = 0; ui64 PassedCount = 0; static TAtomic IsOverflow; - TDuration WakeupInterval{TDuration::Seconds(5)}; + TDuration WakeupInterval{TDuration::Seconds(5)}; std::unique_ptr<ILoggerMetrics> Metrics; - void BecomeDefunct(); + void BecomeDefunct(); void HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx); - void HandleIgnoredEventDrop(); + void HandleIgnoredEventDrop(); void HandleLogEvent(NLog::TEvLog::TPtr& ev, const TActorContext& ctx); - void HandleLogEventDrop(const NLog::TEvLog::TPtr& ev); + void HandleLogEventDrop(const NLog::TEvLog::TPtr& ev); void HandleLogComponentLevelRequest(TLogComponentLevelRequest::TPtr& ev, const TActorContext& ctx); void HandleMonInfo(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx); - void HandleWakeup(); - [[nodiscard]] bool OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, const TString& formatted) noexcept; + void HandleWakeup(); + [[nodiscard]] bool OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, const TString& formatted) noexcept; void RenderComponentPriorities(IOutputStream& str); void LogIgnoredCount(TInstant now); - void WriteMessageStat(const NLog::TEvLog& ev); + void WriteMessageStat(const NLog::TEvLog& ev); static const char* FormatLocalTimestamp(TInstant time, char* buf); }; @@ -305,19 +305,19 @@ namespace NActors { // Logging adaptors for memory log and logging into filesystem ///////////////////////////////////////////////////////////////////// - namespace NDetail { - inline void Y_PRINTF_FORMAT(2, 3) PrintfV(TString& dst, const char* format, ...) { - va_list params; - va_start(params, format); - vsprintf(dst, format, params); - va_end(params); - } - - inline void PrintfV(TString& dst, const char* format, va_list params) { - vsprintf(dst, format, params); - } - } // namespace NDetail - + namespace NDetail { + inline void Y_PRINTF_FORMAT(2, 3) PrintfV(TString& dst, const char* format, ...) { + va_list params; + va_start(params, format); + vsprintf(dst, format, params); + va_end(params); + } + + inline void PrintfV(TString& dst, const char* format, va_list params) { + vsprintf(dst, format, params); + } + } // namespace NDetail + template <typename TCtx> inline void DeliverLogMessage(TCtx& ctx, NLog::EPriority mPriority, NLog::EComponent mComponent, TString &&str) { @@ -326,21 +326,21 @@ namespace NActors { ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str)))); } - template <typename TCtx, typename... TArgs> + template <typename TCtx, typename... TArgs> inline void MemLogAdapter( TCtx& actorCtxOrSystem, NLog::EPriority mPriority, NLog::EComponent mComponent, - const char* format, TArgs&&... params) { + const char* format, TArgs&&... params) { TString Formatted; - if constexpr (sizeof... (params) > 0) { - NDetail::PrintfV(Formatted, format, std::forward<TArgs>(params)...); - } else { - NDetail::PrintfV(Formatted, "%s", format); - } - + if constexpr (sizeof... (params) > 0) { + NDetail::PrintfV(Formatted, format, std::forward<TArgs>(params)...); + } else { + NDetail::PrintfV(Formatted, "%s", format); + } + MemLogWrite(Formatted.data(), Formatted.size(), true); DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, std::move(Formatted)); } diff --git a/library/cpp/actors/core/log_ut.cpp b/library/cpp/actors/core/log_ut.cpp index f5396cc8b1..09b5f88ea2 100644 --- a/library/cpp/actors/core/log_ut.cpp +++ b/library/cpp/actors/core/log_ut.cpp @@ -1,27 +1,27 @@ -#include "log.h" - +#include "log.h" + #include <library/cpp/testing/unittest/registar.h> - + #include <library/cpp/actors/testlib/test_runtime.h> - -using namespace NMonitoring; -using namespace NActors; -using namespace NActors::NLog; - -namespace { - const TString& ServiceToString(int) { - static const TString FAKE{"FAKE"}; - return FAKE; - } - - TIntrusivePtr<TSettings> DefaultSettings() { + +using namespace NMonitoring; +using namespace NActors; +using namespace NActors::NLog; + +namespace { + const TString& ServiceToString(int) { + static const TString FAKE{"FAKE"}; + return FAKE; + } + + TIntrusivePtr<TSettings> DefaultSettings() { auto loggerId = TActorId{0, "Logger"}; - auto s = MakeIntrusive<TSettings>(loggerId, 0, EPriority::PRI_TRACE); - s->SetAllowDrop(false); - s->Append(0, 1, ServiceToString); - return s; - } - + auto s = MakeIntrusive<TSettings>(loggerId, 0, EPriority::PRI_TRACE); + s->SetAllowDrop(false); + s->Append(0, 1, ServiceToString); + return s; + } + TIntrusivePtr<TSettings> DroppingSettings(ui64 timeThresholdMs) { auto loggerId = TActorId{0, "Logger"}; auto s = MakeIntrusive<TSettings>( @@ -35,123 +35,123 @@ namespace { return s; } - class TMockBackend: public TLogBackend { - public: - using TWriteImpl = std::function<void(const TLogRecord&)>; - using TReopenImpl = std::function<void()>; - - static void REOPEN_NOP() { } - - TMockBackend(TWriteImpl writeImpl, TReopenImpl reopenImpl = REOPEN_NOP) - : WriteImpl_{writeImpl} - , ReopenImpl_{reopenImpl} - { - } - - void WriteData(const TLogRecord& r) override { - WriteImpl_(r); - } - - void ReopenLog() override { } - - void SetWriteImpl(TWriteImpl writeImpl) { - WriteImpl_ = writeImpl; - } - - private: - TWriteImpl WriteImpl_; - TReopenImpl ReopenImpl_; - }; - - void ThrowAlways(const TLogRecord&) { - ythrow yexception(); - }; - - struct TFixture { + class TMockBackend: public TLogBackend { + public: + using TWriteImpl = std::function<void(const TLogRecord&)>; + using TReopenImpl = std::function<void()>; + + static void REOPEN_NOP() { } + + TMockBackend(TWriteImpl writeImpl, TReopenImpl reopenImpl = REOPEN_NOP) + : WriteImpl_{writeImpl} + , ReopenImpl_{reopenImpl} + { + } + + void WriteData(const TLogRecord& r) override { + WriteImpl_(r); + } + + void ReopenLog() override { } + + void SetWriteImpl(TWriteImpl writeImpl) { + WriteImpl_ = writeImpl; + } + + private: + TWriteImpl WriteImpl_; + TReopenImpl ReopenImpl_; + }; + + void ThrowAlways(const TLogRecord&) { + ythrow yexception(); + }; + + struct TFixture { TFixture( TIntrusivePtr<TSettings> settings, TMockBackend::TWriteImpl writeImpl = ThrowAlways) { - Runtime.Initialize(); - LogBackend.reset(new TMockBackend{writeImpl}); + Runtime.Initialize(); + LogBackend.reset(new TMockBackend{writeImpl}); LoggerActor = Runtime.Register(new TLoggerActor{settings, LogBackend, Counters}); - Runtime.SetScheduledEventFilter([] (auto&&, auto&&, auto&&, auto) { - return false; - }); - } - + Runtime.SetScheduledEventFilter([] (auto&&, auto&&, auto&&, auto) { + return false; + }); + } + TFixture(TMockBackend::TWriteImpl writeImpl = ThrowAlways) : TFixture(DefaultSettings(), writeImpl) {} - void WriteLog() { - Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")}); - } - + void WriteLog() { + Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")}); + } + void WriteLog(TInstant ts) { Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{EPrio::Emerg}, 0, "foo")}); } - void Wakeup() { - Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvents::TEvWakeup}); - } - - TIntrusivePtr<TDynamicCounters> Counters{MakeIntrusive<TDynamicCounters>()}; - std::shared_ptr<TMockBackend> LogBackend; + void Wakeup() { + Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvents::TEvWakeup}); + } + + TIntrusivePtr<TDynamicCounters> Counters{MakeIntrusive<TDynamicCounters>()}; + std::shared_ptr<TMockBackend> LogBackend; TActorId LoggerActor; - TTestActorRuntimeBase Runtime; - }; -} - - -Y_UNIT_TEST_SUITE(TLoggerActorTest) { - Y_UNIT_TEST(NoCrashOnWriteFailure) { - TFixture test; - test.WriteLog(); - // everything is okay as long as we get here - } - - Y_UNIT_TEST(SubsequentWritesAreIgnored) { - size_t count{0}; - auto countWrites = [&count] (auto&& r) { - count++; - ThrowAlways(r); - }; - - TFixture test{countWrites}; - test.WriteLog(); - UNIT_ASSERT_VALUES_EQUAL(count, 1); - - // at this point we should have started dropping messages - for (auto i = 0; i < 5; ++i) { - test.WriteLog(); - } - - UNIT_ASSERT_VALUES_EQUAL(count, 1); - } - - Y_UNIT_TEST(LoggerCanRecover) { - TFixture test; - test.WriteLog(); - - TVector<TString> messages; - auto acceptWrites = [&] (const TLogRecord& r) { - messages.emplace_back(r.Data, r.Len); - }; - - auto scheduled = test.Runtime.CaptureScheduledEvents(); - UNIT_ASSERT_VALUES_EQUAL(scheduled.size(), 1); - - test.LogBackend->SetWriteImpl(acceptWrites); - test.Wakeup(); - - const auto COUNT = 10; - for (auto i = 0; i < COUNT; ++i) { - test.WriteLog(); - } - - UNIT_ASSERT_VALUES_EQUAL(messages.size(), COUNT); - } + TTestActorRuntimeBase Runtime; + }; +} + + +Y_UNIT_TEST_SUITE(TLoggerActorTest) { + Y_UNIT_TEST(NoCrashOnWriteFailure) { + TFixture test; + test.WriteLog(); + // everything is okay as long as we get here + } + + Y_UNIT_TEST(SubsequentWritesAreIgnored) { + size_t count{0}; + auto countWrites = [&count] (auto&& r) { + count++; + ThrowAlways(r); + }; + + TFixture test{countWrites}; + test.WriteLog(); + UNIT_ASSERT_VALUES_EQUAL(count, 1); + + // at this point we should have started dropping messages + for (auto i = 0; i < 5; ++i) { + test.WriteLog(); + } + + UNIT_ASSERT_VALUES_EQUAL(count, 1); + } + + Y_UNIT_TEST(LoggerCanRecover) { + TFixture test; + test.WriteLog(); + + TVector<TString> messages; + auto acceptWrites = [&] (const TLogRecord& r) { + messages.emplace_back(r.Data, r.Len); + }; + + auto scheduled = test.Runtime.CaptureScheduledEvents(); + UNIT_ASSERT_VALUES_EQUAL(scheduled.size(), 1); + + test.LogBackend->SetWriteImpl(acceptWrites); + test.Wakeup(); + + const auto COUNT = 10; + for (auto i = 0; i < COUNT; ++i) { + test.WriteLog(); + } + + UNIT_ASSERT_VALUES_EQUAL(messages.size(), COUNT); + } Y_UNIT_TEST(ShouldObeyTimeThresholdMsWhenOverloaded) { TFixture test{DroppingSettings(5000)}; @@ -182,4 +182,4 @@ Y_UNIT_TEST_SUITE(TLoggerActorTest) { UNIT_ASSERT_VALUES_EQUAL(messages.size(), COUNT + 1); } -} +} diff --git a/library/cpp/actors/core/mon.h b/library/cpp/actors/core/mon.h index 666303dca7..c450f2338e 100644 --- a/library/cpp/actors/core/mon.h +++ b/library/cpp/actors/core/mon.h @@ -21,20 +21,20 @@ namespace NActors { // request info from an actor in HTML format struct TEvHttpInfo: public NActors::TEventLocal<TEvHttpInfo, HttpInfo> { - TEvHttpInfo(const NMonitoring::IMonHttpRequest& request, int subReqId = 0) + TEvHttpInfo(const NMonitoring::IMonHttpRequest& request, int subReqId = 0) : Request(request) , SubRequestId(subReqId) { } - TEvHttpInfo(const NMonitoring::IMonHttpRequest& request, const TString& userToken) + TEvHttpInfo(const NMonitoring::IMonHttpRequest& request, const TString& userToken) : Request(request) , UserToken(userToken) , SubRequestId(0) { } - const NMonitoring::IMonHttpRequest& Request; + const NMonitoring::IMonHttpRequest& Request; TString UserToken; // built and serialized // SubRequestId != 0 means that we assemble reply from multiple parts and SubRequestId contains this part id int SubRequestId; diff --git a/library/cpp/actors/core/process_stats.cpp b/library/cpp/actors/core/process_stats.cpp index cfc95034c9..0e1dbd0031 100644 --- a/library/cpp/actors/core/process_stats.cpp +++ b/library/cpp/actors/core/process_stats.cpp @@ -136,23 +136,23 @@ namespace NActors { #endif -namespace { +namespace { // Periodically collects process stats and exposes them as mon counters - template <typename TDerived> - class TProcStatCollectingActor: public TActorBootstrapped<TProcStatCollectingActor<TDerived>> { + template <typename TDerived> + class TProcStatCollectingActor: public TActorBootstrapped<TProcStatCollectingActor<TDerived>> { public: static constexpr IActor::EActivityType ActorActivityType() { return IActor::ACTORLIB_STATS; } - TProcStatCollectingActor(TDuration interval) - : Interval(interval) + TProcStatCollectingActor(TDuration interval) + : Interval(interval) { } void Bootstrap(const TActorContext& ctx) { - ctx.Schedule(Interval, new TEvents::TEvWakeup()); - Self()->Become(&TDerived::StateWork); + ctx.Schedule(Interval, new TEvents::TEvWakeup()); + Self()->Become(&TDerived::StateWork); } STFUNC(StateWork) { @@ -163,85 +163,85 @@ namespace { private: void Wakeup(const TActorContext& ctx) { - Self()->UpdateCounters(ProcStat); - ctx.Schedule(Interval, new TEvents::TEvWakeup()); + Self()->UpdateCounters(ProcStat); + ctx.Schedule(Interval, new TEvents::TEvWakeup()); } - TDerived* Self() { + TDerived* Self() { ProcStat.Fill(getpid()); - return static_cast<TDerived*>(this); + return static_cast<TDerived*>(this); } - - private: - const TDuration Interval; - TProcStat ProcStat; + + private: + const TDuration Interval; + TProcStat ProcStat; }; - // Periodically collects process stats and exposes them as mon counters - class TDynamicCounterCollector: public TProcStatCollectingActor<TDynamicCounterCollector> { - using TBase = TProcStatCollectingActor<TDynamicCounterCollector>; - public: + // Periodically collects process stats and exposes them as mon counters + class TDynamicCounterCollector: public TProcStatCollectingActor<TDynamicCounterCollector> { + using TBase = TProcStatCollectingActor<TDynamicCounterCollector>; + public: TDynamicCounterCollector( ui32 intervalSeconds, NMonitoring::TDynamicCounterPtr counters) - : TBase{TDuration::Seconds(intervalSeconds)} - { - ProcStatGroup = counters->GetSubgroup("counters", "utils"); - - VmSize = ProcStatGroup->GetCounter("Process/VmSize", false); - AnonRssSize = ProcStatGroup->GetCounter("Process/AnonRssSize", false); - FileRssSize = ProcStatGroup->GetCounter("Process/FileRssSize", false); + : TBase{TDuration::Seconds(intervalSeconds)} + { + ProcStatGroup = counters->GetSubgroup("counters", "utils"); + + VmSize = ProcStatGroup->GetCounter("Process/VmSize", false); + AnonRssSize = ProcStatGroup->GetCounter("Process/AnonRssSize", false); + FileRssSize = ProcStatGroup->GetCounter("Process/FileRssSize", false); CGroupMemLimit = ProcStatGroup->GetCounter("Process/CGroupMemLimit", false); - UserTime = ProcStatGroup->GetCounter("Process/UserTime", true); - SysTime = ProcStatGroup->GetCounter("Process/SystemTime", true); - MinorPageFaults = ProcStatGroup->GetCounter("Process/MinorPageFaults", true); - MajorPageFaults = ProcStatGroup->GetCounter("Process/MajorPageFaults", true); + UserTime = ProcStatGroup->GetCounter("Process/UserTime", true); + SysTime = ProcStatGroup->GetCounter("Process/SystemTime", true); + MinorPageFaults = ProcStatGroup->GetCounter("Process/MinorPageFaults", true); + MajorPageFaults = ProcStatGroup->GetCounter("Process/MajorPageFaults", true); UptimeSeconds = ProcStatGroup->GetCounter("Process/UptimeSeconds", false); NumThreads = ProcStatGroup->GetCounter("Process/NumThreads", false); SystemUptimeSeconds = ProcStatGroup->GetCounter("System/UptimeSeconds", false); - } - - void UpdateCounters(const TProcStat& procStat) { - *VmSize = procStat.Vsize; - *AnonRssSize = procStat.AnonRss; - *FileRssSize = procStat.FileRss; + } + + void UpdateCounters(const TProcStat& procStat) { + *VmSize = procStat.Vsize; + *AnonRssSize = procStat.AnonRss; + *FileRssSize = procStat.FileRss; if (procStat.CGroupMemLim) { *CGroupMemLimit = procStat.CGroupMemLim; } - *UserTime = procStat.Utime; - *SysTime = procStat.Stime; - *MinorPageFaults = procStat.MinFlt; - *MajorPageFaults = procStat.MajFlt; + *UserTime = procStat.Utime; + *SysTime = procStat.Stime; + *MinorPageFaults = procStat.MinFlt; + *MajorPageFaults = procStat.MajFlt; *UptimeSeconds = procStat.Uptime.Seconds(); *NumThreads = procStat.NumThreads; *SystemUptimeSeconds = procStat.Uptime.Seconds(); - } - - private: + } + + private: NMonitoring::TDynamicCounterPtr ProcStatGroup; - NMonitoring::TDynamicCounters::TCounterPtr VmSize; - NMonitoring::TDynamicCounters::TCounterPtr AnonRssSize; - NMonitoring::TDynamicCounters::TCounterPtr FileRssSize; + NMonitoring::TDynamicCounters::TCounterPtr VmSize; + NMonitoring::TDynamicCounters::TCounterPtr AnonRssSize; + NMonitoring::TDynamicCounters::TCounterPtr FileRssSize; NMonitoring::TDynamicCounters::TCounterPtr CGroupMemLimit; - NMonitoring::TDynamicCounters::TCounterPtr UserTime; - NMonitoring::TDynamicCounters::TCounterPtr SysTime; - NMonitoring::TDynamicCounters::TCounterPtr MinorPageFaults; - NMonitoring::TDynamicCounters::TCounterPtr MajorPageFaults; + NMonitoring::TDynamicCounters::TCounterPtr UserTime; + NMonitoring::TDynamicCounters::TCounterPtr SysTime; + NMonitoring::TDynamicCounters::TCounterPtr MinorPageFaults; + NMonitoring::TDynamicCounters::TCounterPtr MajorPageFaults; NMonitoring::TDynamicCounters::TCounterPtr UptimeSeconds; NMonitoring::TDynamicCounters::TCounterPtr NumThreads; NMonitoring::TDynamicCounters::TCounterPtr SystemUptimeSeconds; - }; - - - class TRegistryCollector: public TProcStatCollectingActor<TRegistryCollector> { - using TBase = TProcStatCollectingActor<TRegistryCollector>; - public: + }; + + + class TRegistryCollector: public TProcStatCollectingActor<TRegistryCollector> { + using TBase = TProcStatCollectingActor<TRegistryCollector>; + public: TRegistryCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) - : TBase{interval} - { - VmSize = registry.IntGauge({{"sensor", "process.VmSize"}}); - AnonRssSize = registry.IntGauge({{"sensor", "process.AnonRssSize"}}); - FileRssSize = registry.IntGauge({{"sensor", "process.FileRssSize"}}); + : TBase{interval} + { + VmSize = registry.IntGauge({{"sensor", "process.VmSize"}}); + AnonRssSize = registry.IntGauge({{"sensor", "process.AnonRssSize"}}); + FileRssSize = registry.IntGauge({{"sensor", "process.FileRssSize"}}); CGroupMemLimit = registry.IntGauge({{"sensor", "process.CGroupMemLimit"}}); UptimeSeconds = registry.IntGauge({{"sensor", "process.UptimeSeconds"}}); NumThreads = registry.IntGauge({{"sensor", "process.NumThreads"}}); @@ -251,12 +251,12 @@ namespace { SysTime = registry.Rate({{"sensor", "process.SystemTime"}}); MinorPageFaults = registry.Rate({{"sensor", "process.MinorPageFaults"}}); MajorPageFaults = registry.Rate({{"sensor", "process.MajorPageFaults"}}); - } - - void UpdateCounters(const TProcStat& procStat) { - VmSize->Set(procStat.Vsize); - AnonRssSize->Set(procStat.AnonRss); - FileRssSize->Set(procStat.FileRss); + } + + void UpdateCounters(const TProcStat& procStat) { + VmSize->Set(procStat.Vsize); + AnonRssSize->Set(procStat.AnonRss); + FileRssSize->Set(procStat.FileRss); CGroupMemLimit->Set(procStat.CGroupMemLim); UptimeSeconds->Set(procStat.Uptime.Seconds()); NumThreads->Set(procStat.NumThreads); @@ -276,12 +276,12 @@ namespace { MajorPageFaults->Reset(); MajorPageFaults->Add(procStat.MajFlt); - } - - private: - NMonitoring::TIntGauge* VmSize; - NMonitoring::TIntGauge* AnonRssSize; - NMonitoring::TIntGauge* FileRssSize; + } + + private: + NMonitoring::TIntGauge* VmSize; + NMonitoring::TIntGauge* AnonRssSize; + NMonitoring::TIntGauge* FileRssSize; NMonitoring::TIntGauge* CGroupMemLimit; NMonitoring::TRate* UserTime; NMonitoring::TRate* SysTime; @@ -290,14 +290,14 @@ namespace { NMonitoring::TIntGauge* UptimeSeconds; NMonitoring::TIntGauge* NumThreads; NMonitoring::TIntGauge* SystemUptimeSeconds; - }; -} // namespace - + }; +} // namespace + IActor* CreateProcStatCollector(ui32 intervalSec, NMonitoring::TDynamicCounterPtr counters) { return new TDynamicCounterCollector(intervalSec, counters); } IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) { - return new TRegistryCollector(interval, registry); - } + return new TRegistryCollector(interval, registry); + } } diff --git a/library/cpp/actors/core/process_stats.h b/library/cpp/actors/core/process_stats.h index e2f3a8276d..66346d0b5a 100644 --- a/library/cpp/actors/core/process_stats.h +++ b/library/cpp/actors/core/process_stats.h @@ -5,10 +5,10 @@ #include <library/cpp/monlib/dynamic_counters/counters.h> -namespace NMonitoring { +namespace NMonitoring { class TMetricRegistry; -} - +} + namespace NActors { struct TProcStat { ui64 Rss; diff --git a/library/cpp/actors/core/ut/ya.make b/library/cpp/actors/core/ut/ya.make index 918ed3f656..3ee28d5850 100644 --- a/library/cpp/actors/core/ut/ya.make +++ b/library/cpp/actors/core/ut/ya.make @@ -38,7 +38,7 @@ SRCS( event_pb_ut.cpp executor_pool_basic_ut.cpp executor_pool_united_ut.cpp - log_ut.cpp + log_ut.cpp memory_tracker_ut.cpp scheduler_actor_ut.cpp ) diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp index 99fdb7ebb6..27c4eeb6f3 100644 --- a/library/cpp/actors/http/http_cache.cpp +++ b/library/cpp/actors/http/http_cache.cpp @@ -1,6 +1,6 @@ #include "http.h" -#include "http_proxy.h" -#include "http_cache.h" +#include "http_proxy.h" +#include "http_cache.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/executor_pool_basic.h> #include <library/cpp/actors/core/log.h> diff --git a/library/cpp/actors/http/http_static.cpp b/library/cpp/actors/http/http_static.cpp index 8e1e649dae..c075c5f693 100644 --- a/library/cpp/actors/http/http_static.cpp +++ b/library/cpp/actors/http/http_static.cpp @@ -1,5 +1,5 @@ -#include "http_proxy.h" -#include "http_static.h" +#include "http_proxy.h" +#include "http_static.h" #include <library/cpp/actors/core/executor_pool_basic.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/core/scheduler_basic.h> diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp index 735fe66380..4c922f8d0f 100644 --- a/library/cpp/actors/http/http_ut.cpp +++ b/library/cpp/actors/http/http_ut.cpp @@ -175,11 +175,11 @@ Y_UNIT_TEST_SUITE(HttpProxy) { } Y_UNIT_TEST(BasicRunning) { - NActors::TTestActorRuntimeBase actorSystem; + NActors::TTestActorRuntimeBase actorSystem; TPortManager portManager; TIpPort port = portManager.GetTcpPort(); TAutoPtr<NActors::IEventHandle> handle; - actorSystem.Initialize(); + actorSystem.Initialize(); NMonitoring::TMetricRegistry sensors; NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); diff --git a/library/cpp/actors/http/ut/ya.make b/library/cpp/actors/http/ut/ya.make index 8d83a1ea91..8b4c04c4d3 100644 --- a/library/cpp/actors/http/ut/ya.make +++ b/library/cpp/actors/http/ut/ya.make @@ -8,11 +8,11 @@ PEERDIR( library/cpp/actors/testlib ) -IF (NOT OS_WINDOWS) +IF (NOT OS_WINDOWS) SRCS( http_ut.cpp ) -ELSE() -ENDIF() +ELSE() +ENDIF() END() diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 567df9e141..6fa25b9965 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -1,5 +1,5 @@ #include "test_runtime.h" - + #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/callstack.h> #include <library/cpp/actors/core/executor_pool_basic.h> @@ -35,7 +35,7 @@ namespace { namespace NActors { ui64 TScheduledEventQueueItem::NextUniqueId = 0; - void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) { + void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) { Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite()) << ", from " << ev->Sender.LocalId(); TString name = runtime->GetActorName(ev->Sender); @@ -56,7 +56,7 @@ namespace NActors { Cerr << "\n"; } - TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() { + TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() { ActorSystemTimestamp = nullptr; ActorSystemMonotonic = nullptr; } @@ -82,13 +82,13 @@ namespace NActors { } - class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> { + class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> { public: - static constexpr EActivityType ActorActivityType() { - return TEST_ACTOR_RUNTIME; + static constexpr EActivityType ActorActivityType() { + return TEST_ACTOR_RUNTIME; } - TEdgeActor(TTestActorRuntimeBase* runtime) + TEdgeActor(TTestActorRuntimeBase* runtime) : TActor(&TEdgeActor::StateFunc) , Runtime(runtime) { @@ -123,7 +123,7 @@ namespace NActors { } private: - TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase* Runtime; }; void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) { @@ -218,9 +218,9 @@ namespace NActors { return Sent.size(); } - class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider { + class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider { public: - TTimeProvider(TTestActorRuntimeBase& runtime) + TTimeProvider(TTestActorRuntimeBase& runtime) : Runtime(runtime) { } @@ -230,12 +230,12 @@ namespace NActors { } private: - TTestActorRuntimeBase& Runtime; + TTestActorRuntimeBase& Runtime; }; - class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread { + class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread { public: - TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) + TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) : Runtime(runtime) , Node(node) { @@ -263,13 +263,13 @@ namespace NActors { } private: - TTestActorRuntimeBase* Runtime; - TTestActorRuntimeBase::TNodeDataBase* Node; + TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase::TNodeDataBase* Node; }; - class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool { + class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool { public: - TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) + TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) : IExecutorPool(poolId) , Runtime(runtime) , NodeIndex(nodeIndex) @@ -277,7 +277,7 @@ namespace NActors { { } - TTestActorRuntimeBase* GetRuntime() { + TTestActorRuntimeBase* GetRuntime() { return Runtime; } @@ -437,26 +437,26 @@ namespace NActors { } private: - TTestActorRuntimeBase* const Runtime; + TTestActorRuntimeBase* const Runtime; const ui32 NodeIndex; - TTestActorRuntimeBase::TNodeDataBase* const Node; + TTestActorRuntimeBase::TNodeDataBase* const Node; }; - IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) { - return new TExecutorPoolStub{runtime, nodeIndex, node, poolId}; - } + IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) { + return new TExecutorPoolStub{runtime, nodeIndex, node, poolId}; + } + - - ui32 TTestActorRuntimeBase::NextNodeId = 1; - - TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv) - : TTestActorRuntimeBase(1, 1, false) + ui32 TTestActorRuntimeBase::NextNodeId = 1; + + TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv) + : TTestActorRuntimeBase(1, 1, false) { SingleSysEnv = true; } - TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) - : ScheduledCount(0) + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) + : ScheduledCount(0) , ScheduledLimit(100000) , MainThreadId(TThread::CurrentThreadId()) , ClusterUUID(MakeClusterId()) @@ -468,53 +468,53 @@ namespace NActors { , DispatchCyclesCount(0) , DispatchedEventsCount(0) , NeedMonitoring(false) - , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed)) - , TimeProvider(new TTimeProvider(*this)) - , ShouldContinue() + , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed)) + , TimeProvider(new TTimeProvider(*this)) + , ShouldContinue() , CurrentTimestamp(0) , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT) , ReschedulingDelay(TDuration::MicroSeconds(0)) - , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc) + , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc) , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector) - , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc) - , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc) - , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver) + , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc) + , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc) + , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver) , CurrentDispatchContext(nullptr) { SetDispatcherRandomSeed(TInstant::Now(), 0); - EnableActorCallstack(); - } + EnableActorCallstack(); + } - void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) { + void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) { const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger"); node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */, - NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0); + NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0); node->LogSettings->SetAllowDrop(false); node->LogSettings->SetThrottleDelay(TDuration::Zero()); - node->DynamicCounters = new NMonitoring::TDynamicCounters; + node->DynamicCounters = new NMonitoring::TDynamicCounters; - InitNodeImpl(node, nodeIndex); + InitNodeImpl(node, nodeIndex); } - void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) { + void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) { node->LogSettings->Append( NActorsServices::EServiceCommon_MIN, NActorsServices::EServiceCommon_MAX, NActorsServices::EServiceCommon_Name ); - - if (!UseRealThreads) { - node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0)); - node->MailboxTable.Reset(new TMailboxTable()); - node->ActorSystem = MakeActorSystem(nodeIndex, node); + + if (!UseRealThreads) { + node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0)); + node->MailboxTable.Reset(new TMailboxTable()); + node->ActorSystem = MakeActorSystem(nodeIndex, node); node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor")); - } else { - node->ActorSystem = MakeActorSystem(nodeIndex, node); - } - - node->ActorSystem->Start(); - } - + } else { + node->ActorSystem = MakeActorSystem(nodeIndex, node); + } + + node->ActorSystem->Start(); + } + bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) { ui64 senderLocalId = ev->Sender.LocalId(); ui64 senderMailboxHint = ev->Sender.Hint(); @@ -527,16 +527,16 @@ namespace NActors { return true; } - TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount) - : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) { + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount) + : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) { } - TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads) - : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) { + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads) + : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) { } - TTestActorRuntimeBase::~TTestActorRuntimeBase() { - CleanupNodes(); + TTestActorRuntimeBase::~TTestActorRuntimeBase() { + CleanupNodes(); Cerr.Flush(); Cerr.Flush(); Clog.Flush(); @@ -544,41 +544,41 @@ namespace NActors { DisableActorCallstack(); } - void TTestActorRuntimeBase::CleanupNodes() { - Nodes.clear(); - } - - bool TTestActorRuntimeBase::IsRealThreads() const { + void TTestActorRuntimeBase::CleanupNodes() { + Nodes.clear(); + } + + bool TTestActorRuntimeBase::IsRealThreads() const { return UseRealThreads; } - TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); Y_UNUSED(event); return EEventAction::PROCESS; } - void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { + void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { Y_UNUSED(runtime); Y_UNUSED(queue); scheduledEvents.clear(); } - bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); Y_UNUSED(event); return false; } - bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) { - Y_UNUSED(runtime); + bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) { + Y_UNUSED(runtime); Y_UNUSED(delay); - Y_UNUSED(event); + Y_UNUSED(event); Y_UNUSED(deadline); return true; } - + void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { runtime.ScheduleWhiteList.insert(actorId); @@ -634,7 +634,7 @@ namespace NActors { } }; - void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { + void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { if (scheduledEvents.empty()) return; @@ -682,46 +682,46 @@ namespace NActors { runtime.UpdateCurrentTime(time); } - TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) { + TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) { TGuard<TMutex> guard(Mutex); auto result = ObserverFunc; ObserverFunc = observerFunc; return result; } - TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) { + TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) { TGuard<TMutex> guard(Mutex); auto result = ScheduledEventsSelectorFunc; ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc; return result; } - TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) { + TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) { TGuard<TMutex> guard(Mutex); auto result = EventFilterFunc; EventFilterFunc = filterFunc; return result; } - TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) { + TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) { TGuard<TMutex> guard(Mutex); auto result = ScheduledEventFilterFunc; ScheduledEventFilterFunc = filterFunc; return result; } - TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) { + TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) { TGuard<TMutex> guard(Mutex); auto result = RegistrationObserver; RegistrationObserver = observerFunc; return result; } - bool TTestActorRuntimeBase::IsVerbose() { + bool TTestActorRuntimeBase::IsVerbose() { return VERBOSE; } - void TTestActorRuntimeBase::SetVerbose(bool verbose) { + void TTestActorRuntimeBase::SetVerbose(bool verbose) { VERBOSE = verbose; } @@ -730,7 +730,7 @@ namespace NActors { Y_VERIFY(nodeIndex < NodeCount); auto node = Nodes[nodeIndex + FirstNodeId]; if (!node) { - node = GetNodeFactory().CreateNode(); + node = GetNodeFactory().CreateNode(); Nodes[nodeIndex + FirstNodeId] = node; } @@ -738,51 +738,51 @@ namespace NActors { node->LocalServices.push_back(std::make_pair(actorId, cmd)); } - void TTestActorRuntimeBase::InitNodes() { - NextNodeId += NodeCount; - Y_VERIFY(NodeCount > 0); + void TTestActorRuntimeBase::InitNodes() { + NextNodeId += NodeCount; + Y_VERIFY(NodeCount > 0); - for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { - auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first; - TNodeDataBase* node = nodeIt->second.Get(); - InitNode(node, nodeIndex); + for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { + auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first; + TNodeDataBase* node = nodeIt->second.Get(); + InitNode(node, nodeIndex); } - } + } - void TTestActorRuntimeBase::Initialize() { - InitNodes(); - IsInitialized = true; + void TTestActorRuntimeBase::Initialize() { + InitNodes(); + IsInitialized = true; } void SetupCrossDC() { } - TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) { + TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) { TGuard<TMutex> guard(Mutex); TDuration oldTimeout = DispatchTimeout; DispatchTimeout = timeout; return oldTimeout; } - TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) { + TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) { TGuard<TMutex> guard(Mutex); TDuration oldDelay = ReschedulingDelay; ReschedulingDelay = delay; return oldDelay; } - void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) { + void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) { Y_VERIFY(!IsInitialized); TGuard<TMutex> guard(Mutex); LogBackend = logBackend; } - void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) { + void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) { TGuard<TMutex> guard(Mutex); for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); TString explanation; auto status = node->LogSettings->SetLevel(priority, component, explanation); if (status) { @@ -791,13 +791,13 @@ namespace NActors { } } - TInstant TTestActorRuntimeBase::GetCurrentTime() const { + TInstant TTestActorRuntimeBase::GetCurrentTime() const { TGuard<TMutex> guard(Mutex); Y_VERIFY(!UseRealThreads); return TInstant::MicroSeconds(CurrentTimestamp); } - void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) { + void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) { static int counter = 0; ++counter; if (VERBOSE) { @@ -814,25 +814,25 @@ namespace NActors { } } - void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) { + void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) { UpdateCurrentTime(GetCurrentTime() + duration); } - TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() { + TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() { Y_VERIFY(!UseRealThreads); return TimeProvider; } - ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const { + ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const { Y_VERIFY(index < NodeCount); return FirstNodeId + index; } - ui32 TTestActorRuntimeBase::GetNodeCount() const { + ui32 TTestActorRuntimeBase::GetNodeCount() const { return NodeCount; } - ui64 TTestActorRuntimeBase::AllocateLocalId() { + ui64 TTestActorRuntimeBase::AllocateLocalId() { TGuard<TMutex> guard(Mutex); ui64 nextId = ++LocalId; if (VERBOSE) { @@ -842,7 +842,7 @@ namespace NActors { return nextId; } - ui32 TTestActorRuntimeBase::InterconnectPoolId() const { + ui32 TTestActorRuntimeBase::InterconnectPoolId() const { if (UseRealThreads && NSan::TSanIsOn()) { // Interconnect coroutines may move across threads // Use a special single-threaded pool to avoid that @@ -851,7 +851,7 @@ namespace NActors { return 0; } - TString TTestActorRuntimeBase::GetTempDir() { + TString TTestActorRuntimeBase::GetTempDir() { if (!TmpDir) TmpDir.Reset(new TTempDir()); return (*TmpDir)(); @@ -861,7 +861,7 @@ namespace NActors { ui64 revolvingCounter, const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard<TMutex> guard(Mutex); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId); @@ -929,7 +929,7 @@ namespace NActors { const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard<TMutex> guard(Mutex); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId); @@ -952,7 +952,7 @@ namespace NActors { TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (!UseRealThreads) { IActor* actor = FindActor(actorId, node); node->LocalServicesActors[serviceId] = actor; @@ -971,7 +971,7 @@ namespace NActors { return edgeActor; } - TEventsList TTestActorRuntimeBase::CaptureEvents() { + TEventsList TTestActorRuntimeBase::CaptureEvents() { TGuard<TMutex> guard(Mutex); TEventsList result; for (auto& mbox : Mailboxes) { @@ -981,7 +981,7 @@ namespace NActors { return result; } - TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) { + TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); TEventsList result; @@ -989,14 +989,14 @@ namespace NActors { return result; } - void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) { + void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) { TGuard<TMutex> guard(Mutex); ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); } - void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) { + void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) { TGuard<TMutex> guard(Mutex); for (auto rit = events.rbegin(); rit != events.rend(); ++rit) { if (*rit) { @@ -1010,7 +1010,7 @@ namespace NActors { events.clear(); } - void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) { + void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); TEventsList result; @@ -1018,7 +1018,7 @@ namespace NActors { events.clear(); } - TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() { + TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() { TGuard<TMutex> guard(Mutex); TScheduledEventsList result; for (auto& mbox : Mailboxes) { @@ -1028,28 +1028,28 @@ namespace NActors { return result; } - bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) { + bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) { return DispatchEvents(options, TInstant::Max()); } - bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) { + bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) { return DispatchEvents(options, TInstant::MicroSeconds(CurrentTimestamp) + simTimeout); } - bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) { + bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) { TGuard<TMutex> guard(Mutex); return DispatchEventsInternal(options, simDeadline); } // Mutex must be locked by caller! - bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) { + bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) { TDispatchContext localContext; localContext.Options = &options; localContext.PrevContext = nullptr; bool verbose = !options.Quiet && VERBOSE; struct TDispatchContextSetter { - TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext) + TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext) : Runtime(runtime) { lastContext.PrevContext = Runtime.CurrentDispatchContext; @@ -1060,7 +1060,7 @@ namespace NActors { Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext; } - TTestActorRuntimeBase& Runtime; + TTestActorRuntimeBase& Runtime; } DispatchContextSetter(*this, localContext); TInstant dispatchTime = TInstant::MicroSeconds(0); @@ -1072,7 +1072,7 @@ namespace NActors { } struct TTempEdgeEventsCaptor { - TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime) + TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime) : Runtime(runtime) , HasEvents(false) { @@ -1103,7 +1103,7 @@ namespace NActors { } } - TTestActorRuntimeBase& Runtime; + TTestActorRuntimeBase& Runtime; TEventMailBoxList Store; bool HasEvents; }; @@ -1354,7 +1354,7 @@ namespace NActors { return false; } - void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) { + void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) { TDispatchContext* context = CurrentDispatchContext; while (context) { const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes; @@ -1366,7 +1366,7 @@ namespace NActors { } } - void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) { + void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) { TDispatchContext* context = CurrentDispatchContext; while (context) { for (const auto& finalEvent : context->Options->FinalEvents) { @@ -1382,14 +1382,14 @@ namespace NActors { } } - void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) { + void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) { TGuard<TMutex> guard(Mutex); Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32, senderNodeIndex, NodeCount); SendInternal(ev, senderNodeIndex, viaActorSystem); } - void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) { + void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; @@ -1400,12 +1400,12 @@ namespace NActors { Cerr << "Event was added to scheduled queue\n"; } - void TTestActorRuntimeBase::ClearCounters() { + void TTestActorRuntimeBase::ClearCounters() { TGuard<TMutex> guard(Mutex); EvCounters.clear(); } - ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const { + ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const { TGuard<TMutex> guard(Mutex); auto it = EvCounters.find(evType); if (it == EvCounters.end()) @@ -1417,7 +1417,7 @@ namespace NActors { TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); return node->ActorSystem->LookupLocalService(serviceId); } @@ -1465,7 +1465,7 @@ namespace NActors { Y_VERIFY(nodeIndexFrom < NodeCount); Y_VERIFY(nodeIndexTo < NodeCount); Y_VERIFY(nodeIndexFrom != nodeIndexTo); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get(); return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo); } @@ -1474,7 +1474,7 @@ namespace NActors { BlockedOutput.insert(actorId); } - void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) { + void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) { ui64 days = (time.Hours() / 24); DispatcherRandomSeed = (days << 32) ^ iteration; DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); @@ -1490,7 +1490,7 @@ namespace NActors { Y_VERIFY(nodeIndex < NodeCount); auto nodeIt = Nodes.find(FirstNodeId + nodeIndex); Y_VERIFY(nodeIt != Nodes.end()); - TNodeDataBase* node = nodeIt->second.Get(); + TNodeDataBase* node = nodeIt->second.Get(); return FindActor(actorId, node); } @@ -1514,11 +1514,11 @@ namespace NActors { return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end(); } - TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) { + TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; - TNodeDataBase* node = Nodes[nodeId].Get(); + TNodeDataBase* node = Nodes[nodeId].Get(); return node->DynamicCounters; } @@ -1526,10 +1526,10 @@ namespace NActors { NeedMonitoring = true; } - void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) { + void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) { Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; - TNodeDataBase* node = Nodes[nodeId].Get(); + TNodeDataBase* node = Nodes[nodeId].Get(); ui32 targetNode = ev->GetRecipientRewrite().NodeId(); ui32 targetNodeIndex; if (targetNode == 0) { @@ -1607,10 +1607,10 @@ namespace NActors { return actor; } - THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) { + THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) { THolder<TActorSystemSetup> setup(new TActorSystemSetup); setup->NodeId = FirstNodeId + nodeIndex; - + if (UseRealThreads) { setup->ExecutorsCount = 5; setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]); @@ -1627,20 +1627,20 @@ namespace NActors { setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0)); } - InitActorSystemSetup(*setup); - - return setup; - } - - THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { - auto setup = MakeActorSystemSetup(nodeIndex, node); - + InitActorSystemSetup(*setup); + + return setup; + } + + THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { + auto setup = MakeActorSystemSetup(nodeIndex, node); + node->ExecutorPools.resize(setup->ExecutorsCount); for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { node->ExecutorPools[i] = setup->Executors[i].Get(); } - const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect"); + const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect"); setup->LocalServices = node->LocalServices; setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount); @@ -1686,22 +1686,22 @@ namespace NActors { if (!SingleSysEnv) { // Single system env should do this self TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend(); NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings, - logBackend, GetCountersForComponent(node->DynamicCounters, "utils")); - NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId()); + logBackend, GetCountersForComponent(node->DynamicCounters, "utils")); + NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId()); std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd); setup->LocalServices.push_back(loggerActorPair); } - return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); + return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); } - TActorSystem* TTestActorRuntimeBase::SingleSys() const { + TActorSystem* TTestActorRuntimeBase::SingleSys() const { Y_VERIFY(Nodes.size() == 1, "Works only for single system env"); return Nodes.begin()->second->ActorSystem.Get(); } - TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() { + TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() { for (auto& x : Nodes) { return x.second->ActorSystem.Get(); } @@ -1715,7 +1715,7 @@ namespace NActors { } - TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) { + TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) { TGuard<TMutex> guard(Mutex); auto mboxId = TEventMailboxId(nodeId, hint); auto it = Mailboxes.find(mboxId); @@ -1726,7 +1726,7 @@ namespace NActors { return *it->second; } - void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) { + void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) { TGuard<TMutex> guard(Mutex); auto mboxId = TEventMailboxId(nodeId, hint); Mailboxes.erase(mboxId); @@ -1753,8 +1753,8 @@ namespace NActors { public: class TReplyActor : public TActor<TReplyActor> { public: - static constexpr EActivityType ActorActivityType() { - return TEST_ACTOR_RUNTIME; + static constexpr EActivityType ActorActivityType() { + return TEST_ACTOR_RUNTIME; } TReplyActor(TStrandingActorDecorator* owner) @@ -1769,8 +1769,8 @@ namespace NActors { TStrandingActorDecorator* const Owner; }; - static constexpr EActivityType ActorActivityType() { - return TEST_ACTOR_RUNTIME; + static constexpr EActivityType ActorActivityType() { + return TEST_ACTOR_RUNTIME; } TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors, @@ -1864,7 +1864,7 @@ namespace NActors { TActorId ReplyId; bool HasReply; TDispatchOptions DelegateeOptions; - TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase* Runtime; THolder<IReplyChecker> ReplyChecker; }; @@ -1889,7 +1889,7 @@ namespace NActors { private: TSimpleSharedPtr<TStrandingActorDecoratorContext> Context; - TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase* Runtime; TReplyCheckerCreator CreateReplyChecker; }; diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 6baeebcb37..26e3b45c98 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -1,5 +1,5 @@ #pragma once - + #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/log.h> @@ -182,7 +182,7 @@ namespace NActors { } }; - class TTestActorRuntimeBase: public TNonCopyable { + class TTestActorRuntimeBase: public TNonCopyable { public: class TEdgeActor; class TSchedulerThreadStub; @@ -195,24 +195,24 @@ namespace NActors { RESCHEDULE }; - typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver; - typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector; - typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter; - typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter; + typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver; + typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector; + typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter; + typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter; typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver; - TTestActorRuntimeBase(THeSingleSystemEnv); - TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); - TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount); - TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false); - virtual ~TTestActorRuntimeBase(); + TTestActorRuntimeBase(THeSingleSystemEnv); + TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); + TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount); + TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false); + virtual ~TTestActorRuntimeBase(); bool IsRealThreads() const; - static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); - static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); - static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); - static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); - static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline); + static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); + static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); + static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); + static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); + static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline); static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId); TEventObserver SetObserverFunc(TEventObserver observerFunc); TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc); @@ -233,7 +233,7 @@ namespace NActors { void UpdateCurrentTime(TInstant newTime); void AdvanceCurrentTime(TDuration duration); void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0); - virtual void Initialize(); + virtual void Initialize(); ui32 GetNodeId(ui32 index = 0) const; ui32 GetNodeCount() const; ui64 AllocateLocalId(); @@ -291,7 +291,7 @@ namespace NActors { TEvent* GrabEdgeEventIf(TAutoPtr<IEventHandle>& handle, std::function<bool(const TEvent&)> predicate, TDuration simTimeout = TDuration::Max()) { handle.Destroy(); const ui32 eventType = TEvent::EventType; - WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); if (event->GetTypeRewrite() != eventType) return false; @@ -323,7 +323,7 @@ namespace NActors { { typename TEvent::TPtr handle; const ui32 eventType = TEvent::EventType; - WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); if (event->GetTypeRewrite() != eventType) return false; @@ -383,7 +383,7 @@ namespace NActors { std::tuple<TEvents*...> GrabEdgeEvents(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) { handle.Destroy(); auto eventTypes = { TEvents::EventType... }; - WaitForEdgeEvents([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { + WaitForEdgeEvents([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { if (std::find(std::begin(eventTypes), std::end(eventTypes), event->GetTypeRewrite()) == std::end(eventTypes)) return false; handle = event; @@ -472,26 +472,26 @@ namespace NActors { } protected: - struct TNodeDataBase; - TNodeDataBase* GetRawNode(ui32 node) const { + struct TNodeDataBase; + TNodeDataBase* GetRawNode(ui32 node) const { return Nodes.at(FirstNodeId + node).Get(); } - static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId); - virtual TIntrusivePtr<NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const char* component) { - Y_UNUSED(counters); - Y_UNUSED(component); - - // do nothing, just return the existing counters - return counters; - } - - THolder<TActorSystemSetup> MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node); - THolder<TActorSystem> MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node); - virtual void InitActorSystemSetup(TActorSystemSetup& setup) { - Y_UNUSED(setup); - } - + static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId); + virtual TIntrusivePtr<NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const char* component) { + Y_UNUSED(counters); + Y_UNUSED(component); + + // do nothing, just return the existing counters + return counters; + } + + THolder<TActorSystemSetup> MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node); + THolder<TActorSystem> MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node); + virtual void InitActorSystemSetup(TActorSystemSetup& setup) { + Y_UNUSED(setup); + } + private: IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const; void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem); @@ -506,18 +506,18 @@ namespace NActors { ui64 ScheduledLimit; THolder<TTempDir> TmpDir; const TThread::TId MainThreadId; - + protected: bool UseRealInterconnect = false; TInterconnectMock InterconnectMock; - bool IsInitialized = false; - bool SingleSysEnv = false; + bool IsInitialized = false; + bool SingleSysEnv = false; const TString ClusterUUID; const ui32 FirstNodeId; const ui32 NodeCount; const ui32 DataCenterCount; const bool UseRealThreads; - + ui64 LocalId; TMutex Mutex; TCondVar MailboxesHasEvents; @@ -532,28 +532,28 @@ namespace NActors { TAutoPtr<TLogBackend> LogBackend; bool NeedMonitoring; - TIntrusivePtr<IRandomProvider> RandomProvider; - TIntrusivePtr<ITimeProvider> TimeProvider; - + TIntrusivePtr<IRandomProvider> RandomProvider; + TIntrusivePtr<ITimeProvider> TimeProvider; + protected: - struct TNodeDataBase: public TThrRefBase { - TNodeDataBase(); + struct TNodeDataBase: public TThrRefBase { + TNodeDataBase(); void Stop(); - virtual ~TNodeDataBase(); - virtual ui64 GetLoggerPoolId() const { - return 0; - } - - template <typename T = void> - T* GetAppData() { - return static_cast<T*>(AppData0.get()); - } - - template <typename T = void> - const T* GetAppData() const { - return static_cast<T*>(AppData0.get()); - } - + virtual ~TNodeDataBase(); + virtual ui64 GetLoggerPoolId() const { + return 0; + } + + template <typename T = void> + T* GetAppData() { + return static_cast<T*>(AppData0.get()); + } + + template <typename T = void> + const T* GetAppData() const { + return static_cast<T*>(AppData0.get()); + } + TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; TIntrusivePtr<NActors::NLog::TSettings> LogSettings; TIntrusivePtr<NInterconnect::TPollerThreads> Poller; @@ -563,44 +563,44 @@ namespace NActors { TMap<TActorId, IActor*> LocalServicesActors; TMap<IActor*, TActorId> ActorToActorId; THolder<TMailboxTable> MailboxTable; - std::shared_ptr<void> AppData0; + std::shared_ptr<void> AppData0; THolder<TActorSystem> ActorSystem; - THolder<IExecutorPool> SchedulerPool; + THolder<IExecutorPool> SchedulerPool; TVector<IExecutorPool*> ExecutorPools; THolder<TExecutorThread> ExecutorThread; - }; + }; - struct INodeFactory { - virtual ~INodeFactory() = default; - virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0; + struct INodeFactory { + virtual ~INodeFactory() = default; + virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0; }; - struct TDefaultNodeFactory final: INodeFactory { - virtual TIntrusivePtr<TNodeDataBase> CreateNode() override { - return new TNodeDataBase(); - } - }; - - INodeFactory& GetNodeFactory() { - return *NodeFactory; - } - - virtual TNodeDataBase* GetNodeById(size_t idx) { - return Nodes[idx].Get(); - } - - void InitNodes(); - void CleanupNodes(); - virtual void InitNodeImpl(TNodeDataBase*, size_t); - + struct TDefaultNodeFactory final: INodeFactory { + virtual TIntrusivePtr<TNodeDataBase> CreateNode() override { + return new TNodeDataBase(); + } + }; + + INodeFactory& GetNodeFactory() { + return *NodeFactory; + } + + virtual TNodeDataBase* GetNodeById(size_t idx) { + return Nodes[idx].Get(); + } + + void InitNodes(); + void CleanupNodes(); + virtual void InitNodeImpl(TNodeDataBase*, size_t); + static bool AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev); - protected: - THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory}; - + protected: + THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory}; + private: - void InitNode(TNodeDataBase* node, size_t idx); - + void InitNode(TNodeDataBase* node, size_t idx); + struct TDispatchContext { const TDispatchOptions* Options; TDispatchContext* PrevContext; @@ -610,8 +610,8 @@ namespace NActors { bool FinalEventFound = false; }; - TProgramShouldContinue ShouldContinue; - TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes; + TProgramShouldContinue ShouldContinue; + TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes; ui64 CurrentTimestamp; TSet<TActorId> EdgeActors; THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox; diff --git a/library/cpp/actors/ya.make b/library/cpp/actors/ya.make index b50699b855..737c7fbc18 100644 --- a/library/cpp/actors/ya.make +++ b/library/cpp/actors/ya.make @@ -11,6 +11,6 @@ RECURSE( protos util wilson - testlib - http + testlib + http ) |