#include "executor.h"

#include "thread_extra.h"
#include "what_thread_does.h"
#include "what_thread_does_guard.h"

#include <util/generic/utility.h>
#include <util/random/random.h>
#include <util/stream/str.h>
#include <util/system/tls.h>
#include <util/system/yassert.h>

#include <array>

using namespace NActor;
using namespace NActor::NPrivate;

namespace {
    struct THistoryInternal {
        struct TRecord {
            TAtomic MaxQueueSize;

            TRecord()
                : MaxQueueSize()
            {
            }

            TExecutorHistory::THistoryRecord Capture() {
                TExecutorHistory::THistoryRecord r;
                r.MaxQueueSize = AtomicGet(MaxQueueSize);
                return r;
            }
        };

        ui64 Start;
        ui64 LastTime;

        std::array<TRecord, 3600> Records;

        THistoryInternal() {
            Start = TInstant::Now().Seconds();
            LastTime = Start - 1;
        }

        TRecord& GetRecordForTime(ui64 time) {
            return Records[time % Records.size()];
        }

        TRecord& GetNowRecord(ui64 now) {
            for (ui64 t = LastTime + 1; t <= now; ++t) {
                GetRecordForTime(t) = TRecord();
            }
            LastTime = now;
            return GetRecordForTime(now);
        }

        TExecutorHistory Capture() {
            TExecutorHistory history;
            ui64 now = TInstant::Now().Seconds();
            ui64 lastHistoryRecord = now - 1;
            ui32 historySize = Min<ui32>(lastHistoryRecord - Start, Records.size() - 1);
            history.HistoryRecords.resize(historySize);
            for (ui32 i = 0; i < historySize; ++i) {
                history.HistoryRecords[i] = GetRecordForTime(lastHistoryRecord - historySize + i).Capture();
            }
            history.LastHistoryRecordSecond = lastHistoryRecord;
            return history;
        }
    };

}

Y_POD_STATIC_THREAD(TExecutor*)
ThreadCurrentExecutor;

static const char* NoLocation = "nowhere";

struct TExecutorWorkerThreadLocalData {
    ui32 MaxQueueSize;
};

static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData;
Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData)
WorkerThreadLocalData;

namespace NActor {
    struct TExecutorWorker {
        TExecutor* const Executor;
        TThread Thread;
        const char** WhatThreadDoesLocation;
        TExecutorWorkerThreadLocalData* ThreadLocalData;

        TExecutorWorker(TExecutor* executor)
            : Executor(executor)
            , Thread(RunThreadProc, this)
            , WhatThreadDoesLocation(&NoLocation)
            , ThreadLocalData(&::WorkerNoThreadLocalData)
        {
            Thread.Start();
        }

        void Run() {
            WhatThreadDoesLocation = ::WhatThreadDoesLocation();
            AtomicSet(ThreadLocalData, &::WorkerThreadLocalData);
            WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
            Executor->RunWorker();
        }

        static void* RunThreadProc(void* thiz0) {
            TExecutorWorker* thiz = (TExecutorWorker*)thiz0;
            thiz->Run();
            return nullptr;
        }
    };

    struct TExecutor::TImpl {
        TExecutor* const Executor;
        THistoryInternal History;

        TSystemEvent HelperStopSignal;
        TThread HelperThread;

        TImpl(TExecutor* executor)
            : Executor(executor)
            , HelperThread(HelperThreadProc, this)
        {
        }

        void RunHelper() {
            ui64 nowSeconds = TInstant::Now().Seconds();
            for (;;) {
                TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000));

                if (HelperStopSignal.WaitD(nextStop)) {
                    return;
                }

                nowSeconds = nextStop.Seconds();

                THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds);

                ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear();
                if (maxQueueSize > record.MaxQueueSize) {
                    AtomicSet(record.MaxQueueSize, maxQueueSize);
                }
            }
        }

        static void* HelperThreadProc(void* impl0) {
            TImpl* impl = (TImpl*)impl0;
            impl->RunHelper();
            return nullptr;
        }
    };

}

static TExecutor::TConfig MakeConfig(unsigned workerCount) {
    TExecutor::TConfig config;
    config.WorkerCount = workerCount;
    return config;
}

TExecutor::TExecutor(size_t workerCount)
    : Config(MakeConfig(workerCount))
{
    Init();
}

TExecutor::TExecutor(const TExecutor::TConfig& config)
    : Config(config)
{
    Init();
}

