#include "poller_actor.h"
#include "interconnect_common.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/probes.h>
#include <library/cpp/actors/protos/services_common.pb.h>
#include <library/cpp/actors/util/funnel_queue.h>
#include <util/generic/intrlist.h>
#include <util/system/thread.h>
#include <util/system/event.h>
#include <util/system/pipe.h>
#include <variant>
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
namespace {
int LastSocketError() {
#if defined(_win_)
return WSAGetLastError();
#else
return errno;
#endif
}
}
struct TSocketRecord : TThrRefBase {
const TIntrusivePtr<TSharedDescriptor> Socket;
const TActorId ReadActorId;
const TActorId WriteActorId;
std::atomic_uint32_t Flags = 0;
TSocketRecord(TEvPollerRegister& ev)
: Socket(std::move(ev.Socket))
, ReadActorId(ev.ReadActorId)
, WriteActorId(ev.WriteActorId)
{}
};
template<typename TDerived>
class TPollerThreadBase : public ISimpleThread {
protected:
struct TPollerExitThread {}; // issued then we need to terminate the poller thread
struct TPollerWakeup {};
struct TPollerUnregisterSocket {
TIntrusivePtr<TSharedDescriptor> Socket;
TPollerUnregisterSocket(TIntrusivePtr<TSharedDescriptor> socket)
: Socket(std::move(socket))
{}
};
using TPollerSyncOperation = std::variant<TPollerExitThread, TPollerWakeup, TPollerUnregisterSocket>;
struct TPollerSyncOperationWrapper {
TPollerSyncOperation Operation;
TManualEvent Event;
TPollerSyncOperationWrapper(TPollerSyncOperation&& operation)
: Operation(std::move(operation))
{}
void Wait() {
Event.WaitI();
}
void SignalDone() {
Event.Signal();
}
};
TActorSystem *ActorSystem;
TPipeHandle ReadEnd, WriteEnd; // pipe for sync event processor
TFunnelQueue<TPollerSyncOperationWrapper*> SyncOperationsQ; // operation queue
public:
TPollerThreadBase(TActorSystem *actorSystem)
: ActorSystem(actorSystem)
{
// create a pipe for notifications
try {
TPipeHandle::Pipe(ReadEnd, WriteEnd, CloseOnExec);
} catch (const TFileError& err) {
Y_FAIL("failed to create pipe");
}
// switch the read/write ends to nonblocking mode
SetNonBlock(ReadEnd);
SetNonBlock(WriteEnd);
}
void UnregisterSocket(const TIntrusivePtr<TSocketRecord>& record) {
ExecuteSyncOperation(TPollerUnregisterSocket(record->Socket));
}
protected:
void Notify(TSocketRecord *record, bool read, bool write) {
auto issue = [&](const TActorId& recipient) {
ActorSystem->Send(new IEventHandle(recipient, {}, new TEvPollerReady(record->Socket, read, write)));
};
if (read && record->ReadActorId) {
issue(record->ReadActorId);
if (write && record->WriteActorId && record->WriteActorId != record->ReadActorId) {
issue(record->WriteActorId);
}
} else if (write && record->WriteActorId) {
issue(record->WriteActorId);
}
}
void Stop() {
// signal poller thread to stop and wait for the thread
ExecuteSyncOperation(TPollerExitThread());
ISimpleThread::Join();
}
void ExecuteSyncOperation(TPollerSyncOperation&& op) {
TPollerSyncOperationWrapper wrapper(std::move(op));
if (SyncOperationsQ.Push(&wrapper)) {
// this was the first entry, so we push notification through the pipe
for (;;) {
char buffer = '\x00';
ssize_t nwritten = WriteEnd.Write(&buffer, sizeof(buffer));
if (nwritten < 0) {
const int err = LastSocketError();
if (err == EINTR) {
continue;
} else {
Y_FAIL("WriteEnd.Write() failed with %s", strerror(err));
}
} else {
Y_VERIFY(nwritten);
break;
}
}
}
// wait for operation to complete
wrapper.Wait();
}
bool DrainReadEnd() {
size_t totalRead = 0;
char buffer[4096];
for (;;) {
ssize_t n = ReadEnd.Read(buffer, sizeof(buffer));
if (n < 0) {
const int error = LastSocketError();
if (error == EINTR) {
continue;
} else if (error == EAGAIN || error == EWOULDBLOCK) {
break;
} else {
Y_FAIL("read() failed with %s", strerror(errno));
}
} else {
Y_VERIFY(n);
totalRead += n;
}
}
return totalRead;
}
bool ProcessSyncOpQueue() {
if (DrainReadEnd()) {
Y_VERIFY(!SyncOperationsQ.IsEmpty());
do {
TPollerSyncOperationWrapper *op = SyncOperationsQ.Top();
if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) {
static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket);
op->SignalDone();
} else if (std::get_if<TPollerExitThread>(&op->Operation)) {
op->SignalDone();
return false; // terminate the thread
} else if (std::get_if<TPollerWakeup>(&op->Operation)) {
op->SignalDone();
} else {
Y_FAIL();
}
} while (SyncOperationsQ.Pop());
}
return true;
}
void *ThreadProc() override {
SetCurrentThreadName("network poller");
while (ProcessSyncOpQueue()) {
static_cast<TDerived&>(*this).ProcessEventsInLoop();
}
return nullptr;
}
};
} // namespace NActors
#if defined(_linux_)
# include "poller_actor_linux.h"
#elif defined(_darwin_)
# include "poller_actor_darwin.h"
#elif defined(_win_)
# include "poller_actor_win.h"
#else
# error "Unsupported platform"
#endif
namespace NActors {
class TPollerToken::TImpl {
std::weak_ptr<TPollerThread> Thread;
TIntrusivePtr<TSocketRecord> Record; // valid only when Thread is held locked
public:
TImpl(std::shared_ptr<TPollerThread> thread, TIntrusivePtr<TSocketRecord> record)
: Thread(thread)
, Record(std::move(record))
{
thread->RegisterSocket(Record);
}
~TImpl() {
if (auto thread = Thread.lock()) {
thread->UnregisterSocket(Record);
}
}
void Request(bool read, bool write) {
if (auto thread = Thread.lock()) {
thread->Request(Record, read, write);
}
}
const TIntrusivePtr<TSharedDescriptor>& Socket() const {
return Record->Socket;
}
};
class TPollerActor: public TActorBootstrapped<TPollerActor> {
// poller thread
std::shared_ptr<TPollerThread> PollerThread;
public:
static constexpr IActor::EActivityType ActorActivityType() {
return IActor::INTERCONNECT_POLLER;
}
void Bootstrap() {
PollerThread = std::make_shared<TPollerThread>(TlsActivationContext->ExecutorThread.ActorSystem);
Become(&TPollerActor::StateFunc);
}
STRICT_STFUNC(StateFunc,
hFunc(TEvPollerRegister, Handle);
cFunc(TEvents::TSystem::Poison, PassAway);
)
void Handle(TEvPollerRegister::TPtr& ev) {
auto *msg = ev->Get();
auto impl = std::make_unique<TPollerToken::TImpl>(PollerThread, MakeIntrusive<TSocketRecord>(*msg));
auto socket = impl->Socket();
TPollerToken::TPtr token(new TPollerToken(std::move(impl)));
if (msg->ReadActorId && msg->WriteActorId && msg->WriteActorId != msg->ReadActorId) {
Send(msg->ReadActorId, new TEvPollerRegisterResult(socket, token));
Send(msg->WriteActorId, new TEvPollerRegisterResult(socket, std::move(token)));
} else if (msg->ReadActorId) {
Send(msg->ReadActorId, new TEvPollerRegisterResult(socket, std::move(token)));
} else if (msg->WriteActorId) {
Send(msg->WriteActorId, new TEvPollerRegisterResult(socket, std::move(token)));
}
}
};
TPollerToken::TPollerToken(std::unique_ptr<TImpl> impl)
: Impl(std::move(impl))
{}
TPollerToken::~TPollerToken()
{}
void TPollerToken::Request(bool read, bool write) {
Impl->Request(read, write);
}
IActor* CreatePollerActor() {
return new TPollerActor;
}
}