diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-02-10 16:47:43 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:43 +0300 |
commit | 330c83f8c116bd45316397b179275e9d87007e7d (patch) | |
tree | c0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors/testlib | |
parent | 22d92781ba2a10b7fb5b977b7d1a5c40ff53885f (diff) | |
download | ydb-330c83f8c116bd45316397b179275e9d87007e7d.tar.gz |
Restoring authorship annotation for Alexey Borzenkov <snaury@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/testlib')
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.cpp | 134 | ||||
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.h | 148 |
2 files changed, 141 insertions, 141 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index cf93742d21..6fa25b9965 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -58,7 +58,7 @@ namespace NActors { TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() { ActorSystemTimestamp = nullptr; - ActorSystemMonotonic = nullptr; + ActorSystemMonotonic = nullptr; } void TTestActorRuntimeBase::TNodeDataBase::Stop() { @@ -210,10 +210,10 @@ namespace NActors { return Scheduled.empty(); } - TInstant TEventMailBox::GetFirstScheduleDeadline() const { - return Scheduled.begin()->Deadline; - } - + TInstant TEventMailBox::GetFirstScheduleDeadline() const { + return Scheduled.begin()->Deadline; + } + ui64 TEventMailBox::GetSentEventCount() const { return Sent.size(); } @@ -245,7 +245,7 @@ namespace NActors { void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override { Y_UNUSED(actorSystem); Node->ActorSystemTimestamp = currentTimestamp; - Node->ActorSystemMonotonic = currentMonotonic; + Node->ActorSystemMonotonic = currentMonotonic; } void PrepareSchedules(NSchedulerQueue::TReader **readers, ui32 scheduleReadersCount) override { @@ -299,8 +299,8 @@ namespace NActors { void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override { DoSchedule(TInstant::FromValue(deadline.GetValue()), ev, cookie, workerId); - } - + } + void Schedule(TDuration delay, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override { TInstant deadline = Runtime->GetTimeProvider()->Now() + delay; DoSchedule(deadline, ev, cookie, workerId); @@ -467,7 +467,7 @@ namespace NActors { , LocalId(0) , DispatchCyclesCount(0) , DispatchedEventsCount(0) - , NeedMonitoring(false) + , NeedMonitoring(false) , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed)) , TimeProvider(new TTimeProvider(*this)) , ShouldContinue() @@ -490,7 +490,7 @@ namespace NActors { node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */, NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0); node->LogSettings->SetAllowDrop(false); - node->LogSettings->SetThrottleDelay(TDuration::Zero()); + node->LogSettings->SetThrottleDelay(TDuration::Zero()); node->DynamicCounters = new NMonitoring::TDynamicCounters; InitNodeImpl(node, nodeIndex); @@ -572,9 +572,9 @@ namespace NActors { bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) { Y_UNUSED(runtime); - Y_UNUSED(delay); + Y_UNUSED(delay); Y_UNUSED(event); - Y_UNUSED(deadline); + Y_UNUSED(deadline); return true; } @@ -843,14 +843,14 @@ namespace NActors { } ui32 TTestActorRuntimeBase::InterconnectPoolId() const { - if (UseRealThreads && NSan::TSanIsOn()) { - // Interconnect coroutines may move across threads - // Use a special single-threaded pool to avoid that - return 4; - } - return 0; - } - + if (UseRealThreads && NSan::TSanIsOn()) { + // Interconnect coroutines may move across threads + // Use a special single-threaded pool to avoid that + return 4; + } + return 0; + } + TString TTestActorRuntimeBase::GetTempDir() { if (!TmpDir) TmpDir.Reset(new TTempDir()); @@ -967,7 +967,7 @@ namespace NActors { Y_VERIFY(nodeIndex < NodeCount); TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex); EdgeActors.insert(edgeActor); - EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor; + EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor; return edgeActor; } @@ -1161,15 +1161,15 @@ namespace NActors { if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { bool isEdgeMailbox = false; - if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) { - isEdgeMailbox = true; - TEventsList events; - mbox.second->Capture(events); - for (auto& ev : events) { - TInverseGuard<TMutex> inverseGuard(Mutex); - ObserverFunc(*this, ev); + if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) { + isEdgeMailbox = true; + TEventsList events; + mbox.second->Capture(events); + for (auto& ev : events) { + TInverseGuard<TMutex> inverseGuard(Mutex); + ObserverFunc(*this, ev); } - mbox.second->PushFront(events); + mbox.second->PushFront(events); } if (!isEdgeMailbox) { @@ -1246,8 +1246,8 @@ namespace NActors { } } - if (localContext.FinalEventFound) { - return true; + if (localContext.FinalEventFound) { + return true; } if (!localContext.FoundNonEmptyMailboxes.empty()) @@ -1287,8 +1287,8 @@ namespace NActors { inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; bool isEmpty = true; TMaybe<TInstant> nearestMailboxDeadline; - TVector<TIntrusivePtr<TEventMailBox>> nextScheduleMboxes; - TMaybe<TInstant> nextScheduleDeadline; + TVector<TIntrusivePtr<TEventMailBox>> nextScheduleMboxes; + TMaybe<TInstant> nextScheduleDeadline; for (auto& mbox : currentMailboxes) { if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) { @@ -1301,29 +1301,29 @@ namespace NActors { if (mbox.second->IsScheduledEmpty()) continue; - auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline(); - if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) { - nextScheduleMboxes.clear(); - nextScheduleMboxes.emplace_back(mbox.second); - nextScheduleDeadline = firstScheduleDeadline; - } else if (firstScheduleDeadline == *nextScheduleDeadline) { - nextScheduleMboxes.emplace_back(mbox.second); - } - } - - for (const auto& nextScheduleMbox : nextScheduleMboxes) { + auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline(); + if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) { + nextScheduleMboxes.clear(); + nextScheduleMboxes.emplace_back(mbox.second); + nextScheduleDeadline = firstScheduleDeadline; + } else if (firstScheduleDeadline == *nextScheduleDeadline) { + nextScheduleMboxes.emplace_back(mbox.second); + } + } + + for (const auto& nextScheduleMbox : nextScheduleMboxes) { TEventsList selectedEvents; TScheduledEventsList capturedScheduledEvents; - nextScheduleMbox->CaptureScheduled(capturedScheduledEvents); + nextScheduleMbox->CaptureScheduled(capturedScheduledEvents); ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents); - nextScheduleMbox->PushScheduled(capturedScheduledEvents); + nextScheduleMbox->PushScheduled(capturedScheduledEvents); for (auto& event : selectedEvents) { if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) { Cerr << "Selected scheduled event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", "; PrintEvent(event, this); } - nextScheduleMbox->Send(event); + nextScheduleMbox->Send(event); isEmpty = false; } } @@ -1371,10 +1371,10 @@ namespace NActors { while (context) { for (const auto& finalEvent : context->Options->FinalEvents) { if (finalEvent.EventCheck(ev)) { - auto& freq = context->FinalEventFrequency[&finalEvent]; - if (++freq >= finalEvent.RequiredCount) { - context->FinalEventFound = true; - } + auto& freq = context->FinalEventFrequency[&finalEvent]; + if (++freq >= finalEvent.RequiredCount) { + context->FinalEventFound = true; + } } } @@ -1424,15 +1424,15 @@ namespace NActors { void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) { TGuard<TMutex> guard(Mutex); ui32 dispatchCount = 0; - if (!edgeFilter.empty()) { - for (auto edgeActor : edgeFilter) { + if (!edgeFilter.empty()) { + for (auto edgeActor : edgeFilter) { Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data()); - } - } + } + } const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter; - TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout; + TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout; for (;;) { - for (auto edgeActor : edgeActors) { + for (auto edgeActor : edgeActors) { TEventsList events; auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint()); bool foundEvent = false; @@ -1451,7 +1451,7 @@ namespace NActors { ++dispatchCount; { - if (!DispatchEventsInternal(TDispatchOptions(), deadline)) { + if (!DispatchEventsInternal(TDispatchOptions(), deadline)) { return; // Timed out; event was not found } } @@ -1522,8 +1522,8 @@ namespace NActors { return node->DynamicCounters; } - void TTestActorRuntimeBase::SetupMonitoring() { - NeedMonitoring = true; + void TTestActorRuntimeBase::SetupMonitoring() { + NeedMonitoring = true; } void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) { @@ -1612,13 +1612,13 @@ namespace NActors { setup->NodeId = FirstNodeId + nodeIndex; if (UseRealThreads) { - setup->ExecutorsCount = 5; - setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]); + setup->ExecutorsCount = 5; + setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]); setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20)); setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20)); setup->Executors[2].Reset(new TIOExecutorPool(2, 1)); setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20)); - setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20)); + setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20)); setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100))); } else { setup->ExecutorsCount = 1; @@ -1652,14 +1652,14 @@ namespace NActors { common->MonCounters = interconnectCounters; common->TechnicalSelfHostName = "::1"; - if (!UseRealThreads) { + if (!UseRealThreads) { common->Settings.DeadPeer = TDuration::Max(); common->Settings.CloseOnIdle = TDuration::Max(); common->Settings.PingPeriod = TDuration::Max(); common->Settings.ForceConfirmPeriod = TDuration::Max(); common->Settings.Handshake = TDuration::Max(); - } - + } + common->ClusterUUID = ClusterUUID; common->AcceptUUID = {ClusterUUID}; @@ -1677,7 +1677,7 @@ namespace NActors { } setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock); - + if (UseRealInterconnect) { setup->LocalServices.emplace_back(MakePollerActorId(), NActors::TActorSetupCmd(CreatePollerActor(), NActors::TMailboxType::Simple, InterconnectPoolId())); diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 387b386df3..26e3b45c98 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -16,7 +16,7 @@ #include <util/datetime/base.h> #include <util/folder/tempdir.h> #include <util/generic/deque.h> -#include <util/generic/hash.h> +#include <util/generic/hash.h> #include <util/generic/noncopyable.h> #include <util/generic/ptr.h> #include <util/generic/queue.h> @@ -62,12 +62,12 @@ namespace NActors { return (NodeId == other.NodeId) && (Hint == other.Hint); } - struct THash { - ui64 operator()(const TEventMailboxId& mboxId) const noexcept { - return mboxId.NodeId * 31ULL + mboxId.Hint; - } - }; - + struct THash { + ui64 operator()(const TEventMailboxId& mboxId) const noexcept { + return mboxId.NodeId * 31ULL + mboxId.Hint; + } + }; + ui32 NodeId; ui32 Hint; }; @@ -150,7 +150,7 @@ namespace NActors { TInstant GetInactiveUntil() const; void Schedule(const TScheduledEventQueueItem& item); bool IsScheduledEmpty() const; - TInstant GetFirstScheduleDeadline() const; + TInstant GetFirstScheduleDeadline() const; ui64 GetSentEventCount() const; private: @@ -164,7 +164,7 @@ namespace NActors { #endif }; - typedef THashMap<TEventMailboxId, TIntrusivePtr<TEventMailBox>, TEventMailboxId::THash> TEventMailBoxList; + typedef THashMap<TEventMailboxId, TIntrusivePtr<TEventMailBox>, TEventMailboxId::THash> TEventMailBoxList; class TEmptyEventQueueException : public yexception { public: @@ -237,7 +237,7 @@ namespace NActors { ui32 GetNodeId(ui32 index = 0) const; ui32 GetNodeCount() const; ui64 AllocateLocalId(); - ui32 InterconnectPoolId() const; + ui32 InterconnectPoolId() const; TString GetTempDir(); TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0, TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0, @@ -268,7 +268,7 @@ namespace NActors { void EnableScheduleForActor(const TActorId& actorId, bool allow = true); bool IsScheduleForActorEnabled(const TActorId& actorId) const; TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0); - void SetupMonitoring(); + void SetupMonitoring(); template<typename T> void AppendToLogSettings(NLog::EComponent minVal, NLog::EComponent maxVal, T func) { @@ -303,7 +303,7 @@ namespace NActors { } return false; - }, {}, simTimeout); + }, {}, simTimeout); if (simTimeout == TDuration::Max()) Y_VERIFY(handle); @@ -315,44 +315,44 @@ namespace NActors { } } - template<class TEvent> - typename TEvent::TPtr GrabEdgeEventIf( + template<class TEvent> + typename TEvent::TPtr GrabEdgeEventIf( const TSet<TActorId>& edgeFilter, - const std::function<bool(const typename TEvent::TPtr&)>& predicate, - TDuration simTimeout = TDuration::Max()) - { - typename TEvent::TPtr handle; - const ui32 eventType = TEvent::EventType; + const std::function<bool(const typename TEvent::TPtr&)>& predicate, + TDuration simTimeout = TDuration::Max()) + { + typename TEvent::TPtr handle; + const ui32 eventType = TEvent::EventType; WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { - Y_UNUSED(runtime); - if (event->GetTypeRewrite() != eventType) - return false; - - typename TEvent::TPtr* typedEvent = reinterpret_cast<typename TEvent::TPtr*>(&event); - if (predicate(*typedEvent)) { - handle = *typedEvent; - return true; - } - - return false; - }, edgeFilter, simTimeout); - - if (simTimeout == TDuration::Max()) - Y_VERIFY(handle); - - return handle; - } - - template<class TEvent> - typename TEvent::TPtr GrabEdgeEventIf( + Y_UNUSED(runtime); + if (event->GetTypeRewrite() != eventType) + return false; + + typename TEvent::TPtr* typedEvent = reinterpret_cast<typename TEvent::TPtr*>(&event); + if (predicate(*typedEvent)) { + handle = *typedEvent; + return true; + } + + return false; + }, edgeFilter, simTimeout); + + if (simTimeout == TDuration::Max()) + Y_VERIFY(handle); + + return handle; + } + + template<class TEvent> + typename TEvent::TPtr GrabEdgeEventIf( const TActorId& edgeActor, - const std::function<bool(const typename TEvent::TPtr&)>& predicate, - TDuration simTimeout = TDuration::Max()) - { + const std::function<bool(const typename TEvent::TPtr&)>& predicate, + TDuration simTimeout = TDuration::Max()) + { TSet<TActorId> edgeFilter{edgeActor}; - return GrabEdgeEventIf<TEvent>(edgeFilter, predicate, simTimeout); - } - + return GrabEdgeEventIf<TEvent>(edgeFilter, predicate, simTimeout); + } + template <typename TEvent> TEvent* GrabEdgeEvent(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) { std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; }; @@ -367,17 +367,17 @@ namespace NActors { return THolder(handle ? handle->Release<TEvent>().Release() : nullptr); } - template<class TEvent> + template<class TEvent> typename TEvent::TPtr GrabEdgeEvent(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) { - return GrabEdgeEventIf<TEvent>(edgeFilter, [](const typename TEvent::TPtr&) { return true; }, simTimeout); - } - - template<class TEvent> + return GrabEdgeEventIf<TEvent>(edgeFilter, [](const typename TEvent::TPtr&) { return true; }, simTimeout); + } + + template<class TEvent> typename TEvent::TPtr GrabEdgeEvent(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) { TSet<TActorId> edgeFilter{edgeActor}; - return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout); - } - + return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout); + } + // replace with std::variant<> template <typename... TEvents> std::tuple<TEvents*...> GrabEdgeEvents(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) { @@ -388,7 +388,7 @@ namespace NActors { return false; handle = event; return true; - }, {}, simTimeout); + }, {}, simTimeout); if (simTimeout == TDuration::Max()) Y_VERIFY(handle); if (handle) { @@ -408,24 +408,24 @@ namespace NActors { } } - template<class TEvent> + template<class TEvent> typename TEvent::TPtr GrabEdgeEventRethrow(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) { - try { - return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout); - } catch (...) { - ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage(); - } - } - - template<class TEvent> + try { + return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout); + } catch (...) { + ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage(); + } + } + + template<class TEvent> typename TEvent::TPtr GrabEdgeEventRethrow(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) { - try { - return GrabEdgeEvent<TEvent>(edgeActor, simTimeout); - } catch (...) { - ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage(); - } - } - + try { + return GrabEdgeEvent<TEvent>(edgeActor, simTimeout); + } catch (...) { + ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage(); + } + } + template <typename... TEvents> static TString TypeNames() { static TString names[] = { TypeName<TEvents>()... }; @@ -530,7 +530,7 @@ namespace NActors { ui64 DispatcherRandomSeed; TIntrusivePtr<IRandomProvider> DispatcherRandomProvider; TAutoPtr<TLogBackend> LogBackend; - bool NeedMonitoring; + bool NeedMonitoring; TIntrusivePtr<IRandomProvider> RandomProvider; TIntrusivePtr<ITimeProvider> TimeProvider; @@ -558,7 +558,7 @@ namespace NActors { TIntrusivePtr<NActors::NLog::TSettings> LogSettings; TIntrusivePtr<NInterconnect::TPollerThreads> Poller; volatile ui64* ActorSystemTimestamp; - volatile ui64* ActorSystemMonotonic; + volatile ui64* ActorSystemMonotonic; TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices; TMap<TActorId, IActor*> LocalServicesActors; TMap<IActor*, TActorId> ActorToActorId; @@ -607,7 +607,7 @@ namespace NActors { TMap<const TDispatchOptions::TFinalEventCondition*, ui32> FinalEventFrequency; TSet<TEventMailboxId> FoundNonEmptyMailboxes; - bool FinalEventFound = false; + bool FinalEventFound = false; }; TProgramShouldContinue ShouldContinue; |