aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/logger/thread.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/logger/thread.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/logger/thread.cpp')
-rw-r--r--library/cpp/logger/thread.cpp165
1 files changed, 165 insertions, 0 deletions
diff --git a/library/cpp/logger/thread.cpp b/library/cpp/logger/thread.cpp
new file mode 100644
index 0000000000..0ccf9e374b
--- /dev/null
+++ b/library/cpp/logger/thread.cpp
@@ -0,0 +1,165 @@
+#include "thread.h"
+#include "record.h"
+
+#include <util/thread/pool.h>
+#include <util/system/event.h>
+#include <util/memory/addstorage.h>
+#include <util/generic/ptr.h>
+#include <util/generic/yexception.h>
+
+class TThreadedLogBackend::TImpl {
+ class TRec: public IObjectInQueue, public TAdditionalStorage<TRec>, public TLogRecord {
+ public:
+ inline TRec(TImpl* parent, const TLogRecord& rec)
+ : TLogRecord(rec.Priority, (const char*)AdditionalData(), rec.Len)
+ , Parent_(parent)
+ {
+ memcpy(AdditionalData(), rec.Data, rec.Len);
+ }
+
+ inline ~TRec() override {
+ }
+
+ private:
+ void Process(void* /*tsr*/) override {
+ THolder<TRec> This(this);
+
+ Parent_->Slave_->WriteData(*this);
+ }
+
+ private:
+ TImpl* Parent_;
+ };
+
+ class TReopener: public IObjectInQueue, public TSystemEvent, public TAtomicRefCount<TReopener> {
+ public:
+ inline TReopener(TImpl* parent)
+ : Parent_(parent)
+ {
+ Ref();
+ }
+
+ inline ~TReopener() override {
+ }
+
+ private:
+ void Process(void* /*tsr*/) override {
+ try {
+ Parent_->Slave_->ReopenLog();
+ } catch (...) {
+ }
+
+ Signal();
+ UnRef();
+ }
+
+ private:
+ TImpl* Parent_;
+ };
+
+public:
+ inline TImpl(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback = {})
+ : Slave_(slave)
+ , QueueOverflowCallback_(std::move(queueOverflowCallback))
+ {
+ Queue_.Start(1, queuelen);
+ }
+
+ inline ~TImpl() {
+ Queue_.Stop();
+ }
+
+ inline void WriteData(const TLogRecord& rec) {
+ THolder<TRec> obj(new (rec.Len) TRec(this, rec));
+
+ if (Queue_.Add(obj.Get())) {
+ Y_UNUSED(obj.Release());
+ return;
+ }
+
+ if (QueueOverflowCallback_) {
+ QueueOverflowCallback_();
+ } else {
+ ythrow yexception() << "log queue exhausted";
+ }
+ }
+
+ // Write an emergency message when the memory allocator is corrupted.
+ // The TThreadedLogBackend object can't be used after this method is called.
+ inline void WriteEmergencyData(const TLogRecord& rec) noexcept {
+ Queue_.Stop();
+ Slave_->WriteData(rec);
+ }
+
+ inline void ReopenLog() {
+ TIntrusivePtr<TReopener> reopener(new TReopener(this));
+
+ if (!Queue_.Add(reopener.Get())) {
+ reopener->UnRef(); // Ref() was called in constructor
+ ythrow yexception() << "log queue exhausted";
+ }
+
+ reopener->Wait();
+ }
+
+ inline void ReopenLogNoFlush() {
+ Slave_->ReopenLogNoFlush();
+ }
+
+ inline size_t QueueSize() const {
+ return Queue_.Size();
+ }
+
+private:
+ TLogBackend* Slave_;
+ TThreadPool Queue_{"ThreadedLogBack"};
+ const std::function<void()> QueueOverflowCallback_;
+};
+
+TThreadedLogBackend::TThreadedLogBackend(TLogBackend* slave)
+ : Impl_(new TImpl(slave, 0))
+{
+}
+
+TThreadedLogBackend::TThreadedLogBackend(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback)
+ : Impl_(new TImpl(slave, queuelen, std::move(queueOverflowCallback)))
+{
+}
+
+TThreadedLogBackend::~TThreadedLogBackend() {
+}
+
+void TThreadedLogBackend::WriteData(const TLogRecord& rec) {
+ Impl_->WriteData(rec);
+}
+
+void TThreadedLogBackend::ReopenLog() {
+ Impl_->ReopenLog();
+}
+
+void TThreadedLogBackend::ReopenLogNoFlush() {
+ Impl_->ReopenLogNoFlush();
+}
+
+void TThreadedLogBackend::WriteEmergencyData(const TLogRecord& rec) {
+ Impl_->WriteEmergencyData(rec);
+}
+
+size_t TThreadedLogBackend::QueueSize() const {
+ return Impl_->QueueSize();
+}
+
+TOwningThreadedLogBackend::TOwningThreadedLogBackend(TLogBackend* slave)
+ : THolder<TLogBackend>(slave)
+ , TThreadedLogBackend(Get())
+{
+}
+
+TOwningThreadedLogBackend::TOwningThreadedLogBackend(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback)
+ : THolder<TLogBackend>(slave)
+ , TThreadedLogBackend(Get(), queuelen, std::move(queueOverflowCallback))
+{
+}
+
+TOwningThreadedLogBackend::~TOwningThreadedLogBackend() {
+}