#include "thread.h"
#include "record.h"
#include <util/thread/pool.h>
#include <util/system/event.h>
#include <util/memory/addstorage.h>
#include <util/generic/ptr.h>
#include <util/generic/yexception.h>
class TThreadedLogBackend::TImpl {
class TRec: public IObjectInQueue, public TAdditionalStorage<TRec>, public TLogRecord {
public:
inline TRec(TImpl* parent, const TLogRecord& rec)
: TLogRecord(rec.Priority, (const char*)AdditionalData(), rec.Len)
, Parent_(parent)
{
memcpy(AdditionalData(), rec.Data, rec.Len);
}
inline ~TRec() override {
}
private:
void Process(void* /*tsr*/) override {
THolder<TRec> This(this);
Parent_->Slave_->WriteData(*this);
}
private:
TImpl* Parent_;
};
class TReopener: public IObjectInQueue, public TSystemEvent, public TAtomicRefCount<TReopener> {
public:
inline TReopener(TImpl* parent)
: Parent_(parent)
{
Ref();
}
inline ~TReopener() override {
}
private:
void Process(void* /*tsr*/) override {
try {
Parent_->Slave_->ReopenLog();
} catch (...) {
}
Signal();
UnRef();
}
private:
TImpl* Parent_;
};
public:
inline TImpl(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback = {})
: Slave_(slave)
, QueueOverflowCallback_(std::move(queueOverflowCallback))
{
Queue_.Start(1, queuelen);
}
inline ~TImpl() {
Queue_.Stop();
}
inline void WriteData(const TLogRecord& rec) {
THolder<TRec> obj(new (rec.Len) TRec(this, rec));
if (Queue_.Add(obj.Get())) {
Y_UNUSED(obj.Release());
return;
}
if (QueueOverflowCallback_) {
QueueOverflowCallback_();
} else {
ythrow yexception() << "log queue exhausted";
}
}
// Write an emergency message when the memory allocator is corrupted.
// The TThreadedLogBackend object can't be used after this method is called.
inline void WriteEmergencyData(const TLogRecord& rec) noexcept {
Queue_.Stop();
Slave_->WriteData(rec);
}
inline void ReopenLog() {
TIntrusivePtr<TReopener> reopener(new TReopener(this));
if (!Queue_.Add(reopener.Get())) {
reopener->UnRef(); // Ref() was called in constructor
ythrow yexception() << "log queue exhausted";
}
reopener->Wait();
}
inline void ReopenLogNoFlush() {
Slave_->ReopenLogNoFlush();
}
inline size_t QueueSize() const {
return Queue_.Size();
}
private:
TLogBackend* Slave_;
TThreadPool Queue_{"ThreadedLogBack"};
const std::function<void()> QueueOverflowCallback_;
};
TThreadedLogBackend::TThreadedLogBackend(TLogBackend* slave)
: Impl_(new TImpl(slave, 0))
{
}
TThreadedLogBackend::TThreadedLogBackend(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback)
: Impl_(new TImpl(slave, queuelen, std::move(queueOverflowCallback)))
{
}
TThreadedLogBackend::~TThreadedLogBackend() {
}
void TThreadedLogBackend::WriteData(const TLogRecord& rec) {
Impl_->WriteData(rec);
}
void TThreadedLogBackend::ReopenLog() {
Impl_->ReopenLog();
}
void TThreadedLogBackend::ReopenLogNoFlush() {
Impl_->ReopenLogNoFlush();
}
void TThreadedLogBackend::WriteEmergencyData(const TLogRecord& rec) {
Impl_->WriteEmergencyData(rec);
}
size_t TThreadedLogBackend::QueueSize() const {
return Impl_->QueueSize();
}
TOwningThreadedLogBackend::TOwningThreadedLogBackend(TLogBackend* slave)
: THolder<TLogBackend>(slave)
, TThreadedLogBackend(Get())
{
}
TOwningThreadedLogBackend::TOwningThreadedLogBackend(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback)
: THolder<TLogBackend>(slave)
, TThreadedLogBackend(Get(), queuelen, std::move(queueOverflowCallback))
{
}
TOwningThreadedLogBackend::~TOwningThreadedLogBackend() {
}