#pragma once

#include "acceptor_status.h"
#include "async_result.h"
#include "event_loop.h"
#include "netaddr.h"
#include "remote_connection.h"
#include "remote_connection_status.h"
#include "session_job_count.h"
#include "shutdown_state.h"
#include "ybus.h"

#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/queue_in_actor.h>
#include <library/cpp/messagebus/monitoring/mon_proto.pb.h>

#include <library/cpp/threading/future/legacy_future.h>

#include <util/generic/array_ref.h>
#include <util/generic/string.h>

namespace NBus {
    namespace NPrivate {
        typedef TIntrusivePtr<TRemoteClientConnection> TRemoteClientConnectionPtr;
        typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr;

        typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr;

        typedef TIntrusivePtr<TAcceptor> TAcceptorPtr;
        typedef TVector<TAcceptorPtr> TAcceptorsPtrs;

        struct TConnectionsAcceptorsSnapshot {
            TVector<TRemoteConnectionPtr> Connections;
            TVector<TAcceptorPtr> Acceptors;
            ui64 LastConnectionId;
            ui64 LastAcceptorId;

            TConnectionsAcceptorsSnapshot();
        };

        typedef TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> TConnectionsAcceptorsSnapshotPtr;

        struct TOnAccept {
            SOCKET s;
            TNetAddr addr;
            TInstant now;
        };

        struct TStatusTag {};
        struct TConnectionTag {};

        struct TDeadConnectionTag {};
        struct TRemoveTag {};

        struct TBusSessionImpl
           : public virtual TBusSession,
              private ::NActor::TActor<TBusSessionImpl, TStatusTag>,
              private ::NActor::TActor<TBusSessionImpl, TConnectionTag>

            ,
              private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>,
              private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>,
              private ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>

            ,
              private ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>,
              private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag> {
            friend class TAcceptor;
            friend class TRemoteConnection;
            friend class TRemoteServerConnection;
            friend class ::NActor::TActor<TBusSessionImpl, TStatusTag>;
            friend class ::NActor::TActor<TBusSessionImpl, TConnectionTag>;
            friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>;
            friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>;
            friend class ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>;
            friend class ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>;
            friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>;

        public:
            ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>* GetOnAcceptQueue() {
                return this;
            }

            ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>* GetRemoveConnectionQueue() {
                return this;
            }

            ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionActor() {
                return this;
            }

            typedef TGuard<TMutex> TConnectionsGuard;

            TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto,
                            IBusErrorHandler* handler,
                            const TBusSessionConfig& config, const TString& name);

            ~TBusSessionImpl() override;

            void Shutdown() override;
            bool IsDown();

            size_t GetInFlightImpl(const TNetAddr& addr) const;
            size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const;

            void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
            void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;

            virtual void FillStatus();
            TSessionDumpStatus GetStatusRecordInternal() override;
            TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override;
            TConnectionStatusMonRecord GetStatusProtobuf() override;
            TString GetStatusSingleLine() override;

            void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&);
            void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&);
            void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&);
            void ProcessItem(TStatusTag, ::NActor::TDefaultTag, const TAcceptorStatus&);
            void ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept&);
            void ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr);
            void ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>);
            void StatusUpdateCachedDump();
            void StatusUpdateCachedDumpIfNecessary(TInstant now);
            void Act(TStatusTag);
            void Act(TConnectionTag);

            TBusProtocol* GetProto() const noexcept override;
            const TBusSessionConfig* GetConfig() const noexcept override;
            TBusMessageQueue* GetQueue() const noexcept override;
            TString GetNameInternal() override;

            virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0;

            void Listen(int port, TBusMessageQueue* q);
            void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q);
            TBusConnection* Accept(SOCKET listen);

            inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() {
                return this;
            }
            inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() {
                return this;
            }

            typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections;

            void SendSnapshotToStatusActor();

            void InsertConnectionLockAcquired(TRemoteConnection* connection);
            void InsertAcceptorLockAcquired(TAcceptor* acceptor);

            void GetConnections(TVector<TRemoteConnectionPtr>*);
            void GetAcceptors(TVector<TAcceptorPtr>*);
            void GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>*);
            void GetAcceptorsLockAquired(TVector<TAcceptorPtr>*);

            TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create);
            TRemoteConnectionPtr GetConnectionById(ui64 id);
            TAcceptorPtr GetAcceptorById(ui64 id);

            void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus);

            void Cron();

            TBusSessionJobCount JobCount;

            // TODO: replace with actor
            TMutex ConnectionsLock;

            struct TImpl;
            THolder<TImpl> Impl;

            const bool IsSource_;

            TBusMessageQueue* const Queue;
            TBusProtocol* const Proto;
            // copied to be available after Proto dies
            const TString ProtoName;

            IBusErrorHandler* const ErrorHandler;
            TUseCountHolder HandlerUseCountHolder;
            TBusSessionConfig Config; // TODO: make const

            NEventLoop::TEventLoop WriteEventLoop;
            NEventLoop::TEventLoop ReadEventLoop;
            THolder<NThreading::TLegacyFuture<void, false>> ReadEventLoopThread;
            THolder<NThreading::TLegacyFuture<void, false>> WriteEventLoopThread;

            THashMap<ui64, TRemoteConnectionPtr> ConnectionsById;
            TAddrRemoteConnections Connections;
            TAcceptorsPtrs Acceptors;

            struct TStatusData {
                TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot;
                ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue;

                TAtomicShutdownState ShutdownState;

                TBusSessionStatus Status;

                TSessionDumpStatus StatusDumpCached;
                TMutex StatusDumpCachedMutex;
                TInstant StatusDumpCachedLastUpdate;

                TStatusData();
            };
            TStatusData StatusData;

            struct TConnectionsData {
                TAtomicShutdownState ShutdownState;

                TConnectionsData();
            };
            TConnectionsData ConnectionsData;

            ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus,
                                    TStatusTag, TDeadConnectionTag>*
            GetDeadConnectionWriterStatusQueue() {
                return this;
            }

            ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus,
                                    TStatusTag, TDeadConnectionTag>*
            GetDeadConnectionReaderStatusQueue() {
                return this;
            }

            ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus,
                                    TStatusTag, TDeadConnectionTag>*
            GetDeadAcceptorStatusQueue() {
                return this;
            }

            template <typename TItem>
            ::NActor::IQueueInActor<TItem>* GetQueue() {
                return this;
            }

            ui64 LastAcceptorId;
            ui64 LastConnectionId;

            TAtomic Down;
            TSystemEvent ShutdownCompleteEvent;
        };

        inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept {
            return Proto;
        }

        inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept {
            return &Config;
        }

        inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept {
            return Queue;
        }

    }
}