diff options
author | vvvv <vvvv@yandex-team.ru> | 2022-02-10 16:46:37 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:37 +0300 |
commit | a693106aae8a3a3c7236a4ae953058a9611d7a92 (patch) | |
tree | 49e222ea1c5804306084bb3ae065bb702625360f /library | |
parent | ad94e93a059747f4fc3d7add88d1a83daf40b733 (diff) | |
download | ydb-a693106aae8a3a3c7236a4ae953058a9611d7a92.tar.gz |
Restoring authorship annotation for <vvvv@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
30 files changed, 1646 insertions, 1646 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 7c91642087..ed29bd14b9 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -3,7 +3,7 @@ #include "event.h" #include "monotonic.h" #include <util/system/tls.h> -#include <library/cpp/actors/util/local_process_key.h> +#include <library/cpp/actors/util/local_process_key.h> namespace NActors { class TActorSystem; @@ -383,16 +383,16 @@ namespace NActors { } }; - struct TActorActivityTag {}; - - inline size_t GetActivityTypeCount() { - return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetCount(); - } - - inline TStringBuf GetActivityTypeName(size_t index) { - return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(index); - } - + struct TActorActivityTag {}; + + inline size_t GetActivityTypeCount() { + return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetCount(); + } + + inline TStringBuf GetActivityTypeName(size_t index) { + return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(index); + } + template <typename TDerived> class TActor: public IActor { private: @@ -415,8 +415,8 @@ namespace NActors { // static_cast<IActor::EActorActivity>(TDerived::ActorActivityType())); //} } - } - + } + protected: //* Comment this function to find unmarked activities static constexpr IActor::EActivityType ActorActivityType() { diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index ca0f4e5c00..e1b765ec72 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -513,7 +513,7 @@ Y_UNIT_TEST_SUITE(TestDecorator) { , Counter(counter) { } - + bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override { *Counter += 1; if (ev->Type == TEvents::THelloWorld::Ping) { @@ -527,16 +527,16 @@ Y_UNIT_TEST_SUITE(TestDecorator) { struct TTestActor : TActorBootstrapped<TTestActor> { static constexpr char ActorName[] = "TestActor"; - + void Bootstrap() { const auto& activityTypeIndex = GetActivityType(); - Y_ENSURE(activityTypeIndex < GetActivityTypeCount()); - Y_ENSURE(GetActivityTypeName(activityTypeIndex) == "TestActor"); + Y_ENSURE(activityTypeIndex < GetActivityTypeCount()); + Y_ENSURE(GetActivityTypeName(activityTypeIndex) == "TestActor"); PassAway(); } }; - + Y_UNIT_TEST(Basic) { THolder<TActorSystemSetup> setup = MakeHolder<TActorSystemSetup>(); setup->NodeId = 0; diff --git a/library/cpp/actors/core/actorid.h b/library/cpp/actors/core/actorid.h index 4c9b04c181..d972b1a0ff 100644 --- a/library/cpp/actors/core/actorid.h +++ b/library/cpp/actors/core/actorid.h @@ -2,7 +2,7 @@ #include "defs.h" #include <util/stream/output.h> // for IOutputStream -#include <util/generic/hash.h> +#include <util/generic/hash.h> namespace NActors { // used as global uniq address of actor @@ -187,10 +187,10 @@ template <> inline void Out<NActors::TActorId>(IOutputStream& o, const NActors::TActorId& x) { return x.Out(o); } - -template <> + +template <> struct THash<NActors::TActorId> { inline ui64 operator()(const NActors::TActorId& x) const { - return x.Hash(); - } -}; + return x.Hash(); + } +}; diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 582582cfb0..6ff02aaf94 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -268,14 +268,14 @@ namespace NActors { return Event.Get(); } - + TAutoPtr<IEventBase> ReleaseBase() { TAutoPtr<IEventBase> x = GetBase(); Y_UNUSED(Event.Release()); Buffer.Reset(); return x; } - + TAutoPtr<IEventHandle> Forward(const TActorId& dest) { if (Event) return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId)); diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp index 62c8376d5e..c3b9999168 100644 --- a/library/cpp/actors/core/executor_pool_base.cpp +++ b/library/cpp/actors/core/executor_pool_base.cpp @@ -70,39 +70,39 @@ namespace NActors { // first step - find good enough mailbox ui32 hint = 0; TMailboxHeader* mailbox = nullptr; - + if (revolvingWriteCounter == 0) revolvingWriteCounter = AtomicIncrement(RegisterRevolvingCounter); { ui32 hintBackoff = 0; - + while (hint == 0) { hint = MailboxTable->AllocateMailbox(mailboxType, ++revolvingWriteCounter); mailbox = MailboxTable->Get(hint); - + if (!mailbox->LockFromFree()) { MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingWriteCounter); hintBackoff = hint; hint = 0; } - } + } MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingWriteCounter); - } - + } + const ui64 localActorId = AllocateID(); - + // ok, got mailbox mailbox->AttachActor(localActorId, actor); - + // do init const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint); DoActorInit(ActorSystem, actor, actorId, parentId); - + // Once we unlock the mailbox the actor starts running and we cannot use the pointer any more actor = nullptr; - + switch (mailboxType) { case TMailboxType::Simple: UnlockFromExecution((TMailboxTable::TSimpleMailbox*)mailbox, this, false, hint, MaxWorkers, ++revolvingWriteCounter); @@ -127,7 +127,7 @@ namespace NActors { if (elapsed > 1000000) { LWPROBE(SlowRegisterNew, PoolId, NHPTimer::GetSeconds(elapsed) * 1000.0); } - + return actorId; } @@ -143,7 +143,7 @@ namespace NActors { const ui64 localActorId = AllocateID(); mailbox->AttachActor(localActorId, actor); - + const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint); DoActorInit(ActorSystem, actor, actorId, parentId); NHPTimer::STime elapsed = GetCycleCountFast() - hpstart; @@ -157,7 +157,7 @@ namespace NActors { TAffinity* TExecutorPoolBase::Affinity() const { return ThreadsAffinity.Get(); } - + bool TExecutorPoolBaseMailboxed::Cleanup() { return MailboxTable->Cleanup(); } diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index e5e2cff39a..9d3c573f0d 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -108,5 +108,5 @@ namespace NActors { } executorPool->ReclaimMailbox(TMailbox::MailboxType, hint, workerId, ++revolvingWriteCounter); } - } -} + } +} diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index cf80598fe8..5f63b5af58 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -213,13 +213,13 @@ namespace NActors { Metrics->IncDirectMsgs(); if (Settings && Settings->Satisfies(priority, component, 0ull)) { va_list params; - va_start(params, c); + va_start(params, c); TString formatted; vsprintf(formatted, c, params); auto ok = OutputRecord(time, NLog::EPrio(priority), component, formatted); Y_UNUSED(ok); - va_end(params); + va_end(params); } } @@ -334,11 +334,11 @@ namespace NActors { } TABLEBODY() { for (EComponent i = Settings->MinVal; i < Settings->MaxVal; i++) { - auto name = Settings->ComponentName(i); - if (!*name) - continue; + auto name = Settings->ComponentName(i); + if (!*name) + continue; NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(i); - + TABLER() { TABLED() { str << "<a href='logger?c=" << i << "'>" << name << "</a>"; @@ -368,26 +368,26 @@ namespace NActors { */ void TLoggerActor::HandleMonInfo(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) { const auto& params = ev->Get()->Request.GetParams(); - NLog::EComponent component = NLog::InvalidComponent; - NLog::EPriority priority = NLog::PRI_DEBUG; + NLog::EComponent component = NLog::InvalidComponent; + NLog::EPriority priority = NLog::PRI_DEBUG; NLog::EPriority samplingPriority = NLog::PRI_DEBUG; ui32 samplingRate = 0; - bool hasComponent = false; - bool hasPriority = false; + bool hasComponent = false; + bool hasPriority = false; bool hasSamplingPriority = false; bool hasSamplingRate = false; bool hasAllowDrop = false; int allowDrop = 0; - if (params.Has("c")) { - if (TryFromString(params.Get("c"), component) && (component == NLog::InvalidComponent || Settings->IsValidComponent(component))) { - hasComponent = true; - if (params.Has("p")) { - int rawPriority; - if (TryFromString(params.Get("p"), rawPriority) && NLog::TSettings::IsValidPriority((NLog::EPriority)rawPriority)) { - priority = (NLog::EPriority)rawPriority; - hasPriority = true; - } - } + if (params.Has("c")) { + if (TryFromString(params.Get("c"), component) && (component == NLog::InvalidComponent || Settings->IsValidComponent(component))) { + hasComponent = true; + if (params.Has("p")) { + int rawPriority; + if (TryFromString(params.Get("p"), rawPriority) && NLog::TSettings::IsValidPriority((NLog::EPriority)rawPriority)) { + priority = (NLog::EPriority)rawPriority; + hasPriority = true; + } + } if (params.Has("sp")) { int rawPriority; if (TryFromString(params.Get("sp"), rawPriority) && NLog::TSettings::IsValidPriority((NLog::EPriority)rawPriority)) { @@ -400,14 +400,14 @@ namespace NActors { hasSamplingRate = true; } } - } - } + } + } if (params.Has("allowdrop")) { if (TryFromString(params.Get("allowdrop"), allowDrop)) { hasAllowDrop = true; } } - + TStringStream str; if (hasComponent && !hasPriority && !hasSamplingPriority && !hasSamplingRate) { NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(component); @@ -434,7 +434,7 @@ namespace NActors { } } } - + DIV_CLASS("row") { DIV_CLASS("col-md-12") { H4() { @@ -473,12 +473,12 @@ namespace NActors { } } } - - } else { + + } else { TString explanation; - if (hasComponent && hasPriority) { + if (hasComponent && hasPriority) { Settings->SetLevel(priority, component, explanation); - } + } if (hasComponent && hasSamplingPriority) { Settings->SetSamplingLevel(samplingPriority, component, explanation); } @@ -488,7 +488,7 @@ namespace NActors { if (hasAllowDrop) { Settings->SetAllowDrop(allowDrop); } - + HTML(str) { if (!explanation.empty()) { DIV_CLASS("row") { @@ -496,8 +496,8 @@ namespace NActors { str << explanation; } } - } - + } + DIV_CLASS("row") { DIV_CLASS("col-md-6") { RenderComponentPriorities(str); @@ -522,7 +522,7 @@ namespace NActors { << NLog::PriorityToString(NLog::EPrio(p)) << "</a>"; } } - } + } } } H4() { @@ -567,7 +567,7 @@ namespace NActors { } Metrics->GetOutputHtml(str); } - } + } ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str())); } @@ -669,10 +669,10 @@ namespace NActors { } class TStderrBackend: public TLogBackend { - public: + public: TStderrBackend() { } - void WriteData(const TLogRecord& rec) override { + void WriteData(const TLogRecord& rec) override { #ifdef _MSC_VER if (IsDebuggerPresent()) { TString x; @@ -693,15 +693,15 @@ namespace NActors { Y_UNUSED(err); } } while (!isOk); - } - + } + void ReopenLog() override { - } - - private: + } + + private: const TString Indent; - }; - + }; + class TLineFileLogBackend: public TFileLogBackend { public: TLineFileLogBackend(const TString& path) @@ -735,10 +735,10 @@ namespace NActors { TVector<TAutoPtr<TLogBackend>> UnderlyingBackends; }; - TAutoPtr<TLogBackend> CreateStderrBackend() { - return new TStderrBackend(); - } - + TAutoPtr<TLogBackend> CreateStderrBackend() { + return new TStderrBackend(); + } + TAutoPtr<TLogBackend> CreateFileBackend(const TString& fileName) { return new TLineFileLogBackend(fileName); } diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index da518a2517..c11a7cf3c1 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -296,7 +296,7 @@ namespace NActors { //////////////////////////////////////////////////////////////////////////////// TAutoPtr<TLogBackend> CreateSysLogBackend(const TString& ident, bool logPError, bool logCons); - TAutoPtr<TLogBackend> CreateStderrBackend(); + TAutoPtr<TLogBackend> CreateStderrBackend(); TAutoPtr<TLogBackend> CreateFileBackend(const TString& fileName); TAutoPtr<TLogBackend> CreateNullBackend(); TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends); diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp index 58344b6a72..f52f2fc5d2 100644 --- a/library/cpp/actors/core/log_settings.cpp +++ b/library/cpp/actors/core/log_settings.cpp @@ -112,12 +112,12 @@ namespace NActors { AtomicSet(ComponentInfo[i], settings.Raw.Data); } - TStringStream str; + TStringStream str; str << titleName << " for all components has been changed to " << PriorityToString(EPrio(priority)); - explanation = str.Str(); + explanation = str.Str(); return 0; } else { if (!IsValidComponent(component)) { @@ -213,18 +213,18 @@ namespace NActors { UseLocalTimestamps = value; } - EComponent TSettings::FindComponent(const TStringBuf& componentName) const { + EComponent TSettings::FindComponent(const TStringBuf& componentName) const { if (componentName.empty()) - return InvalidComponent; - - for (EComponent component = MinVal; component <= MaxVal; ++component) { - if (ComponentNames[component] == componentName) - return component; - } - - return InvalidComponent; - } - + return InvalidComponent; + + for (EComponent component = MinVal; component <= MaxVal; ++component) { + if (ComponentNames[component] == componentName) + return component; + } + + return InvalidComponent; + } + } } diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h index 94ed257cd2..7fe4504edd 100644 --- a/library/cpp/actors/core/log_settings.h +++ b/library/cpp/actors/core/log_settings.h @@ -157,7 +157,7 @@ namespace NActors { int SetLevel(EPriority priority, EComponent component, TString& explanation); int SetSamplingLevel(EPriority priority, EComponent component, TString& explanation); int SetSamplingRate(ui32 sampling, EComponent component, TString& explanation); - EComponent FindComponent(const TStringBuf& componentName) const; + EComponent FindComponent(const TStringBuf& componentName) const; static int PowerOf2Mask(int val); static bool IsValidPriority(EPriority priority); bool IsValidComponent(EComponent component); diff --git a/library/cpp/actors/core/mon.h b/library/cpp/actors/core/mon.h index 4987b46edf..c450f2338e 100644 --- a/library/cpp/actors/core/mon.h +++ b/library/cpp/actors/core/mon.h @@ -10,8 +10,8 @@ namespace NActors { enum { HttpInfo = EventSpaceBegin(NActors::TEvents::ES_MON), HttpInfoRes, - RemoteHttpInfo, - RemoteHttpInfoRes, + RemoteHttpInfo, + RemoteHttpInfoRes, RemoteJsonInfoRes, RemoteBinaryInfoRes, End @@ -82,12 +82,12 @@ namespace NActors { struct TEvRemoteHttpInfo: public NActors::TEventBase<TEvRemoteHttpInfo, RemoteHttpInfo> { TEvRemoteHttpInfo() { } - + TEvRemoteHttpInfo(const TString& query) - : Query(query) + : Query(query) { } - + TEvRemoteHttpInfo(const TString& query, HTTP_METHOD method) : Query(query) , Method(method) @@ -96,25 +96,25 @@ namespace NActors { TString Query; HTTP_METHOD Method; - + TString PathInfo() const { - const size_t pos = Query.find('?'); + const size_t pos = Query.find('?'); return (pos == TString::npos) ? TString() : Query.substr(0, pos); - } - - TCgiParameters Cgi() const { - const size_t pos = Query.find('?'); + } + + TCgiParameters Cgi() const { + const size_t pos = Query.find('?'); return TCgiParameters((pos == TString::npos) ? TString() : Query.substr(pos + 1)); - } - + } + TString ToStringHeader() const override { - return "TEvRemoteHttpInfo"; - } - + return "TEvRemoteHttpInfo"; + } + bool SerializeToArcadiaStream(TChunkSerializer *serializer) const override { return serializer->WriteString(&Query); - } - + } + ui32 CalculateSerializedSize() const override { return Query.size(); } @@ -125,33 +125,33 @@ namespace NActors { static IEventBase* Load(TEventSerializedData* bufs) { return new TEvRemoteHttpInfo(bufs->GetString()); - } + } HTTP_METHOD GetMethod() const { return Method; } - }; - + }; + struct TEvRemoteHttpInfoRes: public NActors::TEventBase<TEvRemoteHttpInfoRes, RemoteHttpInfoRes> { TEvRemoteHttpInfoRes() { } - + TEvRemoteHttpInfoRes(const TString& html) - : Html(html) + : Html(html) { } - + TString Html; - + TString ToStringHeader() const override { - return "TEvRemoteHttpInfoRes"; - } - + return "TEvRemoteHttpInfoRes"; + } + bool SerializeToArcadiaStream(TChunkSerializer *serializer) const override { return serializer->WriteString(&Html); - } - + } + ui32 CalculateSerializedSize() const override { return Html.size(); } @@ -162,9 +162,9 @@ namespace NActors { static IEventBase* Load(TEventSerializedData* bufs) { return new TEvRemoteHttpInfoRes(bufs->GetString()); - } - }; - + } + }; + struct TEvRemoteJsonInfoRes: public NActors::TEventBase<TEvRemoteJsonInfoRes, RemoteJsonInfoRes> { TEvRemoteJsonInfoRes() { } diff --git a/library/cpp/actors/core/scheduler_cookie.h b/library/cpp/actors/core/scheduler_cookie.h index dc6933c7a2..2c20ca67f3 100644 --- a/library/cpp/actors/core/scheduler_cookie.h +++ b/library/cpp/actors/core/scheduler_cookie.h @@ -44,12 +44,12 @@ namespace NActors { return Cookie; } - ISchedulerCookie* Release() { - ISchedulerCookie* result = Cookie; - Cookie = nullptr; - return result; - } - + ISchedulerCookie* Release() { + ISchedulerCookie* result = Cookie; + Cookie = nullptr; + return result; + } + void Reset(ISchedulerCookie* cookie) { Detach(); Cookie = cookie; diff --git a/library/cpp/actors/protos/actors.proto b/library/cpp/actors/protos/actors.proto index 5b18e8a53f..5fbd6d44ee 100644 --- a/library/cpp/actors/protos/actors.proto +++ b/library/cpp/actors/protos/actors.proto @@ -1,11 +1,11 @@ -package NActorsProto; -option java_package = "ru.yandex.kikimr.proto"; -option java_outer_classname = "NActorsBaseProto"; - +package NActorsProto; +option java_package = "ru.yandex.kikimr.proto"; +option java_outer_classname = "NActorsBaseProto"; + message TActorId { - required fixed64 RawX1 = 1; - required fixed64 RawX2 = 2; -} + required fixed64 RawX1 = 1; + required fixed64 RawX2 = 2; +} message TCallbackException { required TActorId ActorId = 1; diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto index 6513a59259..2e3b0d0d15 100644 --- a/library/cpp/actors/protos/interconnect.proto +++ b/library/cpp/actors/protos/interconnect.proto @@ -1,19 +1,19 @@ import "library/cpp/actors/protos/actors.proto"; import "google/protobuf/descriptor.proto"; -package NActorsInterconnect; -option java_package = "ru.yandex.kikimr.proto"; - -message TEvResolveNode { - optional uint32 NodeId = 1; +package NActorsInterconnect; +option java_package = "ru.yandex.kikimr.proto"; + +message TEvResolveNode { + optional uint32 NodeId = 1; optional uint64 Deadline = 2; -} - -message TEvNodeInfo { - optional uint32 NodeId = 1; - optional string Address = 2; - optional uint32 Port = 3; -} +} + +message TEvNodeInfo { + optional uint32 NodeId = 1; + optional string Address = 2; + optional uint32 Port = 3; +} extend google.protobuf.FieldOptions { optional string PrintName = 50376; diff --git a/library/cpp/actors/protos/services_common.proto b/library/cpp/actors/protos/services_common.proto index 6b822a5e22..afa0ec0073 100644 --- a/library/cpp/actors/protos/services_common.proto +++ b/library/cpp/actors/protos/services_common.proto @@ -1,12 +1,12 @@ -package NActorsServices; -option java_package = "ru.yandex.kikimr.proto"; - -// 0-255 range -enum EServiceCommon { +package NActorsServices; +option java_package = "ru.yandex.kikimr.proto"; + +// 0-255 range +enum EServiceCommon { // WARN: This must be the smallest value in the enumeration - GLOBAL = 0; - INTERCONNECT = 1; + GLOBAL = 0; + INTERCONNECT = 1; TEST = 2; PROTOCOLS = 3; INTERCONNECT_SPEED_TEST = 4; @@ -18,4 +18,4 @@ enum EServiceCommon { // This value is reserved boundary. Is must not be aliased with any values // TODO: use reseved values upon protobuf update // COMMON_END = 256; -}; +}; diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index e39b7bc59b..6fa25b9965 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -8,20 +8,20 @@ #include <library/cpp/actors/core/scheduler_basic.h> #include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/protos/services_common.pb.h> -#include <library/cpp/random_provider/random_provider.h> +#include <library/cpp/random_provider/random_provider.h> #include <library/cpp/actors/interconnect/interconnect.h> #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h> -#include <util/generic/maybe.h> +#include <util/generic/maybe.h> #include <util/generic/bt_exception.h> -#include <util/random/mersenne.h> -#include <util/string/printf.h> -#include <typeinfo> - +#include <util/random/mersenne.h> +#include <util/string/printf.h> +#include <typeinfo> + bool VERBOSE = false; -const bool PRINT_EVENT_BODY = false; - +const bool PRINT_EVENT_BODY = false; + namespace { TString MakeClusterId() { @@ -32,9 +32,9 @@ namespace { } } -namespace NActors { - ui64 TScheduledEventQueueItem::NextUniqueId = 0; - +namespace NActors { + ui64 TScheduledEventQueueItem::NextUniqueId = 0; + void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) { Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite()) << ", from " << ev->Sender.LocalId(); @@ -46,21 +46,21 @@ namespace NActors { if (!name.empty()) Cerr << " \"" << name << "\""; Cerr << ", "; - if (ev->HasEvent()) + if (ev->HasEvent()) Cerr << " : " << (PRINT_EVENT_BODY ? ev->GetBase()->ToString() : ev->GetBase()->ToStringHeader()); - else if (ev->HasBuffer()) + else if (ev->HasBuffer()) Cerr << " : BUFFER"; - else + else Cerr << " : EMPTY"; - + Cerr << "\n"; - } - + } + TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() { - ActorSystemTimestamp = nullptr; + ActorSystemTimestamp = nullptr; ActorSystemMonotonic = nullptr; - } - + } + void TTestActorRuntimeBase::TNodeDataBase::Stop() { if (Poller) Poller->Stop(); @@ -70,146 +70,146 @@ namespace NActors { Y_VERIFY(round < 10, "cyclic event/actor spawn while trying to shutdown actorsystem stub"); } - if (ActorSystem) - ActorSystem->Stop(); + if (ActorSystem) + ActorSystem->Stop(); - ActorSystem.Destroy(); + ActorSystem.Destroy(); Poller.Reset(); - } - + } + TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() { Stop(); } class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> { - public: + public: static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; } TEdgeActor(TTestActorRuntimeBase* runtime) - : TActor(&TEdgeActor::StateFunc) - , Runtime(runtime) - { - } - - STFUNC(StateFunc) { + : TActor(&TEdgeActor::StateFunc) + , Runtime(runtime) + { + } + + STFUNC(StateFunc) { Y_UNUSED(ctx); - TGuard<TMutex> guard(Runtime->Mutex); - bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; - if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { - verbose = false; - } - - if (verbose) { + TGuard<TMutex> guard(Runtime->Mutex); + bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; + if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { + verbose = false; + } + + if (verbose) { Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", "; PrintEvent(ev, Runtime); - } - - if (!Runtime->EventFilterFunc(*Runtime, ev)) { - ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + } + + if (!Runtime->EventFilterFunc(*Runtime, ev)) { + ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); - ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); - Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); - Runtime->MailboxesHasEvents.Signal(); - if (verbose) + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); + Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); + Runtime->MailboxesHasEvents.Signal(); + if (verbose) Cerr << "Event was added to sent queue\n"; - } - else { - if (verbose) + } + else { + if (verbose) Cerr << "Event was dropped\n"; - } - } - - private: + } + } + + private: TTestActorRuntimeBase* Runtime; - }; - - void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) { - IEventHandle* ptr = ev.Get(); + }; + + void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) { + IEventHandle* ptr = ev.Get(); Y_VERIFY(ptr); -#ifdef DEBUG_ORDER_EVENTS - ui64 counter = NextToSend++; - TrackSent[ptr] = counter; -#endif - Sent.push_back(ev); - } - - TAutoPtr<IEventHandle> TEventMailBox::Pop() { - TAutoPtr<IEventHandle> result = Sent.front(); - Sent.pop_front(); -#ifdef DEBUG_ORDER_EVENTS - auto it = TrackSent.find(result.Get()); - if (it != TrackSent.end()) { +#ifdef DEBUG_ORDER_EVENTS + ui64 counter = NextToSend++; + TrackSent[ptr] = counter; +#endif + Sent.push_back(ev); + } + + TAutoPtr<IEventHandle> TEventMailBox::Pop() { + TAutoPtr<IEventHandle> result = Sent.front(); + Sent.pop_front(); +#ifdef DEBUG_ORDER_EVENTS + auto it = TrackSent.find(result.Get()); + if (it != TrackSent.end()) { Y_VERIFY(ExpectedReceive == it->second); - TrackSent.erase(result.Get()); - ++ExpectedReceive; - } -#endif - return result; - } - - bool TEventMailBox::IsEmpty() const { - return Sent.empty(); - } - - void TEventMailBox::Capture(TEventsList& evList) { - evList.insert(evList.end(), Sent.begin(), Sent.end()); - Sent.clear(); - } - - void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) { - Sent.push_front(ev); - } - - void TEventMailBox::PushFront(TEventsList& evList) { - for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) { - if (*rit) { - Sent.push_front(*rit); - } - } - } - - void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) { - for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) { - evList.insert(*it); - } - - Scheduled.clear(); - } - - void TEventMailBox::PushScheduled(TScheduledEventsList& evList) { - for (auto it = evList.begin(); it != evList.end(); ++it) { - if (it->Event) { - Scheduled.insert(*it); - } - } - - evList.clear(); - } - - bool TEventMailBox::IsActive(const TInstant& currentTime) const { - return currentTime >= InactiveUntil; - } - - void TEventMailBox::Freeze(const TInstant& deadline) { - if (deadline > InactiveUntil) - InactiveUntil = deadline; - } - - TInstant TEventMailBox::GetInactiveUntil() const { - return InactiveUntil; - } - - void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) { - Scheduled.insert(item); - } - - bool TEventMailBox::IsScheduledEmpty() const { - return Scheduled.empty(); - } - + TrackSent.erase(result.Get()); + ++ExpectedReceive; + } +#endif + return result; + } + + bool TEventMailBox::IsEmpty() const { + return Sent.empty(); + } + + void TEventMailBox::Capture(TEventsList& evList) { + evList.insert(evList.end(), Sent.begin(), Sent.end()); + Sent.clear(); + } + + void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) { + Sent.push_front(ev); + } + + void TEventMailBox::PushFront(TEventsList& evList) { + for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) { + if (*rit) { + Sent.push_front(*rit); + } + } + } + + void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) { + for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) { + evList.insert(*it); + } + + Scheduled.clear(); + } + + void TEventMailBox::PushScheduled(TScheduledEventsList& evList) { + for (auto it = evList.begin(); it != evList.end(); ++it) { + if (it->Event) { + Scheduled.insert(*it); + } + } + + evList.clear(); + } + + bool TEventMailBox::IsActive(const TInstant& currentTime) const { + return currentTime >= InactiveUntil; + } + + void TEventMailBox::Freeze(const TInstant& deadline) { + if (deadline > InactiveUntil) + InactiveUntil = deadline; + } + + TInstant TEventMailBox::GetInactiveUntil() const { + return InactiveUntil; + } + + void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) { + Scheduled.insert(item); + } + + bool TEventMailBox::IsScheduledEmpty() const { + return Scheduled.empty(); + } + TInstant TEventMailBox::GetFirstScheduleDeadline() const { return Scheduled.begin()->Deadline; } @@ -219,80 +219,80 @@ namespace NActors { } class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider { - public: + public: TTimeProvider(TTestActorRuntimeBase& runtime) - : Runtime(runtime) - { - } - - TInstant Now() override { - return Runtime.GetCurrentTime(); - } - - private: + : Runtime(runtime) + { + } + + TInstant Now() override { + return Runtime.GetCurrentTime(); + } + + private: TTestActorRuntimeBase& Runtime; - }; - + }; + class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread { - public: + public: TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) - : Runtime(runtime) - , Node(node) + : Runtime(runtime) + , Node(node) { Y_UNUSED(Runtime); } - + void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override { Y_UNUSED(actorSystem); - Node->ActorSystemTimestamp = currentTimestamp; + Node->ActorSystemTimestamp = currentTimestamp; Node->ActorSystemMonotonic = currentMonotonic; - } - + } + void PrepareSchedules(NSchedulerQueue::TReader **readers, ui32 scheduleReadersCount) override { Y_UNUSED(readers); Y_UNUSED(scheduleReadersCount); - } - + } + void Start() override { - } - + } + void PrepareStop() override { - } - + } + void Stop() override { - } - - private: + } + + private: TTestActorRuntimeBase* Runtime; TTestActorRuntimeBase::TNodeDataBase* Node; - }; - + }; + class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool { - public: + public: TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) - : IExecutorPool(poolId) - , Runtime(runtime) - , NodeIndex(nodeIndex) - , Node(node) - { - } - + : IExecutorPool(poolId) + , Runtime(runtime) + , NodeIndex(nodeIndex) + , Node(node) + { + } + TTestActorRuntimeBase* GetRuntime() { - return Runtime; - } - - // for threads + return Runtime; + } + + // for threads ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override { Y_UNUSED(wctx); Y_UNUSED(revolvingCounter); Y_FAIL(); - } - + } + void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) override { Y_UNUSED(workerId); - Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter); - } - + Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter); + } + void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override { DoSchedule(deadline, ev, cookie, workerId); } @@ -309,16 +309,16 @@ namespace NActors { void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) { Y_UNUSED(workerId); - TGuard<TMutex> guard(Runtime->Mutex); - bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; - if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { - verbose = false; - } - - if (verbose) { + TGuard<TMutex> guard(Runtime->Mutex); + bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; + if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { + verbose = false; + } + + if (verbose) { Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", "; PrintEvent(ev, Runtime); - } + } auto now = Runtime->GetTimeProvider()->Now(); if (deadline < now) { @@ -327,36 +327,36 @@ namespace NActors { TDuration delay = (deadline - now); if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) { - ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); - Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie)); - Runtime->MailboxesHasEvents.Signal(); - if (verbose) + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); + Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie)); + Runtime->MailboxesHasEvents.Signal(); + if (verbose) Cerr << "Event was added to scheduled queue\n"; - } else { + } else { if (cookie) { cookie->Detach(); } if (verbose) { Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n"; } - } - } - - // for actorsystem + } + } + + // for actorsystem bool Send(TAutoPtr<IEventHandle>& ev) override { - TGuard<TMutex> guard(Runtime->Mutex); - bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; - if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { - verbose = false; - } - - if (verbose) { + TGuard<TMutex> guard(Runtime->Mutex); + bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; + if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { + verbose = false; + } + + if (verbose) { Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", "; PrintEvent(ev, Runtime); - } - - if (!Runtime->EventFilterFunc(*Runtime, ev)) { - ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + } + + if (!Runtime->EventFilterFunc(*Runtime, ev)) { + ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); TNodeDataBase* node = Runtime->Nodes[nodeId].Get(); @@ -364,7 +364,7 @@ namespace NActors { return true; } - ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) { const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger"); TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId); @@ -384,68 +384,68 @@ namespace NActors { Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); Runtime->MailboxesHasEvents.Signal(); } - if (verbose) + if (verbose) Cerr << "Event was added to sent queue\n"; - } else { - if (verbose) + } else { + if (verbose) Cerr << "Event was dropped\n"; - } - return true; - } - + } + return true; + } + void ScheduleActivation(ui32 activation) override { Y_UNUSED(activation); - } - + } + void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override { Y_UNUSED(activation); Y_UNUSED(revolvingCounter); - } - + } + TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) override { - return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId); - } - + return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId); + } + TActorId Register(IActor *actor, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) override { - return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId); - } - - // lifecycle stuff + return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId); + } + + // lifecycle stuff void Prepare(TActorSystem *actorSystem, NSchedulerQueue::TReader **scheduleReaders, ui32 *scheduleSz) override { Y_UNUSED(actorSystem); Y_UNUSED(scheduleReaders); Y_UNUSED(scheduleSz); - } - + } + void Start() override { - } - + } + void PrepareStop() override { - } - + } + void Shutdown() override { - } - + } + bool Cleanup() override { return true; } - // generic + // generic TAffinity* Affinity() const override { Y_FAIL(); - } - - private: + } + + private: TTestActorRuntimeBase* const Runtime; - const ui32 NodeIndex; + const ui32 NodeIndex; TTestActorRuntimeBase::TNodeDataBase* const Node; - }; - + }; + IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) { return new TExecutorPoolStub{runtime, nodeIndex, node, poolId}; } - + ui32 TTestActorRuntimeBase::NextNodeId = 1; @@ -460,31 +460,31 @@ namespace NActors { , ScheduledLimit(100000) , MainThreadId(TThread::CurrentThreadId()) , ClusterUUID(MakeClusterId()) - , FirstNodeId(NextNodeId) - , NodeCount(nodeCount) + , FirstNodeId(NextNodeId) + , NodeCount(nodeCount) , DataCenterCount(dataCenterCount) - , UseRealThreads(useRealThreads) - , LocalId(0) - , DispatchCyclesCount(0) - , DispatchedEventsCount(0) + , UseRealThreads(useRealThreads) + , LocalId(0) + , DispatchCyclesCount(0) + , DispatchedEventsCount(0) , NeedMonitoring(false) , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed)) , TimeProvider(new TTimeProvider(*this)) , ShouldContinue() - , CurrentTimestamp(0) + , CurrentTimestamp(0) , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT) , ReschedulingDelay(TDuration::MicroSeconds(0)) , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc) - , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector) + , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector) , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc) , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc) , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver) , CurrentDispatchContext(nullptr) - { + { SetDispatcherRandomSeed(TInstant::Now(), 0); EnableActorCallstack(); } - + void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) { const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger"); node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */, @@ -492,10 +492,10 @@ namespace NActors { node->LogSettings->SetAllowDrop(false); node->LogSettings->SetThrottleDelay(TDuration::Zero()); node->DynamicCounters = new NMonitoring::TDynamicCounters; - + InitNodeImpl(node, nodeIndex); - } - + } + void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) { node->LogSettings->Append( NActorsServices::EServiceCommon_MIN, @@ -537,55 +537,55 @@ namespace NActors { TTestActorRuntimeBase::~TTestActorRuntimeBase() { CleanupNodes(); - Cerr.Flush(); Cerr.Flush(); - Clog.Flush(); - + Cerr.Flush(); + Clog.Flush(); + DisableActorCallstack(); - } - + } + void TTestActorRuntimeBase::CleanupNodes() { Nodes.clear(); } bool TTestActorRuntimeBase::IsRealThreads() const { - return UseRealThreads; - } - + return UseRealThreads; + } + TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); Y_UNUSED(event); - return EEventAction::PROCESS; - } - + return EEventAction::PROCESS; + } + void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { Y_UNUSED(runtime); Y_UNUSED(queue); - scheduledEvents.clear(); - } - + scheduledEvents.clear(); + } + bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); Y_UNUSED(event); - return false; - } - + return false; + } + bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) { Y_UNUSED(runtime); Y_UNUSED(delay); Y_UNUSED(event); Y_UNUSED(deadline); - return true; - } - + return true; + } + void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { runtime.ScheduleWhiteList.insert(actorId); runtime.ScheduleWhiteListParent[actorId] = parentId; } - } - + } + class TScheduledTreeItem { public: TString Name; @@ -635,11 +635,11 @@ namespace NActors { }; void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { - if (scheduledEvents.empty()) - return; - - TInstant time = scheduledEvents.begin()->Deadline; - while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) { + if (scheduledEvents.empty()) + return; + + TInstant time = scheduledEvents.begin()->Deadline; + while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) { static THashMap<std::pair<TActorId, TString>, ui64> eventTypes; auto& item = *scheduledEvents.begin(); TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type); @@ -669,58 +669,58 @@ namespace NActors { ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit); } if (item.Cookie->Get()) { - if (item.Cookie->Detach()) { - queue.push_back(item.Event); - } - } else { - queue.push_back(item.Event); - } - - scheduledEvents.erase(scheduledEvents.begin()); - } - - runtime.UpdateCurrentTime(time); - } - + if (item.Cookie->Detach()) { + queue.push_back(item.Event); + } + } else { + queue.push_back(item.Event); + } + + scheduledEvents.erase(scheduledEvents.begin()); + } + + runtime.UpdateCurrentTime(time); + } + TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); auto result = ObserverFunc; ObserverFunc = observerFunc; return result; - } - + } + TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); auto result = ScheduledEventsSelectorFunc; ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc; return result; - } - + } + TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); auto result = EventFilterFunc; EventFilterFunc = filterFunc; return result; - } - + } + TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); auto result = ScheduledEventFilterFunc; ScheduledEventFilterFunc = filterFunc; return result; - } - + } + TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); auto result = RegistrationObserver; RegistrationObserver = observerFunc; return result; - } - + } + bool TTestActorRuntimeBase::IsVerbose() { - return VERBOSE; - } - + return VERBOSE; + } + void TTestActorRuntimeBase::SetVerbose(bool verbose) { VERBOSE = verbose; } @@ -728,16 +728,16 @@ namespace NActors { void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) { Y_VERIFY(!IsInitialized); Y_VERIFY(nodeIndex < NodeCount); - auto node = Nodes[nodeIndex + FirstNodeId]; - if (!node) { + auto node = Nodes[nodeIndex + FirstNodeId]; + if (!node) { node = GetNodeFactory().CreateNode(); - Nodes[nodeIndex + FirstNodeId] = node; - } - - node->LocalServicesActors[actorId] = cmd.Actor; + Nodes[nodeIndex + FirstNodeId] = node; + } + + node->LocalServicesActors[actorId] = cmd.Actor; node->LocalServices.push_back(std::make_pair(actorId, cmd)); - } - + } + void TTestActorRuntimeBase::InitNodes() { NextNodeId += NodeCount; Y_VERIFY(NodeCount > 0); @@ -746,33 +746,33 @@ namespace NActors { auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first; TNodeDataBase* node = nodeIt->second.Get(); InitNode(node, nodeIndex); - } - + } + } - + void TTestActorRuntimeBase::Initialize() { InitNodes(); IsInitialized = true; - } - + } + void SetupCrossDC() { } TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) { - TGuard<TMutex> guard(Mutex); - TDuration oldTimeout = DispatchTimeout; - DispatchTimeout = timeout; - return oldTimeout; - } - + TGuard<TMutex> guard(Mutex); + TDuration oldTimeout = DispatchTimeout; + DispatchTimeout = timeout; + return oldTimeout; + } + TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) { - TGuard<TMutex> guard(Mutex); - TDuration oldDelay = ReschedulingDelay; - ReschedulingDelay = delay; - return oldDelay; - } - + TGuard<TMutex> guard(Mutex); + TDuration oldDelay = ReschedulingDelay; + ReschedulingDelay = delay; + return oldDelay; + } + void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) { Y_VERIFY(!IsInitialized); TGuard<TMutex> guard(Mutex); @@ -780,68 +780,68 @@ namespace NActors { } void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) { - TGuard<TMutex> guard(Mutex); - for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { + TGuard<TMutex> guard(Mutex); + for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); TString explanation; auto status = node->LogSettings->SetLevel(priority, component, explanation); if (status) { Y_FAIL("SetLogPriority failed: %s", explanation.c_str()); } - } - } - + } + } + TInstant TTestActorRuntimeBase::GetCurrentTime() const { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(!UseRealThreads); - return TInstant::MicroSeconds(CurrentTimestamp); - } - + return TInstant::MicroSeconds(CurrentTimestamp); + } + void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) { static int counter = 0; ++counter; if (VERBOSE) { Cerr << "UpdateCurrentTime(" << counter << "," << newTime << ")\n"; } - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(!UseRealThreads); - if (newTime.MicroSeconds() > CurrentTimestamp) { - CurrentTimestamp = newTime.MicroSeconds(); - for (auto& kv : Nodes) { + if (newTime.MicroSeconds() > CurrentTimestamp) { + CurrentTimestamp = newTime.MicroSeconds(); + for (auto& kv : Nodes) { AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp); AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp); - } - } - } - + } + } + } + void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) { UpdateCurrentTime(GetCurrentTime() + duration); } TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() { Y_VERIFY(!UseRealThreads); - return TimeProvider; - } - + return TimeProvider; + } + ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const { Y_VERIFY(index < NodeCount); - return FirstNodeId + index; - } - + return FirstNodeId + index; + } + ui32 TTestActorRuntimeBase::GetNodeCount() const { - return NodeCount; - } - + return NodeCount; + } + ui64 TTestActorRuntimeBase::AllocateLocalId() { - TGuard<TMutex> guard(Mutex); - ui64 nextId = ++LocalId; - if (VERBOSE) { + TGuard<TMutex> guard(Mutex); + ui64 nextId = ++LocalId; + if (VERBOSE) { Cerr << "Allocated id: " << nextId << "\n"; - } - - return nextId; - } - + } + + return nextId; + } + ui32 TTestActorRuntimeBase::InterconnectPoolId() const { if (UseRealThreads && NSan::TSanIsOn()) { // Interconnect coroutines may move across threads @@ -852,63 +852,63 @@ namespace NActors { } TString TTestActorRuntimeBase::GetTempDir() { - if (!TmpDir) - TmpDir.Reset(new TTempDir()); - return (*TmpDir)(); - } - + if (!TmpDir) + TmpDir.Reset(new TTempDir()); + return (*TmpDir)(); + } + TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); - if (UseRealThreads) { + if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); - return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId); - } - - // first step - find good enough mailbox - ui32 hint = 0; + return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId); + } + + // first step - find good enough mailbox + ui32 hint = 0; TMailboxHeader *mailbox = nullptr; - - { - ui32 hintBackoff = 0; - - while (hint == 0) { - hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter); - mailbox = node->MailboxTable->Get(hint); - - if (!mailbox->LockFromFree()) { - node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter); - hintBackoff = hint; - hint = 0; - } - } - - node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter); - } - - const ui64 localActorId = AllocateLocalId(); - if (VERBOSE) { + + { + ui32 hintBackoff = 0; + + while (hint == 0) { + hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter); + mailbox = node->MailboxTable->Get(hint); + + if (!mailbox->LockFromFree()) { + node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter); + hintBackoff = hint; + hint = 0; + } + } + + node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter); + } + + const ui64 localActorId = AllocateLocalId(); + if (VERBOSE) { Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << ", mailbox: " << hint << "\n"; - } - - // ok, got mailbox - mailbox->AttachActor(localActorId, actor); - - // do init + } + + // ok, got mailbox + mailbox->AttachActor(localActorId, actor); + + // do init const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); - RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); + RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); - - switch (mailboxType) { - case TMailboxType::Simple: + + switch (mailboxType) { + case TMailboxType::Simple: UnlockFromExecution((TMailboxTable::TSimpleMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); - break; - case TMailboxType::Revolving: + break; + case TMailboxType::Revolving: UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); - break; + break; case TMailboxType::HTSwap: UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); break; @@ -918,116 +918,116 @@ namespace NActors { case TMailboxType::TinyReadAsFilled: UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); break; - default: + default: Y_FAIL("Unsupported mailbox type"); - } - - return actorId; - } - + } + + return actorId; + } + TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); - if (UseRealThreads) { + if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); - return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId); - } - - const ui64 localActorId = AllocateLocalId(); - if (VERBOSE) { + return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId); + } + + const ui64 localActorId = AllocateLocalId(); + if (VERBOSE) { Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << "\n"; - } - - mailbox->AttachActor(localActorId, actor); + } + + mailbox->AttachActor(localActorId, actor); const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); - RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); + RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); - - return actorId; - } - + + return actorId; + } + TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); - if (!UseRealThreads) { - IActor* actor = FindActor(actorId, node); - node->LocalServicesActors[serviceId] = actor; - node->ActorToActorId[actor] = actorId; - } - + if (!UseRealThreads) { + IActor* actor = FindActor(actorId, node); + node->LocalServicesActors[serviceId] = actor; + node->ActorToActorId[actor] = actorId; + } + return node->ActorSystem->RegisterLocalService(serviceId, actorId); - } - + } + TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex); - EdgeActors.insert(edgeActor); + EdgeActors.insert(edgeActor); EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor; - return edgeActor; - } - + return edgeActor; + } + TEventsList TTestActorRuntimeBase::CaptureEvents() { - TGuard<TMutex> guard(Mutex); - TEventsList result; - for (auto& mbox : Mailboxes) { - mbox.second->Capture(result); - } - - return result; - } - + TGuard<TMutex> guard(Mutex); + TEventsList result; + for (auto& mbox : Mailboxes) { + mbox.second->Capture(result); + } + + return result; + } + TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); - TEventsList result; - GetMailbox(nodeId, hint).Capture(result); - return result; - } - + TEventsList result; + GetMailbox(nodeId, hint).Capture(result); + return result; + } + void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) { - TGuard<TMutex> guard(Mutex); - ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + TGuard<TMutex> guard(Mutex); + ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); - GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); - } - + GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); + } + void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) { - TGuard<TMutex> guard(Mutex); - for (auto rit = events.rbegin(); rit != events.rend(); ++rit) { - if (*rit) { - auto& ev = *rit; - ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + TGuard<TMutex> guard(Mutex); + for (auto rit = events.rbegin(); rit != events.rend(); ++rit) { + if (*rit) { + auto& ev = *rit; + ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); - GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); - } - } - - events.clear(); - } - + GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); + } + } + + events.clear(); + } + void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); - TEventsList result; - GetMailbox(nodeId, hint).PushFront(events); - events.clear(); - } - + TEventsList result; + GetMailbox(nodeId, hint).PushFront(events); + events.clear(); + } + TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() { - TGuard<TMutex> guard(Mutex); - TScheduledEventsList result; - for (auto& mbox : Mailboxes) { - mbox.second->CaptureScheduled(result); - } - - return result; - } - + TGuard<TMutex> guard(Mutex); + TScheduledEventsList result; + for (auto& mbox : Mailboxes) { + mbox.second->CaptureScheduled(result); + } + + return result; + } + bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) { return DispatchEvents(options, TInstant::Max()); } @@ -1037,100 +1037,100 @@ namespace NActors { } bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); return DispatchEventsInternal(options, simDeadline); } // Mutex must be locked by caller! bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) { - TDispatchContext localContext; - localContext.Options = &options; - localContext.PrevContext = nullptr; - bool verbose = !options.Quiet && VERBOSE; - - struct TDispatchContextSetter { + TDispatchContext localContext; + localContext.Options = &options; + localContext.PrevContext = nullptr; + bool verbose = !options.Quiet && VERBOSE; + + struct TDispatchContextSetter { TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext) : Runtime(runtime) - { + { lastContext.PrevContext = Runtime.CurrentDispatchContext; Runtime.CurrentDispatchContext = &lastContext; - } - - ~TDispatchContextSetter() { + } + + ~TDispatchContextSetter() { Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext; - } - + } + TTestActorRuntimeBase& Runtime; } DispatchContextSetter(*this, localContext); TInstant dispatchTime = TInstant::MicroSeconds(0); - TInstant deadline = dispatchTime + DispatchTimeout; - const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10); - TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; - if (verbose) { + TInstant deadline = dispatchTime + DispatchTimeout; + const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10); + TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; + if (verbose) { Cerr << "Start dispatch at " << TInstant::MicroSeconds(CurrentTimestamp) << ", deadline is " << deadline << "\n"; - } - - struct TTempEdgeEventsCaptor { + } + + struct TTempEdgeEventsCaptor { TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime) - : Runtime(runtime) - , HasEvents(false) - { - for (auto edgeActor : Runtime.EdgeActors) { - TEventsList events; - Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events); - auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint()); - auto storeIt = Store.find(mboxId); + : Runtime(runtime) + , HasEvents(false) + { + for (auto edgeActor : Runtime.EdgeActors) { + TEventsList events; + Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events); + auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint()); + auto storeIt = Store.find(mboxId); Y_VERIFY(storeIt == Store.end()); storeIt = Store.insert(std::make_pair(mboxId, new TEventMailBox)).first; - storeIt->second->PushFront(events); - if (!events.empty()) - HasEvents = true; - } - } - - ~TTempEdgeEventsCaptor() { - for (auto edgeActor : Runtime.EdgeActors) { - auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint()); - auto storeIt = Store.find(mboxId); - if (storeIt == Store.end()) { - continue; - } - - TEventsList events; - storeIt->second->Capture(events); - Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events); - } - } - + storeIt->second->PushFront(events); + if (!events.empty()) + HasEvents = true; + } + } + + ~TTempEdgeEventsCaptor() { + for (auto edgeActor : Runtime.EdgeActors) { + auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint()); + auto storeIt = Store.find(mboxId); + if (storeIt == Store.end()) { + continue; + } + + TEventsList events; + storeIt->second->Capture(events); + Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events); + } + } + TTestActorRuntimeBase& Runtime; - TEventMailBoxList Store; - bool HasEvents; - }; - - TEventMailBoxList restrictedMailboxes; - const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty(); - for (auto mailboxId : options.OnlyMailboxes) { - auto it = Mailboxes.find(mailboxId); - if (it == Mailboxes.end()) { + TEventMailBoxList Store; + bool HasEvents; + }; + + TEventMailBoxList restrictedMailboxes; + const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty(); + for (auto mailboxId : options.OnlyMailboxes) { + auto it = Mailboxes.find(mailboxId); + if (it == Mailboxes.end()) { it = Mailboxes.insert(std::make_pair(mailboxId, new TEventMailBox())).first; - } - + } + restrictedMailboxes.insert(std::make_pair(mailboxId, it->second)); - } - - TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor; - if (!restrictedMailboxes) { - tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this)); - } - - TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes; + } + + TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor; + if (!restrictedMailboxes) { + tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this)); + } + + TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes; while (!currentMailboxes.empty()) { - bool hasProgress = true; - while (hasProgress) { - ++DispatchCyclesCount; - hasProgress = false; - + bool hasProgress = true; + while (hasProgress) { + ++DispatchCyclesCount; + hasProgress = false; + ui64 eventsToDispatch = 0; for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) { if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { @@ -1143,7 +1143,7 @@ namespace NActors { bool isEmpty = false; while (!isEmpty && eventsDispatched < eventsToDispatch) { ui64 mailboxCount = currentMailboxes.size(); - ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull; + ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull; auto startWithMboxIt = currentMailboxes.begin(); for (ui64 i = 0; i < startWith; ++i) { ++startWithMboxIt; @@ -1170,7 +1170,7 @@ namespace NActors { ObserverFunc(*this, ev); } mbox.second->PushFront(events); - } + } if (!isEdgeMailbox) { isEmpty = false; @@ -1216,8 +1216,8 @@ namespace NActors { Y_FAIL("Unknown action"); } } - } - + } + } Y_VERIFY(mboxIt != currentMailboxes.end()); if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes && @@ -1225,82 +1225,82 @@ namespace NActors { mboxIt->second->IsScheduledEmpty() && mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { suspectedBoxes.push_back(mboxIt->first); - } + } ++mboxIt; if (mboxIt == currentMailboxes.end()) { mboxIt = currentMailboxes.begin(); - } + } Y_VERIFY(endWithMboxIt != currentMailboxes.end()); if (mboxIt == endWithMboxIt) { - break; - } - } - + break; + } + } + for (auto id : suspectedBoxes) { auto it = currentMailboxes.find(id); if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() && it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { currentMailboxes.erase(it); } - } - } - } - + } + } + } + if (localContext.FinalEventFound) { return true; - } - - if (!localContext.FoundNonEmptyMailboxes.empty()) + } + + if (!localContext.FoundNonEmptyMailboxes.empty()) return true; - + if (options.CustomFinalCondition && options.CustomFinalCondition()) return true; - if (options.FinalEvents.empty()) { - for (auto& mbox : currentMailboxes) { - if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) - continue; - - if (!mbox.second->IsEmpty()) { - if (verbose) { + if (options.FinalEvents.empty()) { + for (auto& mbox : currentMailboxes) { + if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) + continue; + + if (!mbox.second->IsEmpty()) { + if (verbose) { Cerr << "Dispatch complete with non-empty queue at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n"; - } - + } + return true; - } - } - } - + } + } + } + if (TInstant::MicroSeconds(CurrentTimestamp) > simDeadline) { return false; } - if (dispatchTime >= deadline) { - if (verbose) { + if (dispatchTime >= deadline) { + if (verbose) { Cerr << "Reach deadline at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n"; - } - + } + ythrow TWithBackTrace<TEmptyEventQueueException>(); - } - + } + if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) { - inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; - bool isEmpty = true; - TMaybe<TInstant> nearestMailboxDeadline; + inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; + bool isEmpty = true; + TMaybe<TInstant> nearestMailboxDeadline; TVector<TIntrusivePtr<TEventMailBox>> nextScheduleMboxes; TMaybe<TInstant> nextScheduleDeadline; - for (auto& mbox : currentMailboxes) { - if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) { - nearestMailboxDeadline = mbox.second->GetInactiveUntil(); - } - - continue; - } - - if (mbox.second->IsScheduledEmpty()) - continue; - + for (auto& mbox : currentMailboxes) { + if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) { + nearestMailboxDeadline = mbox.second->GetInactiveUntil(); + } + + continue; + } + + if (mbox.second->IsScheduledEmpty()) + continue; + auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline(); if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) { nextScheduleMboxes.clear(); @@ -1312,118 +1312,118 @@ namespace NActors { } for (const auto& nextScheduleMbox : nextScheduleMboxes) { - TEventsList selectedEvents; - TScheduledEventsList capturedScheduledEvents; + TEventsList selectedEvents; + TScheduledEventsList capturedScheduledEvents; nextScheduleMbox->CaptureScheduled(capturedScheduledEvents); - ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents); + ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents); nextScheduleMbox->PushScheduled(capturedScheduledEvents); - for (auto& event : selectedEvents) { - if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) { + for (auto& event : selectedEvents) { + if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) { Cerr << "Selected scheduled event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", "; PrintEvent(event, this); - } - + } + nextScheduleMbox->Send(event); - isEmpty = false; - } - } - - if (!isEmpty) { - if (verbose) { + isEmpty = false; + } + } + + if (!isEmpty) { + if (verbose) { Cerr << "Process selected events at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n"; - } - + } + deadline = dispatchTime + DispatchTimeout; - continue; - } - - if (nearestMailboxDeadline.Defined()) { - if (verbose) { + continue; + } + + if (nearestMailboxDeadline.Defined()) { + if (verbose) { Cerr << "Forward time to " << *nearestMailboxDeadline.Get() << "\n"; - } - - UpdateCurrentTime(*nearestMailboxDeadline.Get()); - continue; - } - } - - TDuration waitDelay = TDuration::MilliSeconds(10); - dispatchTime += waitDelay; - MailboxesHasEvents.WaitT(Mutex, waitDelay); - } + } + + UpdateCurrentTime(*nearestMailboxDeadline.Get()); + continue; + } + } + + TDuration waitDelay = TDuration::MilliSeconds(10); + dispatchTime += waitDelay; + MailboxesHasEvents.WaitT(Mutex, waitDelay); + } return false; - } - + } + void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) { - TDispatchContext* context = CurrentDispatchContext; - while (context) { - const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes; - if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) { - context->FoundNonEmptyMailboxes.insert(mboxId); - } - - context = context->PrevContext; - } - } - + TDispatchContext* context = CurrentDispatchContext; + while (context) { + const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes; + if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) { + context->FoundNonEmptyMailboxes.insert(mboxId); + } + + context = context->PrevContext; + } + } + void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) { - TDispatchContext* context = CurrentDispatchContext; - while (context) { + TDispatchContext* context = CurrentDispatchContext; + while (context) { for (const auto& finalEvent : context->Options->FinalEvents) { if (finalEvent.EventCheck(ev)) { auto& freq = context->FinalEventFrequency[&finalEvent]; if (++freq >= finalEvent.RequiredCount) { context->FinalEventFound = true; } - } - } - - context = context->PrevContext; - } - } - + } + } + + context = context->PrevContext; + } + } + void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32, senderNodeIndex, NodeCount); - SendInternal(ev, senderNodeIndex, viaActorSystem); - } - + SendInternal(ev, senderNodeIndex, viaActorSystem); + } + void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - ui32 nodeId = FirstNodeId + nodeIndex; - ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); - TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration; - GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr)); - if (VERBOSE) + ui32 nodeId = FirstNodeId + nodeIndex; + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); + TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration; + GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr)); + if (VERBOSE) Cerr << "Event was added to scheduled queue\n"; - } - + } + void TTestActorRuntimeBase::ClearCounters() { - TGuard<TMutex> guard(Mutex); - EvCounters.clear(); - } - + TGuard<TMutex> guard(Mutex); + EvCounters.clear(); + } + ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const { - TGuard<TMutex> guard(Mutex); - auto it = EvCounters.find(evType); - if (it == EvCounters.end()) - return 0; - - return it->second; - } - + TGuard<TMutex> guard(Mutex); + auto it = EvCounters.find(evType); + if (it == EvCounters.end()) + return 0; + + return it->second; + } + TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); - return node->ActorSystem->LookupLocalService(serviceId); - } - + return node->ActorSystem->LookupLocalService(serviceId); + } + void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) { - TGuard<TMutex> guard(Mutex); - ui32 dispatchCount = 0; + TGuard<TMutex> guard(Mutex); + ui32 dispatchCount = 0; if (!edgeFilter.empty()) { for (auto edgeActor : edgeFilter) { Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data()); @@ -1431,202 +1431,202 @@ namespace NActors { } const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter; TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout; - for (;;) { + for (;;) { for (auto edgeActor : edgeActors) { - TEventsList events; - auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint()); - bool foundEvent = false; - mbox.Capture(events); - for (auto& ev : events) { - if (filter(*this, ev)) { - foundEvent = true; - break; - } - } - - mbox.PushFront(events); - if (foundEvent) - return; - } - - ++dispatchCount; - { + TEventsList events; + auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint()); + bool foundEvent = false; + mbox.Capture(events); + for (auto& ev : events) { + if (filter(*this, ev)) { + foundEvent = true; + break; + } + } + + mbox.PushFront(events); + if (foundEvent) + return; + } + + ++dispatchCount; + { if (!DispatchEventsInternal(TDispatchOptions(), deadline)) { return; // Timed out; event was not found } - } - + } + Y_VERIFY(dispatchCount < 1000, "Hard limit to prevent endless loop"); - } - } - + } + } + TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndexFrom < NodeCount); Y_VERIFY(nodeIndexTo < NodeCount); Y_VERIFY(nodeIndexFrom != nodeIndexTo); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get(); - return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo); - } - + return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo); + } + void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) { - TGuard<TMutex> guard(Mutex); - BlockedOutput.insert(actorId); - } - + TGuard<TMutex> guard(Mutex); + BlockedOutput.insert(actorId); + } + void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) { ui64 days = (time.Hours() / 24); DispatcherRandomSeed = (days << 32) ^ iteration; - DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); + DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); } IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const { - TGuard<TMutex> guard(Mutex); - if (nodeIndex == Max<ui32>()) { + TGuard<TMutex> guard(Mutex); + if (nodeIndex == Max<ui32>()) { Y_VERIFY(actorId.NodeId()); - nodeIndex = actorId.NodeId() - FirstNodeId; - } - + nodeIndex = actorId.NodeId() - FirstNodeId; + } + Y_VERIFY(nodeIndex < NodeCount); - auto nodeIt = Nodes.find(FirstNodeId + nodeIndex); + auto nodeIt = Nodes.find(FirstNodeId + nodeIndex); Y_VERIFY(nodeIt != Nodes.end()); TNodeDataBase* node = nodeIt->second.Get(); - return FindActor(actorId, node); - } - + return FindActor(actorId, node); + } + void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) { - TGuard<TMutex> guard(Mutex); - if (allow) { + TGuard<TMutex> guard(Mutex); + if (allow) { if (VERBOSE) { Cerr << "Actor " << actorId << " added to schedule whitelist"; } - ScheduleWhiteList.insert(actorId); - } else { + ScheduleWhiteList.insert(actorId); + } else { if (VERBOSE) { Cerr << "Actor " << actorId << " removed from schedule whitelist"; } - ScheduleWhiteList.erase(actorId); - } - } - + ScheduleWhiteList.erase(actorId); + } + } + bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const { - TGuard<TMutex> guard(Mutex); - return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end(); - } - + TGuard<TMutex> guard(Mutex); + return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end(); + } + TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - ui32 nodeId = FirstNodeId + nodeIndex; + ui32 nodeId = FirstNodeId + nodeIndex; TNodeDataBase* node = Nodes[nodeId].Get(); - return node->DynamicCounters; - } - + return node->DynamicCounters; + } + void TTestActorRuntimeBase::SetupMonitoring() { NeedMonitoring = true; - } - + } + void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) { Y_VERIFY(nodeIndex < NodeCount); - ui32 nodeId = FirstNodeId + nodeIndex; + ui32 nodeId = FirstNodeId + nodeIndex; TNodeDataBase* node = Nodes[nodeId].Get(); - ui32 targetNode = ev->GetRecipientRewrite().NodeId(); - ui32 targetNodeIndex; - if (targetNode == 0) { - targetNodeIndex = nodeIndex; - } else { - targetNodeIndex = targetNode - FirstNodeId; + ui32 targetNode = ev->GetRecipientRewrite().NodeId(); + ui32 targetNodeIndex; + if (targetNode == 0) { + targetNodeIndex = nodeIndex; + } else { + targetNodeIndex = targetNode - FirstNodeId; Y_VERIFY(targetNodeIndex < NodeCount); - } - - if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) { - node->ActorSystem->Send(ev); - return; - } - + } + + if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) { + node->ActorSystem->Send(ev); + return; + } + Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex)); - TAutoPtr<IEventHandle> evHolder(ev); - + TAutoPtr<IEventHandle> evHolder(ev); + if (!AllowSendFrom(node, evHolder)) { return; } - ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); - TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint); - if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - mbox.PushFront(evHolder); - return; - } - - ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId(); - if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) { + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); + TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint); + if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + mbox.PushFront(evHolder); + return; + } + + ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId(); + if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) { Cerr << "Send event, "; PrintEvent(evHolder, this); - } - - EvCounters[ev->GetTypeRewrite()]++; - - TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); - IActor* recipientActor = mailbox->FindActor(recipientLocalId); - if (recipientActor) { + } + + EvCounters[ev->GetTypeRewrite()]++; + + TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); + IActor* recipientActor = mailbox->FindActor(recipientLocalId); + if (recipientActor) { // Save actorId by value in order to prevent ctx from being invalidated during another Send call. TActorId actorId = ev->GetRecipientRewrite(); - node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite(); + node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite(); TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId); TActivationContext *prevTlsActivationContext = TlsActivationContext; TlsActivationContext = &ctx; - CurrentRecipient = actorId; - { - TInverseGuard<TMutex> inverseGuard(Mutex); + CurrentRecipient = actorId; + { + TInverseGuard<TMutex> inverseGuard(Mutex); #ifdef USE_ACTOR_CALLSTACK TCallstack::GetTlsCallstack() = ev->Callstack; TCallstack::GetTlsCallstack().SetLinesToSkip(); #endif - recipientActor->Receive(evHolder, ctx); + recipientActor->Receive(evHolder, ctx); node->ExecutorThread->DropUnregistered(); - } + } CurrentRecipient = TActorId(); TlsActivationContext = prevTlsActivationContext; - } else { - if (VERBOSE) { + } else { + if (VERBOSE) { Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n"; - } - - auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown); - if (!!forwardedEv) { - node->ActorSystem->Send(forwardedEv); - } - } - } - + } + + auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown); + if (!!forwardedEv) { + node->ActorSystem->Send(forwardedEv); + } + } + } + IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const { - ui32 mailboxHint = actorId.Hint(); - ui64 localId = actorId.LocalId(); - TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); - IActor* actor = mailbox->FindActor(localId); - return actor; - } - + ui32 mailboxHint = actorId.Hint(); + ui64 localId = actorId.LocalId(); + TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); + IActor* actor = mailbox->FindActor(localId); + return actor; + } + THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) { - THolder<TActorSystemSetup> setup(new TActorSystemSetup); - setup->NodeId = FirstNodeId + nodeIndex; + THolder<TActorSystemSetup> setup(new TActorSystemSetup); + setup->NodeId = FirstNodeId + nodeIndex; - if (UseRealThreads) { + if (UseRealThreads) { setup->ExecutorsCount = 5; setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]); - setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20)); - setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20)); - setup->Executors[2].Reset(new TIOExecutorPool(2, 1)); - setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20)); + setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20)); + setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20)); + setup->Executors[2].Reset(new TIOExecutorPool(2, 1)); + setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20)); setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20)); setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100))); - } else { - setup->ExecutorsCount = 1; - setup->Scheduler.Reset(new TSchedulerThreadStub(this, node)); - setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]); - setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0)); - } - + } else { + setup->ExecutorsCount = 1; + setup->Scheduler.Reset(new TSchedulerThreadStub(this, node)); + setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]); + setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0)); + } + InitActorSystemSetup(*setup); return setup; @@ -1635,15 +1635,15 @@ namespace NActors { THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { auto setup = MakeActorSystemSetup(nodeIndex, node); - node->ExecutorPools.resize(setup->ExecutorsCount); - for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { - node->ExecutorPools[i] = setup->Executors[i].Get(); - } - + node->ExecutorPools.resize(setup->ExecutorsCount); + for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { + node->ExecutorPools[i] = setup->Executors[i].Get(); + } + const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect"); - setup->LocalServices = node->LocalServices; - setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount); + setup->LocalServices = node->LocalServices; + setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount); const TActorId nameserviceId = GetNameserviceActorId(); TIntrusivePtr<TInterconnectProxyCommon> common; @@ -1663,10 +1663,10 @@ namespace NActors { common->ClusterUUID = ClusterUUID; common->AcceptUUID = {ClusterUUID}; - for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) { - if (proxyNodeIndex == nodeIndex) - continue; - + for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) { + if (proxyNodeIndex == nodeIndex) + continue; + const ui32 peerNodeId = FirstNodeId + proxyNodeIndex; IActor *proxyActor = UseRealInterconnect @@ -1674,8 +1674,8 @@ namespace NActors { : InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common); setup->Interconnect.ProxyActors[peerNodeId] = {proxyActor, TMailboxType::ReadAsFilled, InterconnectPoolId()}; - } - + } + setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock); if (UseRealInterconnect) { @@ -1691,10 +1691,10 @@ namespace NActors { std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd); setup->LocalServices.push_back(loggerActorPair); } - + return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); - } - + } + TActorSystem* TTestActorRuntimeBase::SingleSys() const { Y_VERIFY(Nodes.size() == 1, "Works only for single system env"); @@ -1716,16 +1716,16 @@ namespace NActors { TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) { - TGuard<TMutex> guard(Mutex); - auto mboxId = TEventMailboxId(nodeId, hint); - auto it = Mailboxes.find(mboxId); - if (it == Mailboxes.end()) { + TGuard<TMutex> guard(Mutex); + auto mboxId = TEventMailboxId(nodeId, hint); + auto it = Mailboxes.find(mboxId); + if (it == Mailboxes.end()) { it = Mailboxes.insert(std::make_pair(mboxId, new TEventMailBox())).first; - } - - return *it->second; - } - + } + + return *it->second; + } + void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) { TGuard<TMutex> guard(Mutex); auto mboxId = TEventMailboxId(nodeId, hint); @@ -1739,36 +1739,36 @@ namespace NActors { return actorId.ToString(); } - struct TStrandingActorDecoratorContext : public TThrRefBase { - TStrandingActorDecoratorContext() + struct TStrandingActorDecoratorContext : public TThrRefBase { + TStrandingActorDecoratorContext() : Queue(new TQueueType) - { - } - + { + } + typedef TOneOneQueueInplace<IEventHandle*, 32> TQueueType; TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue; - }; - - class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> { - public: - class TReplyActor : public TActor<TReplyActor> { - public: + }; + + class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> { + public: + class TReplyActor : public TActor<TReplyActor> { + public: static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; } - TReplyActor(TStrandingActorDecorator* owner) - : TActor(&TReplyActor::StateFunc) - , Owner(owner) - { - } - - STFUNC(StateFunc); - - private: - TStrandingActorDecorator* const Owner; - }; - + TReplyActor(TStrandingActorDecorator* owner) + : TActor(&TReplyActor::StateFunc) + , Owner(owner) + { + } + + STFUNC(StateFunc); + + private: + TStrandingActorDecorator* const Owner; + }; + static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; } @@ -1776,41 +1776,41 @@ namespace NActors { TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors, TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) - : Delegatee(delegatee) - , IsSync(isSync) - , AdditionalActors(additionalActors) - , Context(context) - , HasReply(false) + : Delegatee(delegatee) + , IsSync(isSync) + , AdditionalActors(additionalActors) + , Context(context) + , HasReply(false) , Runtime(runtime) , ReplyChecker(createReplyChecker()) - { - if (IsSync) { + { + if (IsSync) { Y_VERIFY(!runtime->IsRealThreads()); - } - } - - void Bootstrap(const TActorContext& ctx) { - Become(&TStrandingActorDecorator::StateFunc); + } + } + + void Bootstrap(const TActorContext& ctx) { + Become(&TStrandingActorDecorator::StateFunc); ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this)); - DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint())); - for (const auto& actor : AdditionalActors) { - DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint())); - } - - DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint())); - DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint())); - DelegateeOptions.Quiet = true; - } - - STFUNC(StateFunc) { - bool wasEmpty = !Context->Queue->Head(); - Context->Queue->Push(ev.Release()); - if (wasEmpty) { - SendHead(ctx); - } - } - - STFUNC(Reply) { + DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint())); + for (const auto& actor : AdditionalActors) { + DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint())); + } + + DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint())); + DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint())); + DelegateeOptions.Quiet = true; + } + + STFUNC(StateFunc) { + bool wasEmpty = !Context->Queue->Head(); + Context->Queue->Push(ev.Release()); + if (wasEmpty) { + SendHead(ctx); + } + } + + STFUNC(Reply) { Y_VERIFY(!HasReply); IEventHandle *requestEv = Context->Queue->Head(); TActorId originalSender = requestEv->Sender; @@ -1818,85 +1818,85 @@ namespace NActors { if (HasReply) { delete Context->Queue->Pop(); } - ctx.ExecutorThread.Send(ev->Forward(originalSender)); - if (!IsSync && Context->Queue->Head()) { - SendHead(ctx); - } - } - - private: - void SendHead(const TActorContext& ctx) { - if (!IsSync) { - ctx.ExecutorThread.Send(GetForwardedEvent().Release()); - } else { - while (Context->Queue->Head()) { - HasReply = false; + ctx.ExecutorThread.Send(ev->Forward(originalSender)); + if (!IsSync && Context->Queue->Head()) { + SendHead(ctx); + } + } + + private: + void SendHead(const TActorContext& ctx) { + if (!IsSync) { + ctx.ExecutorThread.Send(GetForwardedEvent().Release()); + } else { + while (Context->Queue->Head()) { + HasReply = false; ctx.ExecutorThread.Send(GetForwardedEvent().Release()); int count = 100; while (!HasReply && count > 0) { - try { + try { Runtime->DispatchEvents(DelegateeOptions); - } catch (TEmptyEventQueueException&) { + } catch (TEmptyEventQueueException&) { count--; - Cerr << "No reply" << Endl; - } - } - + Cerr << "No reply" << Endl; + } + } + Runtime->UpdateCurrentTime(Runtime->GetCurrentTime() + TDuration::MicroSeconds(1000)); - } - } - } - - TAutoPtr<IEventHandle> GetForwardedEvent() { - IEventHandle* ev = Context->Queue->Head(); + } + } + } + + TAutoPtr<IEventHandle> GetForwardedEvent() { + IEventHandle* ev = Context->Queue->Head(); ReplyChecker->OnRequest(ev); - TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent() - ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie) + TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent() + ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie) : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie); - - return forwardedEv; - } - private: + + return forwardedEv; + } + private: const TActorId Delegatee; - const bool IsSync; + const bool IsSync; const TVector<TActorId> AdditionalActors; TSimpleSharedPtr<TStrandingActorDecoratorContext> Context; TActorId ReplyId; - bool HasReply; - TDispatchOptions DelegateeOptions; + bool HasReply; + TDispatchOptions DelegateeOptions; TTestActorRuntimeBase* Runtime; THolder<IReplyChecker> ReplyChecker; - }; - - void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) { - Owner->Reply(ev, ctx); - } - - class TStrandingDecoratorFactory : public IStrandingDecoratorFactory { - public: + }; + + void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) { + Owner->Reply(ev, ctx); + } + + class TStrandingDecoratorFactory : public IStrandingDecoratorFactory { + public: TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) - : Context(new TStrandingActorDecoratorContext()) + : Context(new TStrandingActorDecoratorContext()) , Runtime(runtime) , CreateReplyChecker(createReplyChecker) - { - } - + { + } + IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override { return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime, CreateReplyChecker); - } - - private: + } + + private: TSimpleSharedPtr<TStrandingActorDecoratorContext> Context; TTestActorRuntimeBase* Runtime; TReplyCheckerCreator CreateReplyChecker; - }; - + }; + TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) { return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker)); - } - - ui64 DefaultRandomSeed = 9999; -} + } + + ui64 DefaultRandomSeed = 9999; +} diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 06eadb15de..26e3b45c98 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -1,4 +1,4 @@ -#pragma once +#pragma once #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/actorsystem.h> @@ -9,27 +9,27 @@ #include <library/cpp/actors/util/should_continue.h> #include <library/cpp/actors/interconnect/poller_tcp.h> #include <library/cpp/actors/interconnect/mock/ic_mock.h> -#include <library/cpp/random_provider/random_provider.h> -#include <library/cpp/time_provider/time_provider.h> +#include <library/cpp/random_provider/random_provider.h> +#include <library/cpp/time_provider/time_provider.h> #include <library/cpp/testing/unittest/tests_data.h> - -#include <util/datetime/base.h> -#include <util/folder/tempdir.h> -#include <util/generic/deque.h> + +#include <util/datetime/base.h> +#include <util/folder/tempdir.h> +#include <util/generic/deque.h> #include <util/generic/hash.h> -#include <util/generic/noncopyable.h> -#include <util/generic/ptr.h> -#include <util/generic/queue.h> -#include <util/generic/set.h> -#include <util/generic/vector.h> -#include <util/system/defaults.h> -#include <util/system/mutex.h> -#include <util/system/condvar.h> -#include <util/system/thread.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/queue.h> +#include <util/generic/set.h> +#include <util/generic/vector.h> +#include <util/system/defaults.h> +#include <util/system/mutex.h> +#include <util/system/condvar.h> +#include <util/system/thread.h> #include <util/system/sanitizers.h> #include <util/system/valgrind.h> #include <utility> - + #include <functional> const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer( @@ -38,141 +38,141 @@ const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer( ); -namespace NActors { +namespace NActors { struct THeSingleSystemEnv { }; - struct TEventMailboxId { - TEventMailboxId() - : NodeId(0) - , Hint(0) - { - } - - TEventMailboxId(ui32 nodeId, ui32 hint) - : NodeId(nodeId) - , Hint(hint) - { - } - - bool operator<(const TEventMailboxId& other) const { - return (NodeId < other.NodeId) || (NodeId == other.NodeId) && (Hint < other.Hint); - } - - bool operator==(const TEventMailboxId& other) const { - return (NodeId == other.NodeId) && (Hint == other.Hint); - } - + struct TEventMailboxId { + TEventMailboxId() + : NodeId(0) + , Hint(0) + { + } + + TEventMailboxId(ui32 nodeId, ui32 hint) + : NodeId(nodeId) + , Hint(hint) + { + } + + bool operator<(const TEventMailboxId& other) const { + return (NodeId < other.NodeId) || (NodeId == other.NodeId) && (Hint < other.Hint); + } + + bool operator==(const TEventMailboxId& other) const { + return (NodeId == other.NodeId) && (Hint == other.Hint); + } + struct THash { ui64 operator()(const TEventMailboxId& mboxId) const noexcept { return mboxId.NodeId * 31ULL + mboxId.Hint; } }; - ui32 NodeId; - ui32 Hint; - }; - - struct TDispatchOptions { - struct TFinalEventCondition { + ui32 NodeId; + ui32 Hint; + }; + + struct TDispatchOptions { + struct TFinalEventCondition { std::function<bool(IEventHandle& ev)> EventCheck; - ui32 RequiredCount; - - TFinalEventCondition(ui32 eventType, ui32 requiredCount = 1) + ui32 RequiredCount; + + TFinalEventCondition(ui32 eventType, ui32 requiredCount = 1) : EventCheck([eventType](IEventHandle& ev) -> bool { return ev.GetTypeRewrite() == eventType; }) - , RequiredCount(requiredCount) - { - } + , RequiredCount(requiredCount) + { + } TFinalEventCondition(std::function<bool(IEventHandle& ev)> eventCheck, ui32 requiredCount = 1) : EventCheck(eventCheck) , RequiredCount(requiredCount) { } - }; - + }; + TVector<TFinalEventCondition> FinalEvents; TVector<TEventMailboxId> NonEmptyMailboxes; TVector<TEventMailboxId> OnlyMailboxes; std::function<bool()> CustomFinalCondition; bool Quiet = false; - }; - - struct TScheduledEventQueueItem { - TInstant Deadline; - TAutoPtr<IEventHandle> Event; + }; + + struct TScheduledEventQueueItem { + TInstant Deadline; + TAutoPtr<IEventHandle> Event; TAutoPtr<TSchedulerCookieHolder> Cookie; - ui64 UniqueId; - - TScheduledEventQueueItem(TInstant deadline, TAutoPtr<IEventHandle> event, ISchedulerCookie* cookie) - : Deadline(deadline) - , Event(event) + ui64 UniqueId; + + TScheduledEventQueueItem(TInstant deadline, TAutoPtr<IEventHandle> event, ISchedulerCookie* cookie) + : Deadline(deadline) + , Event(event) , Cookie(new TSchedulerCookieHolder(cookie)) - , UniqueId(++NextUniqueId) - {} - - bool operator<(const TScheduledEventQueueItem& other) const { - if (Deadline < other.Deadline) - return true; - - if (Deadline > other.Deadline) - return false; - - return UniqueId < other.UniqueId; - } - - static ui64 NextUniqueId; - }; - + , UniqueId(++NextUniqueId) + {} + + bool operator<(const TScheduledEventQueueItem& other) const { + if (Deadline < other.Deadline) + return true; + + if (Deadline > other.Deadline) + return false; + + return UniqueId < other.UniqueId; + } + + static ui64 NextUniqueId; + }; + typedef TDeque<TAutoPtr<IEventHandle>> TEventsList; typedef TSet<TScheduledEventQueueItem> TScheduledEventsList; - - class TEventMailBox : public TThrRefBase { - public: - TEventMailBox() + + class TEventMailBox : public TThrRefBase { + public: + TEventMailBox() : InactiveUntil(TInstant::MicroSeconds(0)) -#ifdef DEBUG_ORDER_EVENTS - , ExpectedReceive(0) - , NextToSend(0) -#endif - { - } - - void Send(TAutoPtr<IEventHandle> ev); - bool IsEmpty() const; - TAutoPtr<IEventHandle> Pop(); - void Capture(TEventsList& evList); - void PushFront(TAutoPtr<IEventHandle>& ev); - void PushFront(TEventsList& evList); - void CaptureScheduled(TScheduledEventsList& evList); - void PushScheduled(TScheduledEventsList& evList); - bool IsActive(const TInstant& currentTime) const; - void Freeze(const TInstant& deadline); - TInstant GetInactiveUntil() const; - void Schedule(const TScheduledEventQueueItem& item); - bool IsScheduledEmpty() const; +#ifdef DEBUG_ORDER_EVENTS + , ExpectedReceive(0) + , NextToSend(0) +#endif + { + } + + void Send(TAutoPtr<IEventHandle> ev); + bool IsEmpty() const; + TAutoPtr<IEventHandle> Pop(); + void Capture(TEventsList& evList); + void PushFront(TAutoPtr<IEventHandle>& ev); + void PushFront(TEventsList& evList); + void CaptureScheduled(TScheduledEventsList& evList); + void PushScheduled(TScheduledEventsList& evList); + bool IsActive(const TInstant& currentTime) const; + void Freeze(const TInstant& deadline); + TInstant GetInactiveUntil() const; + void Schedule(const TScheduledEventQueueItem& item); + bool IsScheduledEmpty() const; TInstant GetFirstScheduleDeadline() const; ui64 GetSentEventCount() const; - - private: - TScheduledEventsList Scheduled; - TInstant InactiveUntil; - TEventsList Sent; -#ifdef DEBUG_ORDER_EVENTS + + private: + TScheduledEventsList Scheduled; + TInstant InactiveUntil; + TEventsList Sent; +#ifdef DEBUG_ORDER_EVENTS TMap<IEventHandle*, ui64> TrackSent; - ui64 ExpectedReceive; - ui64 NextToSend; -#endif - }; - + ui64 ExpectedReceive; + ui64 NextToSend; +#endif + }; + typedef THashMap<TEventMailboxId, TIntrusivePtr<TEventMailBox>, TEventMailboxId::THash> TEventMailBoxList; - - class TEmptyEventQueueException : public yexception { - public: - TEmptyEventQueueException() { - Append("Event queue is still empty."); - } - }; - + + class TEmptyEventQueueException : public yexception { + public: + TEmptyEventQueueException() { + Append("Event queue is still empty."); + } + }; + class TSchedulingLimitReachedException : public yexception { public: TSchedulingLimitReachedException(ui64 limit) { @@ -183,83 +183,83 @@ namespace NActors { }; class TTestActorRuntimeBase: public TNonCopyable { - public: - class TEdgeActor; - class TSchedulerThreadStub; - class TExecutorPoolStub; - class TTimeProvider; - - enum class EEventAction { - PROCESS, - DROP, - RESCHEDULE - }; - + public: + class TEdgeActor; + class TSchedulerThreadStub; + class TExecutorPoolStub; + class TTimeProvider; + + enum class EEventAction { + PROCESS, + DROP, + RESCHEDULE + }; + typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver; typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector; typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter; typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter; typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver; - + TTestActorRuntimeBase(THeSingleSystemEnv); TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount); TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false); virtual ~TTestActorRuntimeBase(); - bool IsRealThreads() const; + bool IsRealThreads() const; static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline); static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId); - TEventObserver SetObserverFunc(TEventObserver observerFunc); - TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc); - TEventFilter SetEventFilter(TEventFilter filterFunc); - TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc); - TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc); + TEventObserver SetObserverFunc(TEventObserver observerFunc); + TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc); + TEventFilter SetEventFilter(TEventFilter filterFunc); + TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc); + TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc); static bool IsVerbose(); static void SetVerbose(bool verbose); - TDuration SetDispatchTimeout(TDuration timeout); + TDuration SetDispatchTimeout(TDuration timeout); void SetDispatchedEventsLimit(ui64 limit) { DispatchedEventsLimit = limit; } - TDuration SetReschedulingDelay(TDuration delay); + TDuration SetReschedulingDelay(TDuration delay); void SetLogBackend(const TAutoPtr<TLogBackend> logBackend); - void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority); - TIntrusivePtr<ITimeProvider> GetTimeProvider(); - TInstant GetCurrentTime() const; - void UpdateCurrentTime(TInstant newTime); + void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority); + TIntrusivePtr<ITimeProvider> GetTimeProvider(); + TInstant GetCurrentTime() const; + void UpdateCurrentTime(TInstant newTime); void AdvanceCurrentTime(TDuration duration); void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0); virtual void Initialize(); - ui32 GetNodeId(ui32 index = 0) const; - ui32 GetNodeCount() const; - ui64 AllocateLocalId(); + ui32 GetNodeId(ui32 index = 0) const; + ui32 GetNodeCount() const; + ui64 AllocateLocalId(); ui32 InterconnectPoolId() const; TString GetTempDir(); TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0, - TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0, + TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0, const TActorId& parentid = TActorId()); TActorId Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentid = TActorId()); TActorId RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex = 0); TActorId AllocateEdgeActor(ui32 nodeIndex = 0); - TEventsList CaptureEvents(); - TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId); - TScheduledEventsList CaptureScheduledEvents(); - void PushFront(TAutoPtr<IEventHandle>& ev); - void PushEventsFront(TEventsList& events); - void PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events); - // doesn't dispatch events for edge actors + TEventsList CaptureEvents(); + TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId); + TScheduledEventsList CaptureScheduledEvents(); + void PushFront(TAutoPtr<IEventHandle>& ev); + void PushEventsFront(TEventsList& events); + void PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events); + // doesn't dispatch events for edge actors bool DispatchEvents(const TDispatchOptions& options = TDispatchOptions()); bool DispatchEvents(const TDispatchOptions& options, TDuration simTimeout); bool DispatchEvents(const TDispatchOptions& options, TInstant simDeadline); - void Send(IEventHandle* ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false); - void Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex = 0); - void ClearCounters(); - ui64 GetCounter(ui32 evType) const; + void Send(IEventHandle* ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false); + void Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex = 0); + void ClearCounters(); + ui64 GetCounter(ui32 evType) const; TActorId GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex = 0); void WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter = {}, TDuration simTimeout = TDuration::Max()); TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo); @@ -267,7 +267,7 @@ namespace NActors { IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const; void EnableScheduleForActor(const TActorId& actorId, bool allow = true); bool IsScheduleForActorEnabled(const TActorId& actorId) const; - TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0); + TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0); void SetupMonitoring(); template<typename T> @@ -287,24 +287,24 @@ namespace NActors { TActorSystem* SingleSys() const; TActorSystem* GetAnyNodeActorSystem(); TActorSystem* GetActorSystem(ui32 nodeId); - template <typename TEvent> + template <typename TEvent> TEvent* GrabEdgeEventIf(TAutoPtr<IEventHandle>& handle, std::function<bool(const TEvent&)> predicate, TDuration simTimeout = TDuration::Max()) { - handle.Destroy(); - const ui32 eventType = TEvent::EventType; + handle.Destroy(); + const ui32 eventType = TEvent::EventType; WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); - if (event->GetTypeRewrite() != eventType) - return false; - - TEvent* typedEvent = reinterpret_cast<TAutoPtr<TEventHandle<TEvent>>&>(event)->Get(); - if (predicate(*typedEvent)) { - handle = event; - return true; - } - - return false; + if (event->GetTypeRewrite() != eventType) + return false; + + TEvent* typedEvent = reinterpret_cast<TAutoPtr<TEventHandle<TEvent>>&>(event)->Get(); + if (predicate(*typedEvent)) { + handle = event; + return true; + } + + return false; }, {}, simTimeout); - + if (simTimeout == TDuration::Max()) Y_VERIFY(handle); @@ -313,8 +313,8 @@ namespace NActors { } else { return nullptr; } - } - + } + template<class TEvent> typename TEvent::TPtr GrabEdgeEventIf( const TSet<TActorId>& edgeFilter, @@ -353,12 +353,12 @@ namespace NActors { return GrabEdgeEventIf<TEvent>(edgeFilter, predicate, simTimeout); } - template <typename TEvent> + template <typename TEvent> TEvent* GrabEdgeEvent(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) { std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; }; return GrabEdgeEventIf(handle, truth, simTimeout); - } - + } + template <typename TEvent> THolder<TEvent> GrabEdgeEvent(TDuration simTimeout = TDuration::Max()) { TAutoPtr<IEventHandle> handle; @@ -494,18 +494,18 @@ namespace NActors { private: IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const; - void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem); - TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint); + void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem); + TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint); void ClearMailbox(ui32 nodeId, ui32 hint); - void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId); - void UpdateFinalEventsStatsForEachContext(IEventHandle& ev); + void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId); + void UpdateFinalEventsStatsForEachContext(IEventHandle& ev); bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline); - - private: + + private: ui64 ScheduledCount; ui64 ScheduledLimit; - THolder<TTempDir> TmpDir; - const TThread::TId MainThreadId; + THolder<TTempDir> TmpDir; + const TThread::TId MainThreadId; protected: bool UseRealInterconnect = false; @@ -513,25 +513,25 @@ namespace NActors { bool IsInitialized = false; bool SingleSysEnv = false; const TString ClusterUUID; - const ui32 FirstNodeId; - const ui32 NodeCount; + const ui32 FirstNodeId; + const ui32 NodeCount; const ui32 DataCenterCount; - const bool UseRealThreads; + const bool UseRealThreads; - ui64 LocalId; - TMutex Mutex; - TCondVar MailboxesHasEvents; - TEventMailBoxList Mailboxes; + ui64 LocalId; + TMutex Mutex; + TCondVar MailboxesHasEvents; + TEventMailBoxList Mailboxes; TMap<ui32, ui64> EvCounters; - ui64 DispatchCyclesCount; - ui64 DispatchedEventsCount; + ui64 DispatchCyclesCount; + ui64 DispatchedEventsCount; ui64 DispatchedEventsLimit = 2'500'000; TActorId CurrentRecipient; ui64 DispatcherRandomSeed; - TIntrusivePtr<IRandomProvider> DispatcherRandomProvider; + TIntrusivePtr<IRandomProvider> DispatcherRandomProvider; TAutoPtr<TLogBackend> LogBackend; bool NeedMonitoring; - + TIntrusivePtr<IRandomProvider> RandomProvider; TIntrusivePtr<ITimeProvider> TimeProvider; @@ -554,27 +554,27 @@ namespace NActors { return static_cast<T*>(AppData0.get()); } - TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; - TIntrusivePtr<NActors::NLog::TSettings> LogSettings; + TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; + TIntrusivePtr<NActors::NLog::TSettings> LogSettings; TIntrusivePtr<NInterconnect::TPollerThreads> Poller; - volatile ui64* ActorSystemTimestamp; + volatile ui64* ActorSystemTimestamp; volatile ui64* ActorSystemMonotonic; TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices; TMap<TActorId, IActor*> LocalServicesActors; TMap<IActor*, TActorId> ActorToActorId; - THolder<TMailboxTable> MailboxTable; + THolder<TMailboxTable> MailboxTable; std::shared_ptr<void> AppData0; - THolder<TActorSystem> ActorSystem; + THolder<TActorSystem> ActorSystem; THolder<IExecutorPool> SchedulerPool; TVector<IExecutorPool*> ExecutorPools; - THolder<TExecutorThread> ExecutorThread; + THolder<TExecutorThread> ExecutorThread; }; - + struct INodeFactory { virtual ~INodeFactory() = default; virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0; - }; - + }; + struct TDefaultNodeFactory final: INodeFactory { virtual TIntrusivePtr<TNodeDataBase> CreateNode() override { return new TNodeDataBase(); @@ -601,94 +601,94 @@ namespace NActors { private: void InitNode(TNodeDataBase* node, size_t idx); - struct TDispatchContext { - const TDispatchOptions* Options; - TDispatchContext* PrevContext; - + struct TDispatchContext { + const TDispatchOptions* Options; + TDispatchContext* PrevContext; + TMap<const TDispatchOptions::TFinalEventCondition*, ui32> FinalEventFrequency; TSet<TEventMailboxId> FoundNonEmptyMailboxes; bool FinalEventFound = false; - }; - + }; + TProgramShouldContinue ShouldContinue; TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes; - ui64 CurrentTimestamp; + ui64 CurrentTimestamp; TSet<TActorId> EdgeActors; THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox; - TDuration DispatchTimeout; - TDuration ReschedulingDelay; - TEventObserver ObserverFunc; - TScheduledEventsSelector ScheduledEventsSelectorFunc; - TEventFilter EventFilterFunc; - TScheduledEventFilter ScheduledEventFilterFunc; - TRegistrationObserver RegistrationObserver; + TDuration DispatchTimeout; + TDuration ReschedulingDelay; + TEventObserver ObserverFunc; + TScheduledEventsSelector ScheduledEventsSelectorFunc; + TEventFilter EventFilterFunc; + TScheduledEventFilter ScheduledEventFilterFunc; + TRegistrationObserver RegistrationObserver; TSet<TActorId> BlockedOutput; TSet<TActorId> ScheduleWhiteList; THashMap<TActorId, TActorId> ScheduleWhiteListParent; THashMap<TActorId, TString> ActorNames; TDispatchContext* CurrentDispatchContext; TVector<ui64> TxAllocatorTabletIds; - - static ui32 NextNodeId; - }; - - template <typename TEvent> - TEvent* FindEvent(TEventsList& events) { - for (auto& event : events) { - if (event && event->GetTypeRewrite() == TEvent::EventType) { - return static_cast<TEvent*>(event->GetBase()); - } - } - - return nullptr; - } - - template <typename TEvent> + + static ui32 NextNodeId; + }; + + template <typename TEvent> + TEvent* FindEvent(TEventsList& events) { + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType) { + return static_cast<TEvent*>(event->GetBase()); + } + } + + return nullptr; + } + + template <typename TEvent> TEvent* FindEvent(TEventsList& events, const std::function<bool(const TEvent&)>& predicate) { - for (auto& event : events) { - if (event && event->GetTypeRewrite() == TEvent::EventType && predicate(*static_cast<TEvent*>(event->GetBase()))) { - return static_cast<TEvent*>(event->GetBase()); - } - } - - return nullptr; - } - - template <typename TEvent> - TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev) { - ev.Destroy(); - for (auto& event : events) { - if (event && event->GetTypeRewrite() == TEvent::EventType) { - ev = event; - return static_cast<TEvent*>(ev->GetBase()); - } - } - - return nullptr; - } - - template <typename TEvent> - TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev, + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType && predicate(*static_cast<TEvent*>(event->GetBase()))) { + return static_cast<TEvent*>(event->GetBase()); + } + } + + return nullptr; + } + + template <typename TEvent> + TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev) { + ev.Destroy(); + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType) { + ev = event; + return static_cast<TEvent*>(ev->GetBase()); + } + } + + return nullptr; + } + + template <typename TEvent> + TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev, const std::function<bool(const typename TEvent::TPtr&)>& predicate) { - ev.Destroy(); - for (auto& event : events) { - if (event && event->GetTypeRewrite() == TEvent::EventType) { - if (predicate(reinterpret_cast<const typename TEvent::TPtr&>(event))) { - ev = event; - return static_cast<TEvent*>(ev->GetBase()); - } - } - } - - return nullptr; - } - - class IStrandingDecoratorFactory { - public: - virtual ~IStrandingDecoratorFactory() {} + ev.Destroy(); + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType) { + if (predicate(reinterpret_cast<const typename TEvent::TPtr&>(event))) { + ev = event; + return static_cast<TEvent*>(ev->GetBase()); + } + } + } + + return nullptr; + } + + class IStrandingDecoratorFactory { + public: + virtual ~IStrandingDecoratorFactory() {} virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0; - }; - + }; + struct IReplyChecker { virtual ~IReplyChecker() {} virtual void OnRequest(IEventHandle *request) = 0; @@ -712,5 +712,5 @@ namespace NActors { TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker = CreateNoneReplyChecker); - extern ui64 DefaultRandomSeed; -} + extern ui64 DefaultRandomSeed; +} diff --git a/library/cpp/actors/testlib/ya.make b/library/cpp/actors/testlib/ya.make index fbd9ec2dc9..1afb3f6059 100644 --- a/library/cpp/actors/testlib/ya.make +++ b/library/cpp/actors/testlib/ya.make @@ -12,8 +12,8 @@ PEERDIR( library/cpp/actors/core library/cpp/actors/interconnect/mock library/cpp/actors/protos - library/cpp/random_provider - library/cpp/time_provider + library/cpp/random_provider + library/cpp/time_provider ) IF (GCC) diff --git a/library/cpp/actors/util/local_process_key.h b/library/cpp/actors/util/local_process_key.h index 29a81968a8..172f08fc73 100644 --- a/library/cpp/actors/util/local_process_key.h +++ b/library/cpp/actors/util/local_process_key.h @@ -1,30 +1,30 @@ -#pragma once - +#pragma once + #include <util/string/builder.h> -#include <util/generic/strbuf.h> -#include <util/generic/vector.h> -#include <util/generic/hash.h> -#include <util/generic/singleton.h> +#include <util/generic/strbuf.h> +#include <util/generic/vector.h> +#include <util/generic/hash.h> +#include <util/generic/singleton.h> #include <util/generic/serialized_enum.h> - -template <typename T> -class TLocalProcessKeyState { - -template <typename U, const char* Name> -friend class TLocalProcessKey; + +template <typename T> +class TLocalProcessKeyState { + +template <typename U, const char* Name> +friend class TLocalProcessKey; template <typename U, typename EnumT> friend class TEnumProcessKey; - -public: - static TLocalProcessKeyState& GetInstance() { - return *Singleton<TLocalProcessKeyState<T>>(); - } - - size_t GetCount() const { + +public: + static TLocalProcessKeyState& GetInstance() { + return *Singleton<TLocalProcessKeyState<T>>(); + } + + size_t GetCount() const { return StartIndex + Names.size(); - } - - TStringBuf GetNameByIndex(size_t index) const { + } + + TStringBuf GetNameByIndex(size_t index) const { if (index < StartIndex) { return StaticNames[index]; } else { @@ -32,24 +32,24 @@ public: Y_ENSURE(index < Names.size()); return Names[index]; } - } - - size_t GetIndexByName(TStringBuf name) const { - auto it = Map.find(name); - Y_ENSURE(it != Map.end()); - return it->second; - } - -private: - size_t Register(TStringBuf name) { + } + + size_t GetIndexByName(TStringBuf name) const { + auto it = Map.find(name); + Y_ENSURE(it != Map.end()); + return it->second; + } + +private: + size_t Register(TStringBuf name) { auto x = Map.emplace(name, Names.size()+StartIndex); - if (x.second) { - Names.emplace_back(name); - } - - return x.first->second; - } - + if (x.second) { + Names.emplace_back(name); + } + + return x.first->second; + } + size_t Register(TStringBuf name, ui32 index) { Y_VERIFY(index < StartIndex); auto x = Map.emplace(name, index); @@ -58,7 +58,7 @@ private: return x.first->second; } -private: +private: static constexpr ui32 StartIndex = 2000; TVector<TString> FillStaticNames() { @@ -73,22 +73,22 @@ private: TVector<TString> StaticNames = FillStaticNames(); TVector<TString> Names; THashMap<TString, size_t> Map; -}; - -template <typename T, const char* Name> -class TLocalProcessKey { -public: - static TStringBuf GetName() { - return Name; - } - - static size_t GetIndex() { - return Index; - } - -private: - inline static size_t Index = TLocalProcessKeyState<T>::GetInstance().Register(Name); -}; +}; + +template <typename T, const char* Name> +class TLocalProcessKey { +public: + static TStringBuf GetName() { + return Name; + } + + static size_t GetIndex() { + return Index; + } + +private: + inline static size_t Index = TLocalProcessKeyState<T>::GetInstance().Register(Name); +}; template <typename T, typename EnumT> class TEnumProcessKey { diff --git a/library/cpp/actors/util/threadparkpad.cpp b/library/cpp/actors/util/threadparkpad.cpp index ff1cd248d3..74069ff15b 100644 --- a/library/cpp/actors/util/threadparkpad.cpp +++ b/library/cpp/actors/util/threadparkpad.cpp @@ -43,7 +43,7 @@ namespace NActors { }; #elif defined _win32_ -#include <util/generic/bt_exception.h> +#include <util/generic/bt_exception.h> #include <util/generic/yexception.h> namespace NActors { diff --git a/library/cpp/actors/util/ya.make b/library/cpp/actors/util/ya.make index b3aeabf6b6..37488c3962 100644 --- a/library/cpp/actors/util/ya.make +++ b/library/cpp/actors/util/ya.make @@ -14,7 +14,7 @@ SRCS( funnel_queue.h futex.h intrinsics.h - local_process_key.h + local_process_key.h named_tuple.h queue_chunk.h queue_oneone_inplace.h diff --git a/library/cpp/monlib/dynamic_counters/counters.h b/library/cpp/monlib/dynamic_counters/counters.h index fd027ddc99..dc178cfbe0 100644 --- a/library/cpp/monlib/dynamic_counters/counters.h +++ b/library/cpp/monlib/dynamic_counters/counters.h @@ -81,11 +81,11 @@ namespace NMonitoring { } }; -#ifdef _MSC_VER -#pragma warning(push) -#pragma warning(disable : 4522) // multiple assignment operators specified -#endif // _MSC_VER - +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4522) // multiple assignment operators specified +#endif // _MSC_VER + struct TCounterForPtr: public TDeprecatedCounter, public TCountableBase { TCounterForPtr(bool derivative = false, EVisibility vis = EVisibility::Public) : TDeprecatedCounter(0ULL, derivative) @@ -95,7 +95,7 @@ namespace NMonitoring { TCounterForPtr(const TCounterForPtr&) = delete; TCounterForPtr& operator=(const TCounterForPtr& other) = delete; - + void Accept( const TString& labelName, const TString& labelValue, ICountableConsumer& consumer) const override { @@ -181,10 +181,10 @@ namespace NMonitoring { using THistogramPtr = TIntrusivePtr<THistogramCounter>; -#ifdef _MSC_VER -#pragma warning(pop) -#endif - +#ifdef _MSC_VER +#pragma warning(pop) +#endif + struct TDynamicCounters; typedef TIntrusivePtr<TDynamicCounters> TDynamicCounterPtr; diff --git a/library/cpp/random_provider/random_provider.cpp b/library/cpp/random_provider/random_provider.cpp index 5f09d8358a..64cb48b8b7 100644 --- a/library/cpp/random_provider/random_provider.cpp +++ b/library/cpp/random_provider/random_provider.cpp @@ -1,75 +1,75 @@ -#include "random_provider.h" -#include <util/random/mersenne.h> -#include <util/random/random.h> +#include "random_provider.h" +#include <util/random/mersenne.h> +#include <util/random/random.h> #include <util/system/unaligned_mem.h> - -namespace { - void SetV4(TGUID& g) { - g.dw[1] &= 0x0fffffff; - g.dw[1] |= 0x40000000; - - g.dw[2] &= 0xffffff3f; - g.dw[2] |= 0x00000080; - } -} - -class TDefaultRandomProvider: public IRandomProvider { -public: - ui64 GenRand() noexcept override { - return RandomNumber<ui64>(); - } - - TGUID GenGuid() noexcept override { - TGUID ret; - CreateGuid(&ret); - return ret; - } - + +namespace { + void SetV4(TGUID& g) { + g.dw[1] &= 0x0fffffff; + g.dw[1] |= 0x40000000; + + g.dw[2] &= 0xffffff3f; + g.dw[2] |= 0x00000080; + } +} + +class TDefaultRandomProvider: public IRandomProvider { +public: + ui64 GenRand() noexcept override { + return RandomNumber<ui64>(); + } + + TGUID GenGuid() noexcept override { + TGUID ret; + CreateGuid(&ret); + return ret; + } + TGUID GenUuid4() noexcept override { - TGUID ret; + TGUID ret; WriteUnaligned<ui64>(ret.dw, RandomNumber<ui64>()); WriteUnaligned<ui64>(ret.dw + 2, RandomNumber<ui64>()); - SetV4(ret); - return ret; - } -}; - -class TDeterministicRandomProvider: public IRandomProvider { -public: - TDeterministicRandomProvider(ui64 seed) - : Gen(seed) - { - } - - ui64 GenRand() noexcept override { - return Gen.GenRand(); - } - - TGUID GenGuid() noexcept override { - TGUID ret; + SetV4(ret); + return ret; + } +}; + +class TDeterministicRandomProvider: public IRandomProvider { +public: + TDeterministicRandomProvider(ui64 seed) + : Gen(seed) + { + } + + ui64 GenRand() noexcept override { + return Gen.GenRand(); + } + + TGUID GenGuid() noexcept override { + TGUID ret; WriteUnaligned<ui64>(ret.dw, Gen.GenRand()); - ret.dw[2] = (ui32)Gen.GenRand(); - ret.dw[3] = ++GuidCount; - return ret; - } - + ret.dw[2] = (ui32)Gen.GenRand(); + ret.dw[3] = ++GuidCount; + return ret; + } + TGUID GenUuid4() noexcept override { - TGUID ret; + TGUID ret; WriteUnaligned<ui64>(ret.dw, Gen.GenRand()); WriteUnaligned<ui64>(ret.dw + 2, Gen.GenRand()); - SetV4(ret); - return ret; - } - -private: - TMersenne<ui64> Gen; - ui32 GuidCount = 0; -}; - -TIntrusivePtr<IRandomProvider> CreateDefaultRandomProvider() { - return TIntrusivePtr<IRandomProvider>(new TDefaultRandomProvider()); -} - -TIntrusivePtr<IRandomProvider> CreateDeterministicRandomProvider(ui64 seed) { - return TIntrusivePtr<IRandomProvider>(new TDeterministicRandomProvider(seed)); -} + SetV4(ret); + return ret; + } + +private: + TMersenne<ui64> Gen; + ui32 GuidCount = 0; +}; + +TIntrusivePtr<IRandomProvider> CreateDefaultRandomProvider() { + return TIntrusivePtr<IRandomProvider>(new TDefaultRandomProvider()); +} + +TIntrusivePtr<IRandomProvider> CreateDeterministicRandomProvider(ui64 seed) { + return TIntrusivePtr<IRandomProvider>(new TDeterministicRandomProvider(seed)); +} diff --git a/library/cpp/random_provider/random_provider.h b/library/cpp/random_provider/random_provider.h index 41526f00cf..6e7226866f 100644 --- a/library/cpp/random_provider/random_provider.h +++ b/library/cpp/random_provider/random_provider.h @@ -1,14 +1,14 @@ -#pragma once - -#include <util/generic/guid.h> -#include <util/random/common_ops.h> - -class IRandomProvider: public TThrRefBase, public TCommonRNG<ui64, IRandomProvider> { -public: - virtual TGUID GenGuid() noexcept = 0; - virtual TGUID GenUuid4() noexcept = 0; - virtual ui64 GenRand() noexcept = 0; // for TCommonRNG -}; - -TIntrusivePtr<IRandomProvider> CreateDefaultRandomProvider(); -TIntrusivePtr<IRandomProvider> CreateDeterministicRandomProvider(ui64 seed); +#pragma once + +#include <util/generic/guid.h> +#include <util/random/common_ops.h> + +class IRandomProvider: public TThrRefBase, public TCommonRNG<ui64, IRandomProvider> { +public: + virtual TGUID GenGuid() noexcept = 0; + virtual TGUID GenUuid4() noexcept = 0; + virtual ui64 GenRand() noexcept = 0; // for TCommonRNG +}; + +TIntrusivePtr<IRandomProvider> CreateDefaultRandomProvider(); +TIntrusivePtr<IRandomProvider> CreateDeterministicRandomProvider(ui64 seed); diff --git a/library/cpp/random_provider/ya.make b/library/cpp/random_provider/ya.make index 908a491f36..38d1f070e8 100644 --- a/library/cpp/random_provider/ya.make +++ b/library/cpp/random_provider/ya.make @@ -1,13 +1,13 @@ -LIBRARY() - +LIBRARY() + OWNER( g:kikimr g:yql ) - -SRCS( - random_provider.cpp - random_provider.h -) - -END() + +SRCS( + random_provider.cpp + random_provider.h +) + +END() diff --git a/library/cpp/regex/pire/regexp.h b/library/cpp/regex/pire/regexp.h index 60f449b9af..94bba4064b 100644 --- a/library/cpp/regex/pire/regexp.h +++ b/library/cpp/regex/pire/regexp.h @@ -4,7 +4,7 @@ #include <library/cpp/charset/doccodes.h> #include <library/cpp/charset/recyr.hh> -#include <util/generic/maybe.h> +#include <util/generic/maybe.h> #include <util/generic/strbuf.h> #include <util/generic/string.h> #include <util/generic/vector.h> @@ -42,7 +42,7 @@ namespace NRegExp { bool CaseInsensitive = false; bool Surround = false; - TMaybe<size_t> CapturePos; + TMaybe<size_t> CapturePos; ECharset Charset = CODES_UNKNOWN; bool AndNotSupport = false; }; @@ -70,8 +70,8 @@ namespace NRegExp { lexer.AddFeature(NPire::NFeatures::CaseInsensitive()); } - if (opts.CapturePos) { - lexer.AddFeature(NPire::NFeatures::Capture(*opts.CapturePos)); + if (opts.CapturePos) { + lexer.AddFeature(NPire::NFeatures::Capture(*opts.CapturePos)); } if (opts.AndNotSupport) { diff --git a/library/cpp/testing/unittest/utmain.cpp b/library/cpp/testing/unittest/utmain.cpp index 33b0ccc63a..305bc6b40f 100644 --- a/library/cpp/testing/unittest/utmain.cpp +++ b/library/cpp/testing/unittest/utmain.cpp @@ -584,13 +584,13 @@ public: _CrtSetReportMode(_CRT_ERROR, _CRTDBG_MODE_FILE | _CRTDBG_MODE_DEBUG); _CrtSetReportFile(_CRT_ERROR, _CRTDBG_FILE_STDERR); } - } - ~TWinEnvironment() { + } + ~TWinEnvironment() { if (!IsDebuggerPresent()) { _CrtSetReportMode(_CRT_WARN, _CRTDBG_MODE_FILE); _CrtSetReportFile(_CRT_WARN, _CRTDBG_FILE_STDERR); } - + SetConsoleOutputCP(OutputCP); // restore original output CP at program exit } diff --git a/library/cpp/time_provider/time_provider.cpp b/library/cpp/time_provider/time_provider.cpp index c3181856f8..6c1ba8e07c 100644 --- a/library/cpp/time_provider/time_provider.cpp +++ b/library/cpp/time_provider/time_provider.cpp @@ -1,30 +1,30 @@ -#include "time_provider.h" - -class TDefaultTimeProvider: public ITimeProvider { -public: - TInstant Now() override { - return TInstant::Now(); - } -}; - -class TDeterministicTimeProvider: public ITimeProvider { -public: - TDeterministicTimeProvider(ui64 seed) { - Value = TInstant::Seconds(seed); - } - - TInstant Now() override { - return Value; - } - -private: - TInstant Value; -}; - -TIntrusivePtr<ITimeProvider> CreateDefaultTimeProvider() { - return TIntrusivePtr<ITimeProvider>(new TDefaultTimeProvider()); -} - -TIntrusivePtr<ITimeProvider> CreateDeterministicTimeProvider(ui64 seed) { - return TIntrusivePtr<ITimeProvider>(new TDeterministicTimeProvider(seed)); -} +#include "time_provider.h" + +class TDefaultTimeProvider: public ITimeProvider { +public: + TInstant Now() override { + return TInstant::Now(); + } +}; + +class TDeterministicTimeProvider: public ITimeProvider { +public: + TDeterministicTimeProvider(ui64 seed) { + Value = TInstant::Seconds(seed); + } + + TInstant Now() override { + return Value; + } + +private: + TInstant Value; +}; + +TIntrusivePtr<ITimeProvider> CreateDefaultTimeProvider() { + return TIntrusivePtr<ITimeProvider>(new TDefaultTimeProvider()); +} + +TIntrusivePtr<ITimeProvider> CreateDeterministicTimeProvider(ui64 seed) { + return TIntrusivePtr<ITimeProvider>(new TDeterministicTimeProvider(seed)); +} diff --git a/library/cpp/time_provider/time_provider.h b/library/cpp/time_provider/time_provider.h index 86ff965099..46e0a885da 100644 --- a/library/cpp/time_provider/time_provider.h +++ b/library/cpp/time_provider/time_provider.h @@ -1,11 +1,11 @@ -#pragma once - -#include <util/datetime/base.h> - -class ITimeProvider: public TThrRefBase { -public: - virtual TInstant Now() = 0; -}; - -TIntrusivePtr<ITimeProvider> CreateDefaultTimeProvider(); -TIntrusivePtr<ITimeProvider> CreateDeterministicTimeProvider(ui64 seed); +#pragma once + +#include <util/datetime/base.h> + +class ITimeProvider: public TThrRefBase { +public: + virtual TInstant Now() = 0; +}; + +TIntrusivePtr<ITimeProvider> CreateDefaultTimeProvider(); +TIntrusivePtr<ITimeProvider> CreateDeterministicTimeProvider(ui64 seed); diff --git a/library/cpp/time_provider/ya.make b/library/cpp/time_provider/ya.make index a1bcdd2d96..cf3995b2a4 100644 --- a/library/cpp/time_provider/ya.make +++ b/library/cpp/time_provider/ya.make @@ -1,13 +1,13 @@ -LIBRARY() - +LIBRARY() + OWNER( g:kikimr g:yql ) - -SRCS( - time_provider.cpp - time_provider.h -) - -END() + +SRCS( + time_provider.cpp + time_provider.h +) + +END() |