diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/netliba.cpp | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/neh/netliba.cpp')
-rw-r--r-- | library/cpp/neh/netliba.cpp | 508 |
1 files changed, 508 insertions, 0 deletions
diff --git a/library/cpp/neh/netliba.cpp b/library/cpp/neh/netliba.cpp new file mode 100644 index 00000000000..f69906f3ba6 --- /dev/null +++ b/library/cpp/neh/netliba.cpp @@ -0,0 +1,508 @@ +#include "details.h" +#include "factory.h" +#include "http_common.h" +#include "location.h" +#include "multi.h" +#include "netliba.h" +#include "netliba_udp_http.h" +#include "lfqueue.h" +#include "utils.h" + +#include <library/cpp/dns/cache.h> + +#include <util/generic/hash.h> +#include <util/generic/singleton.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> +#include <util/string/cast.h> +#include <util/system/yassert.h> + +#include <atomic> + +using namespace NDns; +using namespace NNeh; +namespace NNeh { + size_t TNetLibaOptions::ClientThreads = 4; + TDuration TNetLibaOptions::AckTailEffect = TDuration::Seconds(30); + + bool TNetLibaOptions::Set(TStringBuf name, TStringBuf value) { +#define NETLIBA_TRY_SET(optType, optName) \ + if (name == TStringBuf(#optName)) { \ + optName = FromString<optType>(value); \ + } + + NETLIBA_TRY_SET(size_t, ClientThreads) + else NETLIBA_TRY_SET(TDuration, AckTailEffect) else { + return false; + } + return true; + } +} + +namespace { + namespace NNetLiba { + using namespace NNetliba; + using namespace NNehNetliba; + + typedef NNehNetliba::IRequester INetLibaRequester; + typedef TAutoPtr<TUdpHttpRequest> TUdpHttpRequestPtr; + typedef TAutoPtr<TUdpHttpResponse> TUdpHttpResponsePtr; + + static inline const addrinfo* FindIPBase(const TNetworkAddress* addr, int family) { + for (TNetworkAddress::TIterator it = addr->Begin(); it != addr->End(); ++it) { + if (it->ai_family == family) { + return &*it; + } + } + + return nullptr; + } + + static inline const sockaddr_in6& FindIP(const TNetworkAddress* addr) { + //prefer ipv6 + const addrinfo* ret = FindIPBase(addr, AF_INET6); + + if (!ret) { + ret = FindIPBase(addr, AF_INET); + } + + if (!ret) { + ythrow yexception() << "ip not supported by " << *addr; + } + + return *(const sockaddr_in6*)(ret->ai_addr); + } + + class TLastAckTimes { + struct TTimeVal { + TTimeVal() + : Val(0) + { + } + + std::atomic<TInstant::TValue> Val; + }; + + public: + TInstant::TValue Get(size_t idAddr) { + return Tm_.Get(idAddr).Val.load(std::memory_order_acquire); + } + + void Set(size_t idAddr) { + Tm_.Get(idAddr).Val.store(TInstant::Now().GetValue(), std::memory_order_release); + } + + static TLastAckTimes& Common() { + return *Singleton<TLastAckTimes>(); + } + + private: + NNeh::NHttp::TLockFreeSequence<TTimeVal> Tm_; + }; + + class TRequest: public TSimpleHandle { + public: + inline TRequest(TIntrusivePtr<INetLibaRequester>& r, size_t idAddr, const TMessage& msg, IOnRecv* cb, TStatCollector* s) + : TSimpleHandle(cb, msg, s) + , R_(r) + , IdAddr_(idAddr) + , Notified_(false) + { + CreateGuid(&Guid_); + } + + void Cancel() noexcept override { + TSimpleHandle::Cancel(); + R_->CancelRequest(Guid_); + } + + inline const TString& Addr() const noexcept { + return Message().Addr; + } + + inline const TGUID& Guid() const noexcept { + return Guid_; + } + + //return false if already notifie + inline bool SetNotified() noexcept { + bool ret = Notified_; + Notified_ = true; + return !ret; + } + + void OnSend() { + if (TNetLibaOptions::AckTailEffect.GetValue() && TLastAckTimes::Common().Get(IdAddr_) + TNetLibaOptions::AckTailEffect.GetValue() > TInstant::Now().GetValue()) { + //fake(predicted) completing detection + SetSendComplete(); + } + } + + void OnRequestAck() { + if (TNetLibaOptions::AckTailEffect.GetValue()) { + TLastAckTimes::Common().Set(IdAddr_); + } + SetSendComplete(); + } + + private: + TIntrusivePtr<INetLibaRequester> R_; + size_t IdAddr_; + TGUID Guid_; + bool Notified_; + }; + + typedef TIntrusivePtr<TRequest> TRequestRef; + + class TNetLibaBus { + class TEventsHandler: public IEventsCollector { + typedef THashMap<TGUID, TRequestRef, TGUIDHash> TInFly; + + public: + inline void OnSend(TRequestRef& req) { + Q_.Enqueue(req); + req->OnSend(); + } + + private: + void UpdateInFly() { + TRequestRef req; + + while (Q_.Dequeue(&req)) { + if (!req) { + return; + } + + InFly_[req->Guid()] = req; + } + } + + void AddRequest(TUdpHttpRequest* req) override { + //ignore received requests in client + delete req; + } + + void AddResponse(TUdpHttpResponse* resp) override { + TUdpHttpResponsePtr ptr(resp); + + UpdateInFly(); + TInFly::iterator it = InFly_.find(resp->ReqId); + + Y_VERIFY(it != InFly_.end(), "incorrect incoming message"); + + TRequestRef& req = it->second; + + if (req->SetNotified()) { + if (resp->Ok == TUdpHttpResponse::OK) { + req->NotifyResponse(TString(resp->Data.data(), resp->Data.size())); + } else { + if (resp->Ok == TUdpHttpResponse::CANCELED) { + req->NotifyError(new TError(resp->Error, TError::Cancelled)); + } else { + req->NotifyError(new TError(resp->Error)); + } + } + } + + InFly_.erase(it); + } + + void AddCancel(const TGUID& guid) override { + UpdateInFly(); + TInFly::iterator it = InFly_.find(guid); + + if (it != InFly_.end() && it->second->SetNotified()) { + it->second->NotifyError("Canceled (before ack)"); + } + } + + void AddRequestAck(const TGUID& guid) override { + UpdateInFly(); + TInFly::iterator it = InFly_.find(guid); + + Y_VERIFY(it != InFly_.end(), "incorrect complete notification"); + + it->second->OnRequestAck(); + } + + private: + TLockFreeQueue<TRequestRef> Q_; + TInFly InFly_; + }; + + struct TClientThread { + TClientThread(int physicalCpu) + : EH_(new TEventsHandler()) + , R_(CreateHttpUdpRequester(0, IEventsCollectorRef(EH_.Get()), physicalCpu)) + { + R_->EnableReportRequestAck(); + } + + ~TClientThread() { + R_->StopNoWait(); + } + + TIntrusivePtr<TEventsHandler> EH_; + TIntrusivePtr<INetLibaRequester> R_; + }; + + public: + TNetLibaBus() { + for (size_t i = 0; i < TNetLibaOptions::ClientThreads; ++i) { + Clnt_.push_back(new TClientThread(i)); + } + } + + inline THandleRef Schedule(const TMessage& msg, IOnRecv* cb, TServiceStatRef& ss) { + TParsedLocation loc(msg.Addr); + TUdpAddress addr; + + const TResolvedHost* resHost = CachedResolve(TResolveInfo(loc.Host, loc.GetPort())); + GetUdpAddress(&addr, FindIP(&resHost->Addr)); + + TClientThread& clnt = *Clnt_[resHost->Id % Clnt_.size()]; + TIntrusivePtr<INetLibaRequester> rr = clnt.R_; + TRequestRef req(new TRequest(rr, resHost->Id, msg, cb, !ss ? nullptr : new TStatCollector(ss))); + + clnt.EH_->OnSend(req); + rr->SendRequest(addr, ToString(loc.Service), msg.Data, req->Guid()); + + return THandleRef(req.Get()); + } + + private: + TVector<TAutoPtr<TClientThread>> Clnt_; + }; + + //server + class TRequester: public TThrRefBase { + struct TSrvRequestState: public TAtomicRefCount<TSrvRequestState> { + TSrvRequestState() + : Canceled(false) + { + } + + TAtomicBool Canceled; + }; + + class TRequest: public IRequest { + public: + inline TRequest(TAutoPtr<TUdpHttpRequest> req, TIntrusivePtr<TSrvRequestState> state, TRequester* parent) + : R_(req) + , S_(state) + , P_(parent) + { + } + + ~TRequest() override { + if (!!P_) { + P_->RequestProcessed(this); + } + } + + TStringBuf Scheme() const override { + return TStringBuf("netliba"); + } + + TString RemoteHost() const override { + if (!H_) { + TUdpAddress tmp(R_->PeerAddress); + tmp.Scope = 0; //discard scope from serialized addr + + TString addr = GetAddressAsString(tmp); + + TStringBuf host, port; + + TStringBuf(addr).RSplit(':', host, port); + H_ = host; + } + return H_; + } + + TStringBuf Service() const override { + return TStringBuf(R_->Url.c_str(), R_->Url.length()); + } + + TStringBuf Data() const override { + return TStringBuf((const char*)R_->Data.data(), R_->Data.size()); + } + + TStringBuf RequestId() const override { + const TGUID& g = R_->ReqId; + + return TStringBuf((const char*)g.dw, sizeof(g.dw)); + } + + bool Canceled() const override { + return S_->Canceled; + } + + void SendReply(TData& data) override { + TIntrusivePtr<TRequester> p; + p.Swap(P_); + if (!!p) { + if (!Canceled()) { + p->R_->SendResponse(R_->ReqId, &data); + } + p->RequestProcessed(this); + } + } + + void SendError(TResponseError, const TString&) override { + // TODO + } + + inline const TGUID& RequestGuid() const noexcept { + return R_->ReqId; + } + + private: + TAutoPtr<TUdpHttpRequest> R_; + mutable TString H_; + TIntrusivePtr<TSrvRequestState> S_; + TIntrusivePtr<TRequester> P_; + }; + + class TEventsHandler: public IEventsCollector { + public: + TEventsHandler(TRequester* parent) + { + P_.store(parent, std::memory_order_release); + } + + void RequestProcessed(const TRequest* r) { + FinishedReqs_.Enqueue(r->RequestGuid()); + } + + //thread safe method for disable proxy callbacks to parent (OnRequest(...)) + void SyncStop() { + P_.store(nullptr, std::memory_order_release); + while (!RequesterPtrPotector_.TryAcquire()) { + Sleep(TDuration::MicroSeconds(100)); + } + RequesterPtrPotector_.Release(); + } + + private: + typedef THashMap<TGUID, TIntrusivePtr<TSrvRequestState>, TGUIDHash> TStatesInProcessRequests; + + void AddRequest(TUdpHttpRequest* req) override { + TUdpHttpRequestPtr ptr(req); + + TSrvRequestState* state = new TSrvRequestState(); + + InProcess_[req->ReqId] = state; + try { + TGuard<TSpinLock> m(RequesterPtrPotector_); + if (TRequester* p = P_.load(std::memory_order_acquire)) { + p->OnRequest(ptr, state); //move req. owning to parent + } + } catch (...) { + Cdbg << "ignore exc.: " << CurrentExceptionMessage() << Endl; + } + } + + void AddResponse(TUdpHttpResponse*) override { + Y_FAIL("unexpected response in neh netliba server"); + } + + void AddCancel(const TGUID& guid) override { + UpdateInProcess(); + TStatesInProcessRequests::iterator ustate = InProcess_.find(guid); + if (ustate != InProcess_.end()) + ustate->second->Canceled = true; + } + + void AddRequestAck(const TGUID&) override { + Y_FAIL("unexpected acc in neh netliba server"); + } + + void UpdateInProcess() { + TGUID guid; + + while (FinishedReqs_.Dequeue(&guid)) { + InProcess_.erase(guid); + } + } + + private: + TLockFreeStack<TGUID> FinishedReqs_; //processed requests (responded or destroyed) + TStatesInProcessRequests InProcess_; + TSpinLock RequesterPtrPotector_; + std::atomic<TRequester*> P_; + }; + + public: + inline TRequester(IOnRequest* cb, ui16 port) + : CB_(cb) + , EH_(new TEventsHandler(this)) + , R_(CreateHttpUdpRequester(port, EH_.Get())) + { + R_->EnableReportRequestCancel(); + } + + ~TRequester() override { + Shutdown(); + } + + void Shutdown() noexcept { + if (!Shutdown_) { + Shutdown_ = true; + R_->StopNoWait(); + EH_->SyncStop(); + } + } + + void OnRequest(TUdpHttpRequestPtr req, TSrvRequestState* state) { + CB_->OnRequest(new TRequest(req, state, this)); + } + + void RequestProcessed(const TRequest* r) { + EH_->RequestProcessed(r); + } + + private: + IOnRequest* CB_; + TIntrusivePtr<TEventsHandler> EH_; + TIntrusivePtr<INetLibaRequester> R_; + bool Shutdown_ = false; + }; + + typedef TIntrusivePtr<TRequester> TRequesterRef; + + class TRequesterAutoShutdown: public NNeh::IRequester { + public: + TRequesterAutoShutdown(const TRequesterRef& r) + : R_(r) + { + } + + ~TRequesterAutoShutdown() override { + R_->Shutdown(); + } + + private: + TRequesterRef R_; + }; + + class TProtocol: public IProtocol { + public: + THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override { + return Singleton<TNetLibaBus>()->Schedule(msg, fallback, ss); + } + + NNeh::IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override { + TRequesterRef r(new TRequester(cb, loc.GetPort())); + return new TRequesterAutoShutdown(r); + } + + TStringBuf Scheme() const noexcept override { + return TStringBuf("netliba"); + } + }; + } +} + +IProtocol* NNeh::NetLibaProtocol() { + return Singleton<NNetLiba::TProtocol>(); +} |