diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/storage.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/storage.h')
-rw-r--r-- | library/cpp/messagebus/storage.h | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/library/cpp/messagebus/storage.h b/library/cpp/messagebus/storage.h new file mode 100644 index 0000000000..7d168844ed --- /dev/null +++ b/library/cpp/messagebus/storage.h @@ -0,0 +1,94 @@ +#pragma once + +#include "message_ptr_and_header.h" +#include "moved.h" +#include "ybus.h" + +#include <contrib/libs/sparsehash/src/sparsehash/dense_hash_map> + +#include <util/generic/deque.h> +#include <util/generic/noncopyable.h> +#include <util/generic/utility.h> + +namespace NBus { + namespace NPrivate { + typedef TVector<TBusMessage*> TMessagesPtrs; + + class TTimedMessages { + public: + TTimedMessages(); + ~TTimedMessages(); + + struct TItem { + THolder<TBusMessage> Message; + + void Swap(TItem& that) { + DoSwap(Message, that.Message); + } + }; + + typedef TDeque<TMoved<TItem>> TItems; + + void PushBack(TNonDestroyingAutoPtr<TBusMessage> m); + TNonDestroyingAutoPtr<TBusMessage> PopFront(); + bool Empty() const; + size_t Size() const; + + void Timeout(TInstant before, TMessagesPtrs* r); + void Clear(TMessagesPtrs* r); + + private: + TItems Items; + }; + + class TSyncAckMessages : TNonCopyable { + public: + TSyncAckMessages(); + ~TSyncAckMessages(); + + void Push(TBusMessagePtrAndHeader& m); + TBusMessage* Pop(TBusKey id); + + void Timeout(TInstant before, TMessagesPtrs* r); + + void Clear(TMessagesPtrs* r); + + size_t Size() const { + return KeyToMessage.size(); + } + + void RemoveAll(const TMessagesPtrs&); + + void Gc(); + + void DumpState(); + + private: + struct TTimedItem { + TBusKey Key; + TBusInstant SendTime; + }; + + typedef TDeque<TTimedItem> TTimedItems; + typedef TDeque<TTimedItem>::iterator TTimedIterator; + + TTimedItems TimedItems; + + struct TValue { + TBusMessage* Message; + }; + + // keys are already random, no need to hash them further + struct TIdHash { + size_t operator()(TBusKey value) const { + return value; + } + }; + + typedef google::dense_hash_map<TBusKey, TValue, TIdHash> TKeyToMessage; + + TKeyToMessage KeyToMessage; + }; + + } +} |