#include "io_dispatcher.h" #include "actor_bootstrapped.h" #include "hfunc.h" #include #include #include #include #include namespace NActors { class TIoDispatcherActor : public TActorBootstrapped { enum { EvNotifyThreadStopped = EventSpaceBegin(TEvents::ES_PRIVATE), }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // IO task queue //////////////////////////////////////////////////////////////////////////////////////////////////////////////// class TTask { TInstant Timestamp; std::function Callback; public: TTask(TInstant timestamp, TEvInvokeQuery *ev) : Timestamp(timestamp) , Callback(std::move(ev->Callback)) {} void Execute() { Callback(); } TInstant GetTimestamp() const { return Timestamp; } }; class TTaskQueue { std::list Tasks; TMutex Mutex; TCondVar CondVar; size_t NumThreadsToStop = 0; public: void Enqueue(TInstant timestamp, TEvInvokeQuery *ev) { std::list list; list.emplace_back(timestamp, ev); with_lock (Mutex) { Tasks.splice(Tasks.end(), std::move(list)); } CondVar.Signal(); } bool Dequeue(std::list& list, bool *sendNotify) { with_lock (Mutex) { CondVar.Wait(Mutex, [&] { return NumThreadsToStop || !Tasks.empty(); }); if (NumThreadsToStop) { *sendNotify = NumThreadsToStop != Max(); if (*sendNotify) { --NumThreadsToStop; } return false; } else { list.splice(list.end(), Tasks, Tasks.begin()); return true; } } } void Stop() { with_lock (Mutex) { NumThreadsToStop = Max(); } CondVar.BroadCast(); } void StopOne() { with_lock (Mutex) { ++NumThreadsToStop; Y_VERIFY(NumThreadsToStop); } CondVar.Signal(); } std::optional GetEarliestTaskTimestamp() { with_lock (Mutex) { return Tasks.empty() ? std::nullopt : std::make_optional(Tasks.front().GetTimestamp()); } } }; TTaskQueue TaskQueue; //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // IO dispatcher threads //////////////////////////////////////////////////////////////////////////////////////////////////////////////// class TThread : public ISimpleThread { TIoDispatcherActor& Actor; TActorSystem* const ActorSystem; public: TThread(TIoDispatcherActor& actor, TActorSystem *actorSystem) : Actor(actor) , ActorSystem(actorSystem) { Start(); } void *ThreadProc() override { SetCurrentThreadName("kikimr IO"); for (;;) { std::list tasks; bool sendNotify; if (!Actor.TaskQueue.Dequeue(tasks, &sendNotify)) { if (sendNotify) { ActorSystem->Send(new IEventHandle(EvNotifyThreadStopped, 0, Actor.SelfId(), TActorId(), nullptr, TThread::CurrentThreadId())); } break; } for (TTask& task : tasks) { task.Execute(); ++*Actor.TasksCompleted; } } return nullptr; } }; static constexpr size_t MinThreadCount = 4; static constexpr size_t MaxThreadCount = 64; std::map> Threads; size_t NumRunningThreads = 0; void StartThread() { auto thread = std::make_unique(*this, TlsActivationContext->ExecutorThread.ActorSystem); const TThread::TId id = thread->Id(); Threads.emplace(id, std::move(thread)); *NumThreads = ++NumRunningThreads; ++*ThreadsStarted; } void StopThread() { Y_VERIFY(Threads.size()); TaskQueue.StopOne(); *NumThreads = --NumRunningThreads; ++*ThreadsStopped; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Counters //////////////////////////////////////////////////////////////////////////////////////////////////////////////// NMonitoring::TDynamicCounters::TCounterPtr NumThreads; NMonitoring::TDynamicCounters::TCounterPtr TasksAdded; NMonitoring::TDynamicCounters::TCounterPtr TasksCompleted; NMonitoring::TDynamicCounters::TCounterPtr ThreadsStarted; NMonitoring::TDynamicCounters::TCounterPtr ThreadsStopped; public: TIoDispatcherActor(const NMonitoring::TDynamicCounterPtr& counters) : NumThreads(counters->GetCounter("NumThreads")) , TasksAdded(counters->GetCounter("TasksAdded", true)) , TasksCompleted(counters->GetCounter("TasksCompleted", true)) , ThreadsStarted(counters->GetCounter("ThreadsStarted", true)) , ThreadsStopped(counters->GetCounter("ThreadsStopped", true)) {} ~TIoDispatcherActor() override { TaskQueue.Stop(); } static constexpr char ActorName[] = "IO_DISPATCHER_ACTOR"; void Bootstrap() { while (NumRunningThreads < MinThreadCount) { StartThread(); } HandleWakeup(); Become(&TThis::StateFunc); } void HandleThreadStopped(TAutoPtr ev) { auto it = Threads.find(ev->Cookie); Y_VERIFY(it != Threads.end()); it->second->Join(); Threads.erase(it); } void Handle(TEvInvokeQuery::TPtr ev) { ++*TasksAdded; TaskQueue.Enqueue(TActivationContext::Now(), ev->Get()); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Thread usage counter logic //////////////////////////////////////////////////////////////////////////////////////////////////////////////// std::optional IdleTimestamp; static constexpr TDuration ThreadStartTime = TDuration::MilliSeconds(500); static constexpr TDuration ThreadStopTime = TDuration::MilliSeconds(500); void HandleWakeup() { const TInstant now = TActivationContext::Now(); std::optional earliest = TaskQueue.GetEarliestTaskTimestamp(); if (earliest) { if (now >= *earliest + ThreadStartTime && NumRunningThreads < MaxThreadCount) { StartThread(); } IdleTimestamp.reset(); } else if (!IdleTimestamp) { IdleTimestamp = now; } else if (now >= *IdleTimestamp + ThreadStopTime) { IdleTimestamp.reset(); if (NumRunningThreads > MinThreadCount) { StopThread(); } } Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup); } STRICT_STFUNC(StateFunc, { fFunc(EvNotifyThreadStopped, HandleThreadStopped); hFunc(TEvInvokeQuery, Handle); cFunc(TEvents::TSystem::Wakeup, HandleWakeup); cFunc(TEvents::TSystem::Poison, PassAway); }) }; IActor *CreateIoDispatcherActor(const NMonitoring::TDynamicCounterPtr& counters) { return new TIoDispatcherActor(counters); } } // NActors