void TExecutor::Init() {
    Impl.Reset(new TImpl(this));

    AtomicSet(ExitWorkers, 0);

    Y_VERIFY(Config.WorkerCount > 0);

    for (size_t i = 0; i < Config.WorkerCount; i++) {
        WorkerThreads.push_back(new TExecutorWorker(this));
    }

    Impl->HelperThread.Start();
}

TExecutor::~TExecutor() {
    Stop();
}

void TExecutor::Stop() {
    AtomicSet(ExitWorkers, 1);

    Impl->HelperStopSignal.Signal();
    Impl->HelperThread.Join();

    {
        TWhatThreadDoesAcquireGuard<TMutex> guard(WorkMutex, "executor: acquiring lock for Stop");
        WorkAvailable.BroadCast();
    }

    for (size_t i = 0; i < WorkerThreads.size(); i++) {
        WorkerThreads[i]->Thread.Join();
    }

    // TODO: make queue empty at this point
    ProcessWorkQueueHere();
}

void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) {
    if (wis.empty())
        return;

    if (Y_UNLIKELY(AtomicGet(ExitWorkers) != 0)) {
        Y_VERIFY(WorkItems.Empty(), "executor %s: cannot add tasks after queue shutdown", Config.Name);
    }

    TWhatThreadDoesPushPop pp("executor: EnqueueWork");

    WorkItems.PushAll(wis);

    {
        if (wis.size() == 1) {
            TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
            WorkAvailable.Signal();
        } else {
            TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
            WorkAvailable.BroadCast();
        }
    }
}

size_t TExecutor::GetWorkQueueSize() const {
    return WorkItems.Size();
}

using namespace NTSAN;

ui32 TExecutor::GetMaxQueueSizeAndClear() const {
    ui32 max = 0;
    for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
        TExecutorWorkerThreadLocalData* wtls = RelaxedLoad(&WorkerThreads[i]->ThreadLocalData);
        max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize));
        RelaxedStore<ui32>(&wtls->MaxQueueSize, 0);
    }
    return max;
}

TString TExecutor::GetStatus() const {
    return GetStatusRecordInternal().Status;
}

TString TExecutor::GetStatusSingleLine() const {
    TStringStream ss;
    ss << "work items: " << GetWorkQueueSize();
    return ss.Str();
}

TExecutorStatus TExecutor::GetStatusRecordInternal() const {
    TExecutorStatus r;

    r.WorkQueueSize = GetWorkQueueSize();

    {
        TStringStream ss;
        ss << "work items:     " << GetWorkQueueSize() << "\n";
        ss << "workers:\n";
        for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
            ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n";
        }
        r.Status = ss.Str();
    }

    r.History = Impl->History.Capture();

    return r;
}

bool TExecutor::IsInExecutorThread() const {
    return ThreadCurrentExecutor == this;
}

TAutoPtr<IWorkItem> TExecutor::DequeueWork() {
    IWorkItem* wi = reinterpret_cast<IWorkItem*>(1);
    size_t queueSize = Max<size_t>();
    if (!WorkItems.TryPop(&wi, &queueSize)) {
        TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork");
        while (!WorkItems.TryPop(&wi, &queueSize)) {
            if (AtomicGet(ExitWorkers) != 0)
                return nullptr;

            TWhatThreadDoesPushPop pp("waiting for work on condvar");
            WorkAvailable.Wait(WorkMutex);
        }
    }

    auto& wtls = TlsRef(WorkerThreadLocalData);

    if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) {
        RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize);
    }

    return wi;
}

void TExecutor::RunWorkItem(TAutoPtr<IWorkItem> wi) {
    WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
    wi.Release()->DoWork();
}

void TExecutor::ProcessWorkQueueHere() {
    IWorkItem* wi;
    while (WorkItems.TryPop(&wi)) {
        RunWorkItem(wi);
    }
}

void TExecutor::RunWorker() {
    Y_VERIFY(!ThreadCurrentExecutor, "state check");
    ThreadCurrentExecutor = this;

    SetCurrentThreadName("wrkr");

    for (;;) {
        TAutoPtr<IWorkItem> wi = DequeueWork();
        if (!wi) {
            break;
        }
        // Note for messagebus users: make sure program crashes
        // on uncaught exception in thread, otherewise messagebus may just hang on error.
        RunWorkItem(wi);
    }

    ThreadCurrentExecutor = (TExecutor*)nullptr;
}