aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/storage.cpp
diff options
context:
space:
mode:
authorsomov <somov@yandex-team.ru>2022-02-10 16:45:49 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:49 +0300
commit7489e4682331202b9c7d863c0898eb83d7b12c2b (patch)
tree9142afc54d335ea52910662635b898e79e192e49 /library/cpp/messagebus/storage.cpp
parenta5950576e397b1909261050b8c7da16db58f10b1 (diff)
downloadydb-7489e4682331202b9c7d863c0898eb83d7b12c2b.tar.gz
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/storage.cpp')
-rw-r--r--library/cpp/messagebus/storage.cpp26
1 files changed, 13 insertions, 13 deletions
diff --git a/library/cpp/messagebus/storage.cpp b/library/cpp/messagebus/storage.cpp
index 6743f0abe4..efefc87340 100644
--- a/library/cpp/messagebus/storage.cpp
+++ b/library/cpp/messagebus/storage.cpp
@@ -6,7 +6,7 @@ namespace NBus {
namespace NPrivate {
TTimedMessages::TTimedMessages() {
}
-
+
TTimedMessages::~TTimedMessages() {
Y_VERIFY(Items.empty());
}
@@ -33,14 +33,14 @@ namespace NBus {
size_t TTimedMessages::Size() const {
return Items.size();
}
-
+
void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) {
// shortcut
if (before == TInstant::Max()) {
Clear(r);
return;
}
-
+
while (!Items.empty()) {
TItem& i = *Items.front();
if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) {
@@ -50,14 +50,14 @@ namespace NBus {
Items.pop_front();
}
}
-
+
void TTimedMessages::Clear(TMessagesPtrs* r) {
while (!Items.empty()) {
r->push_back(Items.front()->Message.Release());
Items.pop_front();
}
}
-
+
TSyncAckMessages::TSyncAckMessages() {
KeyToMessage.set_empty_key(0);
KeyToMessage.set_deleted_key(1);
@@ -66,14 +66,14 @@ namespace NBus {
TSyncAckMessages::~TSyncAckMessages() {
Y_VERIFY(KeyToMessage.empty());
Y_VERIFY(TimedItems.empty());
- }
-
+ }
+
void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) {
// Perform garbage collection if `TimedMessages` contain too many junk data
if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) {
Gc();
}
-
+
TValue value = {m.MessagePtr.Release()};
std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value));
@@ -95,7 +95,7 @@ namespace NBus {
return v.Message;
}
-
+
void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) {
// shortcut
if (before == TInstant::Max()) {
@@ -110,7 +110,7 @@ namespace NBus {
if (TInstant::MilliSeconds(i.SendTime) > before) {
break;
}
-
+
TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key);
if (itMessage != KeyToMessage.end()) {
@@ -133,7 +133,7 @@ namespace NBus {
void TSyncAckMessages::Gc() {
TDeque<TTimedItem> tmp;
-
+
for (auto& timedItem : TimedItems) {
if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) {
continue;
@@ -143,7 +143,7 @@ namespace NBus {
TimedItems.swap(tmp);
}
-
+
void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) {
for (auto message : messages) {
TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id);
@@ -158,4 +158,4 @@ namespace NBus {
}
}
-}
+}