aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/ut/large.cpp
blob: 88207f816baff129335de5124fe0667ed90ec6be (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include "lib/ic_test_cluster.h"
#include "lib/test_events.h"
#include "lib/test_actors.h"

#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>

#include <library/cpp/testing/unittest/tests_data.h>
#include <library/cpp/testing/unittest/registar.h>

#include <util/system/event.h>
#include <util/system/sanitizers.h>

Y_UNIT_TEST_SUITE(LargeMessage) {
    using namespace NActors;

    class TProducer: public TActorBootstrapped<TProducer> {
        const TActorId RecipientActorId;

    public:
        TProducer(const TActorId& recipientActorId)
            : RecipientActorId(recipientActorId)
        {}

        void Bootstrap(const TActorContext& ctx) {
            Become(&TThis::StateFunc);
            ctx.Send(RecipientActorId, new TEvTest(1, "hello"), IEventHandle::FlagTrackDelivery, 1);
            ctx.Send(RecipientActorId, new TEvTest(2, TString(150 * 1024 * 1024, 'X')), IEventHandle::FlagTrackDelivery, 2);
        }

        void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) {
            if (ev->Cookie == 2) {
                Cerr << "TEvUndelivered\n";
                ctx.Send(RecipientActorId, new TEvTest(3, "hello"), IEventHandle::FlagTrackDelivery, 3);
            }
        }

        STRICT_STFUNC(StateFunc,
            HFunc(TEvents::TEvUndelivered, Handle)
        )
    };

    class TConsumer : public TActorBootstrapped<TConsumer> {
        TManualEvent& Done;
        TActorId SessionId;

    public:
        TConsumer(TManualEvent& done)
            : Done(done)
        {
        }

        void Bootstrap(const TActorContext& /*ctx*/) {
            Become(&TThis::StateFunc);
        }

        void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) {
            const auto& record = ev->Get()->Record;
            Cerr << "RECEIVED TEvTest\n";
            if (record.GetSequenceNumber() == 1) {
                Y_ABORT_UNLESS(!SessionId);
                SessionId = ev->InterconnectSession;
            } else if (record.GetSequenceNumber() == 3) {
                Y_ABORT_UNLESS(SessionId != ev->InterconnectSession);
                Done.Signal();
            } else {
                Y_ABORT("incorrect sequence number");
            }
        }

        STRICT_STFUNC(StateFunc,
            HFunc(TEvTest, Handle)
        )
    };

    Y_UNIT_TEST(Test) {
        TTestICCluster testCluster(2);

        TManualEvent done;
        TConsumer* consumer = new TConsumer(done);
        const TActorId recp = testCluster.RegisterActor(consumer, 1);
        testCluster.RegisterActor(new TProducer(recp), 2);
        done.WaitI();
    }

}