aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/storage.cpp
diff options
context:
space:
mode:
authorsomov <somov@yandex-team.ru>2022-02-10 16:45:47 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:47 +0300
commita5950576e397b1909261050b8c7da16db58f10b1 (patch)
tree7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library/cpp/messagebus/storage.cpp
parent81eddc8c0b55990194e112b02d127b87d54164a9 (diff)
downloadydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/storage.cpp')
-rw-r--r--library/cpp/messagebus/storage.cpp26
1 files changed, 13 insertions, 13 deletions
diff --git a/library/cpp/messagebus/storage.cpp b/library/cpp/messagebus/storage.cpp
index efefc87340..6743f0abe4 100644
--- a/library/cpp/messagebus/storage.cpp
+++ b/library/cpp/messagebus/storage.cpp
@@ -6,7 +6,7 @@ namespace NBus {
namespace NPrivate {
TTimedMessages::TTimedMessages() {
}
-
+
TTimedMessages::~TTimedMessages() {
Y_VERIFY(Items.empty());
}
@@ -33,14 +33,14 @@ namespace NBus {
size_t TTimedMessages::Size() const {
return Items.size();
}
-
+
void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) {
// shortcut
if (before == TInstant::Max()) {
Clear(r);
return;
}
-
+
while (!Items.empty()) {
TItem& i = *Items.front();
if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) {
@@ -50,14 +50,14 @@ namespace NBus {
Items.pop_front();
}
}
-
+
void TTimedMessages::Clear(TMessagesPtrs* r) {
while (!Items.empty()) {
r->push_back(Items.front()->Message.Release());
Items.pop_front();
}
}
-
+
TSyncAckMessages::TSyncAckMessages() {
KeyToMessage.set_empty_key(0);
KeyToMessage.set_deleted_key(1);
@@ -66,14 +66,14 @@ namespace NBus {
TSyncAckMessages::~TSyncAckMessages() {
Y_VERIFY(KeyToMessage.empty());
Y_VERIFY(TimedItems.empty());
- }
-
+ }
+
void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) {
// Perform garbage collection if `TimedMessages` contain too many junk data
if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) {
Gc();
}
-
+
TValue value = {m.MessagePtr.Release()};
std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value));
@@ -95,7 +95,7 @@ namespace NBus {
return v.Message;
}
-
+
void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) {
// shortcut
if (before == TInstant::Max()) {
@@ -110,7 +110,7 @@ namespace NBus {
if (TInstant::MilliSeconds(i.SendTime) > before) {
break;
}
-
+
TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key);
if (itMessage != KeyToMessage.end()) {
@@ -133,7 +133,7 @@ namespace NBus {
void TSyncAckMessages::Gc() {
TDeque<TTimedItem> tmp;
-
+
for (auto& timedItem : TimedItems) {
if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) {
continue;
@@ -143,7 +143,7 @@ namespace NBus {
TimedItems.swap(tmp);
}
-
+
void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) {
for (auto message : messages) {
TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id);
@@ -158,4 +158,4 @@ namespace NBus {
}
}
-}
+}