diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 13:26:22 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 15:44:45 +0300 |
commit | 0a98fece5a9b54f16afeb3a94b3eb3105e9c3962 (patch) | |
tree | 291d72dbd7e9865399f668c84d11ed86fb190bbf /library/cpp/eventlog/eventlog.h | |
parent | cb2c8d75065e5b3c47094067cb4aa407d4813298 (diff) | |
download | ydb-0a98fece5a9b54f16afeb3a94b3eb3105e9c3962.tar.gz |
YQ Connector:Use docker-compose in integrational tests
Diffstat (limited to 'library/cpp/eventlog/eventlog.h')
-rw-r--r-- | library/cpp/eventlog/eventlog.h | 623 |
1 files changed, 623 insertions, 0 deletions
diff --git a/library/cpp/eventlog/eventlog.h b/library/cpp/eventlog/eventlog.h new file mode 100644 index 0000000000..45c2dfb17f --- /dev/null +++ b/library/cpp/eventlog/eventlog.h @@ -0,0 +1,623 @@ +#pragma once + +#include "eventlog_int.h" +#include "event_field_output.h" +#include "events_extension.h" + +#include <library/cpp/blockcodecs/codecs.h> +#include <library/cpp/logger/all.h> + +#include <google/protobuf/message.h> + +#include <util/datetime/base.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/stream/output.h> +#include <util/stream/buffer.h> +#include <util/stream/str.h> +#include <util/system/mutex.h> +#include <util/stream/output.h> +#include <util/system/env.h> +#include <util/system/unaligned_mem.h> +#include <util/ysaveload.h> + +#include <cstdlib> + +namespace NJson { + class TJsonWriter; +} + +class IEventLog; + +class TEvent : public TThrRefBase { +public: + enum class TOutputFormat { + TabSeparated, + TabSeparatedRaw, // disables escaping + Json + }; + + struct TOutputOptions { + TOutputFormat OutputFormat = TOutputFormat::TabSeparated; + // Dump some fields (e.g. timestamp) in more human-readable format + bool HumanReadable = false; + + TOutputOptions(TOutputFormat outputFormat = TOutputFormat::TabSeparated) + : OutputFormat(outputFormat) + { + } + + TOutputOptions(TOutputFormat outputFormat, bool humanReadable) + : OutputFormat(outputFormat) + , HumanReadable(humanReadable) + { + } + }; + + struct TEventState { + TEventTimestamp FrameStartTime = 0; + TEventTimestamp PrevEventTime = 0; + TEventState() { + } + }; + + TEvent(TEventClass c, TEventTimestamp t) + : Class(c) + , Timestamp(t) + { + } + + virtual ~TEvent() = default; + + // Note, that descendants MUST have Save() & Load() methods to alter + // only its new variables, not the base class! + virtual void Save(IOutputStream& out) const = 0; + virtual void SaveToBuffer(TBufferOutput& out) const { + Save(out); + } + + // Note, that descendants MUST have Save() & Load() methods to alter + // only its new variables, not the base class! + virtual void Load(IInputStream& i) = 0; + + virtual TStringBuf GetName() const = 0; + virtual const NProtoBuf::Message* GetProto() const = 0; + + void Print(IOutputStream& out, const TOutputOptions& options = TOutputOptions(), const TEventState& eventState = TEventState()) const; + void PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const; + + TString ToString() const { + TStringStream buff; + Print(buff); + return buff.Str(); + } + + void FullSaveToBuffer(TBufferOutput& buf) const { + SaveMessageHeader(buf); + this->SaveToBuffer(buf); + } + + void FullSave(IOutputStream& o) const { + SaveMessageHeader(o); + this->Save(o); + } + + void FullLoad(IInputStream& i) { + ::Load(&i, Timestamp); + ::Load(&i, Class); + this->Load(i); + } + + template <class T> + const T* Get() const { + return static_cast<const T*>(this->GetProto()); + } + + TEventClass Class; + TEventTimestamp Timestamp; + ui32 FrameId = 0; + +private: + void SaveMessageHeader(IOutputStream& out) const { + ::Save(&out, Timestamp); + ::Save(&out, Class); + } + + virtual void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const = 0; + virtual void DoPrintJson(NJson::TJsonWriter& jsonWriter) const = 0; + + void PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const; +}; + +using TEventPtr = TIntrusivePtr<TEvent>; +using TConstEventPtr = TIntrusiveConstPtr<TEvent>; + +class IEventProcessor { +public: + virtual void SetOptions(const TEvent::TOutputOptions& options) { + Options_ = options; + } + virtual void ProcessEvent(const TEvent* ev) = 0; + virtual bool CheckedProcessEvent(const TEvent* ev) { + ProcessEvent(ev); + return true; + } + virtual ~IEventProcessor() = default; + +protected: + TEvent::TOutputOptions Options_; +}; + +class IEventFactory { +public: + virtual TEvent* CreateLogEvent(TEventClass c) = 0; + virtual TEventLogFormat CurrentFormat() = 0; + virtual TEventClass ClassByName(TStringBuf name) const = 0; + virtual TEventClass EventClassBegin() const = 0; + virtual TEventClass EventClassEnd() const = 0; + virtual ~IEventFactory() = default; +}; + +class TUnknownEvent: public TEvent { +public: + TUnknownEvent(TEventTimestamp ts, TEventClass cls) + : TEvent(cls, ts) + { + } + + ~TUnknownEvent() override = default; + + void Save(IOutputStream& /* o */) const override { + ythrow yexception() << "TUnknownEvent cannot be saved"; + } + + void Load(IInputStream& /* i */) override { + ythrow yexception() << "TUnknownEvent cannot be loaded"; + } + + TStringBuf GetName() const override; + +private: + void DoPrint(IOutputStream& out, EFieldOutputFlags) const override { + out << GetName() << "\t" << (size_t)Class; + } + + void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override; + + const NProtoBuf::Message* GetProto() const override; +}; + +class TEndOfFrameEvent: public TEvent { +public: + enum { + EventClass = 0 + }; + + TEndOfFrameEvent(TEventTimestamp ts) + : TEvent(TEndOfFrameEvent::EventClass, ts) + { + } + + ~TEndOfFrameEvent() override = default; + + void Save(IOutputStream& o) const override { + (void)o; + ythrow yexception() << "TEndOfFrameEvent cannot be saved"; + } + + void Load(IInputStream& i) override { + (void)i; + ythrow yexception() << "TEndOfFrameEvent cannot be loaded"; + } + + TStringBuf GetName() const override; + +private: + void DoPrint(IOutputStream& out, EFieldOutputFlags) const override { + out << GetName(); + } + void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override; + + const NProtoBuf::Message* GetProto() const override; +}; + +class ILogFrameEventVisitor { +public: + virtual ~ILogFrameEventVisitor() = default; + + virtual void Visit(const TEvent& event) = 0; +}; + +class IWriteFrameCallback : public TAtomicRefCount<IWriteFrameCallback> { +public: + virtual ~IWriteFrameCallback() = default; + + virtual void OnAfterCompress(const TBuffer& compressedFrame, TEventTimestamp startTimestamp, TEventTimestamp endTimestamp) = 0; +}; + +using TWriteFrameCallbackPtr = TIntrusivePtr<IWriteFrameCallback>; + +class TEventLogFrame { +public: + TEventLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + + virtual ~TEventLogFrame() = default; + + void Flush(); + void SafeFlush(); + + void ForceDump() { + ForceDump_ = true; + } + + template <class T> + inline void LogEvent(const T& ev) { + if (NeedAlwaysSafeAdd_) { + SafeLogEvent(ev); + } else { + UnSafeLogEvent(ev); + } + } + + template <class T> + inline void LogEvent(TEventTimestamp timestamp, const T& ev) { + if (NeedAlwaysSafeAdd_) { + SafeLogEvent(timestamp, ev); + } else { + UnSafeLogEvent(timestamp, ev); + } + } + + template <class T> + inline void UnSafeLogEvent(const T& ev) { + if (!IsEventIgnored(ev.ID)) + LogProtobufEvent(ev.ID, ev); + } + + template <class T> + inline void UnSafeLogEvent(TEventTimestamp timestamp, const T& ev) { + if (!IsEventIgnored(ev.ID)) + LogProtobufEvent(timestamp, ev.ID, ev); + } + + template <class T> + inline void SafeLogEvent(const T& ev) { + if (!IsEventIgnored(ev.ID)) { + TGuard<TMutex> g(Mtx_); + LogProtobufEvent(ev.ID, ev); + } + } + + template <class T> + inline void SafeLogEvent(TEventTimestamp timestamp, const T& ev) { + if (!IsEventIgnored(ev.ID)) { + TGuard<TMutex> g(Mtx_); + LogProtobufEvent(timestamp, ev.ID, ev); + } + } + + void VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory); + + inline bool IsEventIgnored(size_t eventId) const { + Y_UNUSED(eventId); // in future we might want to selectively discard only some kinds of messages + return !IsDebugModeEnabled() && EvLog_ == nullptr && !ForceDump_; + } + + void Enable(IEventLog& evLog) { + EvLog_ = &evLog; + } + + void Disable() { + EvLog_ = nullptr; + } + + void SetNeedAlwaysSafeAdd(bool val) { + NeedAlwaysSafeAdd_ = val; + } + + void SetWriteFrameCallback(TWriteFrameCallbackPtr writeFrameCallback) { + WriteFrameCallback_ = writeFrameCallback; + } + + void AddMetaFlag(const TString& key, const TString& value) { + if (NeedAlwaysSafeAdd_) { + TGuard<TMutex> g(Mtx_); + MetaFlags_.emplace_back(key, value); + } else { + MetaFlags_.emplace_back(key, value); + } + } + +protected: + void LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev); + void LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev); + +private: + static bool IsDebugModeEnabled() { + static struct TSelector { + bool Flag; + + TSelector() + : Flag(GetEnv("EVLOG_DEBUG") == TStringBuf("1")) + { + } + } selector; + + return selector.Flag; + } + + template <class T> + void DebugDump(const T& ev); + + // T must be a descendant of NEvClass::TEvent + template <class T> + inline void LogEventImpl(const T& ev) { + if (EvLog_ != nullptr || ForceDump_) { + TBuffer& b = Buf_.Buffer(); + size_t lastSize = b.size(); + ::Save(&Buf_, ui32(0)); + ev.FullSaveToBuffer(Buf_); + WriteUnaligned<ui32>(b.data() + lastSize, (ui32)(b.size() - lastSize)); + AddEvent(ev.Timestamp); + } + + if (IsDebugModeEnabled()) { + DebugDump(ev); + } + } + + void AddEvent(TEventTimestamp timestamp); + void DoInit(); + +private: + TBufferOutput Buf_; + TEventTimestamp StartTimestamp_, EndTimestamp_; + IEventLog* EvLog_; + TMutex Mtx_; + bool NeedAlwaysSafeAdd_; + bool ForceDump_; + TWriteFrameCallbackPtr WriteFrameCallback_; + TLogRecord::TMetaFlags MetaFlags_; + friend class TEventRecord; +}; + +class TSelfFlushLogFrame: public TEventLogFrame, public TAtomicRefCount<TSelfFlushLogFrame> { +public: + TSelfFlushLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + + virtual ~TSelfFlushLogFrame(); +}; + +using TSelfFlushLogFramePtr = TIntrusivePtr<TSelfFlushLogFrame>; + +class IEventLog: public TAtomicRefCount<IEventLog> { +public: + class IErrorCallback { + public: + virtual ~IErrorCallback() { + } + + virtual void OnWriteError() = 0; + }; + + class ISuccessCallback { + public: + virtual ~ISuccessCallback() { + } + + virtual void OnWriteSuccess(const TBuffer& frameData) = 0; + }; + + virtual ~IEventLog(); + + virtual void ReopenLog() = 0; + virtual void CloseLog() = 0; + virtual void Flush() = 0; + virtual void SetErrorCallback(IErrorCallback*) { + } + virtual void SetSuccessCallback(ISuccessCallback*) { + } + + template <class T> + void LogEvent(const T& ev) { + TEventLogFrame frame(*this); + frame.LogEvent(ev); + frame.Flush(); + } + + virtual bool HasNullBackend() const = 0; + + virtual void WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) = 0; +}; + +struct TEventLogBackendOptions { + bool UseSyncPageCacheBackend = false; + size_t SyncPageCacheBackendBufferSize = 0; + size_t SyncPageCacheBackendMaxPendingSize = 0; +}; + +class TEventLog: public IEventLog { +public: + /* + * Параметр contentformat указывает формат контента лога, например какие могут в логе + * встретится классы событий, какие параметры у этих событий, и пр. Старший байт параметра + * должен быть нулевым. + */ + TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat); + TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts = {}); + TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4); + TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4); + + ~TEventLog() override; + + void ReopenLog() override; + void CloseLog() override; + void Flush() override; + void SetErrorCallback(IErrorCallback* errorCallback) override { + ErrorCallback_ = errorCallback; + } + void SetSuccessCallback(ISuccessCallback* successCallback) override { + SuccessCallback_ = successCallback; + } + + template <class T> + void LogEvent(const T& ev) { + TEventLogFrame frame(*this); + frame.LogEvent(ev); + frame.Flush(); + } + + bool HasNullBackend() const override { + return HasNullBackend_; + } + + void WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) override; + +private: + mutable TLog Log_; + TEventLogFormat ContentFormat_; + const TEventLogFormat LogFormat_; + bool HasNullBackend_; + const NBlockCodecs::ICodec* const Lz4hcCodec_; + const NBlockCodecs::ICodec* const ZstdCodec_; + IErrorCallback* ErrorCallback_ = nullptr; + ISuccessCallback* SuccessCallback_ = nullptr; +}; + +using TEventLogPtr = TIntrusivePtr<IEventLog>; + +class TEventLogWithSlave: public IEventLog { +public: + TEventLogWithSlave(IEventLog& parentLog) + : Slave_(&parentLog) + { + } + + TEventLogWithSlave(const TEventLogPtr& parentLog) + : SlavePtr_(parentLog) + , Slave_(SlavePtr_.Get()) + { + } + + ~TEventLogWithSlave() override { + try { + Slave().Flush(); + } catch (...) { + } + } + + void Flush() override { + Slave().Flush(); + } + + void ReopenLog() override { + return Slave().ReopenLog(); + } + void CloseLog() override { + return Slave().CloseLog(); + } + + bool HasNullBackend() const override { + return Slave().HasNullBackend(); + } + + void WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) override { + Slave().WriteFrame(buffer, startTimestamp, endTimestamp, writeFrameCallback, std::move(metaFlags)); + } + + void SetErrorCallback(IErrorCallback* errorCallback) override { + Slave().SetErrorCallback(errorCallback); + } + + void SetSuccessCallback(ISuccessCallback* successCallback) override { + Slave().SetSuccessCallback(successCallback); + } + +protected: + inline IEventLog& Slave() const { + return *Slave_; + } + +private: + TEventLogPtr SlavePtr_; + IEventLog* Slave_ = nullptr; +}; + +extern TAtomic eventlogFrameCounter; + +class TProtobufEventProcessor: public IEventProcessor { +public: + void ProcessEvent(const TEvent* ev) override final { + ProcessEvent(ev, &Cout); + } + + void ProcessEvent(const TEvent* ev, IOutputStream *out) { + UpdateEventState(ev); + DoProcessEvent(ev, out); + EventState_.PrevEventTime = ev->Timestamp; + } +protected: + virtual void DoProcessEvent(const TEvent * ev, IOutputStream *out) { + ev->Print(*out, Options_, EventState_); + (*out) << Endl; + } + ui32 CurrentFrameId_ = Max<ui32>(); + TEvent::TEventState EventState_; + +private: + void UpdateEventState(const TEvent *ev) { + if (ev->FrameId != CurrentFrameId_) { + EventState_.FrameStartTime = ev->Timestamp; + EventState_.PrevEventTime = ev->Timestamp; + CurrentFrameId_ = ev->FrameId; + } + } +}; + +class TProtobufEventFactory: public IEventFactory { +public: + TProtobufEventFactory(NProtoBuf::TEventFactory* factory = NProtoBuf::TEventFactory::Instance()) + : EventFactory_(factory) + { + } + + TEvent* CreateLogEvent(TEventClass c) override; + + TEventLogFormat CurrentFormat() override { + return 0; + } + + TEventClass ClassByName(TStringBuf name) const override; + + TEventClass EventClassBegin() const override; + + TEventClass EventClassEnd() const override; + + ~TProtobufEventFactory() override = default; + +private: + NProtoBuf::TEventFactory* EventFactory_; +}; + +THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev); + +namespace NEvClass { + IEventFactory* Factory(); + IEventProcessor* Processor(); +} |