aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/messagebus/acceptor.cpp
blob: 64a38619c2206c72699e6226f752ca909fecf7ad (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                     

                              
                    
 


                                 









                                                                                                    
                                                          








                                                            
                                







                                                     
                                   





                                       
                        

























                                                                                              

                                                                       














                                                                      
                                                 

                                       
                


                          
                    
 
                                          

                                                          
                     








                                           
#include "acceptor.h"

#include "key_value_printer.h"
#include "mb_lwtrace.h"
#include "network.h"

#include <util/network/init.h>
#include <util/system/defaults.h>
#include <util/system/error.h>
#include <util/system/yassert.h>

LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)

using namespace NActor;
using namespace NBus;
using namespace NBus::NPrivate;

TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, const TNetAddr& addr)
    : TActor<TAcceptor>(session->Queue->WorkQueue.Get())
    , AcceptorId(acceptorId)
    , Session(session)
    , GranStatus(session->Config.Secret.StatusFlushPeriod)
{
    SetNonBlock(socket, true);

    Channel = Session->ReadEventLoop.Register(socket, this);
    Channel->EnableRead();

    Stats.AcceptorId = acceptorId;
    Stats.Fd = socket;
    Stats.ListenAddr = addr;

    SendStatus(TInstant::Now());
}

void TAcceptor::Act(TDefaultTag) {
    EShutdownState state = ShutdownState.State.Get();

    if (state == SS_SHUTDOWN_COMPLETE) {
        return;
    }

    TInstant now = TInstant::Now();

    if (state == SS_SHUTDOWN_COMMAND) {
        if (!!Channel) {
            Channel->Unregister();
            Channel.Drop();
            Stats.Fd = INVALID_SOCKET;
        }

        SendStatus(now);

        Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats);
        Stats.ResetIncremental();

        ShutdownState.CompleteShutdown();
        return;
    }

    THolder<TOpaqueAddr> addr(new TOpaqueAddr());
    SOCKET acceptedSocket = accept(Channel->GetSocket(), addr->MutableAddr(), addr->LenPtr());

    int acceptErrno = LastSystemError();

    if (acceptedSocket == INVALID_SOCKET) {
        if (LastSystemError() != EWOULDBLOCK) {
            Stats.LastAcceptErrorErrno = acceptErrno;
            Stats.LastAcceptErrorInstant = now;
            ++Stats.AcceptErrorCount;
        }
    } else {
        TSocketHolder s(acceptedSocket);
        try {
            SetKeepAlive(s, true);
            SetNoDelay(s, Session->Config.TcpNoDelay);
            SetSockOptTcpCork(s, Session->Config.TcpCork);
            SetCloseOnExec(s, true);
            SetNonBlock(s, true);
            if (Session->Config.SocketToS >= 0) {
                SetSocketToS(s, addr.Get(), Session->Config.SocketToS);
            }
        } catch (...) {
            // It means that connection was reset just now
            // TODO: do something better
            goto skipAccept;
        }

        {
            TOnAccept onAccept;
            onAccept.s = s.Release();
            onAccept.addr = TNetAddr(addr.Release());
            onAccept.now = now;

            LWPROBE(Accepted, ToString(onAccept.addr));

            Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept);

            Stats.LastAcceptSuccessInstant = now;
            ++Stats.AcceptSuccessCount;
        }

    skipAccept:;
    }

    Channel->EnableRead();

    SendStatus(now);
}

void TAcceptor::SendStatus(TInstant now) {
    GranStatus.Listen.Update(Stats, now);
}

void TAcceptor::HandleEvent(SOCKET socket, void* cookie) {
    Y_UNUSED(socket);
    Y_UNUSED(cookie);

    GetActor()->Schedule();
}

void TAcceptor::Shutdown() {
    ShutdownState.ShutdownCommand();
    GetActor()->Schedule();

    ShutdownState.ShutdownComplete.WaitI();
}