aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/netliba.cpp
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/netliba.cpp
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/neh/netliba.cpp')
-rw-r--r--library/cpp/neh/netliba.cpp508
1 files changed, 508 insertions, 0 deletions
diff --git a/library/cpp/neh/netliba.cpp b/library/cpp/neh/netliba.cpp
new file mode 100644
index 00000000000..f69906f3ba6
--- /dev/null
+++ b/library/cpp/neh/netliba.cpp
@@ -0,0 +1,508 @@
+#include "details.h"
+#include "factory.h"
+#include "http_common.h"
+#include "location.h"
+#include "multi.h"
+#include "netliba.h"
+#include "netliba_udp_http.h"
+#include "lfqueue.h"
+#include "utils.h"
+
+#include <library/cpp/dns/cache.h>
+
+#include <util/generic/hash.h>
+#include <util/generic/singleton.h>
+#include <util/generic/vector.h>
+#include <util/generic/yexception.h>
+#include <util/string/cast.h>
+#include <util/system/yassert.h>
+
+#include <atomic>
+
+using namespace NDns;
+using namespace NNeh;
+namespace NNeh {
+ size_t TNetLibaOptions::ClientThreads = 4;
+ TDuration TNetLibaOptions::AckTailEffect = TDuration::Seconds(30);
+
+ bool TNetLibaOptions::Set(TStringBuf name, TStringBuf value) {
+#define NETLIBA_TRY_SET(optType, optName) \
+ if (name == TStringBuf(#optName)) { \
+ optName = FromString<optType>(value); \
+ }
+
+ NETLIBA_TRY_SET(size_t, ClientThreads)
+ else NETLIBA_TRY_SET(TDuration, AckTailEffect) else {
+ return false;
+ }
+ return true;
+ }
+}
+
+namespace {
+ namespace NNetLiba {
+ using namespace NNetliba;
+ using namespace NNehNetliba;
+
+ typedef NNehNetliba::IRequester INetLibaRequester;
+ typedef TAutoPtr<TUdpHttpRequest> TUdpHttpRequestPtr;
+ typedef TAutoPtr<TUdpHttpResponse> TUdpHttpResponsePtr;
+
+ static inline const addrinfo* FindIPBase(const TNetworkAddress* addr, int family) {
+ for (TNetworkAddress::TIterator it = addr->Begin(); it != addr->End(); ++it) {
+ if (it->ai_family == family) {
+ return &*it;
+ }
+ }
+
+ return nullptr;
+ }
+
+ static inline const sockaddr_in6& FindIP(const TNetworkAddress* addr) {
+ //prefer ipv6
+ const addrinfo* ret = FindIPBase(addr, AF_INET6);
+
+ if (!ret) {
+ ret = FindIPBase(addr, AF_INET);
+ }
+
+ if (!ret) {
+ ythrow yexception() << "ip not supported by " << *addr;
+ }
+
+ return *(const sockaddr_in6*)(ret->ai_addr);
+ }
+
+ class TLastAckTimes {
+ struct TTimeVal {
+ TTimeVal()
+ : Val(0)
+ {
+ }
+
+ std::atomic<TInstant::TValue> Val;
+ };
+
+ public:
+ TInstant::TValue Get(size_t idAddr) {
+ return Tm_.Get(idAddr).Val.load(std::memory_order_acquire);
+ }
+
+ void Set(size_t idAddr) {
+ Tm_.Get(idAddr).Val.store(TInstant::Now().GetValue(), std::memory_order_release);
+ }
+
+ static TLastAckTimes& Common() {
+ return *Singleton<TLastAckTimes>();
+ }
+
+ private:
+ NNeh::NHttp::TLockFreeSequence<TTimeVal> Tm_;
+ };
+
+ class TRequest: public TSimpleHandle {
+ public:
+ inline TRequest(TIntrusivePtr<INetLibaRequester>& r, size_t idAddr, const TMessage& msg, IOnRecv* cb, TStatCollector* s)
+ : TSimpleHandle(cb, msg, s)
+ , R_(r)
+ , IdAddr_(idAddr)
+ , Notified_(false)
+ {
+ CreateGuid(&Guid_);
+ }
+
+ void Cancel() noexcept override {
+ TSimpleHandle::Cancel();
+ R_->CancelRequest(Guid_);
+ }
+
+ inline const TString& Addr() const noexcept {
+ return Message().Addr;
+ }
+
+ inline const TGUID& Guid() const noexcept {
+ return Guid_;
+ }
+
+ //return false if already notifie
+ inline bool SetNotified() noexcept {
+ bool ret = Notified_;
+ Notified_ = true;
+ return !ret;
+ }
+
+ void OnSend() {
+ if (TNetLibaOptions::AckTailEffect.GetValue() && TLastAckTimes::Common().Get(IdAddr_) + TNetLibaOptions::AckTailEffect.GetValue() > TInstant::Now().GetValue()) {
+ //fake(predicted) completing detection
+ SetSendComplete();
+ }
+ }
+
+ void OnRequestAck() {
+ if (TNetLibaOptions::AckTailEffect.GetValue()) {
+ TLastAckTimes::Common().Set(IdAddr_);
+ }
+ SetSendComplete();
+ }
+
+ private:
+ TIntrusivePtr<INetLibaRequester> R_;
+ size_t IdAddr_;
+ TGUID Guid_;
+ bool Notified_;
+ };
+
+ typedef TIntrusivePtr<TRequest> TRequestRef;
+
+ class TNetLibaBus {
+ class TEventsHandler: public IEventsCollector {
+ typedef THashMap<TGUID, TRequestRef, TGUIDHash> TInFly;
+
+ public:
+ inline void OnSend(TRequestRef& req) {
+ Q_.Enqueue(req);
+ req->OnSend();
+ }
+
+ private:
+ void UpdateInFly() {
+ TRequestRef req;
+
+ while (Q_.Dequeue(&req)) {
+ if (!req) {
+ return;
+ }
+
+ InFly_[req->Guid()] = req;
+ }
+ }
+
+ void AddRequest(TUdpHttpRequest* req) override {
+ //ignore received requests in client
+ delete req;
+ }
+
+ void AddResponse(TUdpHttpResponse* resp) override {
+ TUdpHttpResponsePtr ptr(resp);
+
+ UpdateInFly();
+ TInFly::iterator it = InFly_.find(resp->ReqId);
+
+ Y_VERIFY(it != InFly_.end(), "incorrect incoming message");
+
+ TRequestRef& req = it->second;
+
+ if (req->SetNotified()) {
+ if (resp->Ok == TUdpHttpResponse::OK) {
+ req->NotifyResponse(TString(resp->Data.data(), resp->Data.size()));
+ } else {
+ if (resp->Ok == TUdpHttpResponse::CANCELED) {
+ req->NotifyError(new TError(resp->Error, TError::Cancelled));
+ } else {
+ req->NotifyError(new TError(resp->Error));
+ }
+ }
+ }
+
+ InFly_.erase(it);
+ }
+
+ void AddCancel(const TGUID& guid) override {
+ UpdateInFly();
+ TInFly::iterator it = InFly_.find(guid);
+
+ if (it != InFly_.end() && it->second->SetNotified()) {
+ it->second->NotifyError("Canceled (before ack)");
+ }
+ }
+
+ void AddRequestAck(const TGUID& guid) override {
+ UpdateInFly();
+ TInFly::iterator it = InFly_.find(guid);
+
+ Y_VERIFY(it != InFly_.end(), "incorrect complete notification");
+
+ it->second->OnRequestAck();
+ }
+
+ private:
+ TLockFreeQueue<TRequestRef> Q_;
+ TInFly InFly_;
+ };
+
+ struct TClientThread {
+ TClientThread(int physicalCpu)
+ : EH_(new TEventsHandler())
+ , R_(CreateHttpUdpRequester(0, IEventsCollectorRef(EH_.Get()), physicalCpu))
+ {
+ R_->EnableReportRequestAck();
+ }
+
+ ~TClientThread() {
+ R_->StopNoWait();
+ }
+
+ TIntrusivePtr<TEventsHandler> EH_;
+ TIntrusivePtr<INetLibaRequester> R_;
+ };
+
+ public:
+ TNetLibaBus() {
+ for (size_t i = 0; i < TNetLibaOptions::ClientThreads; ++i) {
+ Clnt_.push_back(new TClientThread(i));
+ }
+ }
+
+ inline THandleRef Schedule(const TMessage& msg, IOnRecv* cb, TServiceStatRef& ss) {
+ TParsedLocation loc(msg.Addr);
+ TUdpAddress addr;
+
+ const TResolvedHost* resHost = CachedResolve(TResolveInfo(loc.Host, loc.GetPort()));
+ GetUdpAddress(&addr, FindIP(&resHost->Addr));
+
+ TClientThread& clnt = *Clnt_[resHost->Id % Clnt_.size()];
+ TIntrusivePtr<INetLibaRequester> rr = clnt.R_;
+ TRequestRef req(new TRequest(rr, resHost->Id, msg, cb, !ss ? nullptr : new TStatCollector(ss)));
+
+ clnt.EH_->OnSend(req);
+ rr->SendRequest(addr, ToString(loc.Service), msg.Data, req->Guid());
+
+ return THandleRef(req.Get());
+ }
+
+ private:
+ TVector<TAutoPtr<TClientThread>> Clnt_;
+ };
+
+ //server
+ class TRequester: public TThrRefBase {
+ struct TSrvRequestState: public TAtomicRefCount<TSrvRequestState> {
+ TSrvRequestState()
+ : Canceled(false)
+ {
+ }
+
+ TAtomicBool Canceled;
+ };
+
+ class TRequest: public IRequest {
+ public:
+ inline TRequest(TAutoPtr<TUdpHttpRequest> req, TIntrusivePtr<TSrvRequestState> state, TRequester* parent)
+ : R_(req)
+ , S_(state)
+ , P_(parent)
+ {
+ }
+
+ ~TRequest() override {
+ if (!!P_) {
+ P_->RequestProcessed(this);
+ }
+ }
+
+ TStringBuf Scheme() const override {
+ return TStringBuf("netliba");
+ }
+
+ TString RemoteHost() const override {
+ if (!H_) {
+ TUdpAddress tmp(R_->PeerAddress);
+ tmp.Scope = 0; //discard scope from serialized addr
+
+ TString addr = GetAddressAsString(tmp);
+
+ TStringBuf host, port;
+
+ TStringBuf(addr).RSplit(':', host, port);
+ H_ = host;
+ }
+ return H_;
+ }
+
+ TStringBuf Service() const override {
+ return TStringBuf(R_->Url.c_str(), R_->Url.length());
+ }
+
+ TStringBuf Data() const override {
+ return TStringBuf((const char*)R_->Data.data(), R_->Data.size());
+ }
+
+ TStringBuf RequestId() const override {
+ const TGUID& g = R_->ReqId;
+
+ return TStringBuf((const char*)g.dw, sizeof(g.dw));
+ }
+
+ bool Canceled() const override {
+ return S_->Canceled;
+ }
+
+ void SendReply(TData& data) override {
+ TIntrusivePtr<TRequester> p;
+ p.Swap(P_);
+ if (!!p) {
+ if (!Canceled()) {
+ p->R_->SendResponse(R_->ReqId, &data);
+ }
+ p->RequestProcessed(this);
+ }
+ }
+
+ void SendError(TResponseError, const TString&) override {
+ // TODO
+ }
+
+ inline const TGUID& RequestGuid() const noexcept {
+ return R_->ReqId;
+ }
+
+ private:
+ TAutoPtr<TUdpHttpRequest> R_;
+ mutable TString H_;
+ TIntrusivePtr<TSrvRequestState> S_;
+ TIntrusivePtr<TRequester> P_;
+ };
+
+ class TEventsHandler: public IEventsCollector {
+ public:
+ TEventsHandler(TRequester* parent)
+ {
+ P_.store(parent, std::memory_order_release);
+ }
+
+ void RequestProcessed(const TRequest* r) {
+ FinishedReqs_.Enqueue(r->RequestGuid());
+ }
+
+ //thread safe method for disable proxy callbacks to parent (OnRequest(...))
+ void SyncStop() {
+ P_.store(nullptr, std::memory_order_release);
+ while (!RequesterPtrPotector_.TryAcquire()) {
+ Sleep(TDuration::MicroSeconds(100));
+ }
+ RequesterPtrPotector_.Release();
+ }
+
+ private:
+ typedef THashMap<TGUID, TIntrusivePtr<TSrvRequestState>, TGUIDHash> TStatesInProcessRequests;
+
+ void AddRequest(TUdpHttpRequest* req) override {
+ TUdpHttpRequestPtr ptr(req);
+
+ TSrvRequestState* state = new TSrvRequestState();
+
+ InProcess_[req->ReqId] = state;
+ try {
+ TGuard<TSpinLock> m(RequesterPtrPotector_);
+ if (TRequester* p = P_.load(std::memory_order_acquire)) {
+ p->OnRequest(ptr, state); //move req. owning to parent
+ }
+ } catch (...) {
+ Cdbg << "ignore exc.: " << CurrentExceptionMessage() << Endl;
+ }
+ }
+
+ void AddResponse(TUdpHttpResponse*) override {
+ Y_FAIL("unexpected response in neh netliba server");
+ }
+
+ void AddCancel(const TGUID& guid) override {
+ UpdateInProcess();
+ TStatesInProcessRequests::iterator ustate = InProcess_.find(guid);
+ if (ustate != InProcess_.end())
+ ustate->second->Canceled = true;
+ }
+
+ void AddRequestAck(const TGUID&) override {
+ Y_FAIL("unexpected acc in neh netliba server");
+ }
+
+ void UpdateInProcess() {
+ TGUID guid;
+
+ while (FinishedReqs_.Dequeue(&guid)) {
+ InProcess_.erase(guid);
+ }
+ }
+
+ private:
+ TLockFreeStack<TGUID> FinishedReqs_; //processed requests (responded or destroyed)
+ TStatesInProcessRequests InProcess_;
+ TSpinLock RequesterPtrPotector_;
+ std::atomic<TRequester*> P_;
+ };
+
+ public:
+ inline TRequester(IOnRequest* cb, ui16 port)
+ : CB_(cb)
+ , EH_(new TEventsHandler(this))
+ , R_(CreateHttpUdpRequester(port, EH_.Get()))
+ {
+ R_->EnableReportRequestCancel();
+ }
+
+ ~TRequester() override {
+ Shutdown();
+ }
+
+ void Shutdown() noexcept {
+ if (!Shutdown_) {
+ Shutdown_ = true;
+ R_->StopNoWait();
+ EH_->SyncStop();
+ }
+ }
+
+ void OnRequest(TUdpHttpRequestPtr req, TSrvRequestState* state) {
+ CB_->OnRequest(new TRequest(req, state, this));
+ }
+
+ void RequestProcessed(const TRequest* r) {
+ EH_->RequestProcessed(r);
+ }
+
+ private:
+ IOnRequest* CB_;
+ TIntrusivePtr<TEventsHandler> EH_;
+ TIntrusivePtr<INetLibaRequester> R_;
+ bool Shutdown_ = false;
+ };
+
+ typedef TIntrusivePtr<TRequester> TRequesterRef;
+
+ class TRequesterAutoShutdown: public NNeh::IRequester {
+ public:
+ TRequesterAutoShutdown(const TRequesterRef& r)
+ : R_(r)
+ {
+ }
+
+ ~TRequesterAutoShutdown() override {
+ R_->Shutdown();
+ }
+
+ private:
+ TRequesterRef R_;
+ };
+
+ class TProtocol: public IProtocol {
+ public:
+ THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
+ return Singleton<TNetLibaBus>()->Schedule(msg, fallback, ss);
+ }
+
+ NNeh::IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
+ TRequesterRef r(new TRequester(cb, loc.GetPort()));
+ return new TRequesterAutoShutdown(r);
+ }
+
+ TStringBuf Scheme() const noexcept override {
+ return TStringBuf("netliba");
+ }
+ };
+ }
+}
+
+IProtocol* NNeh::NetLibaProtocol() {
+ return Singleton<NNetLiba::TProtocol>();
+}