aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/storage.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commitc2a1af049e9deca890e9923abe64fe6c59060348 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/storage.cpp
parent1f553f46fb4f3c5eec631352cdd900a0709016af (diff)
downloadydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/storage.cpp')
-rw-r--r--library/cpp/messagebus/storage.cpp52
1 files changed, 26 insertions, 26 deletions
diff --git a/library/cpp/messagebus/storage.cpp b/library/cpp/messagebus/storage.cpp
index f201a4335d..efefc87340 100644
--- a/library/cpp/messagebus/storage.cpp
+++ b/library/cpp/messagebus/storage.cpp
@@ -1,7 +1,7 @@
#include "storage.h"
-#include <typeinfo>
-
+#include <typeinfo>
+
namespace NBus {
namespace NPrivate {
TTimedMessages::TTimedMessages() {
@@ -10,13 +10,13 @@ namespace NBus {
TTimedMessages::~TTimedMessages() {
Y_VERIFY(Items.empty());
}
-
+
void TTimedMessages::PushBack(TNonDestroyingAutoPtr<TBusMessage> m) {
TItem i;
i.Message.Reset(m.Release());
Items.push_back(i);
}
-
+
TNonDestroyingAutoPtr<TBusMessage> TTimedMessages::PopFront() {
TBusMessage* r = nullptr;
if (!Items.empty()) {
@@ -25,11 +25,11 @@ namespace NBus {
}
return r;
}
-
+
bool TTimedMessages::Empty() const {
return Items.empty();
}
-
+
size_t TTimedMessages::Size() const {
return Items.size();
}
@@ -62,7 +62,7 @@ namespace NBus {
KeyToMessage.set_empty_key(0);
KeyToMessage.set_deleted_key(1);
}
-
+
TSyncAckMessages::~TSyncAckMessages() {
Y_VERIFY(KeyToMessage.empty());
Y_VERIFY(TimedItems.empty());
@@ -75,14 +75,14 @@ namespace NBus {
}
TValue value = {m.MessagePtr.Release()};
-
+
std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value));
Y_VERIFY(p.second, "non-unique id; %s", value.Message->Describe().data());
-
+
TTimedItem item = {m.Header.Id, m.Header.SendTime};
TimedItems.push_back(item);
}
-
+
TBusMessage* TSyncAckMessages::Pop(TBusKey id) {
TKeyToMessage::iterator it = KeyToMessage.find(id);
if (it == KeyToMessage.end()) {
@@ -90,9 +90,9 @@ namespace NBus {
}
TValue v = it->second;
KeyToMessage.erase(it);
-
+
// `TimedMessages` still contain record about this message
-
+
return v.Message;
}
@@ -102,9 +102,9 @@ namespace NBus {
Clear(r);
return;
}
-
+
Y_ASSERT(r->empty());
-
+
while (!TimedItems.empty()) {
TTimedItem i = TimedItems.front();
if (TInstant::MilliSeconds(i.SendTime) > before) {
@@ -112,25 +112,25 @@ namespace NBus {
}
TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key);
-
+
if (itMessage != KeyToMessage.end()) {
r->push_back(itMessage->second.Message);
KeyToMessage.erase(itMessage);
}
-
+
TimedItems.pop_front();
}
- }
-
+ }
+
void TSyncAckMessages::Clear(TMessagesPtrs* r) {
for (TKeyToMessage::const_iterator i = KeyToMessage.begin(); i != KeyToMessage.end(); ++i) {
r->push_back(i->second.Message);
}
-
+
KeyToMessage.clear();
TimedItems.clear();
- }
-
+ }
+
void TSyncAckMessages::Gc() {
TDeque<TTimedItem> tmp;
@@ -140,7 +140,7 @@ namespace NBus {
}
tmp.push_back(timedItem);
}
-
+
TimedItems.swap(tmp);
}
@@ -151,11 +151,11 @@ namespace NBus {
KeyToMessage.erase(it);
}
}
-
+
void TSyncAckMessages::DumpState() {
Cerr << TimedItems.size() << Endl;
Cerr << KeyToMessage.size() << Endl;
- }
-
- }
+ }
+
+ }
}