#pragma once

#include "defs.h"

#include <library/cpp/actors/util/futex.h>

namespace NActors {

    class alignas(64) TCpuState {
        // Atomic cachelign-aligned 64-bit state, see description below
        TAtomic State = 0;
        char Padding[64 - sizeof(TAtomic)];

        // Bits 0-31: Currently executing pool
        //  - value less than MaxPools means cpu is executing corresponding pool (fast-worker is executing or waiting for slow-workers)
        //  - one of Cpu* values in case of idle cpu
        //  - used as futex by blocked fast-worker
        static constexpr ui64 CurrentBits = 32;
        static constexpr ui64 CurrentMask = ui64((1ull << CurrentBits) - 1);

        // Bits 32-63: Assigned pool
        //  - value is set by balancer
        //  - NOT used as futex
        //  - Not balanced
        static constexpr ui64 AssignedOffs = 32;
        static constexpr ui64 AssignedMask = ~CurrentMask;

    public:
        TCpuState() {
            Y_UNUSED(Padding);
        }

        void Load(TPoolId& assigned, TPoolId& current) const {
            TAtomicBase state = AtomicLoad(&State);
            assigned = (state & AssignedMask) >> AssignedOffs;
            current = state & CurrentMask;
        }

        TPoolId CurrentPool() const {
            return TPoolId(AtomicLoad(&State) & CurrentMask);
        }

        void SwitchPool(TPoolId pool) {
            while (true) {
                TAtomicBase state = AtomicLoad(&State);
                if (AtomicCas(&State, (state & ~CurrentMask) | pool, state)) {
                    return;
                }
            }
        }

        TPoolId AssignedPool() const {
            return TPoolId((AtomicLoad(&State) & AssignedMask) >> AssignedOffs);
        }

        // Assigns new pool to cpu and wakes it up if cpu is idle
        void AssignPool(TPoolId pool) {
            while (true) {
                TAtomicBase state = AtomicLoad(&State);
                TPoolId current(state & CurrentMask);
                if (Y_UNLIKELY(current == CpuStopped)) {
                    return; // it would be better to shutdown instead of balancing
                }
                // Idle cpu must be woken up after balancing to handle pending tokens (if any) in assigned/schedulable pool(s)
                if (current == CpuSpinning) {
                    if (AtomicCas(&State, (ui64(pool) << AssignedOffs) | pool, state)) {
                        return; // successfully woken up
                    }
                } else if (current == CpuBlocked) {
                    if (AtomicCas(&State, (ui64(pool) << AssignedOffs) | pool, state)) {
                        FutexWake();
                        return; // successfully woken up
                    }
                } else {
                    if (AtomicCas(&State, (ui64(pool) << AssignedOffs) | (state & ~AssignedMask), state)) {
                        return; // wakeup is not required
                    }
                }
            }
        }

        void Stop() {
            while (true) {
                TAtomicBase state = AtomicLoad(&State);
                if (AtomicCas(&State, (state & ~CurrentMask) | CpuStopped, state)) {
                    FutexWake();
                    return; // successfully stopped
                }
            }
        }

        // Start waiting, returns false in case of actorsystem shutdown
        bool StartSpinning() {
            while (true) {
                TAtomicBase state = AtomicLoad(&State);
                TPoolId current(state & CurrentMask);
                if (Y_UNLIKELY(current == CpuStopped)) {
                    return false;
                }
                Y_VERIFY_DEBUG(current < MaxPools, "unexpected already waiting state of cpu (%d)", (int)current);
                if (AtomicCas(&State, (state & ~CurrentMask) | CpuSpinning, state)) { // successfully marked as spinning
                    return true;
                }
            }
        }

        bool StartBlocking() {
            while (true) {
                TAtomicBase state = AtomicLoad(&State);
                TPoolId current(state & CurrentMask);
                if (current == CpuSpinning) {
                    if (AtomicCas(&State, (state & ~CurrentMask) | CpuBlocked, state)) {
                        return false; // successful switch
                    }
                } else {
                    return true; // wakeup
                }
            }
        }

        bool Block(ui64 timeoutNs, TPoolId& result) {
#ifdef _linux_
            timespec timeout;
            timeout.tv_sec = timeoutNs / 1'000'000'000;
            timeout.tv_nsec = timeoutNs % 1'000'000'000;
            SysFutex(Futex(), FUTEX_WAIT_PRIVATE, CpuBlocked, &timeout, nullptr, 0);
#else
            NanoSleep(timeoutNs); // non-linux wake is not supported, cpu will go idle on wake after blocked state
#endif
            TAtomicBase state = AtomicLoad(&State);
            TPoolId current(state & CurrentMask);
            if (current == CpuBlocked) {
                return false; // timeout
            } else {
                result = current;
                return true; // wakeup
            }
        }

        enum EWakeResult {
            Woken, // successfully woken up
            NotIdle, // cpu is already not idle
            Forbidden, // cpu is assigned to another pool
            Stopped, // cpu is shutdown
        };

        EWakeResult WakeWithoutToken(TPoolId pool) {
            while (true) {
                TAtomicBase state = RelaxedLoad(&State);
                TPoolId current(state & CurrentMask);
                TPoolId assigned((state & AssignedMask) >> AssignedOffs);
                if (assigned == CpuShared || assigned == pool) {
                    if (current == CpuSpinning) {
                        if (AtomicCas(&State, (state & ~CurrentMask) | pool, state)) {
                            return Woken;
                        }
                    } else if (current == CpuBlocked) {
                        if (AtomicCas(&State, (state & ~CurrentMask) | pool, state)) {
                            FutexWake();
                            return Woken;
                        }
                    } else if (current == CpuStopped) {
                        return Stopped;
                    } else {
                        return NotIdle;
                    }
                } else {
                    return Forbidden;
                }
            }
        }

        EWakeResult WakeWithTokenAcquired(TPoolId token) {
            while (true) {
                TAtomicBase state = RelaxedLoad(&State);
                TPoolId current(state & CurrentMask);
                // NOTE: We ignore assigned value because we already have token, so
                // NOTE: not assigned pool may be run here. This will be fixed
                // NOTE: after we finish with current activation
                if (current == CpuSpinning) {
                    if (AtomicCas(&State, (state & ~CurrentMask) | token, state)) {
                        return Woken;
                    }
                } else if (current == CpuBlocked) {
                    if (AtomicCas(&State, (state & ~CurrentMask) | token, state)) {
                        FutexWake();
                        return Woken;
                    }
                } else if (current == CpuStopped) {
                    return Stopped;
                } else {
                    return NotIdle;
                }
            }
        }

        bool IsPoolReassigned(TPoolId current) const {
            TAtomicBase state = AtomicLoad(&State);
            TPoolId assigned((state & AssignedMask) >> AssignedOffs);
            return assigned != current;
        }

    private:
        void* Futex() {
            return (void*)&State; // little endian assumed
        }

        void FutexWake() {
#ifdef _linux_
            SysFutex(Futex(), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
#endif
        }
    };

}