aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authormsherbakov <msherbakov@yandex-team.ru>2022-02-10 16:49:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:17 +0300
commita0ffafe83b7d6229709a32fa942c71d672ac989c (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/actors
parentc224a621661ddd69699f9476922eb316607ef57e (diff)
downloadydb-a0ffafe83b7d6229709a32fa942c71d672ac989c.tar.gz
Restoring authorship annotation for <msherbakov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/core/actor.h2
-rw-r--r--library/cpp/actors/core/log.cpp80
-rw-r--r--library/cpp/actors/core/log.h76
-rw-r--r--library/cpp/actors/core/log_ut.cpp250
-rw-r--r--library/cpp/actors/core/mon.h6
-rw-r--r--library/cpp/actors/core/process_stats.cpp156
-rw-r--r--library/cpp/actors/core/process_stats.h6
-rw-r--r--library/cpp/actors/core/ut/ya.make2
-rw-r--r--library/cpp/actors/http/http_cache.cpp4
-rw-r--r--library/cpp/actors/http/http_static.cpp4
-rw-r--r--library/cpp/actors/http/http_ut.cpp4
-rw-r--r--library/cpp/actors/http/ut/ya.make6
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp326
-rw-r--r--library/cpp/actors/testlib/test_runtime.h184
-rw-r--r--library/cpp/actors/ya.make4
15 files changed, 555 insertions, 555 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index c30ac17946..ed29bd14b9 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -234,7 +234,7 @@ namespace NActors {
INTERCONNECT_SESSION_TCP = 13,
INTERCONNECT_COMMON = 171,
SELF_PING_ACTOR = 207,
- TEST_ACTOR_RUNTIME = 283,
+ TEST_ACTOR_RUNTIME = 283,
INTERCONNECT_HANDSHAKE = 284,
INTERCONNECT_POLLER = 285,
INTERCONNECT_SESSION_KILLER = 286,
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index 46af66104d..5f63b5af58 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -217,8 +217,8 @@ namespace NActors {
TString formatted;
vsprintf(formatted, c, params);
- auto ok = OutputRecord(time, NLog::EPrio(priority), component, formatted);
- Y_UNUSED(ok);
+ auto ok = OutputRecord(time, NLog::EPrio(priority), component, formatted);
+ Y_UNUSED(ok);
va_end(params);
}
}
@@ -230,9 +230,9 @@ namespace NActors {
void TLoggerActor::LogIgnoredCount(TInstant now) {
TString message = Sprintf("Ignored IgnoredCount# %" PRIu64 " log records due to logger overflow!", IgnoredCount);
- if (!OutputRecord(now, NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) {
- BecomeDefunct();
- }
+ if (!OutputRecord(now, NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) {
+ BecomeDefunct();
+ }
}
void TLoggerActor::HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx) {
@@ -242,14 +242,14 @@ namespace NActors {
PassedCount = 0;
}
- void TLoggerActor::HandleIgnoredEventDrop() {
- // logger backend is unavailable, just ignore
- }
-
- void TLoggerActor::WriteMessageStat(const NLog::TEvLog& ev) {
+ void TLoggerActor::HandleIgnoredEventDrop() {
+ // logger backend is unavailable, just ignore
+ }
+
+ void TLoggerActor::WriteMessageStat(const NLog::TEvLog& ev) {
Metrics->IncActorMsgs();
- const auto prio = ev.Level.ToPrio();
+ const auto prio = ev.Level.ToPrio();
switch (prio) {
case ::NActors::NLog::EPrio::Alert:
@@ -262,11 +262,11 @@ namespace NActors {
break;
}
- }
-
- void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) {
- i64 delayMillisec = (ctx.Now() - ev->Get()->Stamp).MilliSeconds();
- WriteMessageStat(*ev->Get());
+ }
+
+ void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) {
+ i64 delayMillisec = (ctx.Now() - ev->Get()->Stamp).MilliSeconds();
+ WriteMessageStat(*ev->Get());
if (Settings->AllowDrop) {
// Disable throttling if it was enabled previously
if (AtomicGet(IsOverflow))
@@ -291,17 +291,17 @@ namespace NActors {
AtomicSet(IsOverflow, 0);
}
- const auto prio = ev->Get()->Level.ToPrio();
- if (!OutputRecord(ev->Get()->Stamp, prio, ev->Get()->Component, ev->Get()->Line)) {
- BecomeDefunct();
- }
+ const auto prio = ev->Get()->Level.ToPrio();
+ if (!OutputRecord(ev->Get()->Stamp, prio, ev->Get()->Component, ev->Get()->Line)) {
+ BecomeDefunct();
+ }
+ }
+
+ void TLoggerActor::BecomeDefunct() {
+ Become(&TThis::StateDefunct);
+ Schedule(WakeupInterval, new TEvents::TEvWakeup);
}
- void TLoggerActor::BecomeDefunct() {
- Become(&TThis::StateDefunct);
- Schedule(WakeupInterval, new TEvents::TEvWakeup);
- }
-
void TLoggerActor::HandleLogComponentLevelRequest(TLogComponentLevelRequest::TPtr& ev, const NActors::TActorContext& ctx) {
Metrics->IncLevelRequests();
TString explanation;
@@ -367,7 +367,7 @@ namespace NActors {
* 4. Log level changes (last N changes)
*/
void TLoggerActor::HandleMonInfo(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
- const auto& params = ev->Get()->Request.GetParams();
+ const auto& params = ev->Get()->Request.GetParams();
NLog::EComponent component = NLog::InvalidComponent;
NLog::EPriority priority = NLog::PRI_DEBUG;
NLog::EPriority samplingPriority = NLog::PRI_DEBUG;
@@ -574,8 +574,8 @@ namespace NActors {
constexpr size_t TimeBufSize = 512;
- bool TLoggerActor::OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component,
- const TString& formatted) noexcept try {
+ bool TLoggerActor::OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component,
+ const TString& formatted) noexcept try {
const auto logPrio = ::ELogPriority(ui16(priority));
char buf[TimeBufSize];
@@ -634,21 +634,21 @@ namespace NActors {
TLogRecord(logPrio, logRecord.data(), logRecord.size()));
} break;
}
-
- return true;
- } catch (...) {
- return false;
+
+ return true;
+ } catch (...) {
+ return false;
}
- void TLoggerActor::HandleLogEventDrop(const NLog::TEvLog::TPtr& ev) {
- WriteMessageStat(*ev->Get());
+ void TLoggerActor::HandleLogEventDrop(const NLog::TEvLog::TPtr& ev) {
+ WriteMessageStat(*ev->Get());
Metrics->IncDroppedMsgs();
- }
-
- void TLoggerActor::HandleWakeup() {
- Become(&TThis::StateFunc);
- }
-
+ }
+
+ void TLoggerActor::HandleWakeup() {
+ Become(&TThis::StateFunc);
+ }
+
const char* TLoggerActor::FormatLocalTimestamp(TInstant time, char* buf) {
struct tm localTime;
time.LocalTime(&localTime);
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index 0bd6e41729..c11a7cf3c1 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -219,16 +219,16 @@ namespace NActors {
}
}
- STFUNC(StateDefunct) {
- switch (ev->GetTypeRewrite()) {
- cFunc(TLogIgnored::EventType, HandleIgnoredEventDrop);
- hFunc(NLog::TEvLog, HandleLogEventDrop);
- HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest);
- HFunc(NMon::TEvHttpInfo, HandleMonInfo);
- cFunc(TEvents::TEvWakeup::EventType, HandleWakeup);
- }
- }
-
+ STFUNC(StateDefunct) {
+ switch (ev->GetTypeRewrite()) {
+ cFunc(TLogIgnored::EventType, HandleIgnoredEventDrop);
+ hFunc(NLog::TEvLog, HandleLogEventDrop);
+ HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest);
+ HFunc(NMon::TEvHttpInfo, HandleMonInfo);
+ cFunc(TEvents::TEvWakeup::EventType, HandleWakeup);
+ }
+ }
+
// Directly call logger instead of sending a message
void Log(TInstant time, NLog::EPriority priority, NLog::EComponent component, const char* c, ...);
@@ -240,21 +240,21 @@ namespace NActors {
ui64 IgnoredCount = 0;
ui64 PassedCount = 0;
static TAtomic IsOverflow;
- TDuration WakeupInterval{TDuration::Seconds(5)};
+ TDuration WakeupInterval{TDuration::Seconds(5)};
std::unique_ptr<ILoggerMetrics> Metrics;
- void BecomeDefunct();
+ void BecomeDefunct();
void HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx);
- void HandleIgnoredEventDrop();
+ void HandleIgnoredEventDrop();
void HandleLogEvent(NLog::TEvLog::TPtr& ev, const TActorContext& ctx);
- void HandleLogEventDrop(const NLog::TEvLog::TPtr& ev);
+ void HandleLogEventDrop(const NLog::TEvLog::TPtr& ev);
void HandleLogComponentLevelRequest(TLogComponentLevelRequest::TPtr& ev, const TActorContext& ctx);
void HandleMonInfo(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx);
- void HandleWakeup();
- [[nodiscard]] bool OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, const TString& formatted) noexcept;
+ void HandleWakeup();
+ [[nodiscard]] bool OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, const TString& formatted) noexcept;
void RenderComponentPriorities(IOutputStream& str);
void LogIgnoredCount(TInstant now);
- void WriteMessageStat(const NLog::TEvLog& ev);
+ void WriteMessageStat(const NLog::TEvLog& ev);
static const char* FormatLocalTimestamp(TInstant time, char* buf);
};
@@ -305,19 +305,19 @@ namespace NActors {
// Logging adaptors for memory log and logging into filesystem
/////////////////////////////////////////////////////////////////////
- namespace NDetail {
- inline void Y_PRINTF_FORMAT(2, 3) PrintfV(TString& dst, const char* format, ...) {
- va_list params;
- va_start(params, format);
- vsprintf(dst, format, params);
- va_end(params);
- }
-
- inline void PrintfV(TString& dst, const char* format, va_list params) {
- vsprintf(dst, format, params);
- }
- } // namespace NDetail
-
+ namespace NDetail {
+ inline void Y_PRINTF_FORMAT(2, 3) PrintfV(TString& dst, const char* format, ...) {
+ va_list params;
+ va_start(params, format);
+ vsprintf(dst, format, params);
+ va_end(params);
+ }
+
+ inline void PrintfV(TString& dst, const char* format, va_list params) {
+ vsprintf(dst, format, params);
+ }
+ } // namespace NDetail
+
template <typename TCtx>
inline void DeliverLogMessage(TCtx& ctx, NLog::EPriority mPriority, NLog::EComponent mComponent, TString &&str)
{
@@ -326,21 +326,21 @@ namespace NActors {
ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str))));
}
- template <typename TCtx, typename... TArgs>
+ template <typename TCtx, typename... TArgs>
inline void MemLogAdapter(
TCtx& actorCtxOrSystem,
NLog::EPriority mPriority,
NLog::EComponent mComponent,
- const char* format, TArgs&&... params) {
+ const char* format, TArgs&&... params) {
TString Formatted;
- if constexpr (sizeof... (params) > 0) {
- NDetail::PrintfV(Formatted, format, std::forward<TArgs>(params)...);
- } else {
- NDetail::PrintfV(Formatted, "%s", format);
- }
-
+ if constexpr (sizeof... (params) > 0) {
+ NDetail::PrintfV(Formatted, format, std::forward<TArgs>(params)...);
+ } else {
+ NDetail::PrintfV(Formatted, "%s", format);
+ }
+
MemLogWrite(Formatted.data(), Formatted.size(), true);
DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, std::move(Formatted));
}
diff --git a/library/cpp/actors/core/log_ut.cpp b/library/cpp/actors/core/log_ut.cpp
index f5396cc8b1..09b5f88ea2 100644
--- a/library/cpp/actors/core/log_ut.cpp
+++ b/library/cpp/actors/core/log_ut.cpp
@@ -1,27 +1,27 @@
-#include "log.h"
-
+#include "log.h"
+
#include <library/cpp/testing/unittest/registar.h>
-
+
#include <library/cpp/actors/testlib/test_runtime.h>
-
-using namespace NMonitoring;
-using namespace NActors;
-using namespace NActors::NLog;
-
-namespace {
- const TString& ServiceToString(int) {
- static const TString FAKE{"FAKE"};
- return FAKE;
- }
-
- TIntrusivePtr<TSettings> DefaultSettings() {
+
+using namespace NMonitoring;
+using namespace NActors;
+using namespace NActors::NLog;
+
+namespace {
+ const TString& ServiceToString(int) {
+ static const TString FAKE{"FAKE"};
+ return FAKE;
+ }
+
+ TIntrusivePtr<TSettings> DefaultSettings() {
auto loggerId = TActorId{0, "Logger"};
- auto s = MakeIntrusive<TSettings>(loggerId, 0, EPriority::PRI_TRACE);
- s->SetAllowDrop(false);
- s->Append(0, 1, ServiceToString);
- return s;
- }
-
+ auto s = MakeIntrusive<TSettings>(loggerId, 0, EPriority::PRI_TRACE);
+ s->SetAllowDrop(false);
+ s->Append(0, 1, ServiceToString);
+ return s;
+ }
+
TIntrusivePtr<TSettings> DroppingSettings(ui64 timeThresholdMs) {
auto loggerId = TActorId{0, "Logger"};
auto s = MakeIntrusive<TSettings>(
@@ -35,123 +35,123 @@ namespace {
return s;
}
- class TMockBackend: public TLogBackend {
- public:
- using TWriteImpl = std::function<void(const TLogRecord&)>;
- using TReopenImpl = std::function<void()>;
-
- static void REOPEN_NOP() { }
-
- TMockBackend(TWriteImpl writeImpl, TReopenImpl reopenImpl = REOPEN_NOP)
- : WriteImpl_{writeImpl}
- , ReopenImpl_{reopenImpl}
- {
- }
-
- void WriteData(const TLogRecord& r) override {
- WriteImpl_(r);
- }
-
- void ReopenLog() override { }
-
- void SetWriteImpl(TWriteImpl writeImpl) {
- WriteImpl_ = writeImpl;
- }
-
- private:
- TWriteImpl WriteImpl_;
- TReopenImpl ReopenImpl_;
- };
-
- void ThrowAlways(const TLogRecord&) {
- ythrow yexception();
- };
-
- struct TFixture {
+ class TMockBackend: public TLogBackend {
+ public:
+ using TWriteImpl = std::function<void(const TLogRecord&)>;
+ using TReopenImpl = std::function<void()>;
+
+ static void REOPEN_NOP() { }
+
+ TMockBackend(TWriteImpl writeImpl, TReopenImpl reopenImpl = REOPEN_NOP)
+ : WriteImpl_{writeImpl}
+ , ReopenImpl_{reopenImpl}
+ {
+ }
+
+ void WriteData(const TLogRecord& r) override {
+ WriteImpl_(r);
+ }
+
+ void ReopenLog() override { }
+
+ void SetWriteImpl(TWriteImpl writeImpl) {
+ WriteImpl_ = writeImpl;
+ }
+
+ private:
+ TWriteImpl WriteImpl_;
+ TReopenImpl ReopenImpl_;
+ };
+
+ void ThrowAlways(const TLogRecord&) {
+ ythrow yexception();
+ };
+
+ struct TFixture {
TFixture(
TIntrusivePtr<TSettings> settings,
TMockBackend::TWriteImpl writeImpl = ThrowAlways)
{
- Runtime.Initialize();
- LogBackend.reset(new TMockBackend{writeImpl});
+ Runtime.Initialize();
+ LogBackend.reset(new TMockBackend{writeImpl});
LoggerActor = Runtime.Register(new TLoggerActor{settings, LogBackend, Counters});
- Runtime.SetScheduledEventFilter([] (auto&&, auto&&, auto&&, auto) {
- return false;
- });
- }
-
+ Runtime.SetScheduledEventFilter([] (auto&&, auto&&, auto&&, auto) {
+ return false;
+ });
+ }
+
TFixture(TMockBackend::TWriteImpl writeImpl = ThrowAlways)
: TFixture(DefaultSettings(), writeImpl)
{}
- void WriteLog() {
- Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")});
- }
-
+ void WriteLog() {
+ Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")});
+ }
+
void WriteLog(TInstant ts) {
Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{EPrio::Emerg}, 0, "foo")});
}
- void Wakeup() {
- Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvents::TEvWakeup});
- }
-
- TIntrusivePtr<TDynamicCounters> Counters{MakeIntrusive<TDynamicCounters>()};
- std::shared_ptr<TMockBackend> LogBackend;
+ void Wakeup() {
+ Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvents::TEvWakeup});
+ }
+
+ TIntrusivePtr<TDynamicCounters> Counters{MakeIntrusive<TDynamicCounters>()};
+ std::shared_ptr<TMockBackend> LogBackend;
TActorId LoggerActor;
- TTestActorRuntimeBase Runtime;
- };
-}
-
-
-Y_UNIT_TEST_SUITE(TLoggerActorTest) {
- Y_UNIT_TEST(NoCrashOnWriteFailure) {
- TFixture test;
- test.WriteLog();
- // everything is okay as long as we get here
- }
-
- Y_UNIT_TEST(SubsequentWritesAreIgnored) {
- size_t count{0};
- auto countWrites = [&count] (auto&& r) {
- count++;
- ThrowAlways(r);
- };
-
- TFixture test{countWrites};
- test.WriteLog();
- UNIT_ASSERT_VALUES_EQUAL(count, 1);
-
- // at this point we should have started dropping messages
- for (auto i = 0; i < 5; ++i) {
- test.WriteLog();
- }
-
- UNIT_ASSERT_VALUES_EQUAL(count, 1);
- }
-
- Y_UNIT_TEST(LoggerCanRecover) {
- TFixture test;
- test.WriteLog();
-
- TVector<TString> messages;
- auto acceptWrites = [&] (const TLogRecord& r) {
- messages.emplace_back(r.Data, r.Len);
- };
-
- auto scheduled = test.Runtime.CaptureScheduledEvents();
- UNIT_ASSERT_VALUES_EQUAL(scheduled.size(), 1);
-
- test.LogBackend->SetWriteImpl(acceptWrites);
- test.Wakeup();
-
- const auto COUNT = 10;
- for (auto i = 0; i < COUNT; ++i) {
- test.WriteLog();
- }
-
- UNIT_ASSERT_VALUES_EQUAL(messages.size(), COUNT);
- }
+ TTestActorRuntimeBase Runtime;
+ };
+}
+
+
+Y_UNIT_TEST_SUITE(TLoggerActorTest) {
+ Y_UNIT_TEST(NoCrashOnWriteFailure) {
+ TFixture test;
+ test.WriteLog();
+ // everything is okay as long as we get here
+ }
+
+ Y_UNIT_TEST(SubsequentWritesAreIgnored) {
+ size_t count{0};
+ auto countWrites = [&count] (auto&& r) {
+ count++;
+ ThrowAlways(r);
+ };
+
+ TFixture test{countWrites};
+ test.WriteLog();
+ UNIT_ASSERT_VALUES_EQUAL(count, 1);
+
+ // at this point we should have started dropping messages
+ for (auto i = 0; i < 5; ++i) {
+ test.WriteLog();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(count, 1);
+ }
+
+ Y_UNIT_TEST(LoggerCanRecover) {
+ TFixture test;
+ test.WriteLog();
+
+ TVector<TString> messages;
+ auto acceptWrites = [&] (const TLogRecord& r) {
+ messages.emplace_back(r.Data, r.Len);
+ };
+
+ auto scheduled = test.Runtime.CaptureScheduledEvents();
+ UNIT_ASSERT_VALUES_EQUAL(scheduled.size(), 1);
+
+ test.LogBackend->SetWriteImpl(acceptWrites);
+ test.Wakeup();
+
+ const auto COUNT = 10;
+ for (auto i = 0; i < COUNT; ++i) {
+ test.WriteLog();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), COUNT);
+ }
Y_UNIT_TEST(ShouldObeyTimeThresholdMsWhenOverloaded) {
TFixture test{DroppingSettings(5000)};
@@ -182,4 +182,4 @@ Y_UNIT_TEST_SUITE(TLoggerActorTest) {
UNIT_ASSERT_VALUES_EQUAL(messages.size(), COUNT + 1);
}
-}
+}
diff --git a/library/cpp/actors/core/mon.h b/library/cpp/actors/core/mon.h
index 666303dca7..c450f2338e 100644
--- a/library/cpp/actors/core/mon.h
+++ b/library/cpp/actors/core/mon.h
@@ -21,20 +21,20 @@ namespace NActors {
// request info from an actor in HTML format
struct TEvHttpInfo: public NActors::TEventLocal<TEvHttpInfo, HttpInfo> {
- TEvHttpInfo(const NMonitoring::IMonHttpRequest& request, int subReqId = 0)
+ TEvHttpInfo(const NMonitoring::IMonHttpRequest& request, int subReqId = 0)
: Request(request)
, SubRequestId(subReqId)
{
}
- TEvHttpInfo(const NMonitoring::IMonHttpRequest& request, const TString& userToken)
+ TEvHttpInfo(const NMonitoring::IMonHttpRequest& request, const TString& userToken)
: Request(request)
, UserToken(userToken)
, SubRequestId(0)
{
}
- const NMonitoring::IMonHttpRequest& Request;
+ const NMonitoring::IMonHttpRequest& Request;
TString UserToken; // built and serialized
// SubRequestId != 0 means that we assemble reply from multiple parts and SubRequestId contains this part id
int SubRequestId;
diff --git a/library/cpp/actors/core/process_stats.cpp b/library/cpp/actors/core/process_stats.cpp
index cfc95034c9..0e1dbd0031 100644
--- a/library/cpp/actors/core/process_stats.cpp
+++ b/library/cpp/actors/core/process_stats.cpp
@@ -136,23 +136,23 @@ namespace NActors {
#endif
-namespace {
+namespace {
// Periodically collects process stats and exposes them as mon counters
- template <typename TDerived>
- class TProcStatCollectingActor: public TActorBootstrapped<TProcStatCollectingActor<TDerived>> {
+ template <typename TDerived>
+ class TProcStatCollectingActor: public TActorBootstrapped<TProcStatCollectingActor<TDerived>> {
public:
static constexpr IActor::EActivityType ActorActivityType() {
return IActor::ACTORLIB_STATS;
}
- TProcStatCollectingActor(TDuration interval)
- : Interval(interval)
+ TProcStatCollectingActor(TDuration interval)
+ : Interval(interval)
{
}
void Bootstrap(const TActorContext& ctx) {
- ctx.Schedule(Interval, new TEvents::TEvWakeup());
- Self()->Become(&TDerived::StateWork);
+ ctx.Schedule(Interval, new TEvents::TEvWakeup());
+ Self()->Become(&TDerived::StateWork);
}
STFUNC(StateWork) {
@@ -163,85 +163,85 @@ namespace {
private:
void Wakeup(const TActorContext& ctx) {
- Self()->UpdateCounters(ProcStat);
- ctx.Schedule(Interval, new TEvents::TEvWakeup());
+ Self()->UpdateCounters(ProcStat);
+ ctx.Schedule(Interval, new TEvents::TEvWakeup());
}
- TDerived* Self() {
+ TDerived* Self() {
ProcStat.Fill(getpid());
- return static_cast<TDerived*>(this);
+ return static_cast<TDerived*>(this);
}
-
- private:
- const TDuration Interval;
- TProcStat ProcStat;
+
+ private:
+ const TDuration Interval;
+ TProcStat ProcStat;
};
- // Periodically collects process stats and exposes them as mon counters
- class TDynamicCounterCollector: public TProcStatCollectingActor<TDynamicCounterCollector> {
- using TBase = TProcStatCollectingActor<TDynamicCounterCollector>;
- public:
+ // Periodically collects process stats and exposes them as mon counters
+ class TDynamicCounterCollector: public TProcStatCollectingActor<TDynamicCounterCollector> {
+ using TBase = TProcStatCollectingActor<TDynamicCounterCollector>;
+ public:
TDynamicCounterCollector(
ui32 intervalSeconds,
NMonitoring::TDynamicCounterPtr counters)
- : TBase{TDuration::Seconds(intervalSeconds)}
- {
- ProcStatGroup = counters->GetSubgroup("counters", "utils");
-
- VmSize = ProcStatGroup->GetCounter("Process/VmSize", false);
- AnonRssSize = ProcStatGroup->GetCounter("Process/AnonRssSize", false);
- FileRssSize = ProcStatGroup->GetCounter("Process/FileRssSize", false);
+ : TBase{TDuration::Seconds(intervalSeconds)}
+ {
+ ProcStatGroup = counters->GetSubgroup("counters", "utils");
+
+ VmSize = ProcStatGroup->GetCounter("Process/VmSize", false);
+ AnonRssSize = ProcStatGroup->GetCounter("Process/AnonRssSize", false);
+ FileRssSize = ProcStatGroup->GetCounter("Process/FileRssSize", false);
CGroupMemLimit = ProcStatGroup->GetCounter("Process/CGroupMemLimit", false);
- UserTime = ProcStatGroup->GetCounter("Process/UserTime", true);
- SysTime = ProcStatGroup->GetCounter("Process/SystemTime", true);
- MinorPageFaults = ProcStatGroup->GetCounter("Process/MinorPageFaults", true);
- MajorPageFaults = ProcStatGroup->GetCounter("Process/MajorPageFaults", true);
+ UserTime = ProcStatGroup->GetCounter("Process/UserTime", true);
+ SysTime = ProcStatGroup->GetCounter("Process/SystemTime", true);
+ MinorPageFaults = ProcStatGroup->GetCounter("Process/MinorPageFaults", true);
+ MajorPageFaults = ProcStatGroup->GetCounter("Process/MajorPageFaults", true);
UptimeSeconds = ProcStatGroup->GetCounter("Process/UptimeSeconds", false);
NumThreads = ProcStatGroup->GetCounter("Process/NumThreads", false);
SystemUptimeSeconds = ProcStatGroup->GetCounter("System/UptimeSeconds", false);
- }
-
- void UpdateCounters(const TProcStat& procStat) {
- *VmSize = procStat.Vsize;
- *AnonRssSize = procStat.AnonRss;
- *FileRssSize = procStat.FileRss;
+ }
+
+ void UpdateCounters(const TProcStat& procStat) {
+ *VmSize = procStat.Vsize;
+ *AnonRssSize = procStat.AnonRss;
+ *FileRssSize = procStat.FileRss;
if (procStat.CGroupMemLim) {
*CGroupMemLimit = procStat.CGroupMemLim;
}
- *UserTime = procStat.Utime;
- *SysTime = procStat.Stime;
- *MinorPageFaults = procStat.MinFlt;
- *MajorPageFaults = procStat.MajFlt;
+ *UserTime = procStat.Utime;
+ *SysTime = procStat.Stime;
+ *MinorPageFaults = procStat.MinFlt;
+ *MajorPageFaults = procStat.MajFlt;
*UptimeSeconds = procStat.Uptime.Seconds();
*NumThreads = procStat.NumThreads;
*SystemUptimeSeconds = procStat.Uptime.Seconds();
- }
-
- private:
+ }
+
+ private:
NMonitoring::TDynamicCounterPtr ProcStatGroup;
- NMonitoring::TDynamicCounters::TCounterPtr VmSize;
- NMonitoring::TDynamicCounters::TCounterPtr AnonRssSize;
- NMonitoring::TDynamicCounters::TCounterPtr FileRssSize;
+ NMonitoring::TDynamicCounters::TCounterPtr VmSize;
+ NMonitoring::TDynamicCounters::TCounterPtr AnonRssSize;
+ NMonitoring::TDynamicCounters::TCounterPtr FileRssSize;
NMonitoring::TDynamicCounters::TCounterPtr CGroupMemLimit;
- NMonitoring::TDynamicCounters::TCounterPtr UserTime;
- NMonitoring::TDynamicCounters::TCounterPtr SysTime;
- NMonitoring::TDynamicCounters::TCounterPtr MinorPageFaults;
- NMonitoring::TDynamicCounters::TCounterPtr MajorPageFaults;
+ NMonitoring::TDynamicCounters::TCounterPtr UserTime;
+ NMonitoring::TDynamicCounters::TCounterPtr SysTime;
+ NMonitoring::TDynamicCounters::TCounterPtr MinorPageFaults;
+ NMonitoring::TDynamicCounters::TCounterPtr MajorPageFaults;
NMonitoring::TDynamicCounters::TCounterPtr UptimeSeconds;
NMonitoring::TDynamicCounters::TCounterPtr NumThreads;
NMonitoring::TDynamicCounters::TCounterPtr SystemUptimeSeconds;
- };
-
-
- class TRegistryCollector: public TProcStatCollectingActor<TRegistryCollector> {
- using TBase = TProcStatCollectingActor<TRegistryCollector>;
- public:
+ };
+
+
+ class TRegistryCollector: public TProcStatCollectingActor<TRegistryCollector> {
+ using TBase = TProcStatCollectingActor<TRegistryCollector>;
+ public:
TRegistryCollector(TDuration interval, NMonitoring::TMetricRegistry& registry)
- : TBase{interval}
- {
- VmSize = registry.IntGauge({{"sensor", "process.VmSize"}});
- AnonRssSize = registry.IntGauge({{"sensor", "process.AnonRssSize"}});
- FileRssSize = registry.IntGauge({{"sensor", "process.FileRssSize"}});
+ : TBase{interval}
+ {
+ VmSize = registry.IntGauge({{"sensor", "process.VmSize"}});
+ AnonRssSize = registry.IntGauge({{"sensor", "process.AnonRssSize"}});
+ FileRssSize = registry.IntGauge({{"sensor", "process.FileRssSize"}});
CGroupMemLimit = registry.IntGauge({{"sensor", "process.CGroupMemLimit"}});
UptimeSeconds = registry.IntGauge({{"sensor", "process.UptimeSeconds"}});
NumThreads = registry.IntGauge({{"sensor", "process.NumThreads"}});
@@ -251,12 +251,12 @@ namespace {
SysTime = registry.Rate({{"sensor", "process.SystemTime"}});
MinorPageFaults = registry.Rate({{"sensor", "process.MinorPageFaults"}});
MajorPageFaults = registry.Rate({{"sensor", "process.MajorPageFaults"}});
- }
-
- void UpdateCounters(const TProcStat& procStat) {
- VmSize->Set(procStat.Vsize);
- AnonRssSize->Set(procStat.AnonRss);
- FileRssSize->Set(procStat.FileRss);
+ }
+
+ void UpdateCounters(const TProcStat& procStat) {
+ VmSize->Set(procStat.Vsize);
+ AnonRssSize->Set(procStat.AnonRss);
+ FileRssSize->Set(procStat.FileRss);
CGroupMemLimit->Set(procStat.CGroupMemLim);
UptimeSeconds->Set(procStat.Uptime.Seconds());
NumThreads->Set(procStat.NumThreads);
@@ -276,12 +276,12 @@ namespace {
MajorPageFaults->Reset();
MajorPageFaults->Add(procStat.MajFlt);
- }
-
- private:
- NMonitoring::TIntGauge* VmSize;
- NMonitoring::TIntGauge* AnonRssSize;
- NMonitoring::TIntGauge* FileRssSize;
+ }
+
+ private:
+ NMonitoring::TIntGauge* VmSize;
+ NMonitoring::TIntGauge* AnonRssSize;
+ NMonitoring::TIntGauge* FileRssSize;
NMonitoring::TIntGauge* CGroupMemLimit;
NMonitoring::TRate* UserTime;
NMonitoring::TRate* SysTime;
@@ -290,14 +290,14 @@ namespace {
NMonitoring::TIntGauge* UptimeSeconds;
NMonitoring::TIntGauge* NumThreads;
NMonitoring::TIntGauge* SystemUptimeSeconds;
- };
-} // namespace
-
+ };
+} // namespace
+
IActor* CreateProcStatCollector(ui32 intervalSec, NMonitoring::TDynamicCounterPtr counters) {
return new TDynamicCounterCollector(intervalSec, counters);
}
IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) {
- return new TRegistryCollector(interval, registry);
- }
+ return new TRegistryCollector(interval, registry);
+ }
}
diff --git a/library/cpp/actors/core/process_stats.h b/library/cpp/actors/core/process_stats.h
index e2f3a8276d..66346d0b5a 100644
--- a/library/cpp/actors/core/process_stats.h
+++ b/library/cpp/actors/core/process_stats.h
@@ -5,10 +5,10 @@
#include <library/cpp/monlib/dynamic_counters/counters.h>
-namespace NMonitoring {
+namespace NMonitoring {
class TMetricRegistry;
-}
-
+}
+
namespace NActors {
struct TProcStat {
ui64 Rss;
diff --git a/library/cpp/actors/core/ut/ya.make b/library/cpp/actors/core/ut/ya.make
index 918ed3f656..3ee28d5850 100644
--- a/library/cpp/actors/core/ut/ya.make
+++ b/library/cpp/actors/core/ut/ya.make
@@ -38,7 +38,7 @@ SRCS(
event_pb_ut.cpp
executor_pool_basic_ut.cpp
executor_pool_united_ut.cpp
- log_ut.cpp
+ log_ut.cpp
memory_tracker_ut.cpp
scheduler_actor_ut.cpp
)
diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp
index 99fdb7ebb6..27c4eeb6f3 100644
--- a/library/cpp/actors/http/http_cache.cpp
+++ b/library/cpp/actors/http/http_cache.cpp
@@ -1,6 +1,6 @@
#include "http.h"
-#include "http_proxy.h"
-#include "http_cache.h"
+#include "http_proxy.h"
+#include "http_cache.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/executor_pool_basic.h>
#include <library/cpp/actors/core/log.h>
diff --git a/library/cpp/actors/http/http_static.cpp b/library/cpp/actors/http/http_static.cpp
index 8e1e649dae..c075c5f693 100644
--- a/library/cpp/actors/http/http_static.cpp
+++ b/library/cpp/actors/http/http_static.cpp
@@ -1,5 +1,5 @@
-#include "http_proxy.h"
-#include "http_static.h"
+#include "http_proxy.h"
+#include "http_static.h"
#include <library/cpp/actors/core/executor_pool_basic.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/scheduler_basic.h>
diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp
index 735fe66380..4c922f8d0f 100644
--- a/library/cpp/actors/http/http_ut.cpp
+++ b/library/cpp/actors/http/http_ut.cpp
@@ -175,11 +175,11 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
}
Y_UNIT_TEST(BasicRunning) {
- NActors::TTestActorRuntimeBase actorSystem;
+ NActors::TTestActorRuntimeBase actorSystem;
TPortManager portManager;
TIpPort port = portManager.GetTcpPort();
TAutoPtr<NActors::IEventHandle> handle;
- actorSystem.Initialize();
+ actorSystem.Initialize();
NMonitoring::TMetricRegistry sensors;
NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
diff --git a/library/cpp/actors/http/ut/ya.make b/library/cpp/actors/http/ut/ya.make
index 8d83a1ea91..8b4c04c4d3 100644
--- a/library/cpp/actors/http/ut/ya.make
+++ b/library/cpp/actors/http/ut/ya.make
@@ -8,11 +8,11 @@ PEERDIR(
library/cpp/actors/testlib
)
-IF (NOT OS_WINDOWS)
+IF (NOT OS_WINDOWS)
SRCS(
http_ut.cpp
)
-ELSE()
-ENDIF()
+ELSE()
+ENDIF()
END()
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 567df9e141..6fa25b9965 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -1,5 +1,5 @@
#include "test_runtime.h"
-
+
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/callstack.h>
#include <library/cpp/actors/core/executor_pool_basic.h>
@@ -35,7 +35,7 @@ namespace {
namespace NActors {
ui64 TScheduledEventQueueItem::NextUniqueId = 0;
- void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) {
+ void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) {
Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite())
<< ", from " << ev->Sender.LocalId();
TString name = runtime->GetActorName(ev->Sender);
@@ -56,7 +56,7 @@ namespace NActors {
Cerr << "\n";
}
- TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() {
+ TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() {
ActorSystemTimestamp = nullptr;
ActorSystemMonotonic = nullptr;
}
@@ -82,13 +82,13 @@ namespace NActors {
}
- class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> {
+ class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> {
public:
- static constexpr EActivityType ActorActivityType() {
- return TEST_ACTOR_RUNTIME;
+ static constexpr EActivityType ActorActivityType() {
+ return TEST_ACTOR_RUNTIME;
}
- TEdgeActor(TTestActorRuntimeBase* runtime)
+ TEdgeActor(TTestActorRuntimeBase* runtime)
: TActor(&TEdgeActor::StateFunc)
, Runtime(runtime)
{
@@ -123,7 +123,7 @@ namespace NActors {
}
private:
- TTestActorRuntimeBase* Runtime;
+ TTestActorRuntimeBase* Runtime;
};
void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) {
@@ -218,9 +218,9 @@ namespace NActors {
return Sent.size();
}
- class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider {
+ class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider {
public:
- TTimeProvider(TTestActorRuntimeBase& runtime)
+ TTimeProvider(TTestActorRuntimeBase& runtime)
: Runtime(runtime)
{
}
@@ -230,12 +230,12 @@ namespace NActors {
}
private:
- TTestActorRuntimeBase& Runtime;
+ TTestActorRuntimeBase& Runtime;
};
- class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread {
+ class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread {
public:
- TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
+ TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
: Runtime(runtime)
, Node(node)
{
@@ -263,13 +263,13 @@ namespace NActors {
}
private:
- TTestActorRuntimeBase* Runtime;
- TTestActorRuntimeBase::TNodeDataBase* Node;
+ TTestActorRuntimeBase* Runtime;
+ TTestActorRuntimeBase::TNodeDataBase* Node;
};
- class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool {
+ class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool {
public:
- TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId)
+ TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId)
: IExecutorPool(poolId)
, Runtime(runtime)
, NodeIndex(nodeIndex)
@@ -277,7 +277,7 @@ namespace NActors {
{
}
- TTestActorRuntimeBase* GetRuntime() {
+ TTestActorRuntimeBase* GetRuntime() {
return Runtime;
}
@@ -437,26 +437,26 @@ namespace NActors {
}
private:
- TTestActorRuntimeBase* const Runtime;
+ TTestActorRuntimeBase* const Runtime;
const ui32 NodeIndex;
- TTestActorRuntimeBase::TNodeDataBase* const Node;
+ TTestActorRuntimeBase::TNodeDataBase* const Node;
};
- IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) {
- return new TExecutorPoolStub{runtime, nodeIndex, node, poolId};
- }
+ IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) {
+ return new TExecutorPoolStub{runtime, nodeIndex, node, poolId};
+ }
+
-
- ui32 TTestActorRuntimeBase::NextNodeId = 1;
-
- TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv)
- : TTestActorRuntimeBase(1, 1, false)
+ ui32 TTestActorRuntimeBase::NextNodeId = 1;
+
+ TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv)
+ : TTestActorRuntimeBase(1, 1, false)
{
SingleSysEnv = true;
}
- TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
- : ScheduledCount(0)
+ TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
+ : ScheduledCount(0)
, ScheduledLimit(100000)
, MainThreadId(TThread::CurrentThreadId())
, ClusterUUID(MakeClusterId())
@@ -468,53 +468,53 @@ namespace NActors {
, DispatchCyclesCount(0)
, DispatchedEventsCount(0)
, NeedMonitoring(false)
- , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed))
- , TimeProvider(new TTimeProvider(*this))
- , ShouldContinue()
+ , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed))
+ , TimeProvider(new TTimeProvider(*this))
+ , ShouldContinue()
, CurrentTimestamp(0)
, DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT)
, ReschedulingDelay(TDuration::MicroSeconds(0))
- , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc)
+ , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc)
, ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector)
- , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc)
- , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc)
- , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver)
+ , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc)
+ , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc)
+ , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver)
, CurrentDispatchContext(nullptr)
{
SetDispatcherRandomSeed(TInstant::Now(), 0);
- EnableActorCallstack();
- }
+ EnableActorCallstack();
+ }
- void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) {
+ void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) {
const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger");
node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */,
- NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0);
+ NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0);
node->LogSettings->SetAllowDrop(false);
node->LogSettings->SetThrottleDelay(TDuration::Zero());
- node->DynamicCounters = new NMonitoring::TDynamicCounters;
+ node->DynamicCounters = new NMonitoring::TDynamicCounters;
- InitNodeImpl(node, nodeIndex);
+ InitNodeImpl(node, nodeIndex);
}
- void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) {
+ void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) {
node->LogSettings->Append(
NActorsServices::EServiceCommon_MIN,
NActorsServices::EServiceCommon_MAX,
NActorsServices::EServiceCommon_Name
);
-
- if (!UseRealThreads) {
- node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0));
- node->MailboxTable.Reset(new TMailboxTable());
- node->ActorSystem = MakeActorSystem(nodeIndex, node);
+
+ if (!UseRealThreads) {
+ node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0));
+ node->MailboxTable.Reset(new TMailboxTable());
+ node->ActorSystem = MakeActorSystem(nodeIndex, node);
node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor"));
- } else {
- node->ActorSystem = MakeActorSystem(nodeIndex, node);
- }
-
- node->ActorSystem->Start();
- }
-
+ } else {
+ node->ActorSystem = MakeActorSystem(nodeIndex, node);
+ }
+
+ node->ActorSystem->Start();
+ }
+
bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) {
ui64 senderLocalId = ev->Sender.LocalId();
ui64 senderMailboxHint = ev->Sender.Hint();
@@ -527,16 +527,16 @@ namespace NActors {
return true;
}
- TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount)
- : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) {
+ TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount)
+ : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) {
}
- TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads)
- : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) {
+ TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads)
+ : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) {
}
- TTestActorRuntimeBase::~TTestActorRuntimeBase() {
- CleanupNodes();
+ TTestActorRuntimeBase::~TTestActorRuntimeBase() {
+ CleanupNodes();
Cerr.Flush();
Cerr.Flush();
Clog.Flush();
@@ -544,41 +544,41 @@ namespace NActors {
DisableActorCallstack();
}
- void TTestActorRuntimeBase::CleanupNodes() {
- Nodes.clear();
- }
-
- bool TTestActorRuntimeBase::IsRealThreads() const {
+ void TTestActorRuntimeBase::CleanupNodes() {
+ Nodes.clear();
+ }
+
+ bool TTestActorRuntimeBase::IsRealThreads() const {
return UseRealThreads;
}
- TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
+ TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
Y_UNUSED(runtime);
Y_UNUSED(event);
return EEventAction::PROCESS;
}
- void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
+ void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
Y_UNUSED(runtime);
Y_UNUSED(queue);
scheduledEvents.clear();
}
- bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
+ bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
Y_UNUSED(runtime);
Y_UNUSED(event);
return false;
}
- bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) {
- Y_UNUSED(runtime);
+ bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) {
+ Y_UNUSED(runtime);
Y_UNUSED(delay);
- Y_UNUSED(event);
+ Y_UNUSED(event);
Y_UNUSED(deadline);
return true;
}
-
+
void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
runtime.ScheduleWhiteList.insert(actorId);
@@ -634,7 +634,7 @@ namespace NActors {
}
};
- void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
+ void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
if (scheduledEvents.empty())
return;
@@ -682,46 +682,46 @@ namespace NActors {
runtime.UpdateCurrentTime(time);
}
- TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) {
+ TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) {
TGuard<TMutex> guard(Mutex);
auto result = ObserverFunc;
ObserverFunc = observerFunc;
return result;
}
- TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) {
+ TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) {
TGuard<TMutex> guard(Mutex);
auto result = ScheduledEventsSelectorFunc;
ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc;
return result;
}
- TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) {
+ TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) {
TGuard<TMutex> guard(Mutex);
auto result = EventFilterFunc;
EventFilterFunc = filterFunc;
return result;
}
- TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) {
+ TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) {
TGuard<TMutex> guard(Mutex);
auto result = ScheduledEventFilterFunc;
ScheduledEventFilterFunc = filterFunc;
return result;
}
- TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) {
+ TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) {
TGuard<TMutex> guard(Mutex);
auto result = RegistrationObserver;
RegistrationObserver = observerFunc;
return result;
}
- bool TTestActorRuntimeBase::IsVerbose() {
+ bool TTestActorRuntimeBase::IsVerbose() {
return VERBOSE;
}
- void TTestActorRuntimeBase::SetVerbose(bool verbose) {
+ void TTestActorRuntimeBase::SetVerbose(bool verbose) {
VERBOSE = verbose;
}
@@ -730,7 +730,7 @@ namespace NActors {
Y_VERIFY(nodeIndex < NodeCount);
auto node = Nodes[nodeIndex + FirstNodeId];
if (!node) {
- node = GetNodeFactory().CreateNode();
+ node = GetNodeFactory().CreateNode();
Nodes[nodeIndex + FirstNodeId] = node;
}
@@ -738,51 +738,51 @@ namespace NActors {
node->LocalServices.push_back(std::make_pair(actorId, cmd));
}
- void TTestActorRuntimeBase::InitNodes() {
- NextNodeId += NodeCount;
- Y_VERIFY(NodeCount > 0);
+ void TTestActorRuntimeBase::InitNodes() {
+ NextNodeId += NodeCount;
+ Y_VERIFY(NodeCount > 0);
- for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
- auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first;
- TNodeDataBase* node = nodeIt->second.Get();
- InitNode(node, nodeIndex);
+ for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
+ auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first;
+ TNodeDataBase* node = nodeIt->second.Get();
+ InitNode(node, nodeIndex);
}
- }
+ }
- void TTestActorRuntimeBase::Initialize() {
- InitNodes();
- IsInitialized = true;
+ void TTestActorRuntimeBase::Initialize() {
+ InitNodes();
+ IsInitialized = true;
}
void SetupCrossDC() {
}
- TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) {
+ TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) {
TGuard<TMutex> guard(Mutex);
TDuration oldTimeout = DispatchTimeout;
DispatchTimeout = timeout;
return oldTimeout;
}
- TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) {
+ TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) {
TGuard<TMutex> guard(Mutex);
TDuration oldDelay = ReschedulingDelay;
ReschedulingDelay = delay;
return oldDelay;
}
- void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) {
+ void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) {
Y_VERIFY(!IsInitialized);
TGuard<TMutex> guard(Mutex);
LogBackend = logBackend;
}
- void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) {
+ void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) {
TGuard<TMutex> guard(Mutex);
for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
TString explanation;
auto status = node->LogSettings->SetLevel(priority, component, explanation);
if (status) {
@@ -791,13 +791,13 @@ namespace NActors {
}
}
- TInstant TTestActorRuntimeBase::GetCurrentTime() const {
+ TInstant TTestActorRuntimeBase::GetCurrentTime() const {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(!UseRealThreads);
return TInstant::MicroSeconds(CurrentTimestamp);
}
- void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) {
+ void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) {
static int counter = 0;
++counter;
if (VERBOSE) {
@@ -814,25 +814,25 @@ namespace NActors {
}
}
- void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) {
+ void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) {
UpdateCurrentTime(GetCurrentTime() + duration);
}
- TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() {
+ TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() {
Y_VERIFY(!UseRealThreads);
return TimeProvider;
}
- ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const {
+ ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const {
Y_VERIFY(index < NodeCount);
return FirstNodeId + index;
}
- ui32 TTestActorRuntimeBase::GetNodeCount() const {
+ ui32 TTestActorRuntimeBase::GetNodeCount() const {
return NodeCount;
}
- ui64 TTestActorRuntimeBase::AllocateLocalId() {
+ ui64 TTestActorRuntimeBase::AllocateLocalId() {
TGuard<TMutex> guard(Mutex);
ui64 nextId = ++LocalId;
if (VERBOSE) {
@@ -842,7 +842,7 @@ namespace NActors {
return nextId;
}
- ui32 TTestActorRuntimeBase::InterconnectPoolId() const {
+ ui32 TTestActorRuntimeBase::InterconnectPoolId() const {
if (UseRealThreads && NSan::TSanIsOn()) {
// Interconnect coroutines may move across threads
// Use a special single-threaded pool to avoid that
@@ -851,7 +851,7 @@ namespace NActors {
return 0;
}
- TString TTestActorRuntimeBase::GetTempDir() {
+ TString TTestActorRuntimeBase::GetTempDir() {
if (!TmpDir)
TmpDir.Reset(new TTempDir());
return (*TmpDir)();
@@ -861,7 +861,7 @@ namespace NActors {
ui64 revolvingCounter, const TActorId& parentId) {
Y_VERIFY(nodeIndex < NodeCount);
TGuard<TMutex> guard(Mutex);
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
if (UseRealThreads) {
Y_VERIFY(poolId < node->ExecutorPools.size());
return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId);
@@ -929,7 +929,7 @@ namespace NActors {
const TActorId& parentId) {
Y_VERIFY(nodeIndex < NodeCount);
TGuard<TMutex> guard(Mutex);
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
if (UseRealThreads) {
Y_VERIFY(poolId < node->ExecutorPools.size());
return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId);
@@ -952,7 +952,7 @@ namespace NActors {
TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
if (!UseRealThreads) {
IActor* actor = FindActor(actorId, node);
node->LocalServicesActors[serviceId] = actor;
@@ -971,7 +971,7 @@ namespace NActors {
return edgeActor;
}
- TEventsList TTestActorRuntimeBase::CaptureEvents() {
+ TEventsList TTestActorRuntimeBase::CaptureEvents() {
TGuard<TMutex> guard(Mutex);
TEventsList result;
for (auto& mbox : Mailboxes) {
@@ -981,7 +981,7 @@ namespace NActors {
return result;
}
- TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) {
+ TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
TEventsList result;
@@ -989,14 +989,14 @@ namespace NActors {
return result;
}
- void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) {
+ void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) {
TGuard<TMutex> guard(Mutex);
ui32 nodeId = ev->GetRecipientRewrite().NodeId();
Y_VERIFY(nodeId != 0);
GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
}
- void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) {
+ void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) {
TGuard<TMutex> guard(Mutex);
for (auto rit = events.rbegin(); rit != events.rend(); ++rit) {
if (*rit) {
@@ -1010,7 +1010,7 @@ namespace NActors {
events.clear();
}
- void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) {
+ void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
TEventsList result;
@@ -1018,7 +1018,7 @@ namespace NActors {
events.clear();
}
- TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() {
+ TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() {
TGuard<TMutex> guard(Mutex);
TScheduledEventsList result;
for (auto& mbox : Mailboxes) {
@@ -1028,28 +1028,28 @@ namespace NActors {
return result;
}
- bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) {
+ bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) {
return DispatchEvents(options, TInstant::Max());
}
- bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) {
+ bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) {
return DispatchEvents(options, TInstant::MicroSeconds(CurrentTimestamp) + simTimeout);
}
- bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) {
+ bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) {
TGuard<TMutex> guard(Mutex);
return DispatchEventsInternal(options, simDeadline);
}
// Mutex must be locked by caller!
- bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) {
+ bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) {
TDispatchContext localContext;
localContext.Options = &options;
localContext.PrevContext = nullptr;
bool verbose = !options.Quiet && VERBOSE;
struct TDispatchContextSetter {
- TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext)
+ TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext)
: Runtime(runtime)
{
lastContext.PrevContext = Runtime.CurrentDispatchContext;
@@ -1060,7 +1060,7 @@ namespace NActors {
Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext;
}
- TTestActorRuntimeBase& Runtime;
+ TTestActorRuntimeBase& Runtime;
} DispatchContextSetter(*this, localContext);
TInstant dispatchTime = TInstant::MicroSeconds(0);
@@ -1072,7 +1072,7 @@ namespace NActors {
}
struct TTempEdgeEventsCaptor {
- TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime)
+ TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime)
: Runtime(runtime)
, HasEvents(false)
{
@@ -1103,7 +1103,7 @@ namespace NActors {
}
}
- TTestActorRuntimeBase& Runtime;
+ TTestActorRuntimeBase& Runtime;
TEventMailBoxList Store;
bool HasEvents;
};
@@ -1354,7 +1354,7 @@ namespace NActors {
return false;
}
- void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) {
+ void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) {
TDispatchContext* context = CurrentDispatchContext;
while (context) {
const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes;
@@ -1366,7 +1366,7 @@ namespace NActors {
}
}
- void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) {
+ void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) {
TDispatchContext* context = CurrentDispatchContext;
while (context) {
for (const auto& finalEvent : context->Options->FinalEvents) {
@@ -1382,14 +1382,14 @@ namespace NActors {
}
}
- void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) {
+ void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32,
senderNodeIndex, NodeCount);
SendInternal(ev, senderNodeIndex, viaActorSystem);
}
- void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) {
+ void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
ui32 nodeId = FirstNodeId + nodeIndex;
@@ -1400,12 +1400,12 @@ namespace NActors {
Cerr << "Event was added to scheduled queue\n";
}
- void TTestActorRuntimeBase::ClearCounters() {
+ void TTestActorRuntimeBase::ClearCounters() {
TGuard<TMutex> guard(Mutex);
EvCounters.clear();
}
- ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const {
+ ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const {
TGuard<TMutex> guard(Mutex);
auto it = EvCounters.find(evType);
if (it == EvCounters.end())
@@ -1417,7 +1417,7 @@ namespace NActors {
TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
return node->ActorSystem->LookupLocalService(serviceId);
}
@@ -1465,7 +1465,7 @@ namespace NActors {
Y_VERIFY(nodeIndexFrom < NodeCount);
Y_VERIFY(nodeIndexTo < NodeCount);
Y_VERIFY(nodeIndexFrom != nodeIndexTo);
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get();
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get();
return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo);
}
@@ -1474,7 +1474,7 @@ namespace NActors {
BlockedOutput.insert(actorId);
}
- void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) {
+ void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) {
ui64 days = (time.Hours() / 24);
DispatcherRandomSeed = (days << 32) ^ iteration;
DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
@@ -1490,7 +1490,7 @@ namespace NActors {
Y_VERIFY(nodeIndex < NodeCount);
auto nodeIt = Nodes.find(FirstNodeId + nodeIndex);
Y_VERIFY(nodeIt != Nodes.end());
- TNodeDataBase* node = nodeIt->second.Get();
+ TNodeDataBase* node = nodeIt->second.Get();
return FindActor(actorId, node);
}
@@ -1514,11 +1514,11 @@ namespace NActors {
return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end();
}
- TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) {
+ TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
ui32 nodeId = FirstNodeId + nodeIndex;
- TNodeDataBase* node = Nodes[nodeId].Get();
+ TNodeDataBase* node = Nodes[nodeId].Get();
return node->DynamicCounters;
}
@@ -1526,10 +1526,10 @@ namespace NActors {
NeedMonitoring = true;
}
- void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) {
+ void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) {
Y_VERIFY(nodeIndex < NodeCount);
ui32 nodeId = FirstNodeId + nodeIndex;
- TNodeDataBase* node = Nodes[nodeId].Get();
+ TNodeDataBase* node = Nodes[nodeId].Get();
ui32 targetNode = ev->GetRecipientRewrite().NodeId();
ui32 targetNodeIndex;
if (targetNode == 0) {
@@ -1607,10 +1607,10 @@ namespace NActors {
return actor;
}
- THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) {
+ THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) {
THolder<TActorSystemSetup> setup(new TActorSystemSetup);
setup->NodeId = FirstNodeId + nodeIndex;
-
+
if (UseRealThreads) {
setup->ExecutorsCount = 5;
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]);
@@ -1627,20 +1627,20 @@ namespace NActors {
setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0));
}
- InitActorSystemSetup(*setup);
-
- return setup;
- }
-
- THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) {
- auto setup = MakeActorSystemSetup(nodeIndex, node);
-
+ InitActorSystemSetup(*setup);
+
+ return setup;
+ }
+
+ THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) {
+ auto setup = MakeActorSystemSetup(nodeIndex, node);
+
node->ExecutorPools.resize(setup->ExecutorsCount);
for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
node->ExecutorPools[i] = setup->Executors[i].Get();
}
- const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect");
+ const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect");
setup->LocalServices = node->LocalServices;
setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
@@ -1686,22 +1686,22 @@ namespace NActors {
if (!SingleSysEnv) { // Single system env should do this self
TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend();
NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings,
- logBackend, GetCountersForComponent(node->DynamicCounters, "utils"));
- NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId());
+ logBackend, GetCountersForComponent(node->DynamicCounters, "utils"));
+ NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId());
std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd);
setup->LocalServices.push_back(loggerActorPair);
}
- return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings));
+ return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings));
}
- TActorSystem* TTestActorRuntimeBase::SingleSys() const {
+ TActorSystem* TTestActorRuntimeBase::SingleSys() const {
Y_VERIFY(Nodes.size() == 1, "Works only for single system env");
return Nodes.begin()->second->ActorSystem.Get();
}
- TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() {
+ TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() {
for (auto& x : Nodes) {
return x.second->ActorSystem.Get();
}
@@ -1715,7 +1715,7 @@ namespace NActors {
}
- TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) {
+ TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) {
TGuard<TMutex> guard(Mutex);
auto mboxId = TEventMailboxId(nodeId, hint);
auto it = Mailboxes.find(mboxId);
@@ -1726,7 +1726,7 @@ namespace NActors {
return *it->second;
}
- void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) {
+ void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) {
TGuard<TMutex> guard(Mutex);
auto mboxId = TEventMailboxId(nodeId, hint);
Mailboxes.erase(mboxId);
@@ -1753,8 +1753,8 @@ namespace NActors {
public:
class TReplyActor : public TActor<TReplyActor> {
public:
- static constexpr EActivityType ActorActivityType() {
- return TEST_ACTOR_RUNTIME;
+ static constexpr EActivityType ActorActivityType() {
+ return TEST_ACTOR_RUNTIME;
}
TReplyActor(TStrandingActorDecorator* owner)
@@ -1769,8 +1769,8 @@ namespace NActors {
TStrandingActorDecorator* const Owner;
};
- static constexpr EActivityType ActorActivityType() {
- return TEST_ACTOR_RUNTIME;
+ static constexpr EActivityType ActorActivityType() {
+ return TEST_ACTOR_RUNTIME;
}
TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
@@ -1864,7 +1864,7 @@ namespace NActors {
TActorId ReplyId;
bool HasReply;
TDispatchOptions DelegateeOptions;
- TTestActorRuntimeBase* Runtime;
+ TTestActorRuntimeBase* Runtime;
THolder<IReplyChecker> ReplyChecker;
};
@@ -1889,7 +1889,7 @@ namespace NActors {
private:
TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
- TTestActorRuntimeBase* Runtime;
+ TTestActorRuntimeBase* Runtime;
TReplyCheckerCreator CreateReplyChecker;
};
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index 6baeebcb37..26e3b45c98 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -1,5 +1,5 @@
#pragma once
-
+
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/log.h>
@@ -182,7 +182,7 @@ namespace NActors {
}
};
- class TTestActorRuntimeBase: public TNonCopyable {
+ class TTestActorRuntimeBase: public TNonCopyable {
public:
class TEdgeActor;
class TSchedulerThreadStub;
@@ -195,24 +195,24 @@ namespace NActors {
RESCHEDULE
};
- typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver;
- typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector;
- typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter;
- typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter;
+ typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver;
+ typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector;
+ typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter;
+ typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter;
typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver;
- TTestActorRuntimeBase(THeSingleSystemEnv);
- TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads);
- TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount);
- TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false);
- virtual ~TTestActorRuntimeBase();
+ TTestActorRuntimeBase(THeSingleSystemEnv);
+ TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads);
+ TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount);
+ TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false);
+ virtual ~TTestActorRuntimeBase();
bool IsRealThreads() const;
- static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
- static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue);
- static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue);
- static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
- static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline);
+ static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
+ static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue);
+ static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue);
+ static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
+ static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline);
static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId);
TEventObserver SetObserverFunc(TEventObserver observerFunc);
TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc);
@@ -233,7 +233,7 @@ namespace NActors {
void UpdateCurrentTime(TInstant newTime);
void AdvanceCurrentTime(TDuration duration);
void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0);
- virtual void Initialize();
+ virtual void Initialize();
ui32 GetNodeId(ui32 index = 0) const;
ui32 GetNodeCount() const;
ui64 AllocateLocalId();
@@ -291,7 +291,7 @@ namespace NActors {
TEvent* GrabEdgeEventIf(TAutoPtr<IEventHandle>& handle, std::function<bool(const TEvent&)> predicate, TDuration simTimeout = TDuration::Max()) {
handle.Destroy();
const ui32 eventType = TEvent::EventType;
- WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
+ WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
Y_UNUSED(runtime);
if (event->GetTypeRewrite() != eventType)
return false;
@@ -323,7 +323,7 @@ namespace NActors {
{
typename TEvent::TPtr handle;
const ui32 eventType = TEvent::EventType;
- WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
+ WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
Y_UNUSED(runtime);
if (event->GetTypeRewrite() != eventType)
return false;
@@ -383,7 +383,7 @@ namespace NActors {
std::tuple<TEvents*...> GrabEdgeEvents(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) {
handle.Destroy();
auto eventTypes = { TEvents::EventType... };
- WaitForEdgeEvents([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
+ WaitForEdgeEvents([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
if (std::find(std::begin(eventTypes), std::end(eventTypes), event->GetTypeRewrite()) == std::end(eventTypes))
return false;
handle = event;
@@ -472,26 +472,26 @@ namespace NActors {
}
protected:
- struct TNodeDataBase;
- TNodeDataBase* GetRawNode(ui32 node) const {
+ struct TNodeDataBase;
+ TNodeDataBase* GetRawNode(ui32 node) const {
return Nodes.at(FirstNodeId + node).Get();
}
- static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId);
- virtual TIntrusivePtr<NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const char* component) {
- Y_UNUSED(counters);
- Y_UNUSED(component);
-
- // do nothing, just return the existing counters
- return counters;
- }
-
- THolder<TActorSystemSetup> MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node);
- THolder<TActorSystem> MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node);
- virtual void InitActorSystemSetup(TActorSystemSetup& setup) {
- Y_UNUSED(setup);
- }
-
+ static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId);
+ virtual TIntrusivePtr<NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const char* component) {
+ Y_UNUSED(counters);
+ Y_UNUSED(component);
+
+ // do nothing, just return the existing counters
+ return counters;
+ }
+
+ THolder<TActorSystemSetup> MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node);
+ THolder<TActorSystem> MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node);
+ virtual void InitActorSystemSetup(TActorSystemSetup& setup) {
+ Y_UNUSED(setup);
+ }
+
private:
IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const;
void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem);
@@ -506,18 +506,18 @@ namespace NActors {
ui64 ScheduledLimit;
THolder<TTempDir> TmpDir;
const TThread::TId MainThreadId;
-
+
protected:
bool UseRealInterconnect = false;
TInterconnectMock InterconnectMock;
- bool IsInitialized = false;
- bool SingleSysEnv = false;
+ bool IsInitialized = false;
+ bool SingleSysEnv = false;
const TString ClusterUUID;
const ui32 FirstNodeId;
const ui32 NodeCount;
const ui32 DataCenterCount;
const bool UseRealThreads;
-
+
ui64 LocalId;
TMutex Mutex;
TCondVar MailboxesHasEvents;
@@ -532,28 +532,28 @@ namespace NActors {
TAutoPtr<TLogBackend> LogBackend;
bool NeedMonitoring;
- TIntrusivePtr<IRandomProvider> RandomProvider;
- TIntrusivePtr<ITimeProvider> TimeProvider;
-
+ TIntrusivePtr<IRandomProvider> RandomProvider;
+ TIntrusivePtr<ITimeProvider> TimeProvider;
+
protected:
- struct TNodeDataBase: public TThrRefBase {
- TNodeDataBase();
+ struct TNodeDataBase: public TThrRefBase {
+ TNodeDataBase();
void Stop();
- virtual ~TNodeDataBase();
- virtual ui64 GetLoggerPoolId() const {
- return 0;
- }
-
- template <typename T = void>
- T* GetAppData() {
- return static_cast<T*>(AppData0.get());
- }
-
- template <typename T = void>
- const T* GetAppData() const {
- return static_cast<T*>(AppData0.get());
- }
-
+ virtual ~TNodeDataBase();
+ virtual ui64 GetLoggerPoolId() const {
+ return 0;
+ }
+
+ template <typename T = void>
+ T* GetAppData() {
+ return static_cast<T*>(AppData0.get());
+ }
+
+ template <typename T = void>
+ const T* GetAppData() const {
+ return static_cast<T*>(AppData0.get());
+ }
+
TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters;
TIntrusivePtr<NActors::NLog::TSettings> LogSettings;
TIntrusivePtr<NInterconnect::TPollerThreads> Poller;
@@ -563,44 +563,44 @@ namespace NActors {
TMap<TActorId, IActor*> LocalServicesActors;
TMap<IActor*, TActorId> ActorToActorId;
THolder<TMailboxTable> MailboxTable;
- std::shared_ptr<void> AppData0;
+ std::shared_ptr<void> AppData0;
THolder<TActorSystem> ActorSystem;
- THolder<IExecutorPool> SchedulerPool;
+ THolder<IExecutorPool> SchedulerPool;
TVector<IExecutorPool*> ExecutorPools;
THolder<TExecutorThread> ExecutorThread;
- };
+ };
- struct INodeFactory {
- virtual ~INodeFactory() = default;
- virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0;
+ struct INodeFactory {
+ virtual ~INodeFactory() = default;
+ virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0;
};
- struct TDefaultNodeFactory final: INodeFactory {
- virtual TIntrusivePtr<TNodeDataBase> CreateNode() override {
- return new TNodeDataBase();
- }
- };
-
- INodeFactory& GetNodeFactory() {
- return *NodeFactory;
- }
-
- virtual TNodeDataBase* GetNodeById(size_t idx) {
- return Nodes[idx].Get();
- }
-
- void InitNodes();
- void CleanupNodes();
- virtual void InitNodeImpl(TNodeDataBase*, size_t);
-
+ struct TDefaultNodeFactory final: INodeFactory {
+ virtual TIntrusivePtr<TNodeDataBase> CreateNode() override {
+ return new TNodeDataBase();
+ }
+ };
+
+ INodeFactory& GetNodeFactory() {
+ return *NodeFactory;
+ }
+
+ virtual TNodeDataBase* GetNodeById(size_t idx) {
+ return Nodes[idx].Get();
+ }
+
+ void InitNodes();
+ void CleanupNodes();
+ virtual void InitNodeImpl(TNodeDataBase*, size_t);
+
static bool AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev);
- protected:
- THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory};
-
+ protected:
+ THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory};
+
private:
- void InitNode(TNodeDataBase* node, size_t idx);
-
+ void InitNode(TNodeDataBase* node, size_t idx);
+
struct TDispatchContext {
const TDispatchOptions* Options;
TDispatchContext* PrevContext;
@@ -610,8 +610,8 @@ namespace NActors {
bool FinalEventFound = false;
};
- TProgramShouldContinue ShouldContinue;
- TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes;
+ TProgramShouldContinue ShouldContinue;
+ TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes;
ui64 CurrentTimestamp;
TSet<TActorId> EdgeActors;
THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox;
diff --git a/library/cpp/actors/ya.make b/library/cpp/actors/ya.make
index b50699b855..737c7fbc18 100644
--- a/library/cpp/actors/ya.make
+++ b/library/cpp/actors/ya.make
@@ -11,6 +11,6 @@ RECURSE(
protos
util
wilson
- testlib
- http
+ testlib
+ http
)