#include "details.h"
#include "factory.h"
#include "http_common.h"
#include "location.h"
#include "multi.h"
#include "netliba.h"
#include "netliba_udp_http.h"
#include "lfqueue.h"
#include "utils.h"
#include <library/cpp/dns/cache.h>
#include <util/generic/hash.h>
#include <util/generic/singleton.h>
#include <util/generic/vector.h>
#include <util/generic/yexception.h>
#include <util/string/cast.h>
#include <util/system/yassert.h>
#include <atomic>
using namespace NDns;
using namespace NNeh;
namespace NNeh {
size_t TNetLibaOptions::ClientThreads = 4;
TDuration TNetLibaOptions::AckTailEffect = TDuration::Seconds(30);
bool TNetLibaOptions::Set(TStringBuf name, TStringBuf value) {
#define NETLIBA_TRY_SET(optType, optName) \
if (name == TStringBuf(#optName)) { \
optName = FromString<optType>(value); \
}
NETLIBA_TRY_SET(size_t, ClientThreads)
else NETLIBA_TRY_SET(TDuration, AckTailEffect) else {
return false;
}
return true;
}
}
namespace {
namespace NNetLiba {
using namespace NNetliba;
using namespace NNehNetliba;
typedef NNehNetliba::IRequester INetLibaRequester;
typedef TAutoPtr<TUdpHttpRequest> TUdpHttpRequestPtr;
typedef TAutoPtr<TUdpHttpResponse> TUdpHttpResponsePtr;
static inline const addrinfo* FindIPBase(const TNetworkAddress* addr, int family) {
for (TNetworkAddress::TIterator it = addr->Begin(); it != addr->End(); ++it) {
if (it->ai_family == family) {
return &*it;
}
}
return nullptr;
}
static inline const sockaddr_in6& FindIP(const TNetworkAddress* addr) {
//prefer ipv6
const addrinfo* ret = FindIPBase(addr, AF_INET6);
if (!ret) {
ret = FindIPBase(addr, AF_INET);
}
if (!ret) {
ythrow yexception() << "ip not supported by " << *addr;
}
return *(const sockaddr_in6*)(ret->ai_addr);
}
class TLastAckTimes {
struct TTimeVal {
TTimeVal()
: Val(0)
{
}
std::atomic<TInstant::TValue> Val;
};
public:
TInstant::TValue Get(size_t idAddr) {
return Tm_.Get(idAddr).Val.load(std::memory_order_acquire);
}
void Set(size_t idAddr) {
Tm_.Get(idAddr).Val.store(TInstant::Now().GetValue(), std::memory_order_release);
}
static TLastAckTimes& Common() {
return *Singleton<TLastAckTimes>();
}
private:
NNeh::NHttp::TLockFreeSequence<TTimeVal> Tm_;
};
class TRequest: public TSimpleHandle {
public:
inline TRequest(TIntrusivePtr<INetLibaRequester>& r, size_t idAddr, const TMessage& msg, IOnRecv* cb, TStatCollector* s)
: TSimpleHandle(cb, msg, s)
, R_(r)
, IdAddr_(idAddr)
, Notified_(false)
{
CreateGuid(&Guid_);
}
void Cancel() noexcept override {
TSimpleHandle::Cancel();
R_->CancelRequest(Guid_);
}
inline const TString& Addr() const noexcept {
return Message().Addr;
}
inline const TGUID& Guid() const noexcept {
return Guid_;
}
//return false if already notifie
inline bool SetNotified() noexcept {
bool ret = Notified_;
Notified_ = true;
return !ret;
}
void OnSend() {
if (TNetLibaOptions::AckTailEffect.GetValue() && TLastAckTimes::Common().Get(IdAddr_) + TNetLibaOptions::AckTailEffect.GetValue() > TInstant::Now().GetValue()) {
//fake(predicted) completing detection
SetSendComplete();
}
}
void OnRequestAck() {
if (TNetLibaOptions::AckTailEffect.GetValue()) {
TLastAckTimes::Common().Set(IdAddr_);
}
SetSendComplete();
}
private:
TIntrusivePtr<INetLibaRequester> R_;
size_t IdAddr_;
TGUID Guid_;
bool Notified_;
};
typedef TIntrusivePtr<TRequest> TRequestRef;
class TNetLibaBus {
class TEventsHandler: public IEventsCollector {
typedef THashMap<TGUID, TRequestRef, TGUIDHash> TInFly;
public:
inline void OnSend(TRequestRef& req) {
Q_.Enqueue(req);
req->OnSend();
}
private:
void UpdateInFly() {
TRequestRef req;
while (Q_.Dequeue(&req)) {
if (!req) {
return;
}
InFly_[req->Guid()] = req;
}
}
void AddRequest(TUdpHttpRequest* req) override {
//ignore received requests in client
delete req;
}
void AddResponse(TUdpHttpResponse* resp) override {
TUdpHttpResponsePtr ptr(resp);
UpdateInFly();
TInFly::iterator it = InFly_.find(resp->ReqId);
Y_VERIFY(it != InFly_.end(), "incorrect incoming message");
TRequestRef& req = it->second;
if (req->SetNotified()) {
if (resp->Ok == TUdpHttpResponse::OK) {
req->NotifyResponse(TString(resp->Data.data(), resp->Data.size()));
} else {
if (resp->Ok == TUdpHttpResponse::CANCELED) {
req->NotifyError(new TError(resp->Error, TError::Cancelled));
} else {
req->NotifyError(new TError(resp->Error));
}
}
}
InFly_.erase(it);
}
void AddCancel(const TGUID& guid) override {
UpdateInFly();
TInFly::iterator it = InFly_.find(guid);
if (it != InFly_.end() && it->second->SetNotified()) {
it->second->NotifyError("Canceled (before ack)");
}
}
void AddRequestAck(const TGUID& guid) override {
UpdateInFly();
TInFly::iterator it = InFly_.find(guid);
Y_VERIFY(it != InFly_.end(), "incorrect complete notification");
it->second->OnRequestAck();
}
private:
TLockFreeQueue<TRequestRef> Q_;
TInFly InFly_;
};
struct TClientThread {
TClientThread(int physicalCpu)
: EH_(new TEventsHandler())
, R_(CreateHttpUdpRequester(0, IEventsCollectorRef(EH_.Get()), physicalCpu))
{
R_->EnableReportRequestAck();
}
~TClientThread() {
R_->StopNoWait();
}
TIntrusivePtr<TEventsHandler> EH_;
TIntrusivePtr<INetLibaRequester> R_;
};
public:
TNetLibaBus() {
for (size_t i = 0; i < TNetLibaOptions::ClientThreads; ++i) {
Clnt_.push_back(new TClientThread(i));
}
}
inline THandleRef Schedule(const TMessage& msg, IOnRecv* cb, TServiceStatRef& ss) {
TParsedLocation loc(msg.Addr);
TUdpAddress addr;
const TResolvedHost* resHost = CachedResolve(TResolveInfo(loc.Host, loc.GetPort()));
GetUdpAddress(&addr, FindIP(&resHost->Addr));
TClientThread& clnt = *Clnt_[resHost->Id % Clnt_.size()];
TIntrusivePtr<INetLibaRequester> rr = clnt.R_;
TRequestRef req(new TRequest(rr, resHost->Id, msg, cb, !ss ? nullptr : new TStatCollector(ss)));
clnt.EH_->OnSend(req);
rr->SendRequest(addr, ToString(loc.Service), msg.Data, req->Guid());
return THandleRef(req.Get());
}
private:
TVector<TAutoPtr<TClientThread>> Clnt_;
};
//server
class TRequester: public TThrRefBase {
struct TSrvRequestState: public TAtomicRefCount<TSrvRequestState> {
TSrvRequestState()
: Canceled(false)
{
}
TAtomicBool Canceled;
};
class TRequest: public IRequest {
public:
inline TRequest(TAutoPtr<TUdpHttpRequest> req, TIntrusivePtr<TSrvRequestState> state, TRequester* parent)
: R_(req)
, S_(state)
, P_(parent)
{
}
~TRequest() override {
if (!!P_) {
P_->RequestProcessed(this);
}
}
TStringBuf Scheme() const override {
return TStringBuf("netliba");
}
TString RemoteHost() const override {
if (!H_) {
TUdpAddress tmp(R_->PeerAddress);
tmp.Scope = 0; //discard scope from serialized addr
TString addr = GetAddressAsString(tmp);
TStringBuf host, port;
TStringBuf(addr).RSplit(':', host, port);
H_ = host;
}
return H_;
}
TStringBuf Service() const override {
return TStringBuf(R_->Url.c_str(), R_->Url.length());
}
TStringBuf Data() const override {
return TStringBuf((const char*)R_->Data.data(), R_->Data.size());
}
TStringBuf RequestId() const override {
const TGUID& g = R_->ReqId;
return TStringBuf((const char*)g.dw, sizeof(g.dw));
}
bool Canceled() const override {
return S_->Canceled;
}
void SendReply(TData& data) override {
TIntrusivePtr<TRequester> p;
p.Swap(P_);
if (!!p) {
if (!Canceled()) {
p->R_->SendResponse(R_->ReqId, &data);
}
p->RequestProcessed(this);
}
}
void SendError(TResponseError, const TString&) override {
// TODO
}
inline const TGUID& RequestGuid() const noexcept {
return R_->ReqId;
}
private:
TAutoPtr<TUdpHttpRequest> R_;
mutable TString H_;
TIntrusivePtr<TSrvRequestState> S_;
TIntrusivePtr<TRequester> P_;
};
class TEventsHandler: public IEventsCollector {
public:
TEventsHandler(TRequester* parent)
{
P_.store(parent, std::memory_order_release);
}
void RequestProcessed(const TRequest* r) {
FinishedReqs_.Enqueue(r->RequestGuid());
}
//thread safe method for disable proxy callbacks to parent (OnRequest(...))
void SyncStop() {
P_.store(nullptr, std::memory_order_release);
while (!RequesterPtrPotector_.TryAcquire()) {
Sleep(TDuration::MicroSeconds(100));
}
RequesterPtrPotector_.Release();
}
private:
typedef THashMap<TGUID, TIntrusivePtr<TSrvRequestState>, TGUIDHash> TStatesInProcessRequests;
void AddRequest(TUdpHttpRequest* req) override {
TUdpHttpRequestPtr ptr(req);
TSrvRequestState* state = new TSrvRequestState();
InProcess_[req->ReqId] = state;
try {
TGuard<TSpinLock> m(RequesterPtrPotector_);
if (TRequester* p = P_.load(std::memory_order_acquire)) {
p->OnRequest(ptr, state); //move req. owning to parent
}
} catch (...) {
Cdbg << "ignore exc.: " << CurrentExceptionMessage() << Endl;
}
}
void AddResponse(TUdpHttpResponse*) override {
Y_FAIL("unexpected response in neh netliba server");
}
void AddCancel(const TGUID& guid) override {
UpdateInProcess();
TStatesInProcessRequests::iterator ustate = InProcess_.find(guid);
if (ustate != InProcess_.end())
ustate->second->Canceled = true;
}
void AddRequestAck(const TGUID&) override {
Y_FAIL("unexpected acc in neh netliba server");
}
void UpdateInProcess() {
TGUID guid;
while (FinishedReqs_.Dequeue(&guid)) {
InProcess_.erase(guid);
}
}
private:
TLockFreeStack<TGUID> FinishedReqs_; //processed requests (responded or destroyed)
TStatesInProcessRequests InProcess_;
TSpinLock RequesterPtrPotector_;
std::atomic<TRequester*> P_;
};
public:
inline TRequester(IOnRequest* cb, ui16 port)
: CB_(cb)
, EH_(new TEventsHandler(this))
, R_(CreateHttpUdpRequester(port, EH_.Get()))
{
R_->EnableReportRequestCancel();
}
~TRequester() override {
Shutdown();
}
void Shutdown() noexcept {
if (!Shutdown_) {
Shutdown_ = true;
R_->StopNoWait();
EH_->SyncStop();
}
}
void OnRequest(TUdpHttpRequestPtr req, TSrvRequestState* state) {
CB_->OnRequest(new TRequest(req, state, this));
}
void RequestProcessed(const TRequest* r) {
EH_->RequestProcessed(r);
}
private:
IOnRequest* CB_;
TIntrusivePtr<TEventsHandler> EH_;
TIntrusivePtr<INetLibaRequester> R_;
bool Shutdown_ = false;
};
typedef TIntrusivePtr<TRequester> TRequesterRef;
class TRequesterAutoShutdown: public NNeh::IRequester {
public:
TRequesterAutoShutdown(const TRequesterRef& r)
: R_(r)
{
}
~TRequesterAutoShutdown() override {
R_->Shutdown();
}
private:
TRequesterRef R_;
};
class TProtocol: public IProtocol {
public:
THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
return Singleton<TNetLibaBus>()->Schedule(msg, fallback, ss);
}
NNeh::IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
TRequesterRef r(new TRequester(cb, loc.GetPort()));
return new TRequesterAutoShutdown(r);
}
TStringBuf Scheme() const noexcept override {
return TStringBuf("netliba");
}
};
}
}
IProtocol* NNeh::NetLibaProtocol() {
return Singleton<NNetLiba::TProtocol>();
}