#include "local_executor.h"
#include <library/cpp/threading/future/future.h>
#include <util/generic/utility.h>
#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/system/event.h>
#include <util/system/thread.h>
#include <util/system/tls.h>
#include <util/system/yield.h>
#include <util/thread/lfqueue.h>
#include <utility>
#ifdef _win_
static void RegularYield() {
}
#else
// unix actually has cooperative multitasking! :)
// without this function program runs slower and system lags for some magic reason
static void RegularYield() {
SchedYield();
}
#endif
namespace {
struct TFunctionWrapper : NPar::ILocallyExecutable {
NPar::TLocallyExecutableFunction Exec;
TFunctionWrapper(NPar::TLocallyExecutableFunction exec)
: Exec(std::move(exec))
{
}
void LocalExec(int id) override {
Exec(id);
}
};
class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable {
private:
NPar::TLocallyExecutableFunction Exec;
int FirstId, LastId;
TVector<NThreading::TPromise<void>> Promises;
public:
TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId)
: Exec(std::move(exec))
, FirstId(firstId)
, LastId(lastId)
{
Y_ASSERT(FirstId <= LastId);
const int rangeSize = LastId - FirstId;
Promises.resize(rangeSize, NThreading::NewPromise());
for (auto& promise : Promises) {
promise = NThreading::NewPromise();
}
}
void LocalExec(int id) override {
Y_ASSERT(FirstId <= id && id < LastId);
NThreading::NImpl::SetValue(Promises[id - FirstId], [=] { Exec(id); });
}
TVector<NThreading::TFuture<void>> GetFutures() const {
TVector<NThreading::TFuture<void>> out;
out.reserve(Promises.ysize());
for (auto& promise : Promises) {
out.push_back(promise.GetFuture());
}
return out;
}
};
struct TSingleJob {
TIntrusivePtr<NPar::ILocallyExecutable> Exec;
int Id{0};
TSingleJob() = default;
TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id)
: Exec(std::move(exec))
, Id(id)
{
}
};
class TLocalRangeExecutor: public NPar::ILocallyExecutable {
TIntrusivePtr<NPar::ILocallyExecutable> Exec;
alignas(64) TAtomic Counter;
alignas(64) TAtomic WorkerCount;
int LastId;
void LocalExec(int) override {
AtomicAdd(WorkerCount, 1);
for (;;) {
if (!DoSingleOp())
break;
}
AtomicAdd(WorkerCount, -1);
}
public:
TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId)
: Exec(std::move(exec))
, Counter(firstId)
, WorkerCount(0)
, LastId(lastId)
{
}
bool DoSingleOp() {
const int id = AtomicAdd(Counter, 1) - 1;
if (id >= LastId)
return false;
Exec->LocalExec(id);
RegularYield();
return true;
}
void WaitComplete() {
while (AtomicGet(WorkerCount) > 0)
RegularYield();
}
int GetRangeSize() const {
return Max<int>(LastId - Counter, 0);
}
};
}
//////////////////////////////////////////////////////////////////////////
class NPar::TLocalExecutor::TImpl {
public:
TLockFreeQueue<TSingleJob> JobQueue;
TLockFreeQueue<TSingleJob> MedJobQueue;
TLockFreeQueue<TSingleJob> LowJobQueue;
alignas(64) TSystemEvent HasJob;
TAtomic ThreadCount{0};
alignas(64) TAtomic QueueSize{0};
TAtomic MPQueueSize{0};
TAtomic LPQueueSize{0};
TAtomic ThreadId{0};
Y_THREAD(int)
CurrentTaskPriority;
Y_THREAD(int)
WorkerThreadId;
static void* HostWorkerThread(void* p);
bool GetJob(TSingleJob* job);
void RunNewThread();
void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit,
TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue);
TImpl() = default;
~TImpl();
};
NPar::TLocalExecutor::TImpl::~TImpl() {
AtomicAdd(QueueSize, 1);
JobQueue.Enqueue(TSingleJob(nullptr, 0));
HasJob.Signal();
while (AtomicGet(ThreadCount)) {
ThreadYield();
}
}
void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) {
static const int FAST_ITERATIONS = 200;
auto* const ctx = (TImpl*)p;
TThread::SetCurrentThreadName("ParLocalExecutor");
ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1);
for (bool cont = true; cont;) {
TSingleJob job;
bool gotJob = false;
for (int iter = 0; iter < FAST_ITERATIONS; ++iter) {
if (ctx->GetJob(&job)) {
gotJob = true;
break;
}
}
if (!gotJob) {
ctx->HasJob.Reset();
if (!ctx->GetJob(&job)) {
ctx->HasJob.Wait();
continue;
}
}
if (job.Exec.Get()) {
job.Exec->LocalExec(job.Id);
RegularYield();
} else {
AtomicAdd(ctx->QueueSize, 1);
ctx->JobQueue.Enqueue(job);
ctx->HasJob.Signal();
cont = false;
}
}
AtomicAdd(ctx->ThreadCount, -1);
return nullptr;
}
bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) {
if (JobQueue.Dequeue(job)) {
CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY;
AtomicAdd(QueueSize, -1);
return true;
} else if (MedJobQueue.Dequeue(job)) {
CurrentTaskPriority = TLocalExecutor::MED_PRIORITY;
AtomicAdd(MPQueueSize, -1);
return true;
} else if (LowJobQueue.Dequeue(job)) {
CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY;
AtomicAdd(LPQueueSize, -1);
return true;
}
return false;
}
void NPar::TLocalExecutor::TImpl::RunNewThread() {
AtomicAdd(ThreadCount, 1);
TThread thr(HostWorkerThread, this);
thr.Start();
thr.Detach();
}
void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec,
int queueSizeLimit,
TAtomic* queueSize,
TLockFreeQueue<TSingleJob>* jobQueue) {
int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize());
if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) {
return;
}
AtomicAdd(*queueSize, count);
jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)});
HasJob.Signal();
}
NPar::TLocalExecutor::TLocalExecutor()
: Impl_{MakeHolder<TImpl>()} {
}
NPar::TLocalExecutor::~TLocalExecutor() = default;
void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) {
for (int i = 0; i < threadCount; i++)
Impl_->RunNewThread();
}
void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) {
Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported
int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
switch (prior) {
case HIGH_PRIORITY:
AtomicAdd(Impl_->QueueSize, 1);
Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id));
break;
case MED_PRIORITY:
AtomicAdd(Impl_->MPQueueSize, 1);
Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id));
break;
case LOW_PRIORITY:
AtomicAdd(Impl_->LPQueueSize, 1);
Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id));
break;
default:
Y_ASSERT(0);
break;
}
Impl_->HasJob.Signal();
}
void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) {
Exec(new TFunctionWrapper(std::move(exec)), id, flags);
}
void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) {
Y_ASSERT(lastId >= firstId);
if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) {
return;
}
auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId);
int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1;
int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
switch (prior) {
case HIGH_PRIORITY:
Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue);
break;
case MED_PRIORITY:
Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue);
break;
case LOW_PRIORITY:
Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue);
break;
default:
Y_ASSERT(0);
break;
}
if (flags & WAIT_COMPLETE) {
int keepPrior = Impl_->CurrentTaskPriority;
Impl_->CurrentTaskPriority = prior;
while (rangeExec->DoSingleOp()) {
}
Impl_->CurrentTaskPriority = keepPrior;
rangeExec->WaitComplete();
}
}
void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
return;
}
ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags);
}
void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
Y_ABORT_UNLESS((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise.");
if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
return;
}
TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags);
for (auto& result : currentRun) {
result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown.
}
}
TVector<NThreading::TFuture<void>>
NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId);
TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures();
ExecRange(execWrapper, firstId, lastId, flags);
return out;
}
void NPar::TLocalExecutor::ClearLPQueue() {
for (bool cont = true; cont;) {
cont = false;
TSingleJob job;
while (Impl_->LowJobQueue.Dequeue(&job)) {
AtomicAdd(Impl_->LPQueueSize, -1);
cont = true;
}
while (Impl_->MedJobQueue.Dequeue(&job)) {
AtomicAdd(Impl_->MPQueueSize, -1);
cont = true;
}
}
}
int NPar::TLocalExecutor::GetQueueSize() const noexcept {
return AtomicGet(Impl_->QueueSize);
}
int NPar::TLocalExecutor::GetMPQueueSize() const noexcept {
return AtomicGet(Impl_->MPQueueSize);
}
int NPar::TLocalExecutor::GetLPQueueSize() const noexcept {
return AtomicGet(Impl_->LPQueueSize);
}
int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept {
return Impl_->WorkerThreadId;
}
int NPar::TLocalExecutor::GetThreadCount() const noexcept {
return AtomicGet(Impl_->ThreadCount);
}
//////////////////////////////////////////////////////////////////////////