aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/storage.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/storage.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/storage.cpp')
-rw-r--r--library/cpp/messagebus/storage.cpp161
1 files changed, 161 insertions, 0 deletions
diff --git a/library/cpp/messagebus/storage.cpp b/library/cpp/messagebus/storage.cpp
new file mode 100644
index 0000000000..efefc87340
--- /dev/null
+++ b/library/cpp/messagebus/storage.cpp
@@ -0,0 +1,161 @@
+#include "storage.h"
+
+#include <typeinfo>
+
+namespace NBus {
+ namespace NPrivate {
+ TTimedMessages::TTimedMessages() {
+ }
+
+ TTimedMessages::~TTimedMessages() {
+ Y_VERIFY(Items.empty());
+ }
+
+ void TTimedMessages::PushBack(TNonDestroyingAutoPtr<TBusMessage> m) {
+ TItem i;
+ i.Message.Reset(m.Release());
+ Items.push_back(i);
+ }
+
+ TNonDestroyingAutoPtr<TBusMessage> TTimedMessages::PopFront() {
+ TBusMessage* r = nullptr;
+ if (!Items.empty()) {
+ r = Items.front()->Message.Release();
+ Items.pop_front();
+ }
+ return r;
+ }
+
+ bool TTimedMessages::Empty() const {
+ return Items.empty();
+ }
+
+ size_t TTimedMessages::Size() const {
+ return Items.size();
+ }
+
+ void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) {
+ // shortcut
+ if (before == TInstant::Max()) {
+ Clear(r);
+ return;
+ }
+
+ while (!Items.empty()) {
+ TItem& i = *Items.front();
+ if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) {
+ break;
+ }
+ r->push_back(i.Message.Release());
+ Items.pop_front();
+ }
+ }
+
+ void TTimedMessages::Clear(TMessagesPtrs* r) {
+ while (!Items.empty()) {
+ r->push_back(Items.front()->Message.Release());
+ Items.pop_front();
+ }
+ }
+
+ TSyncAckMessages::TSyncAckMessages() {
+ KeyToMessage.set_empty_key(0);
+ KeyToMessage.set_deleted_key(1);
+ }
+
+ TSyncAckMessages::~TSyncAckMessages() {
+ Y_VERIFY(KeyToMessage.empty());
+ Y_VERIFY(TimedItems.empty());
+ }
+
+ void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) {
+ // Perform garbage collection if `TimedMessages` contain too many junk data
+ if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) {
+ Gc();
+ }
+
+ TValue value = {m.MessagePtr.Release()};
+
+ std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value));
+ Y_VERIFY(p.second, "non-unique id; %s", value.Message->Describe().data());
+
+ TTimedItem item = {m.Header.Id, m.Header.SendTime};
+ TimedItems.push_back(item);
+ }
+
+ TBusMessage* TSyncAckMessages::Pop(TBusKey id) {
+ TKeyToMessage::iterator it = KeyToMessage.find(id);
+ if (it == KeyToMessage.end()) {
+ return nullptr;
+ }
+ TValue v = it->second;
+ KeyToMessage.erase(it);
+
+ // `TimedMessages` still contain record about this message
+
+ return v.Message;
+ }
+
+ void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) {
+ // shortcut
+ if (before == TInstant::Max()) {
+ Clear(r);
+ return;
+ }
+
+ Y_ASSERT(r->empty());
+
+ while (!TimedItems.empty()) {
+ TTimedItem i = TimedItems.front();
+ if (TInstant::MilliSeconds(i.SendTime) > before) {
+ break;
+ }
+
+ TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key);
+
+ if (itMessage != KeyToMessage.end()) {
+ r->push_back(itMessage->second.Message);
+ KeyToMessage.erase(itMessage);
+ }
+
+ TimedItems.pop_front();
+ }
+ }
+
+ void TSyncAckMessages::Clear(TMessagesPtrs* r) {
+ for (TKeyToMessage::const_iterator i = KeyToMessage.begin(); i != KeyToMessage.end(); ++i) {
+ r->push_back(i->second.Message);
+ }
+
+ KeyToMessage.clear();
+ TimedItems.clear();
+ }
+
+ void TSyncAckMessages::Gc() {
+ TDeque<TTimedItem> tmp;
+
+ for (auto& timedItem : TimedItems) {
+ if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) {
+ continue;
+ }
+ tmp.push_back(timedItem);
+ }
+
+ TimedItems.swap(tmp);
+ }
+
+ void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) {
+ for (auto message : messages) {
+ TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id);
+ Y_VERIFY(it != KeyToMessage.end(), "delete non-existent message");
+ KeyToMessage.erase(it);
+ }
+ }
+
+ void TSyncAckMessages::DumpState() {
+ Cerr << TimedItems.size() << Endl;
+ Cerr << KeyToMessage.size() << Endl;
+ }
+
+ }
+}