#pragma once #include "wfmo.h" #include "stat.h" #include <library/cpp/http/io/headers.h> #include <util/generic/ptr.h> #include <util/generic/string.h> #include <util/datetime/base.h> #include <utility> namespace NNeh { struct TMessage { TMessage() = default; inline TMessage(TString addr, TString data) : Addr(std::move(addr)) , Data(std::move(data)) { } static TMessage FromString(TStringBuf request); TString Addr; TString Data; }; using TMessageRef = TAutoPtr<TMessage>; struct TError { public: enum TType { UnknownType, Cancelled, ProtocolSpecific }; TError(TString text, TType type = UnknownType, i32 code = 0, i32 systemCode = 0) : Type(std::move(type)) , Code(code) , Text(text) , SystemCode(systemCode) { } TType Type = UnknownType; i32 Code = 0; // protocol specific code (example(http): 404) TString Text; i32 SystemCode = 0; // system error code }; using TErrorRef = TAutoPtr<TError>; struct TResponse; using TResponseRef = TAutoPtr<TResponse>; struct TResponse { inline TResponse(TMessage req, TString data, const TDuration duration) : TResponse(std::move(req), std::move(data), duration, {} /* firstLine */, {} /* headers */, {} /* error */) { } inline TResponse(TMessage req, TString data, const TDuration duration, TString firstLine, THttpHeaders headers) : TResponse(std::move(req), std::move(data), duration, std::move(firstLine), std::move(headers), {} /* error */) { } inline TResponse(TMessage req, TString data, const TDuration duration, TString firstLine, THttpHeaders headers, TErrorRef error) : Request(std::move(req)) , Data(std::move(data)) , Duration(duration) , FirstLine(std::move(firstLine)) , Headers(std::move(headers)) , Error_(std::move(error)) { } inline static TResponseRef FromErrorText(TMessage msg, TString error, const TDuration duration) { return new TResponse(std::move(msg), {} /* data */, duration, {} /* firstLine */, {} /* headers */, new TError(std::move(error))); } inline static TResponseRef FromError(TMessage msg, TErrorRef error, const TDuration duration) { return new TResponse(std::move(msg), {} /* data */, duration, {} /* firstLine */, {} /* headers */, error); } inline static TResponseRef FromError(TMessage msg, TErrorRef error, const TDuration duration, TString data, TString firstLine, THttpHeaders headers) { return new TResponse(std::move(msg), std::move(data), duration, std::move(firstLine), std::move(headers), error); } inline static TResponseRef FromError( TMessage msg, TErrorRef error, TString data, const TDuration duration, TString firstLine, THttpHeaders headers) { return new TResponse(std::move(msg), std::move(data), duration, std::move(firstLine), std::move(headers), error); } inline bool IsError() const { return Error_.Get(); } inline TError::TType GetErrorType() const { return Error_.Get() ? Error_->Type : TError::UnknownType; } inline i32 GetErrorCode() const { return Error_.Get() ? Error_->Code : 0; } inline i32 GetSystemErrorCode() const { return Error_.Get() ? Error_->SystemCode : 0; } inline TString GetErrorText() const { return Error_.Get() ? Error_->Text : TString(); } const TMessage Request; const TString Data; const TDuration Duration; const TString FirstLine; THttpHeaders Headers; private: THolder<TError> Error_; }; class THandle; class IOnRecv { public: virtual ~IOnRecv() = default; virtual void OnNotify(THandle&) { } //callback on receive response virtual void OnEnd() { } //response was extracted by Wait() method, - OnRecv() will not be called virtual void OnRecv(THandle& resp) = 0; //callback on destroy handler }; class THandle: public TThrRefBase, public TWaitHandle { public: inline THandle(IOnRecv* f, TStatCollector* s = nullptr) noexcept : F_(f) , Stat_(s) { } ~THandle() override { if (F_) { try { F_->OnRecv(*this); } catch (...) { } } } virtual bool MessageSendedCompletely() const noexcept { //TODO return true; } virtual void Cancel() noexcept { //TODO if (!!Stat_) Stat_->OnCancel(); } inline const TResponse* Response() const noexcept { return R_.Get(); } //method MUST be called only after success Wait() for this handle or from callback IOnRecv::OnRecv() //else exist chance for memory leak (race between Get()/Notify()) inline TResponseRef Get() noexcept { return R_; } inline bool Wait(TResponseRef& msg, const TInstant deadLine) { if (WaitForOne(*this, deadLine)) { if (F_) { F_->OnEnd(); F_ = nullptr; } msg = Get(); return true; } return false; } inline bool Wait(TResponseRef& msg, const TDuration timeOut) { return Wait(msg, timeOut.ToDeadLine()); } inline bool Wait(TResponseRef& msg) { return Wait(msg, TInstant::Max()); } inline TResponseRef Wait(const TInstant deadLine) { TResponseRef ret; Wait(ret, deadLine); return ret; } inline TResponseRef Wait(const TDuration timeOut) { return Wait(timeOut.ToDeadLine()); } inline TResponseRef Wait() { return Wait(TInstant::Max()); } protected: inline void Notify(TResponseRef resp) { if (!!Stat_) { if (!resp || resp->IsError()) { Stat_->OnFail(); } else { Stat_->OnSuccess(); } } R_.Swap(resp); if (F_) { try { F_->OnNotify(*this); } catch (...) { } } Signal(); } IOnRecv* F_; private: TResponseRef R_; THolder<TStatCollector> Stat_; }; using THandleRef = TIntrusivePtr<THandle>; THandleRef Request(const TMessage& msg, IOnRecv* fallback); inline THandleRef Request(const TMessage& msg) { return Request(msg, nullptr); } THandleRef Request(const TString& req, IOnRecv* fallback); inline THandleRef Request(const TString& req) { return Request(req, nullptr); } class IMultiRequester { public: virtual ~IMultiRequester() = default; virtual void Add(const THandleRef& req) = 0; virtual void Del(const THandleRef& req) = 0; virtual bool Wait(THandleRef& req, TInstant deadLine) = 0; virtual bool IsEmpty() const = 0; inline void Schedule(const TString& req) { Add(Request(req)); } inline bool Wait(THandleRef& req, TDuration timeOut) { return Wait(req, timeOut.ToDeadLine()); } inline bool Wait(THandleRef& req) { return Wait(req, TInstant::Max()); } inline bool Wait(TResponseRef& resp, TInstant deadLine) { THandleRef req; while (Wait(req, deadLine)) { resp = req->Get(); if (!!resp) { return true; } } return false; } inline bool Wait(TResponseRef& resp) { return Wait(resp, TInstant::Max()); } }; using IMultiRequesterRef = TAutoPtr<IMultiRequester>; IMultiRequesterRef CreateRequester(); bool SetProtocolOption(TStringBuf protoOption, TStringBuf value); }