1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
#include "lib/ic_test_cluster.h"
#include "lib/test_events.h"
#include "lib/test_actors.h"
#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <library/cpp/testing/unittest/tests_data.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/system/event.h>
#include <util/system/sanitizers.h>
Y_UNIT_TEST_SUITE(LargeMessage) {
using namespace NActors;
class TProducer: public TActorBootstrapped<TProducer> {
const TActorId RecipientActorId;
public:
TProducer(const TActorId& recipientActorId)
: RecipientActorId(recipientActorId)
{}
void Bootstrap(const TActorContext& ctx) {
Become(&TThis::StateFunc);
ctx.Send(RecipientActorId, new TEvTest(1, "hello"), IEventHandle::FlagTrackDelivery, 1);
ctx.Send(RecipientActorId, new TEvTest(2, TString(150 * 1024 * 1024, 'X')), IEventHandle::FlagTrackDelivery, 2);
}
void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) {
if (ev->Cookie == 2) {
Cerr << "TEvUndelivered\n";
ctx.Send(RecipientActorId, new TEvTest(3, "hello"), IEventHandle::FlagTrackDelivery, 3);
}
}
STRICT_STFUNC(StateFunc,
HFunc(TEvents::TEvUndelivered, Handle)
)
};
class TConsumer : public TActorBootstrapped<TConsumer> {
TManualEvent& Done;
TActorId SessionId;
public:
TConsumer(TManualEvent& done)
: Done(done)
{
}
void Bootstrap(const TActorContext& /*ctx*/) {
Become(&TThis::StateFunc);
}
void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) {
const auto& record = ev->Get()->Record;
Cerr << "RECEIVED TEvTest\n";
if (record.GetSequenceNumber() == 1) {
Y_ABORT_UNLESS(!SessionId);
SessionId = ev->InterconnectSession;
} else if (record.GetSequenceNumber() == 3) {
Y_ABORT_UNLESS(SessionId != ev->InterconnectSession);
Done.Signal();
} else {
Y_ABORT("incorrect sequence number");
}
}
STRICT_STFUNC(StateFunc,
HFunc(TEvTest, Handle)
)
};
Y_UNIT_TEST(Test) {
TTestICCluster testCluster(2);
TManualEvent done;
TConsumer* consumer = new TConsumer(done);
const TActorId recp = testCluster.RegisterActor(consumer, 1);
testCluster.RegisterActor(new TProducer(recp), 2);
done.WaitI();
}
}
|