diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/neh.h | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/neh/neh.h')
-rw-r--r-- | library/cpp/neh/neh.h | 320 |
1 files changed, 320 insertions, 0 deletions
diff --git a/library/cpp/neh/neh.h b/library/cpp/neh/neh.h new file mode 100644 index 0000000000..e0211a7dff --- /dev/null +++ b/library/cpp/neh/neh.h @@ -0,0 +1,320 @@ +#pragma once + +#include "wfmo.h" +#include "stat.h" + +#include <library/cpp/http/io/headers.h> + +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/datetime/base.h> + +#include <utility> + +namespace NNeh { + struct TMessage { + TMessage() = default; + + inline TMessage(TString addr, TString data) + : Addr(std::move(addr)) + , Data(std::move(data)) + { + } + + static TMessage FromString(TStringBuf request); + + TString Addr; + TString Data; + }; + + using TMessageRef = TAutoPtr<TMessage>; + + struct TError { + public: + enum TType { + UnknownType, + Cancelled, + ProtocolSpecific + }; + + TError(TString text, TType type = UnknownType, i32 code = 0, i32 systemCode = 0) + : Type(std::move(type)) + , Code(code) + , Text(text) + , SystemCode(systemCode) + { + } + + TType Type = UnknownType; + i32 Code = 0; // protocol specific code (example(http): 404) + TString Text; + i32 SystemCode = 0; // system error code + }; + + using TErrorRef = TAutoPtr<TError>; + + struct TResponse; + using TResponseRef = TAutoPtr<TResponse>; + + struct TResponse { + inline TResponse(TMessage req, + TString data, + const TDuration duration) + : TResponse(std::move(req), std::move(data), duration, {} /* firstLine */, {} /* headers */, {} /* error */) + { + } + + inline TResponse(TMessage req, + TString data, + const TDuration duration, + TString firstLine, + THttpHeaders headers) + : TResponse(std::move(req), std::move(data), duration, std::move(firstLine), std::move(headers), {} /* error */) + { + } + + inline TResponse(TMessage req, + TString data, + const TDuration duration, + TString firstLine, + THttpHeaders headers, + TErrorRef error) + : Request(std::move(req)) + , Data(std::move(data)) + , Duration(duration) + , FirstLine(std::move(firstLine)) + , Headers(std::move(headers)) + , Error_(std::move(error)) + { + } + + inline static TResponseRef FromErrorText(TMessage msg, TString error, const TDuration duration) { + return new TResponse(std::move(msg), {} /* data */, duration, {} /* firstLine */, {} /* headers */, new TError(std::move(error))); + } + + inline static TResponseRef FromError(TMessage msg, TErrorRef error, const TDuration duration) { + return new TResponse(std::move(msg), {} /* data */, duration, {} /* firstLine */, {} /* headers */, error); + } + + inline static TResponseRef FromError(TMessage msg, TErrorRef error, const TDuration duration, + TString data, TString firstLine, THttpHeaders headers) + { + return new TResponse(std::move(msg), std::move(data), duration, std::move(firstLine), std::move(headers), error); + } + + inline static TResponseRef FromError( + TMessage msg, + TErrorRef error, + TString data, + const TDuration duration, + TString firstLine, + THttpHeaders headers) + { + return new TResponse(std::move(msg), std::move(data), duration, std::move(firstLine), std::move(headers), error); + } + + inline bool IsError() const { + return Error_.Get(); + } + + inline TError::TType GetErrorType() const { + return Error_.Get() ? Error_->Type : TError::UnknownType; + } + + inline i32 GetErrorCode() const { + return Error_.Get() ? Error_->Code : 0; + } + + inline i32 GetSystemErrorCode() const { + return Error_.Get() ? Error_->SystemCode : 0; + } + + inline TString GetErrorText() const { + return Error_.Get() ? Error_->Text : TString(); + } + + const TMessage Request; + const TString Data; + const TDuration Duration; + const TString FirstLine; + THttpHeaders Headers; + + private: + THolder<TError> Error_; + }; + + class THandle; + + class IOnRecv { + public: + virtual ~IOnRecv() = default; + + virtual void OnNotify(THandle&) { + } //callback on receive response + virtual void OnEnd() { + } //response was extracted by Wait() method, - OnRecv() will not be called + virtual void OnRecv(THandle& resp) = 0; //callback on destroy handler + }; + + class THandle: public TThrRefBase, public TWaitHandle { + public: + inline THandle(IOnRecv* f, TStatCollector* s = nullptr) noexcept + : F_(f) + , Stat_(s) + { + } + + ~THandle() override { + if (F_) { + try { + F_->OnRecv(*this); + } catch (...) { + } + } + } + + virtual bool MessageSendedCompletely() const noexcept { + //TODO + return true; + } + + virtual void Cancel() noexcept { + //TODO + if (!!Stat_) + Stat_->OnCancel(); + } + + inline const TResponse* Response() const noexcept { + return R_.Get(); + } + + //method MUST be called only after success Wait() for this handle or from callback IOnRecv::OnRecv() + //else exist chance for memory leak (race between Get()/Notify()) + inline TResponseRef Get() noexcept { + return R_; + } + + inline bool Wait(TResponseRef& msg, const TInstant deadLine) { + if (WaitForOne(*this, deadLine)) { + if (F_) { + F_->OnEnd(); + F_ = nullptr; + } + msg = Get(); + + return true; + } + + return false; + } + + inline bool Wait(TResponseRef& msg, const TDuration timeOut) { + return Wait(msg, timeOut.ToDeadLine()); + } + + inline bool Wait(TResponseRef& msg) { + return Wait(msg, TInstant::Max()); + } + + inline TResponseRef Wait(const TInstant deadLine) { + TResponseRef ret; + + Wait(ret, deadLine); + + return ret; + } + + inline TResponseRef Wait(const TDuration timeOut) { + return Wait(timeOut.ToDeadLine()); + } + + inline TResponseRef Wait() { + return Wait(TInstant::Max()); + } + + protected: + inline void Notify(TResponseRef resp) { + if (!!Stat_) { + if (!resp || resp->IsError()) { + Stat_->OnFail(); + } else { + Stat_->OnSuccess(); + } + } + R_.Swap(resp); + if (F_) { + try { + F_->OnNotify(*this); + } catch (...) { + } + } + Signal(); + } + + IOnRecv* F_; + + private: + TResponseRef R_; + THolder<TStatCollector> Stat_; + }; + + using THandleRef = TIntrusivePtr<THandle>; + + THandleRef Request(const TMessage& msg, IOnRecv* fallback, bool useAsyncSendRequest = false); + + inline THandleRef Request(const TMessage& msg) { + return Request(msg, nullptr); + } + + THandleRef Request(const TString& req, IOnRecv* fallback); + + inline THandleRef Request(const TString& req) { + return Request(req, nullptr); + } + + class IMultiRequester { + public: + virtual ~IMultiRequester() = default; + + virtual void Add(const THandleRef& req) = 0; + virtual void Del(const THandleRef& req) = 0; + virtual bool Wait(THandleRef& req, TInstant deadLine) = 0; + virtual bool IsEmpty() const = 0; + + inline void Schedule(const TString& req) { + Add(Request(req)); + } + + inline bool Wait(THandleRef& req, TDuration timeOut) { + return Wait(req, timeOut.ToDeadLine()); + } + + inline bool Wait(THandleRef& req) { + return Wait(req, TInstant::Max()); + } + + inline bool Wait(TResponseRef& resp, TInstant deadLine) { + THandleRef req; + + while (Wait(req, deadLine)) { + resp = req->Get(); + + if (!!resp) { + return true; + } + } + + return false; + } + + inline bool Wait(TResponseRef& resp) { + return Wait(resp, TInstant::Max()); + } + }; + + using IMultiRequesterRef = TAutoPtr<IMultiRequester>; + + IMultiRequesterRef CreateRequester(); + + bool SetProtocolOption(TStringBuf protoOption, TStringBuf value); +} |