#pragma once

#include <util/datetime/base.h>
#include <util/generic/noncopyable.h>
#include <util/generic/ptr.h>
#include <util/generic/typetraits.h>
#include <util/generic/vector.h>
#include <util/generic/ylimits.h>
#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/system/guard.h>
#include <util/system/spinlock.h>
#include <util/system/yassert.h>

#include <type_traits>
#include <utility>

namespace NThreading {
////////////////////////////////////////////////////////////////////////////////
// Platform helpers

#if !defined(PLATFORM_CACHE_LINE)
#define PLATFORM_CACHE_LINE 64
#endif

#if !defined(PLATFORM_PAGE_SIZE)
#define PLATFORM_PAGE_SIZE 4 * 1024
#endif

    template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
    struct TPadded: public T {
        char Pad[PadSize - sizeof(T) % PadSize];

        TPadded() {
            static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
            Y_UNUSED(Pad);
        }

        template<typename... Args>
        TPadded(Args&&... args)
            : T(std::forward<Args>(args)...)
        {
            static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
            Y_UNUSED(Pad);
        }
    };

    ////////////////////////////////////////////////////////////////////////////////
    // Type helpers

    namespace NImpl {
        template <typename T>
        struct TPodTypeHelper {
            template <typename TT>
            static void Write(T* ptr, TT&& value) {
                *ptr = value;
            }

            static T Read(T* ptr) {
                return *ptr;
            }

            static void Destroy(T* ptr) {
                Y_UNUSED(ptr);
            }
        };

        template <typename T>
        struct TNonPodTypeHelper {
            template <typename TT>
            static void Write(T* ptr, TT&& value) {
                new (ptr) T(std::forward<TT>(value));
            }

            static T Read(T* ptr) {
                return std::move(*ptr);
            }

            static void Destroy(T* ptr) {
                (void)ptr; /* Make MSVC happy. */
                ptr->~T();
            }
        };

        template <typename T>
        using TTypeHelper = std::conditional_t<
            TTypeTraits<T>::IsPod,
            TPodTypeHelper<T>,
            TNonPodTypeHelper<T>>;

    }

    ////////////////////////////////////////////////////////////////////////////////
    // One producer/one consumer chunked queue.

    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
    class TOneOneQueue: private TNonCopyable {
        using TTypeHelper = NImpl::TTypeHelper<T>;

        struct TChunk;

        struct TChunkHeader {
            size_t Count = 0;
            TChunk* Next = nullptr;
        };

        struct TChunk: public TChunkHeader {
            static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T);

            char Entries[MaxCount * sizeof(T)];

            TChunk() {
                Y_UNUSED(Entries); // uninitialized
            }

            ~TChunk() {
                for (size_t i = 0; i < this->Count; ++i) {
                    TTypeHelper::Destroy(GetPtr(i));
                }
            }

            T* GetPtr(size_t i) {
                return (T*)Entries + i;
            }
        };

        struct TWriterState {
            TChunk* Chunk = nullptr;
        };

        struct TReaderState {
            TChunk* Chunk = nullptr;
            size_t Count = 0;
        };

    private:
        TPadded<TWriterState> Writer;
        TPadded<TReaderState> Reader;

    public:
        using TItem = T;

        TOneOneQueue() {
            Writer.Chunk = Reader.Chunk = new TChunk();
        }

        ~TOneOneQueue() {
            DeleteChunks(Reader.Chunk);
        }

        template <typename TT>
        void Enqueue(TT&& value) {
            T* ptr = PrepareWrite();
            Y_ASSERT(ptr);
            TTypeHelper::Write(ptr, std::forward<TT>(value));
            CompleteWrite();
        }

        bool Dequeue(T& value) {
            if (T* ptr = PrepareRead()) {
                value = TTypeHelper::Read(ptr);
                CompleteRead();
                return true;
            }
            return false;
        }

        bool IsEmpty() {
            return !PrepareRead();
        }

    protected:
        T* PrepareWrite() {
            TChunk* chunk = Writer.Chunk;
            Y_ASSERT(chunk && !chunk->Next);

            if (chunk->Count != TChunk::MaxCount) {
                return chunk->GetPtr(chunk->Count);
            }

            chunk = new TChunk();
            AtomicSet(Writer.Chunk->Next, chunk);
            Writer.Chunk = chunk;
            return chunk->GetPtr(0);
        }

