#pragma once

#include <library/cpp/unified_agent_client/client.h>
#include <library/cpp/unified_agent_client/client_proto_weighing.h>
#include <library/cpp/unified_agent_client/counters.h>
#include <library/cpp/unified_agent_client/logger.h>
#include <library/cpp/unified_agent_client/variant.h>
#include <library/cpp/unified_agent_client/proto/unified_agent.grpc.pb.h>
#include <library/cpp/unified_agent_client/grpc_io.h>

#include <library/cpp/logger/global/global.h>

#include <util/generic/deque.h>
#include <util/system/mutex.h>

namespace NUnifiedAgent::NPrivate {
    class TClientSession;
    class TGrpcCall;
    class TForkProtector;

    class TClient: public IClient {
    public:
        explicit TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector);

        ~TClient() override;

        TClientSessionPtr CreateSession(const TSessionParameters& parameters) override;

        void StartTracing(ELogPriority logPriority) override;

        void FinishTracing() override;

        inline const TIntrusivePtr<TClientCounters>& GetCounters() const noexcept {
            return Counters;
        }

        inline NUnifiedAgentProto::UnifiedAgentService::Stub& GetStub() noexcept {
            return *Stub;
        }

        TScopeLogger CreateSessionLogger();

        inline const TClientParameters& GetParameters() const noexcept {
            return Parameters;
        }

        inline grpc::CompletionQueue& GetCompletionQueue() noexcept {
            return ActiveCompletionQueue->GetCompletionQueue();
        }

        void RegisterSession(TClientSession* session);

        void UnregisterSession(TClientSession* session);

        void PreFork();

        void PostForkParent();

        void PostForkChild();

        void EnsureStarted();

    private:
        void EnsureStartedNoLock();

        void EnsureStoppedNoLock();

