#include "neh.h" #include "details.h" #include "factory.h" #include <util/generic/list.h> #include <util/generic/hash_set.h> #include <util/digest/numeric.h> #include <util/string/cast.h> using namespace NNeh; namespace { class TMultiRequester: public IMultiRequester { struct TOps { template <class T> inline bool operator()(const T& l, const T& r) const noexcept { return l.Get() == r.Get(); } template <class T> inline size_t operator()(const T& t) const noexcept { return NumericHash(t.Get()); } }; struct TOnComplete { TMultiRequester* Parent; bool Signalled; inline TOnComplete(TMultiRequester* parent) : Parent(parent) , Signalled(false) { } inline void operator()(TWaitHandle* wh) { THandleRef req(static_cast<THandle*>(wh)); Signalled = true; Parent->OnComplete(req); } }; public: void Add(const THandleRef& req) override { Reqs_.insert(req); req->Register(WaitQueue_); } void Del(const THandleRef& req) override { Reqs_.erase(req); } bool Wait(THandleRef& req, TInstant deadLine) override { while (Complete_.empty()) { if (Reqs_.empty()) { return false; } TOnComplete cb(this); WaitForMultipleObj(*WaitQueue_, deadLine, cb); if (!cb.Signalled) { return false; } } req = *Complete_.begin(); Complete_.pop_front(); return true; } bool IsEmpty() const override { return Reqs_.empty() && Complete_.empty(); } inline void OnComplete(const THandleRef& req) { Complete_.push_back(req); Del(req); } private: typedef THashSet<THandleRef, TOps, TOps> TReqs; typedef TList<THandleRef> TComplete; TIntrusivePtr<TWaitQueue> WaitQueue_ = MakeIntrusive<TWaitQueue>(); TReqs Reqs_; TComplete Complete_; }; inline IProtocol* ProtocolForMessage(const TMessage& msg) { return ProtocolFactory()->Protocol(TStringBuf(msg.Addr).Before(':')); } } NNeh::TMessage NNeh::TMessage::FromString(const TStringBuf req) { TStringBuf addr; TStringBuf data; req.Split('?', addr, data); return TMessage(ToString(addr), ToString(data)); } namespace { const TString svcFail = "service status: failed"; } THandleRef NNeh::Request(const TMessage& msg, IOnRecv* fallback, bool useAsyncSendRequest) { TServiceStatRef ss; if (TServiceStat::Disabled()) { return ProtocolForMessage(msg)->ScheduleAsyncRequest(msg, fallback, ss, useAsyncSendRequest); } ss = GetServiceStat(msg.Addr); TServiceStat::EStatus es = ss->GetStatus(); if (es == TServiceStat::Ok) { return ProtocolForMessage(msg)->ScheduleAsyncRequest(msg, fallback, ss, useAsyncSendRequest); } if (es == TServiceStat::ReTry) { //send empty data request for validating service (update TServiceStat info) TMessage validator; validator.Addr = msg.Addr; ProtocolForMessage(msg)->ScheduleAsyncRequest(validator, nullptr, ss, useAsyncSendRequest); } TNotifyHandleRef h(new TNotifyHandle(fallback, msg)); h->NotifyError(new TError(svcFail)); return h.Get(); } THandleRef NNeh::Request(const TString& req, IOnRecv* fallback) { return Request(TMessage::FromString(req), fallback); } IMultiRequesterRef NNeh::CreateRequester() { return new TMultiRequester(); } bool NNeh::SetProtocolOption(TStringBuf protoOption, TStringBuf value) { return ProtocolFactory()->Protocol(protoOption.Before('/'))->SetOption(protoOption.After('/'), value); }