aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/neh.cpp
blob: 8b3092ad572f4cfa3049b681d1720845d352cd6f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#include "neh.h"

#include "details.h"
#include "factory.h"

#include <util/generic/list.h>
#include <util/generic/hash_set.h>
#include <util/digest/numeric.h>
#include <util/string/cast.h>

using namespace NNeh;

namespace {
    class TMultiRequester: public IMultiRequester {
        struct TOps {
            template <class T>
            inline bool operator()(const T& l, const T& r) const noexcept {
                return l.Get() == r.Get();
            }

            template <class T>
            inline size_t operator()(const T& t) const noexcept {
                return NumericHash(t.Get());
            }
        };

        struct TOnComplete {
            TMultiRequester* Parent;
            bool Signalled;

            inline TOnComplete(TMultiRequester* parent)
                : Parent(parent)
                , Signalled(false)
            {
            }

            inline void operator()(TWaitHandle* wh) {
                THandleRef req(static_cast<THandle*>(wh));

                Signalled = true;
                Parent->OnComplete(req);
            }
        };

    public:
        ~TMultiRequester() {
            for (auto& req : Reqs_) {
                req->Register(nullptr);
            }
        }

        void Add(const THandleRef& req) override {
            Reqs_.insert(req);
            req->Register(&WaitQueue_);
        }

        void Del(const THandleRef& req) override {
            Reqs_.erase(req);
            req->Register(nullptr);
        }

        bool Wait(THandleRef& req, TInstant deadLine) override {
            while (Complete_.empty()) {
                if (Reqs_.empty()) {
                    return false;
                }
                TOnComplete cb(this);
                WaitForMultipleObj(WaitQueue_, deadLine, cb);
                if (!cb.Signalled) {
                    return false;
                }
            }

            req = *Complete_.begin();
            Complete_.pop_front();

            return true;
        }

        bool IsEmpty() const override {
            return Reqs_.empty() && Complete_.empty();
        }

        inline void OnComplete(const THandleRef& req) {
            Complete_.push_back(req);
            Del(req);
        }

    private:
        typedef THashSet<THandleRef, TOps, TOps> TReqs;
        typedef TList<THandleRef> TComplete;
        TWaitQueue WaitQueue_;
        TReqs Reqs_;
        TComplete Complete_;
    };

    inline IProtocol* ProtocolForMessage(const TMessage& msg) {
        return ProtocolFactory()->Protocol(TStringBuf(msg.Addr).Before(':'));
    }
}

NNeh::TMessage NNeh::TMessage::FromString(const TStringBuf req) {
    TStringBuf addr;
    TStringBuf data;

    req.Split('?', addr, data);
    return TMessage(ToString(addr), ToString(data));
}

namespace {
    const TString svcFail = "service status: failed";
}

THandleRef NNeh::Request(const TMessage& msg, IOnRecv* fallback, bool useAsyncSendRequest) {
    TServiceStatRef ss;

    if (TServiceStat::Disabled()) {
        return ProtocolForMessage(msg)->ScheduleAsyncRequest(msg, fallback, ss, useAsyncSendRequest);
    }

    ss = GetServiceStat(msg.Addr);
    TServiceStat::EStatus es = ss->GetStatus();

    if (es == TServiceStat::Ok) {
        return ProtocolForMessage(msg)->ScheduleAsyncRequest(msg, fallback, ss, useAsyncSendRequest);
    }

    if (es == TServiceStat::ReTry) {
        //send empty data request for validating service (update TServiceStat info)
        TMessage validator;

        validator.Addr = msg.Addr;

        ProtocolForMessage(msg)->ScheduleAsyncRequest(validator, nullptr, ss, useAsyncSendRequest);
    }

    TNotifyHandleRef h(new TNotifyHandle(fallback, msg));
    h->NotifyError(new TError(svcFail));
    return h.Get();
}

THandleRef NNeh::Request(const TString& req, IOnRecv* fallback) {
    return Request(TMessage::FromString(req), fallback);
}

IMultiRequesterRef NNeh::CreateRequester() {
    return new TMultiRequester();
}

bool NNeh::SetProtocolOption(TStringBuf protoOption, TStringBuf value) {
    return ProtocolFactory()->Protocol(protoOption.Before('/'))->SetOption(protoOption.After('/'), value);
}