        void CompleteWrite() {
            AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1);
        }

        T* PrepareRead() {
            TChunk* chunk = Reader.Chunk;
            Y_ASSERT(chunk);

            for (;;) {
                size_t writerCount = AtomicGet(chunk->Count);
                if (Reader.Count != writerCount) {
                    return chunk->GetPtr(Reader.Count);
                }

                if (writerCount != TChunk::MaxCount) {
                    return nullptr;
                }

                chunk = AtomicGet(chunk->Next);
                if (!chunk) {
                    return nullptr;
                }

                delete Reader.Chunk;
                Reader.Chunk = chunk;
                Reader.Count = 0;
            }
        }

        void CompleteRead() {
            ++Reader.Count;
        }

    private:
        static void DeleteChunks(TChunk* chunk) {
            while (chunk) {
                TChunk* next = chunk->Next;
                delete chunk;
                chunk = next;
            }
        }
    };

    ////////////////////////////////////////////////////////////////////////////////
    // Multiple producers/single consumer partitioned queue.
    // Provides FIFO guaranties for each producer.

    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
    class TManyOneQueue: private TNonCopyable {
        using TTypeHelper = NImpl::TTypeHelper<T>;

        struct TEntry {
            T Value;
            ui64 Tag;
        };

        struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {
            TAtomic WriteLock = 0;

            using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;
            using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;

            using TOneOneQueue<TEntry, ChunkSize>::PrepareRead;
            using TOneOneQueue<TEntry, ChunkSize>::CompleteRead;
        };

    private:
        union {
            TAtomic WriteTag = 0;
            char Pad[PLATFORM_CACHE_LINE];
        };

        TQueueType Queues[Concurrency];

    public:
        using TItem = T;

        template <typename TT>
        void Enqueue(TT&& value) {
            ui64 tag = NextTag();
            while (!TryEnqueue(std::forward<TT>(value), tag)) {
                SpinLockPause();
            }
        }

        bool Dequeue(T& value) {
            size_t index = 0;
            if (TEntry* entry = PrepareRead(index)) {
                value = TTypeHelper::Read(&entry->Value);
                Queues[index].CompleteRead();
                return true;
            }
            return false;
        }

        bool IsEmpty() {
            for (size_t i = 0; i < Concurrency; ++i) {
                if (!Queues[i].IsEmpty()) {
                    return false;
                }
            }
            return true;
        }

    private:
        ui64 NextTag() {
            // TODO: can we avoid synchronization here? it costs 1.5x performance penalty
            // return GetCycleCount();
            return AtomicIncrement(WriteTag);
        }

        template <typename TT>
        bool TryEnqueue(TT&& value, ui64 tag) {
            for (size_t i = 0; i < Concurrency; ++i) {
                TQueueType& queue = Queues[i];
                if (AtomicTryAndTryLock(&queue.WriteLock)) {
                    TEntry* entry = queue.PrepareWrite();
                    Y_ASSERT(entry);
                    TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
                    entry->Tag = tag;
                    queue.CompleteWrite();
                    AtomicUnlock(&queue.WriteLock);
                    return true;
                }
            }
            return false;
        }

        TEntry* PrepareRead(size_t& index) {
            TEntry* entry = nullptr;
            ui64 tag = Max();

            for (size_t i = 0; i < Concurrency; ++i) {
                TEntry* e = Queues[i].PrepareRead();
                if (e && e->Tag < tag) {
                    index = i;
                    entry = e;
                    tag = e->Tag;
                }
            }

            if (entry) {
                // need second pass to catch updates within already scanned range
                size_t candidate = index;
                for (size_t i = 0; i < candidate; ++i) {
                    TEntry* e = Queues[i].PrepareRead();
                    if (e && e->Tag < tag) {
                        index = i;
                        entry = e;
                        tag = e->Tag;
                    }
                }
            }

            return entry;
        }
    };

    ////////////////////////////////////////////////////////////////////////////////
    // Concurrent many-many queue with strong FIFO guaranties.
    // Writers will not block readers (and vice versa), but will block each other.

    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
    class TManyManyQueue: private TNonCopyable {
    private:
        TPadded<TLock> WriteLock;
        TPadded<TLock> ReadLock;

        TOneOneQueue<T, ChunkSize> Queue;

    public:
        using TItem = T;

        template <typename TT>
        void Enqueue(TT&& value) {
            with_lock (WriteLock) {
                Queue.Enqueue(std::forward<TT>(value));
            }
        }

        bool Dequeue(T& value) {
            with_lock (ReadLock) {
                return Queue.Dequeue(value);
            }
        }

        bool IsEmpty() {
            with_lock (ReadLock) {
                return Queue.IsEmpty();
            }
        }
    };

    ////////////////////////////////////////////////////////////////////////////////
    // Multiple producers/single consumer partitioned queue.
    // Because of random partitioning reordering possible - FIFO not guaranteed!

    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
    class TRelaxedManyOneQueue: private TNonCopyable {
        struct TQueueType: public TOneOneQueue<T, ChunkSize> {
            TAtomic WriteLock = 0;
        };

    private:
        union {
            size_t ReadPos = 0;
            char Pad[PLATFORM_CACHE_LINE];
        };

        TQueueType Queues[Concurrency];

    public:
        using TItem = T;

        template <typename TT>
        void Enqueue(TT&& value) {
            while (!TryEnqueue(std::forward<TT>(value))) {
                SpinLockPause();
            }
        }

        bool Dequeue(T& value) {
            for (size_t i = 0; i < Concurrency; ++i) {
                TQueueType& queue = Queues[ReadPos++ % Concurrency];
                if (queue.Dequeue(value)) {
                    return true;
                }
            }
            return false;
        }

        bool IsEmpty() {
            for (size_t i = 0; i < Concurrency; ++i) {
                if (!Queues[i].IsEmpty()) {
                    return false;
                }
            }
            return true;
        }

    private:
        template <typename TT>
        bool TryEnqueue(TT&& value) {
            size_t writePos = GetCycleCount();
            for (size_t i = 0; i < Concurrency; ++i) {
                TQueueType& queue = Queues[writePos++ % Concurrency];
                if (AtomicTryAndTryLock(&queue.WriteLock)) {
                    queue.Enqueue(std::forward<TT>(value));
                    AtomicUnlock(&queue.WriteLock);
                    return true;
                }
            }
            return false;
        }
    };

    ////////////////////////////////////////////////////////////////////////////////
    // Concurrent many-many partitioned queue.
    // Because of random partitioning reordering possible - FIFO not guaranteed!

    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
    class TRelaxedManyManyQueue: private TNonCopyable {
        struct TQueueType: public TOneOneQueue<T, ChunkSize> {
            union {
                TAtomic WriteLock = 0;
                char Pad1[PLATFORM_CACHE_LINE];
            };
            union {
                TAtomic ReadLock = 0;
                char Pad2[PLATFORM_CACHE_LINE];
            };
        };

    private:
        TQueueType Queues[Concurrency];

    public:
        using TItem = T;

        template <typename TT>
        void Enqueue(TT&& value) {
            while (!TryEnqueue(std::forward<TT>(value))) {
                SpinLockPause();
            }
        }

        bool Dequeue(T& value) {
            size_t readPos = GetCycleCount();
            for (size_t i = 0; i < Concurrency; ++i) {
                TQueueType& queue = Queues[readPos++ % Concurrency];
                if (AtomicTryAndTryLock(&queue.ReadLock)) {
                    bool dequeued = queue.Dequeue(value);
                    AtomicUnlock(&queue.ReadLock);
                    if (dequeued) {
                        return true;
                    }
                }
            }
            return false;
        }

        bool IsEmpty() {
            for (size_t i = 0; i < Concurrency; ++i) {
                TQueueType& queue = Queues[i];
                if (AtomicTryAndTryLock(&queue.ReadLock)) {
                    bool empty = queue.IsEmpty();
                    AtomicUnlock(&queue.ReadLock);
                    if (!empty) {
                        return false;
                    }
                }
            }
            return true;
        }

    private:
        template <typename TT>
        bool TryEnqueue(TT&& value) {
            size_t writePos = GetCycleCount();
            for (size_t i = 0; i < Concurrency; ++i) {
                TQueueType& queue = Queues[writePos++ % Concurrency];
                if (AtomicTryAndTryLock(&queue.WriteLock)) {
                    queue.Enqueue(std::forward<TT>(value));
                    AtomicUnlock(&queue.WriteLock);
                    return true;
                }
            }
            return false;
        }
    };

    ////////////////////////////////////////////////////////////////////////////////
    // Simple wrapper to deal with AutoPtrs

    template <typename T, typename TImpl>
    class TAutoQueueBase: private TNonCopyable {
    private:
        TImpl Impl;

    public:
        using TItem = TAutoPtr<T>;

        ~TAutoQueueBase() {
            TAutoPtr<T> value;
            while (Dequeue(value)) {
                // do nothing
            }
        }

        void Enqueue(TAutoPtr<T> value) {
            Impl.Enqueue(value.Get());
            Y_UNUSED(value.Release());
        }

        bool Dequeue(TAutoPtr<T>& value) {
            T* ptr = nullptr;
            if (Impl.Dequeue(ptr)) {
                value.Reset(ptr);
                return true;
            }
            return false;
        }

        bool IsEmpty() {
            return Impl.IsEmpty();
        }
    };

    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
    using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>;

    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
    using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>;

    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
    using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>;

    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
    using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>;

    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
    using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
}