aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/storage.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/storage.cpp
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/storage.cpp')
-rw-r--r--library/cpp/messagebus/storage.cpp266
1 files changed, 133 insertions, 133 deletions
diff --git a/library/cpp/messagebus/storage.cpp b/library/cpp/messagebus/storage.cpp
index efefc87340..030284a74c 100644
--- a/library/cpp/messagebus/storage.cpp
+++ b/library/cpp/messagebus/storage.cpp
@@ -2,159 +2,159 @@
#include <typeinfo>
-namespace NBus {
- namespace NPrivate {
- TTimedMessages::TTimedMessages() {
- }
-
- TTimedMessages::~TTimedMessages() {
- Y_VERIFY(Items.empty());
- }
-
- void TTimedMessages::PushBack(TNonDestroyingAutoPtr<TBusMessage> m) {
- TItem i;
- i.Message.Reset(m.Release());
- Items.push_back(i);
- }
-
- TNonDestroyingAutoPtr<TBusMessage> TTimedMessages::PopFront() {
- TBusMessage* r = nullptr;
- if (!Items.empty()) {
- r = Items.front()->Message.Release();
- Items.pop_front();
- }
- return r;
- }
-
- bool TTimedMessages::Empty() const {
- return Items.empty();
- }
-
- size_t TTimedMessages::Size() const {
- return Items.size();
- }
-
- void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) {
- // shortcut
- if (before == TInstant::Max()) {
- Clear(r);
- return;
- }
-
- while (!Items.empty()) {
- TItem& i = *Items.front();
- if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) {
- break;
- }
- r->push_back(i.Message.Release());
- Items.pop_front();
- }
- }
-
- void TTimedMessages::Clear(TMessagesPtrs* r) {
- while (!Items.empty()) {
- r->push_back(Items.front()->Message.Release());
- Items.pop_front();
- }
- }
-
- TSyncAckMessages::TSyncAckMessages() {
- KeyToMessage.set_empty_key(0);
- KeyToMessage.set_deleted_key(1);
- }
-
- TSyncAckMessages::~TSyncAckMessages() {
- Y_VERIFY(KeyToMessage.empty());
- Y_VERIFY(TimedItems.empty());
- }
-
- void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) {
- // Perform garbage collection if `TimedMessages` contain too many junk data
- if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) {
- Gc();
- }
-
- TValue value = {m.MessagePtr.Release()};
-
- std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value));
+namespace NBus {
+ namespace NPrivate {
+ TTimedMessages::TTimedMessages() {
+ }
+
+ TTimedMessages::~TTimedMessages() {
+ Y_VERIFY(Items.empty());
+ }
+
+ void TTimedMessages::PushBack(TNonDestroyingAutoPtr<TBusMessage> m) {
+ TItem i;
+ i.Message.Reset(m.Release());
+ Items.push_back(i);
+ }
+
+ TNonDestroyingAutoPtr<TBusMessage> TTimedMessages::PopFront() {
+ TBusMessage* r = nullptr;
+ if (!Items.empty()) {
+ r = Items.front()->Message.Release();
+ Items.pop_front();
+ }
+ return r;
+ }
+
+ bool TTimedMessages::Empty() const {
+ return Items.empty();
+ }
+
+ size_t TTimedMessages::Size() const {
+ return Items.size();
+ }
+
+ void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) {
+ // shortcut
+ if (before == TInstant::Max()) {
+ Clear(r);
+ return;
+ }
+
+ while (!Items.empty()) {
+ TItem& i = *Items.front();
+ if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) {
+ break;
+ }
+ r->push_back(i.Message.Release());
+ Items.pop_front();
+ }
+ }
+
+ void TTimedMessages::Clear(TMessagesPtrs* r) {
+ while (!Items.empty()) {
+ r->push_back(Items.front()->Message.Release());
+ Items.pop_front();
+ }
+ }
+
+ TSyncAckMessages::TSyncAckMessages() {
+ KeyToMessage.set_empty_key(0);
+ KeyToMessage.set_deleted_key(1);
+ }
+
+ TSyncAckMessages::~TSyncAckMessages() {
+ Y_VERIFY(KeyToMessage.empty());
+ Y_VERIFY(TimedItems.empty());
+ }
+
+ void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) {
+ // Perform garbage collection if `TimedMessages` contain too many junk data
+ if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) {
+ Gc();
+ }
+
+ TValue value = {m.MessagePtr.Release()};
+
+ std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value));
Y_VERIFY(p.second, "non-unique id; %s", value.Message->Describe().data());
- TTimedItem item = {m.Header.Id, m.Header.SendTime};
- TimedItems.push_back(item);
- }
+ TTimedItem item = {m.Header.Id, m.Header.SendTime};
+ TimedItems.push_back(item);
+ }
- TBusMessage* TSyncAckMessages::Pop(TBusKey id) {
- TKeyToMessage::iterator it = KeyToMessage.find(id);
- if (it == KeyToMessage.end()) {
- return nullptr;
- }
- TValue v = it->second;
- KeyToMessage.erase(it);
+ TBusMessage* TSyncAckMessages::Pop(TBusKey id) {
+ TKeyToMessage::iterator it = KeyToMessage.find(id);
+ if (it == KeyToMessage.end()) {
+ return nullptr;
+ }
+ TValue v = it->second;
+ KeyToMessage.erase(it);
- // `TimedMessages` still contain record about this message
+ // `TimedMessages` still contain record about this message
- return v.Message;
- }
+ return v.Message;
+ }
- void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) {
- // shortcut
- if (before == TInstant::Max()) {
- Clear(r);
- return;
- }
+ void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) {
+ // shortcut
+ if (before == TInstant::Max()) {
+ Clear(r);
+ return;
+ }
- Y_ASSERT(r->empty());
+ Y_ASSERT(r->empty());
- while (!TimedItems.empty()) {
- TTimedItem i = TimedItems.front();
- if (TInstant::MilliSeconds(i.SendTime) > before) {
- break;
- }
+ while (!TimedItems.empty()) {
+ TTimedItem i = TimedItems.front();
+ if (TInstant::MilliSeconds(i.SendTime) > before) {
+ break;
+ }
- TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key);
+ TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key);
- if (itMessage != KeyToMessage.end()) {
- r->push_back(itMessage->second.Message);
- KeyToMessage.erase(itMessage);
- }
+ if (itMessage != KeyToMessage.end()) {
+ r->push_back(itMessage->second.Message);
+ KeyToMessage.erase(itMessage);
+ }
- TimedItems.pop_front();
- }
+ TimedItems.pop_front();
+ }
}
- void TSyncAckMessages::Clear(TMessagesPtrs* r) {
- for (TKeyToMessage::const_iterator i = KeyToMessage.begin(); i != KeyToMessage.end(); ++i) {
- r->push_back(i->second.Message);
- }
+ void TSyncAckMessages::Clear(TMessagesPtrs* r) {
+ for (TKeyToMessage::const_iterator i = KeyToMessage.begin(); i != KeyToMessage.end(); ++i) {
+ r->push_back(i->second.Message);
+ }
- KeyToMessage.clear();
- TimedItems.clear();
+ KeyToMessage.clear();
+ TimedItems.clear();
}
- void TSyncAckMessages::Gc() {
- TDeque<TTimedItem> tmp;
+ void TSyncAckMessages::Gc() {
+ TDeque<TTimedItem> tmp;
- for (auto& timedItem : TimedItems) {
- if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) {
- continue;
- }
- tmp.push_back(timedItem);
- }
+ for (auto& timedItem : TimedItems) {
+ if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) {
+ continue;
+ }
+ tmp.push_back(timedItem);
+ }
- TimedItems.swap(tmp);
- }
+ TimedItems.swap(tmp);
+ }
- void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) {
- for (auto message : messages) {
- TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id);
- Y_VERIFY(it != KeyToMessage.end(), "delete non-existent message");
- KeyToMessage.erase(it);
- }
- }
+ void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) {
+ for (auto message : messages) {
+ TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id);
+ Y_VERIFY(it != KeyToMessage.end(), "delete non-existent message");
+ KeyToMessage.erase(it);
+ }
+ }
- void TSyncAckMessages::DumpState() {
- Cerr << TimedItems.size() << Endl;
- Cerr << KeyToMessage.size() << Endl;
+ void TSyncAckMessages::DumpState() {
+ Cerr << TimedItems.size() << Endl;
+ Cerr << KeyToMessage.size() << Endl;
}
}