aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/neh.cpp
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/neh.cpp
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/neh/neh.cpp')
-rw-r--r--library/cpp/neh/neh.cpp146
1 files changed, 146 insertions, 0 deletions
diff --git a/library/cpp/neh/neh.cpp b/library/cpp/neh/neh.cpp
new file mode 100644
index 00000000000..2a3eef5023a
--- /dev/null
+++ b/library/cpp/neh/neh.cpp
@@ -0,0 +1,146 @@
+#include "neh.h"
+
+#include "details.h"
+#include "factory.h"
+
+#include <util/generic/list.h>
+#include <util/generic/hash_set.h>
+#include <util/digest/numeric.h>
+#include <util/string/cast.h>
+
+using namespace NNeh;
+
+namespace {
+ class TMultiRequester: public IMultiRequester {
+ struct TOps {
+ template <class T>
+ inline bool operator()(const T& l, const T& r) const noexcept {
+ return l.Get() == r.Get();
+ }
+
+ template <class T>
+ inline size_t operator()(const T& t) const noexcept {
+ return NumericHash(t.Get());
+ }
+ };
+
+ struct TOnComplete {
+ TMultiRequester* Parent;
+ bool Signalled;
+
+ inline TOnComplete(TMultiRequester* parent)
+ : Parent(parent)
+ , Signalled(false)
+ {
+ }
+
+ inline void operator()(TWaitHandle* wh) {
+ THandleRef req(static_cast<THandle*>(wh));
+
+ Signalled = true;
+ Parent->OnComplete(req);
+ }
+ };
+
+ public:
+ void Add(const THandleRef& req) override {
+ Reqs_.insert(req);
+ }
+
+ void Del(const THandleRef& req) override {
+ Reqs_.erase(req);
+ }
+
+ bool Wait(THandleRef& req, TInstant deadLine) override {
+ while (Complete_.empty()) {
+ if (Reqs_.empty()) {
+ return false;
+ }
+
+ TOnComplete cb(this);
+
+ WaitForMultipleObj(Reqs_.begin(), Reqs_.end(), deadLine, cb);
+
+ if (!cb.Signalled) {
+ return false;
+ }
+ }
+
+ req = *Complete_.begin();
+ Complete_.pop_front();
+
+ return true;
+ }
+
+ bool IsEmpty() const override {
+ return Reqs_.empty() && Complete_.empty();
+ }
+
+ inline void OnComplete(const THandleRef& req) {
+ Complete_.push_back(req);
+ Reqs_.erase(req);
+ }
+
+ private:
+ typedef THashSet<THandleRef, TOps, TOps> TReqs;
+ typedef TList<THandleRef> TComplete;
+ TReqs Reqs_;
+ TComplete Complete_;
+ };
+
+ inline IProtocol* ProtocolForMessage(const TMessage& msg) {
+ return ProtocolFactory()->Protocol(TStringBuf(msg.Addr).Before(':'));
+ }
+}
+
+NNeh::TMessage NNeh::TMessage::FromString(const TStringBuf req) {
+ TStringBuf addr;
+ TStringBuf data;
+
+ req.Split('?', addr, data);
+ return TMessage(ToString(addr), ToString(data));
+}
+
+namespace {
+ const TString svcFail = "service status: failed";
+}
+
+THandleRef NNeh::Request(const TMessage& msg, IOnRecv* fallback, bool useAsyncSendRequest) {
+ TServiceStatRef ss;
+
+ if (TServiceStat::Disabled()) {
+ return ProtocolForMessage(msg)->ScheduleAsyncRequest(msg, fallback, ss, useAsyncSendRequest);
+ }
+
+ ss = GetServiceStat(msg.Addr);
+ TServiceStat::EStatus es = ss->GetStatus();
+
+ if (es == TServiceStat::Ok) {
+ return ProtocolForMessage(msg)->ScheduleAsyncRequest(msg, fallback, ss, useAsyncSendRequest);
+ }
+
+ if (es == TServiceStat::ReTry) {
+ //send empty data request for validating service (update TServiceStat info)
+ TMessage validator;
+
+ validator.Addr = msg.Addr;
+
+ ProtocolForMessage(msg)->ScheduleAsyncRequest(validator, nullptr, ss, useAsyncSendRequest);
+ }
+
+ TNotifyHandleRef h(new TNotifyHandle(fallback, msg));
+ h->NotifyError(new TError(svcFail));
+ return h.Get();
+}
+
+THandleRef NNeh::Request(const TString& req, IOnRecv* fallback) {
+ return Request(TMessage::FromString(req), fallback);
+}
+
+IMultiRequesterRef NNeh::CreateRequester() {
+ return new TMultiRequester();
+}
+
+bool NNeh::SetProtocolOption(TStringBuf protoOption, TStringBuf value) {
+ return ProtocolFactory()->Protocol(protoOption.Before('/'))->SetOption(protoOption.After('/'), value);
+}