#include "blocking_queue.h"

#include <library/cpp/iterator/enumerate.h>
#include <library/cpp/testing/unittest/registar.h>

#include <util/string/builder.h>
#include <util/system/thread.h>

namespace {
    class TFunctionThread: public ISimpleThread {
    public:
        using TFunc = std::function<void()>;

    private:
        TFunc Func;

    public:
        TFunctionThread(const TFunc& func)
            : Func(func)
        {
        }

        void* ThreadProc() noexcept override {
            Func();
            return nullptr;
        }
    };

}

IOutputStream& operator<<(IOutputStream& o, const TMaybe<int>& val) {
    if (val) {
        o << "TMaybe<int>(" << val.GetRef() << ')';
    } else {
        o << "TMaybe<int>()";
    }
    return o;
}

Y_UNIT_TEST_SUITE(BlockingQueueTest) {
    Y_UNIT_TEST(SimplePushPopTest) {
        const size_t limit = 100;

        NThreading::TBlockingQueue<int> queue(100);

        for (int i = 0; i != limit; ++i) {
            queue.Push(i);
        }

        for (int i = 0; i != limit; ++i) {
            UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
        }

        UNIT_ASSERT(queue.Empty());
    }

    Y_UNIT_TEST(SimplePushDrainTest) {
        const size_t limit = 100;

        NThreading::TBlockingQueue<int> queue(100);

        for (int i = 0; i != limit; ++i) {
            queue.Push(i);
        }

        auto res = queue.Drain();

        UNIT_ASSERT_VALUES_EQUAL(queue.Empty(), true);
        UNIT_ASSERT_VALUES_EQUAL(res.size(), limit);

        for (auto [i, elem] : Enumerate(res)) {
            UNIT_ASSERT_VALUES_EQUAL(elem, i);
        }
    }

    Y_UNIT_TEST(SimpleStopTest) {
        const size_t limit = 100;

        NThreading::TBlockingQueue<int> queue(100);

        for (int i = 0; i != limit; ++i) {
            queue.Push(i);
        }
        queue.Stop();

        bool ok = queue.Push(100500);
        UNIT_ASSERT_VALUES_EQUAL(ok, false);

        for (int i = 0; i != limit; ++i) {
            UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
        }

        UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
        UNIT_ASSERT_VALUES_EQUAL(queue.Drain().empty(), true);
    }

    Y_UNIT_TEST(BigPushPop) {
        const int limit = 100000;

        NThreading::TBlockingQueue<int> queue(10);

        TFunctionThread pusher([&] {
            for (int i = 0; i != limit; ++i) {
                if (!queue.Push(i)) {
                    break;
                }
            }
        });

        pusher.Start();

        try {
            for (int i = 0; i != limit; ++i) {
                size_t size = queue.Size();
                UNIT_ASSERT_C(size <= 10, (TStringBuilder() << "Size exceeds 10: " << size).data());
                UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
            }
        } catch (...) {
            // gracefull shutdown of pusher thread if assertion fails
            queue.Stop();
            throw;
        }

        pusher.Join();
    }

    Y_UNIT_TEST(StopWhenMultiplePoppers) {
        NThreading::TBlockingQueue<int> queue(10);
        TFunctionThread popper1([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
        });
        TFunctionThread popper2([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
        });
        TFunctionThread drainer([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue.Drain().empty(), true);
        });
        popper1.Start();
        popper2.Start();
        drainer.Start();

        queue.Stop();

        popper1.Join();
        popper2.Join();
        drainer.Join();
    }

    Y_UNIT_TEST(StopWhenMultiplePushers) {
        NThreading::TBlockingQueue<int> queue(1);
        queue.Push(1);
        TFunctionThread pusher1([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false);
        });
        TFunctionThread pusher2([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false);
        });
        pusher1.Start();
        pusher2.Start();

        queue.Stop();

        pusher1.Join();
        pusher2.Join();
    }

    Y_UNIT_TEST(WakeUpAllProducers) {
        NThreading::TBlockingQueue<int> queue(2);
        queue.Push(1);
        queue.Push(2);

        TFunctionThread pusher1([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue.Push(3), true);
        });

        TFunctionThread pusher2([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue.Push(4), true);
        });

        pusher1.Start();
        pusher2.Start();

        queue.Drain();

        pusher1.Join();
        pusher2.Join();
    }

    Y_UNIT_TEST(InterruptPopByDeadline) {
        NThreading::TBlockingQueue<int> queue1(10);
        NThreading::TBlockingQueue<int> queue2(10);

        const auto popper1DeadLine = TInstant::Now();
        const auto popper2DeadLine = TInstant::Now() + TDuration::Seconds(2);

        TFunctionThread popper1([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue1.Pop(popper1DeadLine), TMaybe<int>());
            UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false);
        });

        TFunctionThread popper2([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue2.Pop(popper2DeadLine), 2);
            UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false);
        });

        popper1.Start();
        popper2.Start();

        Sleep(TDuration::Seconds(1));

        queue1.Push(1);
        queue2.Push(2);

        Sleep(TDuration::Seconds(1));

        queue1.Stop();
        queue2.Stop();

        popper1.Join();
        popper2.Join();
    }


     Y_UNIT_TEST(InterruptDrainByDeadline) {
        NThreading::TBlockingQueue<int> queue1(10);
        NThreading::TBlockingQueue<int> queue2(10);

        const auto drainer1DeadLine = TInstant::Now();
        const auto drainer2DeadLine = TInstant::Now() + TDuration::Seconds(2);

        TFunctionThread drainer1([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue1.Drain(drainer1DeadLine).empty(), true);
            UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false);
        });

        TFunctionThread drainer2([&] {
            auto res = queue2.Drain(drainer2DeadLine);
            UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
            UNIT_ASSERT_VALUES_EQUAL(res.front(), 2);
            UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false);
        });

        drainer1.Start();
        drainer2.Start();

        Sleep(TDuration::Seconds(1));

        queue1.Push(1);
        queue2.Push(2);

        Sleep(TDuration::Seconds(1));

        queue1.Stop();
        queue2.Stop();

        drainer1.Join();
        drainer2.Join();
    }

    Y_UNIT_TEST(InterruptPushByDeadline) {
        NThreading::TBlockingQueue<int> queue1(1);
        NThreading::TBlockingQueue<int> queue2(1);

        queue1.Push(0);
        queue2.Push(0);

        const auto pusher1DeadLine = TInstant::Now();
        const auto pusher2DeadLine = TInstant::Now() + TDuration::Seconds(2);

        TFunctionThread pusher1([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue1.Push(1, pusher1DeadLine), false);
            UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false);
        });

        TFunctionThread pusher2([&] {
            UNIT_ASSERT_VALUES_EQUAL(queue2.Push(2, pusher2DeadLine), true);
            UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false);
        });

        pusher1.Start();
        pusher2.Start();

        Sleep(TDuration::Seconds(1));

        queue1.Pop();
        queue2.Pop();

        Sleep(TDuration::Seconds(1));

        queue1.Stop();
        queue2.Stop();

        pusher1.Join();
        pusher2.Join();
    }
}