    private:
        const TClientParameters Parameters;
        std::shared_ptr<TForkProtector> ForkProtector;
        TIntrusivePtr<TClientCounters> Counters;
        TLog Log;
        TLogger MainLogger;
        TScopeLogger Logger;
        std::shared_ptr<grpc::Channel> Channel;
        std::unique_ptr<NUnifiedAgentProto::UnifiedAgentService::Stub> Stub;
        THolder<TGrpcCompletionQueueHost> ActiveCompletionQueue;
        std::atomic<size_t> SessionLogLabel;
        TVector<TClientSession*> ActiveSessions;
        bool Started;
        bool Destroyed;
        TAdaptiveLock Lock;
        static std::atomic<ui64> Id;
    };

    class TForkProtector {
    public:
        TForkProtector();

        void Register(TClient& client);

        void Unregister(TClient& client);

        static std::shared_ptr<TForkProtector> Get(bool createIfNotExists);

    private:
        static void PreFork();

        static void PostForkParent();

        static void PostForkChild();

    private:
        TVector<TClient*> Clients;
        grpc::internal::GrpcLibrary GrpcInitializer;
        bool Enabled;
        TAdaptiveLock Lock;

        static std::weak_ptr<TForkProtector> Instance;
        static TMutex InstanceLock;
        static bool SubscribedToForks;
    };

    class TClientSession: public IClientSession {
    public:
        TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters);

        ~TClientSession();

        void Send(TClientMessage&& message) override;

        NThreading::TFuture<void> CloseAsync(TInstant deadline) override;

        inline TClient& GetClient() noexcept {
            return *Client;
        }

        inline TScopeLogger& GetLogger() noexcept {
            return Logger;
        }

        inline TClientSessionCounters& GetCounters() noexcept {
            return *Counters;
        }

        inline TAsyncJoiner& GetAsyncJoiner() noexcept {
            return AsyncJoiner;
        }

        void PrepareInitializeRequest(NUnifiedAgentProto::Request& target);

        void PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target);

        void Acknowledge(ui64 seqNo);

        void OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo);

        void OnGrpcCallFinished();

        NThreading::TFuture<void> PreFork();

        void PostForkParent();

        void PostForkChild();

        void SetAgentMaxReceiveMessage(size_t);

    private:
        enum class EPollingStatus {
            Active,
            Inactive
        };

        struct TCloseRequestedEvent {
            TInstant Deadline;
        };

        struct TMessageReceivedEvent {
            TClientMessage Message;
            size_t Size;
        };

        struct TPurgeWriteQueueStats {
            size_t PurgedMessages{};
            size_t PurgedBytes{};
        };

        using TEvent = std::variant<TCloseRequestedEvent, TMessageReceivedEvent>;

    public:
        struct TPendingMessage {
            TClientMessage Message;
            size_t Size;
            bool Skipped;
        };

        class TRequestBuilder {
        public:
            struct TAddResult;

        public:
            TRequestBuilder(NUnifiedAgentProto::Request &target, size_t RequestPayloadLimitBytes,
                    TFMaybe<size_t> serializedRequestLimitBytes);

            TAddResult TryAddMessage(const TPendingMessage& message, size_t seqNo);

            void ResetCounters();

            inline size_t GetSerializedRequestSize() const {
                return SerializedRequestSize;
            }

            inline size_t GetRequestPayloadSize() const {
                return RequestPayloadSize;
            }

        public:
            struct TAddResult {
                bool LimitExceeded;
                size_t NewRequestPayloadSize;  // == actual value, if !LimitExceeded
                size_t NewSerializedRequestSize;  // == actual value, if !LimitExceeded
            };

        private:
            struct TMetaItemBuilder {
                size_t ItemIndex;
                size_t ValueIndex{0};
            };

        private:
            NUnifiedAgentProto::Request& Target;
            TFMaybe<NPW::TRequest> PwTarget;
            THashMap<TString, TMetaItemBuilder> MetaItems;
            size_t RequestPayloadSize;
            size_t RequestPayloadLimitBytes;
            size_t SerializedRequestSize;
            TFMaybe<size_t> SerializedRequestLimitBytes;
            bool CountersInvalid;
        };

    private:
        void MakeGrpcCall();

        void DoClose();

        void BeginClose(TInstant deadline);

        void Poll();

        TPurgeWriteQueueStats PurgeWriteQueue();

        void DoStart();

    private:
        TAsyncJoiner AsyncJoiner;
        TIntrusivePtr<TClient> Client;
        TFMaybe<TString> OriginalSessionId;
        TFMaybe<TString> SessionId;
        TFMaybe<THashMap<TString, TString>> Meta;
        TScopeLogger Logger;
        bool CloseStarted;
        bool ForcedCloseStarted;
        bool Closed;
        bool ForkInProgressLocal;
        bool Started;
        NThreading::TPromise<void> ClosePromise;
        TIntrusivePtr<TGrpcCall> ActiveGrpcCall;
        TDeque<TPendingMessage> WriteQueue;
        size_t TrimmedCount;
        size_t NextIndex;
        TFMaybe<ui64> AckSeqNo;
        TInstant PollerLastEventTimestamp;
        TIntrusivePtr<TClientSessionCounters> Counters;
        THolder<TGrpcTimer> MakeGrpcCallTimer;
        THolder<TGrpcTimer> ForceCloseTimer;
        THolder<TGrpcTimer> PollTimer;
        ui64 GrpcInflightMessages;
        ui64 GrpcInflightBytes;

        std::atomic<size_t> InflightBytes;
        bool CloseRequested;
        size_t EventsBatchSize;
        EPollingStatus PollingStatus;
        THolder<TGrpcNotification> EventNotification;
        bool EventNotificationTriggered;
        TVector<TEvent> EventsBatch;
        TVector<TEvent> SecondaryEventsBatch;
        std::atomic<bool> ForkInProgress;
        TAdaptiveLock Lock;
        size_t MaxInflightBytes;
        TFMaybe<size_t> AgentMaxReceiveMessage;
    };

    class TGrpcCall final: public TAtomicRefCount<TGrpcCall> {
    public:
        explicit TGrpcCall(TClientSession& session);

        void Start();

        ~TGrpcCall();

        void BeginClose(bool force);

        void Poison();

        void NotifyMessageAdded();

        inline bool Initialized() const {
            return Initialized_;
        }

        inline bool ReuseSessions() const {
            return ReuseSessions_;
        }

    private:
        void EndAccept(EIOStatus status);

        void EndRead(EIOStatus status);

        void EndWrite(EIOStatus status);

        void EndFinish(EIOStatus status);

        void EndWritesDone(EIOStatus);

        void ScheduleWrite();

        void BeginWritesDone();

        bool CheckHasError(EIOStatus status, const char* method);

        void SetError(const TString& error);

        void EnsureFinishStarted();

        void BeginRead();

        void BeginWrite();

        void ScheduleFinishOnError();

    private:
        TClientSession& Session;
        TAsyncJoinerToken AsyncJoinerToken;
        THolder<IIOCallback> AcceptTag;
        THolder<IIOCallback> ReadTag;
        THolder<IIOCallback> WriteTag;
        THolder<IIOCallback> WritesDoneTag;
        THolder<IIOCallback> FinishTag;
        TScopeLogger Logger;
        bool AcceptPending;
        bool Initialized_;
        bool ReadPending;
        bool ReadsDone;
        bool WritePending;
        bool WritesBlocked;
        bool WritesDonePending;
        bool WritesDone;
        bool ErrorOccured;
        bool FinishRequested;
        bool FinishStarted;
        bool FinishDone;
        bool Cancelled;
        bool Poisoned;
        bool PoisonPillSent;
        bool ReuseSessions_;
        grpc::Status FinishStatus;
        grpc::ClientContext ClientContext;
        std::unique_ptr<grpc::ClientAsyncReaderWriter<NUnifiedAgentProto::Request, NUnifiedAgentProto::Response>> ReaderWriter;
        NUnifiedAgentProto::Request Request;
        NUnifiedAgentProto::Response Response;
    };
}