1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
#pragma once
#include "events_local.h"
#include "poller.h"
#include <library/cpp/actors/core/actor.h>
namespace NActors {
struct TEvPollerRegister : TEventLocal<TEvPollerRegister, ui32(ENetwork::EvPollerRegister)> {
const TIntrusivePtr<TSharedDescriptor> Socket; // socket to watch for
const TActorId ReadActorId; // actor id to notify about read availability
const TActorId WriteActorId; // actor id to notify about write availability; may be the same as the ReadActorId
TEvPollerRegister(TIntrusivePtr<TSharedDescriptor> socket, const TActorId& readActorId, const TActorId& writeActorId)
: Socket(std::move(socket))
, ReadActorId(readActorId)
, WriteActorId(writeActorId)
{}
};
// poller token is sent in response to TEvPollerRegister; it allows requesting poll when read/write returns EAGAIN
class TPollerToken : public TThrRefBase {
class TImpl;
std::unique_ptr<TImpl> Impl;
friend class TPollerActor;
TPollerToken(std::unique_ptr<TImpl> impl);
public:
~TPollerToken();
void Request(bool read, bool write);
bool RequestReadNotificationAfterWouldBlock();
bool RequestWriteNotificationAfterWouldBlock();
bool RequestNotificationAfterWouldBlock(bool read, bool write) {
bool status = false;
status |= read && RequestReadNotificationAfterWouldBlock();
status |= write && RequestWriteNotificationAfterWouldBlock();
return status;
}
using TPtr = TIntrusivePtr<TPollerToken>;
};
struct TEvPollerRegisterResult : TEventLocal<TEvPollerRegisterResult, ui32(ENetwork::EvPollerRegisterResult)> {
TIntrusivePtr<TSharedDescriptor> Socket;
TPollerToken::TPtr PollerToken;
TEvPollerRegisterResult(TIntrusivePtr<TSharedDescriptor> socket, TPollerToken::TPtr pollerToken)
: Socket(std::move(socket))
, PollerToken(std::move(pollerToken))
{}
};
struct TEvPollerReady : TEventLocal<TEvPollerReady, ui32(ENetwork::EvPollerReady)> {
TIntrusivePtr<TSharedDescriptor> Socket;
const bool Read, Write;
TEvPollerReady(TIntrusivePtr<TSharedDescriptor> socket, bool read, bool write)
: Socket(std::move(socket))
, Read(read)
, Write(write)
{}
};
IActor* CreatePollerActor();
inline TActorId MakePollerActorId() {
char x[12] = {'I', 'C', 'P', 'o', 'l', 'l', 'e', 'r', '\xDE', '\xAD', '\xBE', '\xEF'};
return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
}
}
|