diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/storage.cpp | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/storage.cpp')
-rw-r--r-- | library/cpp/messagebus/storage.cpp | 266 |
1 files changed, 133 insertions, 133 deletions
diff --git a/library/cpp/messagebus/storage.cpp b/library/cpp/messagebus/storage.cpp index efefc87340..030284a74c 100644 --- a/library/cpp/messagebus/storage.cpp +++ b/library/cpp/messagebus/storage.cpp @@ -2,159 +2,159 @@ #include <typeinfo> -namespace NBus { - namespace NPrivate { - TTimedMessages::TTimedMessages() { - } - - TTimedMessages::~TTimedMessages() { - Y_VERIFY(Items.empty()); - } - - void TTimedMessages::PushBack(TNonDestroyingAutoPtr<TBusMessage> m) { - TItem i; - i.Message.Reset(m.Release()); - Items.push_back(i); - } - - TNonDestroyingAutoPtr<TBusMessage> TTimedMessages::PopFront() { - TBusMessage* r = nullptr; - if (!Items.empty()) { - r = Items.front()->Message.Release(); - Items.pop_front(); - } - return r; - } - - bool TTimedMessages::Empty() const { - return Items.empty(); - } - - size_t TTimedMessages::Size() const { - return Items.size(); - } - - void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) { - // shortcut - if (before == TInstant::Max()) { - Clear(r); - return; - } - - while (!Items.empty()) { - TItem& i = *Items.front(); - if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) { - break; - } - r->push_back(i.Message.Release()); - Items.pop_front(); - } - } - - void TTimedMessages::Clear(TMessagesPtrs* r) { - while (!Items.empty()) { - r->push_back(Items.front()->Message.Release()); - Items.pop_front(); - } - } - - TSyncAckMessages::TSyncAckMessages() { - KeyToMessage.set_empty_key(0); - KeyToMessage.set_deleted_key(1); - } - - TSyncAckMessages::~TSyncAckMessages() { - Y_VERIFY(KeyToMessage.empty()); - Y_VERIFY(TimedItems.empty()); - } - - void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) { - // Perform garbage collection if `TimedMessages` contain too many junk data - if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) { - Gc(); - } - - TValue value = {m.MessagePtr.Release()}; - - std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value)); +namespace NBus { + namespace NPrivate { + TTimedMessages::TTimedMessages() { + } + + TTimedMessages::~TTimedMessages() { + Y_VERIFY(Items.empty()); + } + + void TTimedMessages::PushBack(TNonDestroyingAutoPtr<TBusMessage> m) { + TItem i; + i.Message.Reset(m.Release()); + Items.push_back(i); + } + + TNonDestroyingAutoPtr<TBusMessage> TTimedMessages::PopFront() { + TBusMessage* r = nullptr; + if (!Items.empty()) { + r = Items.front()->Message.Release(); + Items.pop_front(); + } + return r; + } + + bool TTimedMessages::Empty() const { + return Items.empty(); + } + + size_t TTimedMessages::Size() const { + return Items.size(); + } + + void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) { + // shortcut + if (before == TInstant::Max()) { + Clear(r); + return; + } + + while (!Items.empty()) { + TItem& i = *Items.front(); + if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) { + break; + } + r->push_back(i.Message.Release()); + Items.pop_front(); + } + } + + void TTimedMessages::Clear(TMessagesPtrs* r) { + while (!Items.empty()) { + r->push_back(Items.front()->Message.Release()); + Items.pop_front(); + } + } + + TSyncAckMessages::TSyncAckMessages() { + KeyToMessage.set_empty_key(0); + KeyToMessage.set_deleted_key(1); + } + + TSyncAckMessages::~TSyncAckMessages() { + Y_VERIFY(KeyToMessage.empty()); + Y_VERIFY(TimedItems.empty()); + } + + void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) { + // Perform garbage collection if `TimedMessages` contain too many junk data + if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) { + Gc(); + } + + TValue value = {m.MessagePtr.Release()}; + + std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value)); Y_VERIFY(p.second, "non-unique id; %s", value.Message->Describe().data()); - TTimedItem item = {m.Header.Id, m.Header.SendTime}; - TimedItems.push_back(item); - } + TTimedItem item = {m.Header.Id, m.Header.SendTime}; + TimedItems.push_back(item); + } - TBusMessage* TSyncAckMessages::Pop(TBusKey id) { - TKeyToMessage::iterator it = KeyToMessage.find(id); - if (it == KeyToMessage.end()) { - return nullptr; - } - TValue v = it->second; - KeyToMessage.erase(it); + TBusMessage* TSyncAckMessages::Pop(TBusKey id) { + TKeyToMessage::iterator it = KeyToMessage.find(id); + if (it == KeyToMessage.end()) { + return nullptr; + } + TValue v = it->second; + KeyToMessage.erase(it); - // `TimedMessages` still contain record about this message + // `TimedMessages` still contain record about this message - return v.Message; - } + return v.Message; + } - void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) { - // shortcut - if (before == TInstant::Max()) { - Clear(r); - return; - } + void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) { + // shortcut + if (before == TInstant::Max()) { + Clear(r); + return; + } - Y_ASSERT(r->empty()); + Y_ASSERT(r->empty()); - while (!TimedItems.empty()) { - TTimedItem i = TimedItems.front(); - if (TInstant::MilliSeconds(i.SendTime) > before) { - break; - } + while (!TimedItems.empty()) { + TTimedItem i = TimedItems.front(); + if (TInstant::MilliSeconds(i.SendTime) > before) { + break; + } - TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key); + TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key); - if (itMessage != KeyToMessage.end()) { - r->push_back(itMessage->second.Message); - KeyToMessage.erase(itMessage); - } + if (itMessage != KeyToMessage.end()) { + r->push_back(itMessage->second.Message); + KeyToMessage.erase(itMessage); + } - TimedItems.pop_front(); - } + TimedItems.pop_front(); + } } - void TSyncAckMessages::Clear(TMessagesPtrs* r) { - for (TKeyToMessage::const_iterator i = KeyToMessage.begin(); i != KeyToMessage.end(); ++i) { - r->push_back(i->second.Message); - } + void TSyncAckMessages::Clear(TMessagesPtrs* r) { + for (TKeyToMessage::const_iterator i = KeyToMessage.begin(); i != KeyToMessage.end(); ++i) { + r->push_back(i->second.Message); + } - KeyToMessage.clear(); - TimedItems.clear(); + KeyToMessage.clear(); + TimedItems.clear(); } - void TSyncAckMessages::Gc() { - TDeque<TTimedItem> tmp; + void TSyncAckMessages::Gc() { + TDeque<TTimedItem> tmp; - for (auto& timedItem : TimedItems) { - if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) { - continue; - } - tmp.push_back(timedItem); - } + for (auto& timedItem : TimedItems) { + if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) { + continue; + } + tmp.push_back(timedItem); + } - TimedItems.swap(tmp); - } + TimedItems.swap(tmp); + } - void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) { - for (auto message : messages) { - TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id); - Y_VERIFY(it != KeyToMessage.end(), "delete non-existent message"); - KeyToMessage.erase(it); - } - } + void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) { + for (auto message : messages) { + TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id); + Y_VERIFY(it != KeyToMessage.end(), "delete non-existent message"); + KeyToMessage.erase(it); + } + } - void TSyncAckMessages::DumpState() { - Cerr << TimedItems.size() << Endl; - Cerr << KeyToMessage.size() << Endl; + void TSyncAckMessages::DumpState() { + Cerr << TimedItems.size() << Endl; + Cerr << KeyToMessage.size() << Endl; } } |