aboutsummaryrefslogblamecommitdiffstats
path: root/util/network/poller_ut.cpp
blob: e5914b7df3a8032be44b7ab6fd8284d93d30c273 (plain) (tree)
1
2
3
4
5
6
7
8
9
                                                  
                              
 
                 
                   
                       
 
                                      





                                              
                                               
 
                                                                           
                                      
                              
                                                                     
                                                                                 


                                                                     
                                                                               

         
                                    






                                              
                                                                           
                                      
                                                          





                                                                     
                                                                                 


















                                                                               
                                                                 









                                              
                                                     






                                              

                                      
     




































                                                                                 
                                




































































                                                                                 











                                                         
                                                                      







                                                                                         





                                                                                                                                                
      
 
#include <library/cpp/testing/unittest/registar.h>
#include <util/system/error.h>

#include "pair.h"
#include "poller.h"
#include "pollerimpl.h"

Y_UNIT_TEST_SUITE(TSocketPollerTest) {
    Y_UNIT_TEST(TestSimple) {
        SOCKET sockets[2];
        UNIT_ASSERT(SocketPair(sockets) == 0);

        TSocketHolder s1(sockets[0]);
        TSocketHolder s2(sockets[1]);

        TSocketPoller poller;
        poller.WaitRead(sockets[1], (void*)17);

        UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
        UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

        for (ui32 i = 0; i < 3; ++i) {
            char buf[] = {18};
            UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));

            UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));

            UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
            UNIT_ASSERT_VALUES_EQUAL(18, buf[0]);

            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
        }
    }

    Y_UNIT_TEST(TestSimpleOneShot) {
        SOCKET sockets[2];
        UNIT_ASSERT(SocketPair(sockets) == 0);

        TSocketHolder s1(sockets[0]);
        TSocketHolder s2(sockets[1]);

        TSocketPoller poller;

        UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
        UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

        for (ui32 i = 0; i < 3; ++i) {
            poller.WaitReadOneShot(sockets[1], (void*)17);

            char buf[1];

            buf[0] = i + 20;

            UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));

            UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));

            UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
            UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[0]);

            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

            buf[0] = i + 21;

            UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));

            // this fails if socket is not oneshot
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

            UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
            UNIT_ASSERT_VALUES_EQUAL(char(i + 21), buf[0]);
        }
    }

    Y_UNIT_TEST(TestItIsSafeToUnregisterUnregisteredDescriptor) {
        SOCKET sockets[2];
        UNIT_ASSERT(SocketPair(sockets) == 0);

        TSocketHolder s1(sockets[0]);
        TSocketHolder s2(sockets[1]);

        TSocketPoller poller;

        poller.Unwait(s1);
    }

    Y_UNIT_TEST(TestItIsSafeToReregisterDescriptor) {
        SOCKET sockets[2];
        UNIT_ASSERT(SocketPair(sockets) == 0);

        TSocketHolder s1(sockets[0]);
        TSocketHolder s2(sockets[1]);

        TSocketPoller poller;

        poller.WaitRead(s1, nullptr);
        poller.WaitRead(s1, nullptr);
        poller.WaitWrite(s1, nullptr);
    }

    Y_UNIT_TEST(TestSimpleEdgeTriggered) {
        SOCKET sockets[2];
        UNIT_ASSERT(SocketPair(sockets) == 0);

        TSocketHolder s1(sockets[0]);
        TSocketHolder s2(sockets[1]);

        SetNonBlock(sockets[1]);

        TSocketPoller poller;

        UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
        UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

        for (ui32 i = 0; i < 3; ++i) {
            poller.WaitReadWriteEdgeTriggered(sockets[1], (void*)17);

            // notify about writeble
            UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

            char buf[2];

            buf[0] = i + 10;
            buf[1] = i + 20;

            // send one byte
            UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));

            UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

            // restart without reading
            poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, false);

            // after restart read and write might generate separate events
            {
                void* events[3];
                size_t count = poller.WaitT(events, 3, TDuration::Zero());
                UNIT_ASSERT_GE(count, 1);
                UNIT_ASSERT_LE(count, 2);
                UNIT_ASSERT_VALUES_EQUAL(events[0], (void*)17);
            }

            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

            // second two more bytes
            UNIT_ASSERT_VALUES_EQUAL(2, send(sockets[0], buf, 2, 0));

            // here poller could notify or not because we haven't seen end
            Y_UNUSED(poller.WaitT(TDuration::Zero()));

            // recv one, leave two
            UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
            UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);

            // nothing new
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

            // recv the rest
            UNIT_ASSERT_VALUES_EQUAL(2, recv(sockets[1], buf, 2, 0));
            UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
            UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[1]);

            // still nothing new
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

            // hit end
            ClearLastSystemError();
            UNIT_ASSERT_VALUES_EQUAL(-1, recv(sockets[1], buf, 1, 0));
            UNIT_ASSERT_VALUES_EQUAL(EAGAIN, LastSystemError());

            // restart after end (noop for epoll)
            poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true);

            // send and recv byte
            UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));

            UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

            // recv and see end
            UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0));
            UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);

            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

            // the same but send before restart
            UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));

            // restart after end (noop for epoll)
            poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true);

            UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

            UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0));
            UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);

            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
            UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));

            poller.Unwait(sockets[1]);
        }
    }

#if defined(HAVE_EPOLL_POLLER)
    Y_UNIT_TEST(TestRdhup) {
        SOCKET sockets[2];
        UNIT_ASSERT(SocketPair(sockets) == 0);

        TSocketHolder s1(sockets[0]);
        TSocketHolder s2(sockets[1]);

        char buf[1] = {0};
        UNIT_ASSERT_VALUES_EQUAL(1, send(s1, buf, 1, 0));
        shutdown(s1, SHUT_WR);

        using TPoller = TGenericPoller<TEpollPoller<TWithoutLocking>>;
        TPoller poller;
        poller.Set((void*)17, s2, CONT_POLL_RDHUP);

        TPoller::TEvent e;
        UNIT_ASSERT_VALUES_EQUAL(poller.WaitD(&e, 1, TDuration::Zero().ToDeadLine()), 1);
        UNIT_ASSERT_EQUAL(TPoller::ExtractStatus(&e), 0);
        UNIT_ASSERT_EQUAL(TPoller::ExtractFilter(&e), CONT_POLL_RDHUP);
        UNIT_ASSERT_EQUAL(TPoller::ExtractEvent(&e), (void*)17);
    }

    Y_UNIT_TEST(TestSetSocketErrors) {
        TGenericPoller<TEpollPoller<TWithoutLocking>> poller;

        UNIT_ASSERT_EXCEPTION_CONTAINS(poller.Set(nullptr, Max<int>(), CONT_POLL_READ), TSystemError, "epoll add failed");
        UNIT_ASSERT_EXCEPTION_CONTAINS(poller.Set(nullptr, Max<int>(), CONT_POLL_READ | CONT_POLL_MODIFY), TSystemError, "epoll modify failed");
    }
#endif
}