diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 13:26:22 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 15:44:45 +0300 |
commit | 0a98fece5a9b54f16afeb3a94b3eb3105e9c3962 (patch) | |
tree | 291d72dbd7e9865399f668c84d11ed86fb190bbf /library/cpp/eventlog | |
parent | cb2c8d75065e5b3c47094067cb4aa407d4813298 (diff) | |
download | ydb-0a98fece5a9b54f16afeb3a94b3eb3105e9c3962.tar.gz |
YQ Connector:Use docker-compose in integrational tests
Diffstat (limited to 'library/cpp/eventlog')
22 files changed, 3245 insertions, 0 deletions
diff --git a/library/cpp/eventlog/common.h b/library/cpp/eventlog/common.h new file mode 100644 index 0000000000..75c512c13e --- /dev/null +++ b/library/cpp/eventlog/common.h @@ -0,0 +1,10 @@ +#pragma once + +template <class T> +class TPacketInputStream { +public: + virtual bool Avail() const = 0; + virtual T operator*() const = 0; + virtual bool Next() = 0; + virtual ~TPacketInputStream() = default; +}; diff --git a/library/cpp/eventlog/evdecoder.cpp b/library/cpp/eventlog/evdecoder.cpp new file mode 100644 index 0000000000..e4413a1b0e --- /dev/null +++ b/library/cpp/eventlog/evdecoder.cpp @@ -0,0 +1,112 @@ +#include <util/memory/tempbuf.h> +#include <util/string/cast.h> +#include <util/stream/output.h> + +#include "evdecoder.h" +#include "logparser.h" + +static const char* const UNKNOWN_EVENT_CLASS = "Unknown event class"; + +static inline void LogError(ui64 frameAddr, const char* msg, bool strict) { + if (!strict) { + Cerr << "EventDecoder warning @" << frameAddr << ": " << msg << Endl; + } else { + ythrow yexception() << "EventDecoder error @" << frameAddr << ": " << msg; + } +} + +static inline bool SkipData(IInputStream& s, size_t amount) { + return (amount == s.Skip(amount)); +} + +// There are 2 log fomats: the one, that allows event skip without event decode (it has stored event length) +// and another, that requires each event decode just to seek over stream. needRead == true means the latter format. +static inline THolder<TEvent> DoDecodeEvent(IInputStream& s, const TEventFilter* const filter, const bool needRead, IEventFactory* fac) { + TEventTimestamp ts; + TEventClass c; + THolder<TEvent> e; + + ::Load(&s, ts); + ::Load(&s, c); + + bool needReturn = false; + + if (!filter || filter->EventAllowed(c)) { + needReturn = true; + } + + if (needRead || needReturn) { + e.Reset(fac->CreateLogEvent(c)); + + if (!!e) { + e->Timestamp = ts; + e->Load(s); + } else if (needReturn) { + e.Reset(new TUnknownEvent(ts, c)); + } + + if (!needReturn) { + e.Reset(nullptr); + } + } + + return e; +} + +THolder<TEvent> DecodeFramed(IInputStream& inp, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) { + ui32 len; + ::Load(&inp, len); + + if (len < sizeof(ui32)) { + ythrow TEventDecoderError() << "invalid event length"; + } + + TLengthLimitedInput s(&inp, len - sizeof(ui32)); + + try { + THolder<TEvent> e = DoDecodeEvent(s, filter, false, fac); + if (!!e) { + if (!s.Left()) { + return e; + } else if (e->Class == 0) { + if (!SkipData(s, s.Left())) { + ythrow TEventDecoderError() << "cannot skip bad event"; + } + + return e; + } + + LogError(frameAddr, "Event is not fully read", strict); + } + } catch (const TLoadEOF&) { + if (s.Left()) { + throw; + } + + LogError(frameAddr, "Unexpected event end", strict); + } + + if (!SkipData(s, s.Left())) { + ythrow TEventDecoderError() << "cannot skip bad event"; + } + + return nullptr; +} + +THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) { + try { + if (framed) { + return DecodeFramed(s, frameAddr, filter, fac, strict); + } else { + THolder<TEvent> e = DoDecodeEvent(s, filter, true, fac); + // e(0) means event, skipped by filter. Not an error. + if (!!e && !e->Class) { + ythrow TEventDecoderError() << UNKNOWN_EVENT_CLASS; + } + + return e; + } + } catch (const TLoadEOF&) { + ythrow TEventDecoderError() << "unexpected frame end"; + } +} diff --git a/library/cpp/eventlog/evdecoder.h b/library/cpp/eventlog/evdecoder.h new file mode 100644 index 0000000000..eedfc82174 --- /dev/null +++ b/library/cpp/eventlog/evdecoder.h @@ -0,0 +1,16 @@ +#pragma once + +#include <util/generic/yexception.h> +#include <util/generic/ptr.h> + +#include "eventlog.h" + +class TEvent; +class IInputStream; +class TEventFilter; + +struct TEventDecoderError: public yexception { +}; + +THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict = false); +bool AcceptableContent(TEventLogFormat); diff --git a/library/cpp/eventlog/event_field_output.cpp b/library/cpp/eventlog/event_field_output.cpp new file mode 100644 index 0000000000..f9d98dac9d --- /dev/null +++ b/library/cpp/eventlog/event_field_output.cpp @@ -0,0 +1,68 @@ +#include "event_field_output.h" + +#include <util/string/split.h> + +namespace { + TString MakeSeparators(EFieldOutputFlags flags) { + TString res; + res.reserve(3); + + if (flags & EFieldOutputFlag::EscapeTab) { + res.append('\t'); + } + if (flags & EFieldOutputFlag::EscapeNewLine) { + res.append('\n'); + res.append('\r'); + } + if (flags & EFieldOutputFlag::EscapeBackSlash) { + res.append('\\'); + } + + return res; + } +} + +TEventFieldOutput::TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags) + : Output(output) + , Flags(flags) + , Separators(MakeSeparators(flags)) +{ +} + +IOutputStream& TEventFieldOutput::GetOutputStream() { + return Output; +} + +EFieldOutputFlags TEventFieldOutput::GetFlags() const { + return Flags; +} + +void TEventFieldOutput::DoWrite(const void* buf, size_t len) { + if (!Flags) { + Output.Write(buf, len); + return; + } + + TStringBuf chunk{static_cast<const char*>(buf), len}; + + for (const auto part : StringSplitter(chunk).SplitBySet(Separators.data())) { + TStringBuf token = part.Token(); + TStringBuf delim = part.Delim(); + + if (!token.empty()) { + Output.Write(token); + } + if ("\n" == delim) { + Output.Write(TStringBuf("\\n")); + } else if ("\r" == delim) { + Output.Write(TStringBuf("\\r")); + } else if ("\t" == delim) { + Output.Write(TStringBuf("\\t")); + } else if ("\\" == delim) { + Output.Write(TStringBuf("\\\\")); + } else { + Y_ASSERT(delim.empty()); + } + } +} + diff --git a/library/cpp/eventlog/event_field_output.h b/library/cpp/eventlog/event_field_output.h new file mode 100644 index 0000000000..ed9db0ae16 --- /dev/null +++ b/library/cpp/eventlog/event_field_output.h @@ -0,0 +1,29 @@ +#pragma once + +#include <util/stream/output.h> +#include <util/generic/flags.h> + +enum class EFieldOutputFlag { + EscapeTab = 0x1, // escape \t in field value + EscapeNewLine = 0x2, // escape \n in field value + EscapeBackSlash = 0x4 // escape \ in field value +}; + +Y_DECLARE_FLAGS(EFieldOutputFlags, EFieldOutputFlag); +Y_DECLARE_OPERATORS_FOR_FLAGS(EFieldOutputFlags); + +class TEventFieldOutput: public IOutputStream { +public: + TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags); + + IOutputStream& GetOutputStream(); + EFieldOutputFlags GetFlags() const; + +protected: + void DoWrite(const void* buf, size_t len) override; + +private: + IOutputStream& Output; + EFieldOutputFlags Flags; + TString Separators; +}; diff --git a/library/cpp/eventlog/event_field_printer.cpp b/library/cpp/eventlog/event_field_printer.cpp new file mode 100644 index 0000000000..29c6b4b661 --- /dev/null +++ b/library/cpp/eventlog/event_field_printer.cpp @@ -0,0 +1,27 @@ +#include "event_field_printer.h" + +#include <library/cpp/protobuf/json/proto2json.h> + +namespace { + + const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig() + .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault) + .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>()); + +} // namespace + +TEventProtobufMessageFieldPrinter::TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode) + : Mode(mode) +{} + +template <> +void TEventProtobufMessageFieldPrinter::PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output) { + switch (Mode) { + case EProtobufMessageFieldPrintMode::DEFAULT: + case EProtobufMessageFieldPrintMode::JSON: { + // Do not use field.PrintJSON() here: IGNIETFERRO-2002 + NProtobufJson::Proto2Json(field, output, PROTO_2_JSON_CONFIG); + break; + } + } +} diff --git a/library/cpp/eventlog/event_field_printer.h b/library/cpp/eventlog/event_field_printer.h new file mode 100644 index 0000000000..835e8f4a85 --- /dev/null +++ b/library/cpp/eventlog/event_field_printer.h @@ -0,0 +1,38 @@ +#pragma once + +#include "event_field_output.h" + +#include <google/protobuf/message.h> + +// NB: For historical reasons print code for all primitive types/repeated fields/etc generated by https://a.yandex-team.ru/arc/trunk/arcadia/tools/event2cpp + +enum class EProtobufMessageFieldPrintMode { + // Use <TEventProtobufMessageFieldType>::Print method for fields that has it + // Print json for other fields + DEFAULT = 0, + + JSON = 1, +}; + +class TEventProtobufMessageFieldPrinter { +public: + explicit TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode); + + template <typename TEventProtobufMessageFieldType, bool HasPrintFunction> + void PrintProtobufMessageFieldToOutput(const TEventProtobufMessageFieldType& field, TEventFieldOutput& output) { + if constexpr (HasPrintFunction) { + if (Mode == EProtobufMessageFieldPrintMode::DEFAULT) { + field.Print(output.GetOutputStream(), output.GetFlags()); + return; + } + } + + PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(field, output); + } + + template <> + void PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output); + +private: + EProtobufMessageFieldPrintMode Mode; +}; diff --git a/library/cpp/eventlog/eventlog.cpp b/library/cpp/eventlog/eventlog.cpp new file mode 100644 index 0000000000..458a632b4a --- /dev/null +++ b/library/cpp/eventlog/eventlog.cpp @@ -0,0 +1,554 @@ +#include <util/datetime/base.h> +#include <util/stream/zlib.h> +#include <util/stream/length.h> +#include <util/generic/buffer.h> +#include <util/generic/yexception.h> +#include <util/digest/murmur.h> +#include <util/generic/singleton.h> +#include <util/generic/function.h> +#include <util/stream/output.h> +#include <util/stream/format.h> +#include <util/stream/null.h> + +#include <google/protobuf/messagext.h> + +#include "eventlog.h" +#include "events_extension.h" +#include "evdecoder.h" +#include "logparser.h" +#include <library/cpp/eventlog/proto/internal.pb.h> + +#include <library/cpp/json/json_writer.h> +#include <library/cpp/protobuf/json/proto2json.h> + + +TAtomic eventlogFrameCounter = 0; + +namespace { + + const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig() + .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault) + .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>()); + + ui32 GenerateFrameId() { + return ui32(AtomicAdd(eventlogFrameCounter, 1)); + } + + inline const NProtoBuf::Message* UnknownEventMessage() { + return Singleton<NEventLogInternal::TUnknownEvent>(); + } + +} // namespace + +void TEvent::Print(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const { + if (options.OutputFormat == TOutputFormat::TabSeparatedRaw) { + PrintHeader(out, options, eventState); + DoPrint(out, {}); + } else if (options.OutputFormat == TOutputFormat::TabSeparated) { + PrintHeader(out, options, eventState); + DoPrint( + out, + EFieldOutputFlags{} | EFieldOutputFlag::EscapeNewLine | EFieldOutputFlag::EscapeBackSlash); + } else if (options.OutputFormat == TOutputFormat::Json) { + NJson::TJsonWriterConfig jsonWriterConfig; + jsonWriterConfig.FormatOutput = 0; + NJson::TJsonWriter jsonWriter(&out, jsonWriterConfig); + + jsonWriter.OpenMap(); + PrintJsonHeader(jsonWriter); + DoPrintJson(jsonWriter); + jsonWriter.CloseMap(); + } +} + +void TEvent::PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const { + if (options.HumanReadable) { + out << TInstant::MicroSeconds(Timestamp).ToString() << "\t"; + if (Timestamp >= eventState.FrameStartTime) + out << "+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.FrameStartTime)); + else // a bug somewhere? anyway, let's handle it in a nice fashion + out << "-" << HumanReadable(TDuration::MicroSeconds(eventState.FrameStartTime - Timestamp)); + + if (Timestamp >= eventState.PrevEventTime) + out << " (+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.PrevEventTime)) << ")"; + // else: these events are async and out-of-order, relative time diff makes no sense, skip it + + out << "\tF# " << FrameId << '\t'; + } else { + out << static_cast<TEventTimestamp>(Timestamp); + out << '\t' << FrameId << '\t'; + } +} + +void TEvent::PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const { + jsonWriter.Write("Timestamp", Timestamp); + jsonWriter.Write("FrameId", FrameId); +} + +class TProtobufEvent: public TEvent { +public: + TProtobufEvent(TEventTimestamp t, size_t eventId, const NProtoBuf::Message& msg) + : TEvent(eventId, t) + , Message_(&msg) + , EventFactory_(NProtoBuf::TEventFactory::Instance()) + { + } + + TProtobufEvent() + : TEvent(0, 0) + , EventFactory_(NProtoBuf::TEventFactory::Instance()) + { + } + + explicit TProtobufEvent(ui32 id, NProtoBuf::TEventFactory* eventFactory = NProtoBuf::TEventFactory::Instance()) + : TEvent(id, 0) + , EventFactory_(eventFactory) + { + InnerMsg_.Reset(EventFactory_->CreateEvent(Class)); + Message_ = InnerMsg_.Get(); + } + + ui32 Id() const { + return Class; + } + + void Load(IInputStream& in) override { + if (!!InnerMsg_) { + InnerMsg_->ParseFromArcadiaStream(&in); + } else { + TransferData(&in, &Cnull); + } + } + + void Save(IOutputStream& out) const override { + Message_->SerializeToArcadiaStream(&out); + } + + void SaveToBuffer(TBufferOutput& buf) const override { + size_t messageSize = Message_->ByteSize(); + size_t before = buf.Buffer().Size(); + buf.Buffer().Advance(messageSize); + Y_PROTOBUF_SUPPRESS_NODISCARD Message_->SerializeToArray(buf.Buffer().Data() + before, messageSize); + } + + TStringBuf GetName() const override { + return EventFactory_->NameById(Id()); + } + +private: + void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const override { + EventFactory_->PrintEvent(Id(), Message_, out, flags); + } + void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override { + jsonWriter.OpenMap("EventBody"); + jsonWriter.Write("Type", GetName()); + + jsonWriter.Write("Fields"); + NProtobufJson::Proto2Json(*GetProto(), jsonWriter, PROTO_2_JSON_CONFIG); + + jsonWriter.CloseMap(); + } + + const NProtoBuf::Message* GetProto() const override { + if (Message_) { + return Message_; + } + + return UnknownEventMessage(); + } + +private: + const NProtoBuf::Message* Message_ = nullptr; + NProtoBuf::TEventFactory* EventFactory_; + THolder<NProtoBuf::Message> InnerMsg_; + + friend class TEventLogFrame; +}; + +void TEventLogFrame::LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev) { + TProtobufEvent event(Now().MicroSeconds(), eventId, ev); + + LogEventImpl(event); +} + +void TEventLogFrame::LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev) { + TProtobufEvent event(timestamp, eventId, ev); + + LogEventImpl(event); +} + +template <> +void TEventLogFrame::DebugDump(const TProtobufEvent& ev) { + static TMutex lock; + + with_lock (lock) { + Cerr << ev.Timestamp << "\t" << ev.GetName() << "\t"; + ev.GetProto()->PrintJSON(Cerr); + Cerr << Endl; + } +} + +#pragma pack(push, 1) +struct TFrameHeaderData { + char SyncField[COMPRESSED_LOG_FRAME_SYNC_DATA.size()]; + TCompressedFrameBaseHeader Header; + TCompressedFrameHeader2 HeaderEx; +}; +#pragma pack(pop) + +TEventLogFrame::TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : EvLog_(parentLog.HasNullBackend() ? nullptr : &parentLog) + , NeedAlwaysSafeAdd_(needAlwaysSafeAdd) + , ForceDump_(false) + , WriteFrameCallback_(std::move(writeFrameCallback)) +{ + DoInit(); +} + +TEventLogFrame::TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : EvLog_(parentLog) + , NeedAlwaysSafeAdd_(needAlwaysSafeAdd) + , ForceDump_(false) + , WriteFrameCallback_(std::move(writeFrameCallback)) +{ + if (EvLog_ && EvLog_->HasNullBackend()) { + EvLog_ = nullptr; + } + + DoInit(); +} + +TEventLogFrame::TEventLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : EvLog_(nullptr) + , NeedAlwaysSafeAdd_(needAlwaysSafeAdd) + , ForceDump_(false) + , WriteFrameCallback_(std::move(writeFrameCallback)) +{ + DoInit(); +} + +void TEventLogFrame::Flush() { + if (EvLog_ == nullptr) + return; + + TBuffer& buf = Buf_.Buffer(); + + if (buf.Empty()) { + return; + } + + EvLog_->WriteFrame(buf, StartTimestamp_, EndTimestamp_, WriteFrameCallback_, std::move(MetaFlags_)); + + DoInit(); + + return; +} + +void TEventLogFrame::SafeFlush() { + TGuard<TMutex> g(Mtx_); + Flush(); +} + +void TEventLogFrame::AddEvent(TEventTimestamp timestamp) { + if (timestamp < StartTimestamp_) { + StartTimestamp_ = timestamp; + } + + if (timestamp > EndTimestamp_) { + EndTimestamp_ = timestamp; + } +} + +void TEventLogFrame::DoInit() { + Buf_.Buffer().Clear(); + + StartTimestamp_ = (TEventTimestamp)-1; + EndTimestamp_ = 0; +} + +void TEventLogFrame::VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory) { + const auto doVisit = [this, &visitor, eventFactory]() { + TBuffer& buf = Buf_.Buffer(); + + TBufferInput bufferInput(buf); + TLengthLimitedInput limitedInput(&bufferInput, buf.size()); + + TEventFilter EventFilter(false); + + while (limitedInput.Left()) { + THolder<TEvent> event = DecodeEvent(limitedInput, true, 0, &EventFilter, eventFactory); + + visitor.Visit(*event); + } + }; + if (NeedAlwaysSafeAdd_) { + TGuard<TMutex> g(Mtx_); + doVisit(); + } else { + doVisit(); + } +} + +TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback)) +{ +} + +TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback)) +{ +} + +TSelfFlushLogFrame::TSelfFlushLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : TEventLogFrame(needAlwaysSafeAdd, std::move(writeFrameCallback)) +{ +} + +TSelfFlushLogFrame::~TSelfFlushLogFrame() { + try { + Flush(); + } catch (...) { + } +} + +IEventLog::~IEventLog() { +} + +static THolder<TLogBackend> ConstructBackend(const TString& fileName, const TEventLogBackendOptions& backendOpts) { + try { + THolder<TLogBackend> backend; + if (backendOpts.UseSyncPageCacheBackend) { + backend = MakeHolder<TSyncPageCacheFileLogBackend>(fileName, backendOpts.SyncPageCacheBackendBufferSize, backendOpts.SyncPageCacheBackendMaxPendingSize); + } else { + backend = MakeHolder<TFileLogBackend>(fileName); + } + return MakeHolder<TReopenLogBackend>(std::move(backend)); + } catch (...) { + Cdbg << "Warning: Cannot open event log '" << fileName << "': " << CurrentExceptionMessage() << "." << Endl; + } + + return MakeHolder<TNullLogBackend>(); +} + +TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat) + : Log_(ConstructBackend(fileName, backendOpts)) + , ContentFormat_(contentFormat) + , LogFormat_(logFormat.Defined() ? *logFormat : COMPRESSED_LOG_FORMAT_V4) + , HasNullBackend_(Log_.IsNullLog()) + , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc")) + , ZstdCodec_(NBlockCodecs::Codec("zstd_1")) +{ + Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5); + + if (contentFormat & 0xff000000) { + ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")"; + } +} + +TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts) + : TEventLog(fileName, contentFormat, backendOpts, COMPRESSED_LOG_FORMAT_V4) +{ +} + +TEventLog::TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat) + : Log_(log) + , ContentFormat_(contentFormat) + , LogFormat_(logFormat) + , HasNullBackend_(Log_.IsNullLog()) + , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc")) + , ZstdCodec_(NBlockCodecs::Codec("zstd_1")) +{ + if (contentFormat & 0xff000000) { + ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")"; + } +} + +TEventLog::TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat) + : Log_(MakeHolder<TNullLogBackend>()) + , ContentFormat_(contentFormat) + , LogFormat_(logFormat) + , HasNullBackend_(true) + , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc")) + , ZstdCodec_(NBlockCodecs::Codec("zstd_1")) +{ + if (contentFormat & 0xff000000) { + ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")"; + } +} + +TEventLog::~TEventLog() { +} + +void TEventLog::ReopenLog() { + Log_.ReopenLog(); +} + +void TEventLog::CloseLog() { + Log_.CloseLog(); +} + +void TEventLog::Flush() { +} + +namespace { + class TOnExceptionAction { + public: + TOnExceptionAction(std::function<void()>&& f) + : F_(std::move(f)) + { + } + + ~TOnExceptionAction() { + if (F_ && UncaughtException()) { + try { + F_(); + } catch (...) { + } + } + } + + private: + std::function<void()> F_; + }; +} + +void TEventLog::WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback, + TLogRecord::TMetaFlags metaFlags) { + Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5); + + TBuffer& b1 = buffer; + + size_t maxCompressedLength = (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) ? b1.Size() + 256 : ZstdCodec_->MaxCompressedLength(b1); + + // Reserve enough memory to minimize reallocs + TBufferOutput outbuf(sizeof(TFrameHeaderData) + maxCompressedLength); + TBuffer& b2 = outbuf.Buffer(); + b2.Proceed(sizeof(TFrameHeaderData)); + + { + TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data()); + + memcpy(hdr.SyncField, COMPRESSED_LOG_FRAME_SYNC_DATA.data(), COMPRESSED_LOG_FRAME_SYNC_DATA.size()); + hdr.Header.Format = (LogFormat_ << 24) | (ContentFormat_ & 0xffffff); + hdr.Header.FrameId = GenerateFrameId(); + hdr.HeaderEx.UncompressedDatalen = (ui32)b1.Size(); + hdr.HeaderEx.StartTimestamp = startTimestamp; + hdr.HeaderEx.EndTimestamp = endTimestamp; + hdr.HeaderEx.PayloadChecksum = 0; + hdr.HeaderEx.CompressorVersion = 0; + } + + if (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) { + TBuffer encoded(b1.Size() + sizeof(TFrameHeaderData) + 256); + Lz4hcCodec_->Encode(b1, encoded); + + TZLibCompress compr(&outbuf, ZLib::ZLib, 6, 2048); + compr.Write(encoded.data(), encoded.size()); + compr.Finish(); + } else { + b2.Advance(ZstdCodec_->Compress(b1, b2.Pos())); + } + + { + const size_t k = sizeof(TCompressedFrameBaseHeader) + COMPRESSED_LOG_FRAME_SYNC_DATA.size(); + TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data()); + hdr.Header.Length = static_cast<ui32>(b2.size() - k); + hdr.HeaderEx.PayloadChecksum = MurmurHash<ui32>(b2.data() + sizeof(TFrameHeaderData), b2.size() - sizeof(TFrameHeaderData)); + + const size_t n = sizeof(TFrameHeaderData) - (COMPRESSED_LOG_FRAME_SYNC_DATA.size() + sizeof(hdr.HeaderEx.HeaderChecksum)); + hdr.HeaderEx.HeaderChecksum = MurmurHash<ui32>(b2.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(), n); + } + + const TBuffer& frameData = outbuf.Buffer(); + + TOnExceptionAction actionCallback([this] { + if (ErrorCallback_) { + ErrorCallback_->OnWriteError(); + } + }); + + if (writeFrameCallback) { + writeFrameCallback->OnAfterCompress(frameData, startTimestamp, endTimestamp); + } + + Log_.Write(frameData.Data(), frameData.Size(), std::move(metaFlags)); + if (SuccessCallback_) { + SuccessCallback_->OnWriteSuccess(frameData); + } +} + +TEvent* TProtobufEventFactory::CreateLogEvent(TEventClass c) { + return new TProtobufEvent(c, EventFactory_); +} + +TEventClass TProtobufEventFactory::ClassByName(TStringBuf name) const { + return EventFactory_->IdByName(name); +} + +TEventClass TProtobufEventFactory::EventClassBegin() const { + const auto& items = EventFactory_->FactoryItems(); + + if (items.empty()) { + return static_cast<TEventClass>(0); + } + + return static_cast<TEventClass>(items.begin()->first); +} + +TEventClass TProtobufEventFactory::EventClassEnd() const { + const auto& items = EventFactory_->FactoryItems(); + + if (items.empty()) { + return static_cast<TEventClass>(0); + } + + return static_cast<TEventClass>(items.rbegin()->first + 1); +} + +namespace NEvClass { + IEventFactory* Factory() { + return Singleton<TProtobufEventFactory>(); + } + + IEventProcessor* Processor() { + return Singleton<TProtobufEventProcessor>(); + } +} + +const NProtoBuf::Message* TUnknownEvent::GetProto() const { + return UnknownEventMessage(); +} + +TStringBuf TUnknownEvent::GetName() const { + return TStringBuf("UnknownEvent"); +} + +void TUnknownEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const { + jsonWriter.OpenMap("EventBody"); + jsonWriter.Write("Type", GetName()); + jsonWriter.Write("EventId", (size_t)Class); + jsonWriter.CloseMap(); +} + +TStringBuf TEndOfFrameEvent::GetName() const { + return TStringBuf("EndOfFrame"); +} + +const NProtoBuf::Message* TEndOfFrameEvent::GetProto() const { + return Singleton<NEventLogInternal::TEndOfFrameEvent>(); +} + +void TEndOfFrameEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const { + jsonWriter.OpenMap("EventBody"); + jsonWriter.Write("Type", GetName()); + jsonWriter.OpenMap("Fields"); + jsonWriter.CloseMap(); + jsonWriter.CloseMap(); +} + +THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev) { + return MakeHolder<TProtobufEvent>(ts, eventId, ev); +} diff --git a/library/cpp/eventlog/eventlog.h b/library/cpp/eventlog/eventlog.h new file mode 100644 index 0000000000..45c2dfb17f --- /dev/null +++ b/library/cpp/eventlog/eventlog.h @@ -0,0 +1,623 @@ +#pragma once + +#include "eventlog_int.h" +#include "event_field_output.h" +#include "events_extension.h" + +#include <library/cpp/blockcodecs/codecs.h> +#include <library/cpp/logger/all.h> + +#include <google/protobuf/message.h> + +#include <util/datetime/base.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/stream/output.h> +#include <util/stream/buffer.h> +#include <util/stream/str.h> +#include <util/system/mutex.h> +#include <util/stream/output.h> +#include <util/system/env.h> +#include <util/system/unaligned_mem.h> +#include <util/ysaveload.h> + +#include <cstdlib> + +namespace NJson { + class TJsonWriter; +} + +class IEventLog; + +class TEvent : public TThrRefBase { +public: + enum class TOutputFormat { + TabSeparated, + TabSeparatedRaw, // disables escaping + Json + }; + + struct TOutputOptions { + TOutputFormat OutputFormat = TOutputFormat::TabSeparated; + // Dump some fields (e.g. timestamp) in more human-readable format + bool HumanReadable = false; + + TOutputOptions(TOutputFormat outputFormat = TOutputFormat::TabSeparated) + : OutputFormat(outputFormat) + { + } + + TOutputOptions(TOutputFormat outputFormat, bool humanReadable) + : OutputFormat(outputFormat) + , HumanReadable(humanReadable) + { + } + }; + + struct TEventState { + TEventTimestamp FrameStartTime = 0; + TEventTimestamp PrevEventTime = 0; + TEventState() { + } + }; + + TEvent(TEventClass c, TEventTimestamp t) + : Class(c) + , Timestamp(t) + { + } + + virtual ~TEvent() = default; + + // Note, that descendants MUST have Save() & Load() methods to alter + // only its new variables, not the base class! + virtual void Save(IOutputStream& out) const = 0; + virtual void SaveToBuffer(TBufferOutput& out) const { + Save(out); + } + + // Note, that descendants MUST have Save() & Load() methods to alter + // only its new variables, not the base class! + virtual void Load(IInputStream& i) = 0; + + virtual TStringBuf GetName() const = 0; + virtual const NProtoBuf::Message* GetProto() const = 0; + + void Print(IOutputStream& out, const TOutputOptions& options = TOutputOptions(), const TEventState& eventState = TEventState()) const; + void PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const; + + TString ToString() const { + TStringStream buff; + Print(buff); + return buff.Str(); + } + + void FullSaveToBuffer(TBufferOutput& buf) const { + SaveMessageHeader(buf); + this->SaveToBuffer(buf); + } + + void FullSave(IOutputStream& o) const { + SaveMessageHeader(o); + this->Save(o); + } + + void FullLoad(IInputStream& i) { + ::Load(&i, Timestamp); + ::Load(&i, Class); + this->Load(i); + } + + template <class T> + const T* Get() const { + return static_cast<const T*>(this->GetProto()); + } + + TEventClass Class; + TEventTimestamp Timestamp; + ui32 FrameId = 0; + +private: + void SaveMessageHeader(IOutputStream& out) const { + ::Save(&out, Timestamp); + ::Save(&out, Class); + } + + virtual void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const = 0; + virtual void DoPrintJson(NJson::TJsonWriter& jsonWriter) const = 0; + + void PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const; +}; + +using TEventPtr = TIntrusivePtr<TEvent>; +using TConstEventPtr = TIntrusiveConstPtr<TEvent>; + +class IEventProcessor { +public: + virtual void SetOptions(const TEvent::TOutputOptions& options) { + Options_ = options; + } + virtual void ProcessEvent(const TEvent* ev) = 0; + virtual bool CheckedProcessEvent(const TEvent* ev) { + ProcessEvent(ev); + return true; + } + virtual ~IEventProcessor() = default; + +protected: + TEvent::TOutputOptions Options_; +}; + +class IEventFactory { +public: + virtual TEvent* CreateLogEvent(TEventClass c) = 0; + virtual TEventLogFormat CurrentFormat() = 0; + virtual TEventClass ClassByName(TStringBuf name) const = 0; + virtual TEventClass EventClassBegin() const = 0; + virtual TEventClass EventClassEnd() const = 0; + virtual ~IEventFactory() = default; +}; + +class TUnknownEvent: public TEvent { +public: + TUnknownEvent(TEventTimestamp ts, TEventClass cls) + : TEvent(cls, ts) + { + } + + ~TUnknownEvent() override = default; + + void Save(IOutputStream& /* o */) const override { + ythrow yexception() << "TUnknownEvent cannot be saved"; + } + + void Load(IInputStream& /* i */) override { + ythrow yexception() << "TUnknownEvent cannot be loaded"; + } + + TStringBuf GetName() const override; + +private: + void DoPrint(IOutputStream& out, EFieldOutputFlags) const override { + out << GetName() << "\t" << (size_t)Class; + } + + void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override; + + const NProtoBuf::Message* GetProto() const override; +}; + +class TEndOfFrameEvent: public TEvent { +public: + enum { + EventClass = 0 + }; + + TEndOfFrameEvent(TEventTimestamp ts) + : TEvent(TEndOfFrameEvent::EventClass, ts) + { + } + + ~TEndOfFrameEvent() override = default; + + void Save(IOutputStream& o) const override { + (void)o; + ythrow yexception() << "TEndOfFrameEvent cannot be saved"; + } + + void Load(IInputStream& i) override { + (void)i; + ythrow yexception() << "TEndOfFrameEvent cannot be loaded"; + } + + TStringBuf GetName() const override; + +private: + void DoPrint(IOutputStream& out, EFieldOutputFlags) const override { + out << GetName(); + } + void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override; + + const NProtoBuf::Message* GetProto() const override; +}; + +class ILogFrameEventVisitor { +public: + virtual ~ILogFrameEventVisitor() = default; + + virtual void Visit(const TEvent& event) = 0; +}; + +class IWriteFrameCallback : public TAtomicRefCount<IWriteFrameCallback> { +public: + virtual ~IWriteFrameCallback() = default; + + virtual void OnAfterCompress(const TBuffer& compressedFrame, TEventTimestamp startTimestamp, TEventTimestamp endTimestamp) = 0; +}; + +using TWriteFrameCallbackPtr = TIntrusivePtr<IWriteFrameCallback>; + +class TEventLogFrame { +public: + TEventLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + + virtual ~TEventLogFrame() = default; + + void Flush(); + void SafeFlush(); + + void ForceDump() { + ForceDump_ = true; + } + + template <class T> + inline void LogEvent(const T& ev) { + if (NeedAlwaysSafeAdd_) { + SafeLogEvent(ev); + } else { + UnSafeLogEvent(ev); + } + } + + template <class T> + inline void LogEvent(TEventTimestamp timestamp, const T& ev) { + if (NeedAlwaysSafeAdd_) { + SafeLogEvent(timestamp, ev); + } else { + UnSafeLogEvent(timestamp, ev); + } + } + + template <class T> + inline void UnSafeLogEvent(const T& ev) { + if (!IsEventIgnored(ev.ID)) + LogProtobufEvent(ev.ID, ev); + } + + template <class T> + inline void UnSafeLogEvent(TEventTimestamp timestamp, const T& ev) { + if (!IsEventIgnored(ev.ID)) + LogProtobufEvent(timestamp, ev.ID, ev); + } + + template <class T> + inline void SafeLogEvent(const T& ev) { + if (!IsEventIgnored(ev.ID)) { + TGuard<TMutex> g(Mtx_); + LogProtobufEvent(ev.ID, ev); + } + } + + template <class T> + inline void SafeLogEvent(TEventTimestamp timestamp, const T& ev) { + if (!IsEventIgnored(ev.ID)) { + TGuard<TMutex> g(Mtx_); + LogProtobufEvent(timestamp, ev.ID, ev); + } + } + + void VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory); + + inline bool IsEventIgnored(size_t eventId) const { + Y_UNUSED(eventId); // in future we might want to selectively discard only some kinds of messages + return !IsDebugModeEnabled() && EvLog_ == nullptr && !ForceDump_; + } + + void Enable(IEventLog& evLog) { + EvLog_ = &evLog; + } + + void Disable() { + EvLog_ = nullptr; + } + + void SetNeedAlwaysSafeAdd(bool val) { + NeedAlwaysSafeAdd_ = val; + } + + void SetWriteFrameCallback(TWriteFrameCallbackPtr writeFrameCallback) { + WriteFrameCallback_ = writeFrameCallback; + } + + void AddMetaFlag(const TString& key, const TString& value) { + if (NeedAlwaysSafeAdd_) { + TGuard<TMutex> g(Mtx_); + MetaFlags_.emplace_back(key, value); + } else { + MetaFlags_.emplace_back(key, value); + } + } + +protected: + void LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev); + void LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev); + +private: + static bool IsDebugModeEnabled() { + static struct TSelector { + bool Flag; + + TSelector() + : Flag(GetEnv("EVLOG_DEBUG") == TStringBuf("1")) + { + } + } selector; + + return selector.Flag; + } + + template <class T> + void DebugDump(const T& ev); + + // T must be a descendant of NEvClass::TEvent + template <class T> + inline void LogEventImpl(const T& ev) { + if (EvLog_ != nullptr || ForceDump_) { + TBuffer& b = Buf_.Buffer(); + size_t lastSize = b.size(); + ::Save(&Buf_, ui32(0)); + ev.FullSaveToBuffer(Buf_); + WriteUnaligned<ui32>(b.data() + lastSize, (ui32)(b.size() - lastSize)); + AddEvent(ev.Timestamp); + } + + if (IsDebugModeEnabled()) { + DebugDump(ev); + } + } + + void AddEvent(TEventTimestamp timestamp); + void DoInit(); + +private: + TBufferOutput Buf_; + TEventTimestamp StartTimestamp_, EndTimestamp_; + IEventLog* EvLog_; + TMutex Mtx_; + bool NeedAlwaysSafeAdd_; + bool ForceDump_; + TWriteFrameCallbackPtr WriteFrameCallback_; + TLogRecord::TMetaFlags MetaFlags_; + friend class TEventRecord; +}; + +class TSelfFlushLogFrame: public TEventLogFrame, public TAtomicRefCount<TSelfFlushLogFrame> { +public: + TSelfFlushLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + + virtual ~TSelfFlushLogFrame(); +}; + +using TSelfFlushLogFramePtr = TIntrusivePtr<TSelfFlushLogFrame>; + +class IEventLog: public TAtomicRefCount<IEventLog> { +public: + class IErrorCallback { + public: + virtual ~IErrorCallback() { + } + + virtual void OnWriteError() = 0; + }; + + class ISuccessCallback { + public: + virtual ~ISuccessCallback() { + } + + virtual void OnWriteSuccess(const TBuffer& frameData) = 0; + }; + + virtual ~IEventLog(); + + virtual void ReopenLog() = 0; + virtual void CloseLog() = 0; + virtual void Flush() = 0; + virtual void SetErrorCallback(IErrorCallback*) { + } + virtual void SetSuccessCallback(ISuccessCallback*) { + } + + template <class T> + void LogEvent(const T& ev) { + TEventLogFrame frame(*this); + frame.LogEvent(ev); + frame.Flush(); + } + + virtual bool HasNullBackend() const = 0; + + virtual void WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) = 0; +}; + +struct TEventLogBackendOptions { + bool UseSyncPageCacheBackend = false; + size_t SyncPageCacheBackendBufferSize = 0; + size_t SyncPageCacheBackendMaxPendingSize = 0; +}; + +class TEventLog: public IEventLog { +public: + /* + * Параметр contentformat указывает формат контента лога, например какие могут в логе + * встретится классы событий, какие параметры у этих событий, и пр. Старший байт параметра + * должен быть нулевым. + */ + TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat); + TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts = {}); + TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4); + TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4); + + ~TEventLog() override; + + void ReopenLog() override; + void CloseLog() override; + void Flush() override; + void SetErrorCallback(IErrorCallback* errorCallback) override { + ErrorCallback_ = errorCallback; + } + void SetSuccessCallback(ISuccessCallback* successCallback) override { + SuccessCallback_ = successCallback; + } + + template <class T> + void LogEvent(const T& ev) { + TEventLogFrame frame(*this); + frame.LogEvent(ev); + frame.Flush(); + } + + bool HasNullBackend() const override { + return HasNullBackend_; + } + + void WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) override; + +private: + mutable TLog Log_; + TEventLogFormat ContentFormat_; + const TEventLogFormat LogFormat_; + bool HasNullBackend_; + const NBlockCodecs::ICodec* const Lz4hcCodec_; + const NBlockCodecs::ICodec* const ZstdCodec_; + IErrorCallback* ErrorCallback_ = nullptr; + ISuccessCallback* SuccessCallback_ = nullptr; +}; + +using TEventLogPtr = TIntrusivePtr<IEventLog>; + +class TEventLogWithSlave: public IEventLog { +public: + TEventLogWithSlave(IEventLog& parentLog) + : Slave_(&parentLog) + { + } + + TEventLogWithSlave(const TEventLogPtr& parentLog) + : SlavePtr_(parentLog) + , Slave_(SlavePtr_.Get()) + { + } + + ~TEventLogWithSlave() override { + try { + Slave().Flush(); + } catch (...) { + } + } + + void Flush() override { + Slave().Flush(); + } + + void ReopenLog() override { + return Slave().ReopenLog(); + } + void CloseLog() override { + return Slave().CloseLog(); + } + + bool HasNullBackend() const override { + return Slave().HasNullBackend(); + } + + void WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) override { + Slave().WriteFrame(buffer, startTimestamp, endTimestamp, writeFrameCallback, std::move(metaFlags)); + } + + void SetErrorCallback(IErrorCallback* errorCallback) override { + Slave().SetErrorCallback(errorCallback); + } + + void SetSuccessCallback(ISuccessCallback* successCallback) override { + Slave().SetSuccessCallback(successCallback); + } + +protected: + inline IEventLog& Slave() const { + return *Slave_; + } + +private: + TEventLogPtr SlavePtr_; + IEventLog* Slave_ = nullptr; +}; + +extern TAtomic eventlogFrameCounter; + +class TProtobufEventProcessor: public IEventProcessor { +public: + void ProcessEvent(const TEvent* ev) override final { + ProcessEvent(ev, &Cout); + } + + void ProcessEvent(const TEvent* ev, IOutputStream *out) { + UpdateEventState(ev); + DoProcessEvent(ev, out); + EventState_.PrevEventTime = ev->Timestamp; + } +protected: + virtual void DoProcessEvent(const TEvent * ev, IOutputStream *out) { + ev->Print(*out, Options_, EventState_); + (*out) << Endl; + } + ui32 CurrentFrameId_ = Max<ui32>(); + TEvent::TEventState EventState_; + +private: + void UpdateEventState(const TEvent *ev) { + if (ev->FrameId != CurrentFrameId_) { + EventState_.FrameStartTime = ev->Timestamp; + EventState_.PrevEventTime = ev->Timestamp; + CurrentFrameId_ = ev->FrameId; + } + } +}; + +class TProtobufEventFactory: public IEventFactory { +public: + TProtobufEventFactory(NProtoBuf::TEventFactory* factory = NProtoBuf::TEventFactory::Instance()) + : EventFactory_(factory) + { + } + + TEvent* CreateLogEvent(TEventClass c) override; + + TEventLogFormat CurrentFormat() override { + return 0; + } + + TEventClass ClassByName(TStringBuf name) const override; + + TEventClass EventClassBegin() const override; + + TEventClass EventClassEnd() const override; + + ~TProtobufEventFactory() override = default; + +private: + NProtoBuf::TEventFactory* EventFactory_; +}; + +THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev); + +namespace NEvClass { + IEventFactory* Factory(); + IEventProcessor* Processor(); +} diff --git a/library/cpp/eventlog/eventlog_int.cpp b/library/cpp/eventlog/eventlog_int.cpp new file mode 100644 index 0000000000..faa8c42cbe --- /dev/null +++ b/library/cpp/eventlog/eventlog_int.cpp @@ -0,0 +1,12 @@ +#include "eventlog_int.h" + +#include <util/string/cast.h> + +TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str) { + EEventLogFormat format; + if (TryFromString(str, format)) { + return static_cast<TEventLogFormat>(format); + } else { + return {}; + } +} diff --git a/library/cpp/eventlog/eventlog_int.h b/library/cpp/eventlog/eventlog_int.h new file mode 100644 index 0000000000..eb00fecfab --- /dev/null +++ b/library/cpp/eventlog/eventlog_int.h @@ -0,0 +1,72 @@ +#pragma once + +#include <util/stream/output.h> +#include <util/generic/maybe.h> +#include <util/generic/utility.h> +#include <util/generic/yexception.h> +#include <util/ysaveload.h> + +using TEventClass = ui32; +using TEventLogFormat = ui32; +using TEventTimestamp = ui64; + +constexpr TStringBuf COMPRESSED_LOG_FRAME_SYNC_DATA = + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + "\x00\x00\x00\x00\xfe\x00\x00\xff\xff\x00\x00\xff\xff\x00" + "\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff" + "\xff\x00\x00\xff\xff\x00\x00\xff"sv; + +static_assert(COMPRESSED_LOG_FRAME_SYNC_DATA.size() == 64); + +/* + * Коды форматов логов. Форматом лога считается формат служебных + * структур лога. К примеру формат заголовка, наличие компрессии, и т.д. + * Имеет значение только 1 младший байт. + */ + +enum EEventLogFormat : TEventLogFormat { + // Формат версии 1. Используется компрессор LZQ. + COMPRESSED_LOG_FORMAT_V1 = 1, + + // Формат версии 2. Используется компрессор ZLIB. Добавлены CRC заголовка и данных, + // поле типа компрессора. + COMPRESSED_LOG_FORMAT_V2 = 2, + + // Формат версии 3. Используется компрессор ZLIB. В начинке фреймов перед каждым событием добавлен его размер. + COMPRESSED_LOG_FORMAT_V3 = 3, + + // Lz4hc codec + zlib + COMPRESSED_LOG_FORMAT_V4 = 4 /* "zlib_lz4" */, + + // zstd + COMPRESSED_LOG_FORMAT_V5 = 5 /* "zstd" */, +}; + +TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str); + +#pragma pack(push, 1) + +struct TCompressedFrameBaseHeader { + TEventLogFormat Format; + ui32 Length; // Длина остатка фрейма в байтах, после этого заголовка + ui32 FrameId; +}; + +struct TCompressedFrameHeader { + TEventTimestamp StartTimestamp; + TEventTimestamp EndTimestamp; + ui32 UncompressedDatalen; // Длина данных, которые были закомпрессированы + ui32 PayloadChecksum; // В логе версии 1 поле не используется +}; + +struct TCompressedFrameHeader2: public TCompressedFrameHeader { + ui8 CompressorVersion; // Сейчас не используется + ui32 HeaderChecksum; +}; + +#pragma pack(pop) + +Y_DECLARE_PODTYPE(TCompressedFrameBaseHeader); +Y_DECLARE_PODTYPE(TCompressedFrameHeader); +Y_DECLARE_PODTYPE(TCompressedFrameHeader2); diff --git a/library/cpp/eventlog/events_extension.h b/library/cpp/eventlog/events_extension.h new file mode 100644 index 0000000000..0cf062f959 --- /dev/null +++ b/library/cpp/eventlog/events_extension.h @@ -0,0 +1,161 @@ +#pragma once + +#include "event_field_output.h" + +#include <google/protobuf/descriptor.h> +#include <google/protobuf/message.h> + +#include <library/cpp/threading/atomic/bool.h> +#include <library/cpp/string_utils/base64/base64.h> + +#include <util/generic/map.h> +#include <util/generic/deque.h> +#include <util/generic/singleton.h> +#include <util/string/hex.h> +#include <util/system/guard.h> +#include <util/system/mutex.h> + +namespace NProtoBuf { + class TEventFactory { + public: + typedef ::google::protobuf::Message Message; + typedef void (*TEventSerializer)(const Message* event, IOutputStream& output, EFieldOutputFlags flags); + typedef void (*TRegistrationFunc)(); + + private: + class TFactoryItem { + public: + TFactoryItem(const Message* prototype, const TEventSerializer serializer) + : Prototype_(prototype) + , Serializer_(serializer) + { + } + + TStringBuf GetName() const { + return Prototype_->GetDescriptor()->name(); + } + + Message* Create() const { + return Prototype_->New(); + } + + void PrintEvent(const Message* event, IOutputStream& out, EFieldOutputFlags flags) const { + (*Serializer_)(event, out, flags); + } + + private: + const Message* Prototype_; + const TEventSerializer Serializer_; + }; + + typedef TMap<size_t, TFactoryItem> TFactoryMap; + + public: + TEventFactory() + : FactoryItems_() + { + } + + void ScheduleRegistration(TRegistrationFunc func) { + EventRegistrators_.push_back(func); + } + + void RegisterEvent(size_t eventId, const Message* prototype, const TEventSerializer serializer) { + FactoryItems_.insert(std::make_pair(eventId, TFactoryItem(prototype, serializer))); + } + + size_t IdByName(TStringBuf eventname) { + DelayedRegistration(); + for (TFactoryMap::const_iterator it = FactoryItems_.begin(); it != FactoryItems_.end(); ++it) { + if (it->second.GetName() == eventname) + return it->first; + } + + ythrow yexception() << "do not know event '" << eventname << "'"; + } + + TStringBuf NameById(size_t id) { + DelayedRegistration(); + TFactoryMap::const_iterator it = FactoryItems_.find(id); + return it != FactoryItems_.end() ? it->second.GetName() : TStringBuf(); + } + + Message* CreateEvent(size_t eventId) { + DelayedRegistration(); + TFactoryMap::const_iterator it = FactoryItems_.find(eventId); + + if (it != FactoryItems_.end()) { + return it->second.Create(); + } + + return nullptr; + } + + const TMap<size_t, TFactoryItem>& FactoryItems() { + DelayedRegistration(); + return FactoryItems_; + } + + void PrintEvent( + size_t eventId, + const Message* event, + IOutputStream& output, + EFieldOutputFlags flags = {}) { + DelayedRegistration(); + TFactoryMap::const_iterator it = FactoryItems_.find(eventId); + + if (it != FactoryItems_.end()) { + it->second.PrintEvent(event, output, flags); + } + } + + static TEventFactory* Instance() { + return Singleton<TEventFactory>(); + } + + private: + void DelayedRegistration() { + if (!DelayedRegistrationDone_) { + TGuard<TMutex> guard(MutexEventRegistrators_); + Y_UNUSED(guard); + while (!EventRegistrators_.empty()) { + EventRegistrators_.front()(); + EventRegistrators_.pop_front(); + } + DelayedRegistrationDone_ = true; + } + } + + private: + TMap<size_t, TFactoryItem> FactoryItems_; + TDeque<TRegistrationFunc> EventRegistrators_; + NAtomic::TBool DelayedRegistrationDone_ = false; + TMutex MutexEventRegistrators_; + }; + + template <typename T> + void PrintAsBytes(const T& obj, IOutputStream& output) { + const ui8* b = reinterpret_cast<const ui8*>(&obj); + const ui8* e = b + sizeof(T); + const char* delim = ""; + + while (b != e) { + output << delim; + output << (int)*b++; + delim = "."; + } + } + + template <typename T> + void PrintAsHex(const T& obj, IOutputStream& output) { + output << "0x"; + output << HexEncode(&obj, sizeof(T)); + } + + inline void PrintAsBase64(TStringBuf data, IOutputStream& output) { + if (!data.empty()) { + output << Base64Encode(data); + } + } + +} diff --git a/library/cpp/eventlog/iterator.cpp b/library/cpp/eventlog/iterator.cpp new file mode 100644 index 0000000000..71f955bca8 --- /dev/null +++ b/library/cpp/eventlog/iterator.cpp @@ -0,0 +1,88 @@ +#include "iterator.h" + +#include <library/cpp/streams/growing_file_input/growing_file_input.h> + +#include <util/string/cast.h> +#include <util/string/split.h> +#include <util/string/type.h> +#include <util/stream/file.h> + +using namespace NEventLog; + +namespace { + inline TIntrusivePtr<TEventFilter> ConstructEventFilter(bool enableEvents, const TString& evList, IEventFactory* fac) { + if (evList.empty()) { + return nullptr; + } + + TVector<TString> events; + + StringSplitter(evList).Split(',').SkipEmpty().Collect(&events); + if (events.empty()) { + return nullptr; + } + + TIntrusivePtr<TEventFilter> filter(new TEventFilter(enableEvents)); + + for (const auto& event : events) { + if (IsNumber(event)) + filter->AddEventClass(FromString<size_t>(event)); + else + filter->AddEventClass(fac->ClassByName(event)); + } + + return filter; + } + + struct TIterator: public IIterator { + inline TIterator(const TOptions& o, IEventFactory* fac) + : First(true) + { + if (o.FileName.size()) { + if (o.ForceStreamMode || o.TailFMode) { + FileInput.Reset(o.TailFMode ? (IInputStream*)new TGrowingFileInput(o.FileName) : (IInputStream*)new TUnbufferedFileInput(o.FileName)); + FrameStream.Reset(new TFrameStreamer(*FileInput, fac, o.FrameFilter)); + } else { + FrameStream.Reset(new TFrameStreamer(o.FileName, o.StartTime, o.EndTime, o.MaxRequestDuration, fac, o.FrameFilter)); + } + } else { + FrameStream.Reset(new TFrameStreamer(*o.Input, fac, o.FrameFilter)); + } + + EvFilter = ConstructEventFilter(o.EnableEvents, o.EvList, fac); + EventStream.Reset(new TEventStreamer(*FrameStream, o.StartTime, o.EndTime, o.ForceStrongOrdering, EvFilter, o.ForceLosslessStrongOrdering)); + } + + TConstEventPtr Next() override { + if (First) { + First = false; + + if (!EventStream->Avail()) { + return nullptr; + } + } else { + if (!EventStream->Next()) { + return nullptr; + } + } + + return **EventStream; + } + + THolder<IInputStream> FileInput; + THolder<TFrameStreamer> FrameStream; + TIntrusivePtr<TEventFilter> EvFilter; + THolder<TEventStreamer> EventStream; + bool First; + }; +} + +IIterator::~IIterator() = default; + +THolder<IIterator> NEventLog::CreateIterator(const TOptions& o, IEventFactory* fac) { + return MakeHolder<TIterator>(o, fac); +} + +THolder<IIterator> NEventLog::CreateIterator(const TOptions& o) { + return MakeHolder<TIterator>(o, NEvClass::Factory()); +} diff --git a/library/cpp/eventlog/iterator.h b/library/cpp/eventlog/iterator.h new file mode 100644 index 0000000000..71a61ed549 --- /dev/null +++ b/library/cpp/eventlog/iterator.h @@ -0,0 +1,51 @@ +#pragma once + +#include <util/stream/input.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/iterator.h> + +#include "eventlog.h" +#include "logparser.h" + +namespace NEventLog { + struct TOptions { + inline TOptions& SetFileName(const TString& fileName) { + FileName = fileName; + + return *this; + } + + inline TOptions& SetForceStrongOrdering(bool v) { + if(!ForceLosslessStrongOrdering) { + ForceStrongOrdering = v; + } + + return *this; + } + + ui64 StartTime = MIN_START_TIME; + ui64 EndTime = MAX_END_TIME; + ui64 MaxRequestDuration = MAX_REQUEST_DURATION; + TString FileName; + bool ForceStrongOrdering = false; + bool ForceWeakOrdering = false; + bool EnableEvents = true; + TString EvList; + bool ForceStreamMode = false; + bool ForceLosslessStrongOrdering = false; + bool TailFMode = false; + IInputStream* Input = &Cin; + IFrameFilterRef FrameFilter; + }; + + class IIterator: public TInputRangeAdaptor<IIterator> { + public: + virtual ~IIterator(); + + virtual TConstEventPtr Next() = 0; + }; + + THolder<IIterator> CreateIterator(const TOptions& o); + THolder<IIterator> CreateIterator(const TOptions& o, IEventFactory* fac); +} diff --git a/library/cpp/eventlog/logparser.cpp b/library/cpp/eventlog/logparser.cpp new file mode 100644 index 0000000000..6f8959f788 --- /dev/null +++ b/library/cpp/eventlog/logparser.cpp @@ -0,0 +1,814 @@ +#include "logparser.h" +#include "evdecoder.h" + +#include <util/stream/output.h> +#include <util/stream/zlib.h> +#include <util/digest/murmur.h> +#include <util/generic/algorithm.h> +#include <util/generic/scope.h> +#include <util/generic/hash_set.h> +#include <util/string/split.h> +#include <util/string/cast.h> +#include <util/string/escape.h> +#include <util/string/builder.h> + +#include <contrib/libs/re2/re2/re2.h> + +#include <algorithm> +#include <array> + +namespace { + bool FastforwardUntilSyncHeader(IInputStream* in) { + // Usually this function finds the correct header at the first hit + std::array<char, COMPRESSED_LOG_FRAME_SYNC_DATA.size()> buffer; + if (in->Load(buffer.data(), buffer.size()) != buffer.size()) { + return false; + } + + auto begin = buffer.begin(); + + for (;;) { + if (std::mismatch( + begin, buffer.end(), + COMPRESSED_LOG_FRAME_SYNC_DATA.begin()).first == buffer.end() && + std::mismatch( + buffer.begin(), begin, + COMPRESSED_LOG_FRAME_SYNC_DATA.begin() + (buffer.end() - begin)).first == begin) { + return true; + } + if (!in->ReadChar(*begin)) { + return false; + } + ++begin; + if (begin == buffer.end()) { + begin = buffer.begin(); + } + } + } + + bool HasCorrectChecksum(const TFrameHeader& header) { + // Calculating hash over all the fields of the read header except for the field with the hash of the header itself. + const size_t baseSize = sizeof(TCompressedFrameBaseHeader) + sizeof(TCompressedFrameHeader2) - sizeof(ui32); + const ui32 checksum = MurmurHash<ui32>(&header.Basehdr, baseSize); + return checksum == header.Framehdr.HeaderChecksum; + } + + TMaybe<TFrameHeader> FindNextFrameHeader(IInputStream* in) { + for (;;) { + if (FastforwardUntilSyncHeader(in)) { + try { + return TFrameHeader(*in); + } catch (const TFrameLoadError& err) { + Cdbg << err.what() << Endl; + in->Skip(err.SkipAfter); + } + } else { + return Nothing(); + } + } + } + + std::pair<TMaybe<TFrameHeader>, TStringBuf> FindNextFrameHeader(TStringBuf span) { + for (;;) { + auto iter = std::search( + span.begin(), span.end(), + COMPRESSED_LOG_FRAME_SYNC_DATA.begin(), COMPRESSED_LOG_FRAME_SYNC_DATA.end()); + const size_t offset = iter - span.begin(); + + if (offset != span.size()) { + span = span.substr(offset); + try { + TMemoryInput in( + span.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(), + span.size() - COMPRESSED_LOG_FRAME_SYNC_DATA.size()); + return {TFrameHeader(in), span}; + } catch (const TFrameLoadError& err) { + Cdbg << err.what() << Endl; + span = span.substr(err.SkipAfter); + } + } else { + return {Nothing(), {}}; + } + } + } + + size_t FindFrames(const TStringBuf span, ui64 start, ui64 end, ui64 maxRequestDuration) { + Y_ENSURE(start <= end); + + const auto leftTimeBound = start - Min(start, maxRequestDuration); + const auto rightTimeBound = end + Min(maxRequestDuration, Max<ui64>() - end); + + TStringBuf subspan = span; + TMaybe<TFrameHeader> maybeLeftFrame; + std::tie(maybeLeftFrame, subspan) = FindNextFrameHeader(subspan); + + if (!maybeLeftFrame || maybeLeftFrame->EndTime() > rightTimeBound) { + return span.size(); + } + + if (maybeLeftFrame->StartTime() > leftTimeBound) { + return 0; + } + + while (subspan.size() > maybeLeftFrame->FullLength()) { + const auto mid = subspan.data() + subspan.size() / 2; + auto [midFrame, rightHalfSpan] = FindNextFrameHeader({mid, subspan.data() + subspan.size()}); + if (!midFrame) { + // If mid is in the middle of the last frame, here we will lose it meaning that + // we will find previous frame as the result. + // This is fine because we will iterate frames starting from that. + subspan = subspan.substr(0, subspan.size() / 2); + continue; + } + if (midFrame->StartTime() <= leftTimeBound) { + maybeLeftFrame = midFrame; + subspan = rightHalfSpan; + } else { + subspan = subspan.substr(0, subspan.size() / 2); + } + } + + return subspan.data() - span.data(); + } +} + +TFrameHeader::TFrameHeader(IInputStream& in) { + try { + ::Load(&in, Basehdr); + + Y_ENSURE(Basehdr.Length, "Empty frame additional data"); + + ::Load(&in, Framehdr); + switch (LogFormat()) { + case COMPRESSED_LOG_FORMAT_V1: + break; + + case COMPRESSED_LOG_FORMAT_V2: + case COMPRESSED_LOG_FORMAT_V3: + case COMPRESSED_LOG_FORMAT_V4: + case COMPRESSED_LOG_FORMAT_V5: + Y_ENSURE(!Framehdr.CompressorVersion, "Wrong compressor"); + + Y_ENSURE(HasCorrectChecksum(*this), "Wrong header checksum"); + break; + + default: + ythrow yexception() << "Unsupported log structure format"; + }; + + Y_ENSURE(Framehdr.StartTimestamp <= Framehdr.EndTimestamp, "Wrong start/end timestamps"); + + // Each frame must contain at least one event. + Y_ENSURE(Framehdr.UncompressedDatalen, "Empty frame payload"); + } catch (...) { + TString location = ""; + if (const auto* cnt = dynamic_cast<TCountingInput *>(&in)) { + location = "@ " + ToString(cnt->Counter()); + } + ythrow TFrameLoadError(FrameLength()) << "Frame Load Error" << location << ": " << CurrentExceptionMessage(); + } +} + +TFrame::TFrame(IInputStream& in, TFrameHeader header, IEventFactory* fac) + : TFrameHeader(header) + , Limiter_(MakeHolder<TLengthLimitedInput>(&in, header.FrameLength())) + , Fac_(fac) +{ + if (auto* cnt = dynamic_cast<TCountingInput *>(&in)) { + Address_ = cnt->Counter() - sizeof(TFrameHeader); + } else { + Address_ = 0; + } +} + +TFrame::TIterator TFrame::GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter) const { + if (EventsCache_.empty()) { + for (TFrameDecoder decoder{*this, eventFilter.Get()}; decoder.Avail(); decoder.Next()) { + EventsCache_.emplace_back(*decoder); + } + } + + return TIterator(*this, eventFilter); +} + +void TFrame::ClearEventsCache() const { + EventsCache_.clear(); +} + +TString TFrame::GetCompressedFrame() const { + const auto left = Limiter_->Left(); + TString payload = Limiter_->ReadAll(); + Y_ENSURE(payload.size() == left, "Could not read frame payload: premature end of stream"); + const ui32 checksum = MurmurHash<ui32>(payload.data(), payload.size()); + Y_ENSURE(checksum == Framehdr.PayloadChecksum, "Invalid frame checksum"); + + return payload; +} + +TString TFrame::GetRawFrame() const { + TString frameBuf = GetCompressedFrame(); + TStringInput sin(frameBuf); + return TZLibDecompress{&sin}.ReadAll(); +} + +TFrame::TIterator::TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter) + : Frame_(frame) + , Size_(frame.EventsCache_.size()) + , Filter_(filter) + , Index_(0) +{ + SkipToValidEvent(); +} + +TConstEventPtr TFrame::TIterator::operator*() const { + return Frame_.GetEvent(Index_); +} + +bool TFrame::TIterator::Next() { + Index_++; + SkipToValidEvent(); + return Index_ < Size_; +} + +void TFrame::TIterator::SkipToValidEvent() { + if (!Filter_) { + return; + } + + for (; Index_ < Size_; ++Index_) { + if (Filter_->EventAllowed(Frame_.GetEvent(Index_)->Class)) { + break; + } + } +} + +TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory* eventFactory) { + if (auto header = FindNextFrameHeader(in)) { + return TFrame{*in, *header, eventFactory}; + } else { + return Nothing(); + } +} + +TContainsEventFrameFilter::TContainsEventFrameFilter(const TString& unparsedMatchGroups, const IEventFactory* eventFactory) { + TVector<TStringBuf> tokens; + + SplitWithEscaping(tokens, unparsedMatchGroups, "/"); + + // Amount of match groups + size_t size = tokens.size(); + MatchGroups.resize(size); + + for (size_t i = 0; i < size; i++) { + TMatchGroup& group = MatchGroups[i]; + TVector<TStringBuf> groupTokens; + SplitWithEscaping(groupTokens, tokens[i], ":"); + + Y_ENSURE(groupTokens.size() == 3); + + try { + group.EventID = eventFactory->ClassByName(groupTokens[0]); + } catch (yexception& e) { + if (!TryFromString<TEventClass>(groupTokens[0], group.EventID)) { + e << "\nAppend:\n" << "Cannot derive EventId from EventType: " << groupTokens[0]; + throw e; + } + } + + group.FieldName = groupTokens[1]; + group.ValueToMatch = UnescapeCharacters(groupTokens[2], "/:"); + } +} + +bool TContainsEventFrameFilter::FrameAllowed(const TFrame& frame) const { + THashSet<size_t> toMatchSet; + for (size_t i = 0; i < MatchGroups.size(); i++) { + toMatchSet.insert(i); + } + + for (auto it = frame.GetIterator(); it.Avail(); it.Next()) { + TConstEventPtr event(*it); + TVector<size_t> indicesToErase; + + if (!toMatchSet.empty()) { + const NProtoBuf::Message* message = event->GetProto(); + const google::protobuf::Descriptor* descriptor = message->GetDescriptor(); + const google::protobuf::Reflection* reflection = message->GetReflection(); + + Y_ENSURE(descriptor); + Y_ENSURE(reflection); + + for (size_t groupIndex : toMatchSet) { + const TMatchGroup& group = MatchGroups[groupIndex]; + + if (event->Class == group.EventID) { + TVector<TString> parts = StringSplitter(group.FieldName).Split('.').ToList<TString>(); + TString lastPart = std::move(parts.back()); + parts.pop_back(); + + for (auto part : parts) { + auto fieldDescriptor = descriptor->FindFieldByName(part); + Y_ENSURE(fieldDescriptor, "Cannot find field \"" + part + "\". Full fieldname is \"" + group.FieldName + "\"."); + + message = &reflection->GetMessage(*message, fieldDescriptor); + descriptor = message->GetDescriptor(); + reflection = message->GetReflection(); + + Y_ENSURE(descriptor); + Y_ENSURE(reflection); + } + + const google::protobuf::FieldDescriptor* fieldDescriptor = descriptor->FindFieldByName(lastPart); + Y_ENSURE(fieldDescriptor, "Cannot find field \"" + lastPart + "\". Full fieldname is \"" + group.FieldName + "\"."); + + TString fieldValue = GetEventFieldAsString(message, fieldDescriptor, reflection); + if (re2::RE2::FullMatch(fieldValue, group.ValueToMatch)) { + indicesToErase.push_back(groupIndex); + } + } + } + + for (size_t idx : indicesToErase) { + toMatchSet.erase(idx); + } + + if (toMatchSet.empty()) { + return true; + } + } + } + + return toMatchSet.empty(); +} + +void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet) { + size_t tokenStart = 0; + const TString characterSet = TString::Join("\\", externalCharacterSet); + + for (size_t position = stringToSplit.find_first_of(characterSet); position != TString::npos; position = stringToSplit.find_first_of(characterSet, position + 1)) { + if (stringToSplit[position] == '\\') { + position++; + } else { + if (tokenStart != position) { + tokens.push_back(TStringBuf(stringToSplit, tokenStart, position - tokenStart)); + } + tokenStart = position + 1; + } + } + + if (tokenStart < stringToSplit.size()) { + tokens.push_back(TStringBuf(stringToSplit, tokenStart, stringToSplit.size() - tokenStart)); + } +} + +TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet) { + TStringBuilder stringBuilder; + size_t tokenStart = 0; + + for (size_t position = stringToUnescape.find('\\', 0u); position != TString::npos; position = stringToUnescape.find('\\', position + 2)) { + if (position + 1 < stringToUnescape.size() && characterSet.find(stringToUnescape[position + 1]) != TString::npos) { + stringBuilder << TStringBuf(stringToUnescape, tokenStart, position - tokenStart); + tokenStart = position + 1; + } + } + + if (tokenStart < stringToUnescape.size()) { + stringBuilder << TStringBuf(stringToUnescape, tokenStart, stringToUnescape.size() - tokenStart); + } + + return stringBuilder; +} + +TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection) { + Y_ENSURE(message); + Y_ENSURE(fieldDescriptor); + Y_ENSURE(reflection); + + TString result; + switch (fieldDescriptor->type()) { + case google::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: + result = ToString(reflection->GetDouble(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_FLOAT: + result = ToString(reflection->GetFloat(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_BOOL: + result = ToString(reflection->GetBool(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_INT32: + result = ToString(reflection->GetInt32(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_UINT32: + result = ToString(reflection->GetUInt32(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_INT64: + result = ToString(reflection->GetInt64(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_UINT64: + result = ToString(reflection->GetUInt64(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_STRING: + result = ToString(reflection->GetString(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_ENUM: + { + const NProtoBuf::EnumValueDescriptor* enumValueDescriptor = reflection->GetEnum(*message, fieldDescriptor); + result = ToString(enumValueDescriptor->name()); + } + break; + default: + throw yexception() << "GetEventFieldAsString for type " << fieldDescriptor->type_name() << " is not implemented."; + } + return result; +} + +TFrameStreamer::TFrameStreamer(IInputStream& s, IEventFactory* fac, IFrameFilterRef ff) + : In_(&s) + , FrameFilter_(ff) + , EventFactory_(fac) +{ + Frame_ = FindNextFrame(&In_, EventFactory_); + + SkipToAllowedFrame(); +} + +TFrameStreamer::TFrameStreamer( + const TString& fileName, + ui64 startTime, + ui64 endTime, + ui64 maxRequestDuration, + IEventFactory* fac, + IFrameFilterRef ff) + : File_(TBlob::FromFile(fileName)) + , MemoryIn_(File_.Data(), File_.Size()) + , In_(&MemoryIn_) + , StartTime_(startTime) + , EndTime_(endTime) + , CutoffTime_(endTime + Min(maxRequestDuration, Max<ui64>() - endTime)) + , FrameFilter_(ff) + , EventFactory_(fac) +{ + In_.Skip(FindFrames(File_.AsStringBuf(), startTime, endTime, maxRequestDuration)); + Frame_ = FindNextFrame(&In_, fac); + SkipToAllowedFrame(); +} + +TFrameStreamer::~TFrameStreamer() = default; + +bool TFrameStreamer::Avail() const { + return Frame_.Defined(); +} + +const TFrame& TFrameStreamer::operator*() const { + Y_ENSURE(Frame_, "Frame streamer depleted"); + + return *Frame_; +} + +bool TFrameStreamer::Next() { + DoNext(); + SkipToAllowedFrame(); + + return Frame_.Defined(); +} + +bool TFrameStreamer::AllowedTimeRange(const TFrame& frame) const { + const bool allowedStartTime = (StartTime_ == 0) || ((StartTime_ <= frame.StartTime()) && (frame.StartTime() <= EndTime_)); + const bool allowedEndTime = (EndTime_ == 0) || ((StartTime_ <= frame.EndTime()) && (frame.EndTime() <= EndTime_)); + return allowedStartTime || allowedEndTime; +} + +bool TFrameStreamer::DoNext() { + if (!Frame_) { + return false; + } + In_.Skip(Frame_->Limiter_->Left()); + Frame_ = FindNextFrame(&In_, EventFactory_); + + if (Frame_ && CutoffTime_ > 0 && Frame_->EndTime() > CutoffTime_) { + Frame_.Clear(); + } + + return Frame_.Defined(); +} + +namespace { + struct TDecodeBuffer { + TDecodeBuffer(const TString codec, IInputStream& src, size_t bs) { + TBuffer from(bs); + + { + TBufferOutput b(from); + TransferData(&src, &b); + } + + NBlockCodecs::Codec(codec)->Decode(from, DecodeBuffer); + } + + explicit TDecodeBuffer(IInputStream& src) { + TBufferOutput b(DecodeBuffer); + TransferData(&src, &b); + } + + TBuffer DecodeBuffer; + }; + + class TBlockCodecStream: private TDecodeBuffer, public TBufferInput { + public: + TBlockCodecStream(const TString codec, IInputStream& src, size_t bs) + : TDecodeBuffer(codec, src, bs) + , TBufferInput(DecodeBuffer) + {} + + explicit TBlockCodecStream(IInputStream& src) + : TDecodeBuffer(src) + , TBufferInput(DecodeBuffer) + {} + }; +} + +TFrameDecoder::TFrameDecoder(const TFrame& fr, const TEventFilter* const filter, bool strict, bool withRawData) + : Frame_(fr) + , Event_(nullptr) + , Flt_(filter) + , Fac_(fr.Fac_) + , EndOfFrame_(new TEndOfFrameEvent(Frame_.EndTime())) + , Strict_(strict) + , WithRawData_(withRawData) +{ + switch (fr.LogFormat()) { + case COMPRESSED_LOG_FORMAT_V2: + case COMPRESSED_LOG_FORMAT_V3: + case COMPRESSED_LOG_FORMAT_V4: + case COMPRESSED_LOG_FORMAT_V5: { + const auto payload = fr.GetCompressedFrame(); + TMemoryInput payloadInput{payload}; + + if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V5) { + Decompressor_.Reset(new TBlockCodecStream("zstd_1", payloadInput, payload.size())); + } else { + TZLibDecompress zlib(&payloadInput); + Decompressor_.Reset(new TBlockCodecStream(zlib)); + if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V4) { + Decompressor_.Reset(new TBlockCodecStream("lz4hc", *Decompressor_, payload.size())); + } + } + + break; + } + + default: + ythrow yexception() << "unsupported log format: " << fr.LogFormat() << Endl; + break; + }; + + if (WithRawData_) { + TBufferOutput out(UncompressedData_); + TLengthLimitedInput limiter(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen); + + TransferData(&limiter, &out); + Decompressor_.Reset(new TMemoryInput(UncompressedData_.data(), UncompressedData_.size())); + } + + Limiter_.Reset(new TLengthLimitedInput(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen)); + + Decode(); +} + +TFrameDecoder::~TFrameDecoder() = default; + +bool TFrameDecoder::Avail() const { + return HaveData(); +} + +TConstEventPtr TFrameDecoder::operator*() const { + Y_ENSURE(HaveData(), "Decoder depleted"); + + return Event_; +} + +bool TFrameDecoder::Next() { + if (HaveData()) { + Decode(); + } + + return HaveData(); +} + +void TFrameDecoder::Decode() { + Event_ = nullptr; + const bool framed = (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V3) || (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V4 || Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V5); + + size_t evBegin = 0; + size_t evEnd = 0; + if (WithRawData_) + evBegin = UncompressedData_.Size() - Limiter_->Left(); + + while (Limiter_->Left() && !(Event_ = DecodeEvent(*Limiter_, framed, Frame_.Address(), Flt_, Fac_, Strict_).Release())) { + } + + if (WithRawData_) { + evEnd = UncompressedData_.Size() - Limiter_->Left(); + RawEventData_ = TStringBuf(UncompressedData_.data() + evBegin, UncompressedData_.data() + evEnd); + } + + if (!Event_ && (!Flt_ || (Flt_->EventAllowed(TEndOfFrameEvent::EventClass)))) { + Event_ = EndOfFrame_.Release(); + } + + if (!!Event_) { + Event_->FrameId = Frame_.FrameId(); + } +} + +const TStringBuf TFrameDecoder::GetRawEvent() const { + return RawEventData_; +} + +TEventStreamer::TEventStreamer(TFrameStream& fs, ui64 s, ui64 e, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering) + : Frames_(fs) + , Start_(s) + , End_(e) + , MaxEndTimestamp_(0) + , Frontier_(0) + , StrongOrdering_(strongOrdering) + , LosslessStrongOrdering_(losslessStrongOrdering) + , EventFilter_(filter) +{ + + if (Start_ > End_) { + ythrow yexception() << "Wrong main interval"; + } + + TEventStreamer::Next(); +} + +TEventStreamer::~TEventStreamer() = default; + +bool TEventStreamer::Avail() const { + return Events_.Avail() && (*Events_)->Timestamp <= Frontier_; +} + +TConstEventPtr TEventStreamer::operator*() const { + Y_ENSURE(TEventStreamer::Avail(), "Event streamer depleted"); + + return *Events_; +} + +bool TEventStreamer::Next() { + if (Events_.Avail() && Events_.Next() && (*Events_)->Timestamp <= Frontier_) { + return true; + } + + for (;;) { + if (!LoadMoreEvents()) { + return false; + } + + if (TEventStreamer::Avail()) { + return true; + } + } +} + +/* +Two parameters are used in the function: +Frontier - the moment of time up to which inclusively all the log events made their way + into the buffer (and might have been already extracted out of it). +Horizon - the moment of time, that equals to Frontier + MAX_REQUEST_DURATION. +In order to get all the log events up to the Frontier inclusively, + frames need to be read until "end time" of the current frame exceeds the Horizon. +*/ +bool TEventStreamer::LoadMoreEvents() { + if (!Frames_.Avail()) { + return false; + } + + const TFrame& fr1 = *Frames_; + const ui64 maxRequestDuration = (StrongOrdering_ ? MAX_REQUEST_DURATION : 0); + + if (fr1.EndTime() <= Frontier_ + maxRequestDuration) { + ythrow yexception() << "Wrong frame stream state"; + } + + if (Frontier_ >= End_) { + return false; + } + + const ui64 old_frontier = Frontier_; + Frontier_ = fr1.EndTime(); + + { + Y_DEFER { + Events_.Reorder(StrongOrdering_); + }; + + for (; Frames_.Avail(); Frames_.Next()) { + const TFrame& fr2 = *Frames_; + + // Frames need to start later than the Frontier. + if (StrongOrdering_ && fr2.StartTime() <= old_frontier) { + Cdbg << "Invalid frame encountered" << Endl; + continue; + } + + if (fr2.EndTime() > MaxEndTimestamp_) { + MaxEndTimestamp_ = fr2.EndTime(); + } + + if (fr2.EndTime() > Frontier_ + maxRequestDuration && !LosslessStrongOrdering_) { + return true; + } + + // Checking for the frame to be within the main time borders. + if (fr2.EndTime() >= Start_ && fr2.StartTime() <= End_) { + TransferEvents(fr2); + } + } + } + + Frontier_ = MaxEndTimestamp_; + + return true; +} + +void TEventStreamer::TransferEvents(const TFrame& fr) { + Events_.SetCheckpoint(); + + try { + for (auto it = fr.GetIterator(EventFilter_); it.Avail(); it.Next()) { + TConstEventPtr ev = *it; + + if (ev->Timestamp > fr.EndTime() || ev->Timestamp < fr.StartTime()) { + ythrow TInvalidEventTimestamps() << "Event timestamp out of frame range"; + } + + if (ev->Timestamp >= Start_ && ev->Timestamp <= End_) { + Events_.Append(ev, StrongOrdering_); + } + } + } catch (const TInvalidEventTimestamps& err) { + Events_.Rollback(); + Cdbg << "EventsTransfer error: InvalidEventTimestamps: " << err.what() << Endl; + } catch (const TFrameLoadError& err) { + Events_.Rollback(); + Cdbg << "EventsTransfer error: " << err.what() << Endl; + } catch (const TEventDecoderError& err) { + Events_.Rollback(); + Cdbg << "EventsTransfer error: EventDecoder error: " << err.what() << Endl; + } catch (const TZLibDecompressorError& err) { + Events_.Rollback(); + Cdbg << "EventsTransfer error: ZLibDecompressor error: " << err.what() << Endl; + } catch (...) { + Events_.Rollback(); + throw; + } +} + +void TEventStreamer::TEventBuffer::SetCheckpoint() { + BufLen_ = Buffer_.size(); +} + +void TEventStreamer::TEventBuffer::Rollback() { + Buffer_.resize(BufLen_); +} + +void TEventStreamer::TEventBuffer::Reorder(bool strongOrdering) { + SetCheckpoint(); + + std::reverse(Buffer_.begin(), Buffer_.end()); + + if (strongOrdering) { + StableSort(Buffer_.begin(), Buffer_.end(), [&](const auto& a, const auto& b) { + return (a->Timestamp > b->Timestamp) || + ((a->Timestamp == b->Timestamp) && !a->Class && b->Class); + }); + } +} + +void TEventStreamer::TEventBuffer::Append(TConstEventPtr ev, bool strongOrdering) { + // Events in buffer output must be in an ascending order. + Y_ENSURE(!strongOrdering || ev->Timestamp >= LastTimestamp_, "Trying to append out-of-order event"); + + Buffer_.push_back(std::move(ev)); +} + +bool TEventStreamer::TEventBuffer::Avail() const { + return !Buffer_.empty(); +} + +TConstEventPtr TEventStreamer::TEventBuffer::operator*() const { + Y_ENSURE(!Buffer_.empty(), "Event buffer is empty"); + + return Buffer_.back(); +} + +bool TEventStreamer::TEventBuffer::Next() { + if (!Buffer_.empty()) { + LastTimestamp_ = Buffer_.back()->Timestamp; + Buffer_.pop_back(); + return !Buffer_.empty(); + } else { + return false; + } +} diff --git a/library/cpp/eventlog/logparser.h b/library/cpp/eventlog/logparser.h new file mode 100644 index 0000000000..f819e72589 --- /dev/null +++ b/library/cpp/eventlog/logparser.h @@ -0,0 +1,343 @@ +#pragma once + +#include <util/generic/ptr.h> +#include <util/generic/yexception.h> +#include <util/generic/vector.h> +#include <util/generic/set.h> +#include <util/generic/maybe.h> +#include <util/memory/blob.h> +#include <util/stream/length.h> +#include <util/stream/mem.h> + +#include "eventlog_int.h" +#include "eventlog.h" +#include "common.h" + +class IInputStream; + +static const ui64 MAX_REQUEST_DURATION = 60'000'000; +static const ui64 MIN_START_TIME = MAX_REQUEST_DURATION; +static const ui64 MAX_END_TIME = ((ui64)-1) - MAX_REQUEST_DURATION; + +class TEventFilter: public TSet<TEventClass>, public TSimpleRefCount<TEventFilter> { +public: + TEventFilter(bool enableEvents) + : Enable_(enableEvents) + { + } + + void AddEventClass(TEventClass cls) { + insert(cls); + } + + bool EventAllowed(TEventClass cls) const { + bool found = (find(cls) != end()); + + return Enable_ == found; + } + +private: + bool Enable_; +}; + +using TEventStream = TPacketInputStream<TConstEventPtr>; + +struct TFrameHeader { + // Reads header from the stream. The caller must make sure that the + // sync data is present just befor the stream position. + explicit TFrameHeader(IInputStream& in); + + ui64 StartTime() const { + return Framehdr.StartTimestamp; + } + + ui64 EndTime() const { + return Framehdr.EndTimestamp; + } + + ui32 FrameId() const { + return Basehdr.FrameId; + } + + ui64 Duration() const { + return EndTime() - StartTime(); + } + + TEventLogFormat ContentFormat() const { + return Basehdr.Format & 0xffffff; + } + + TEventLogFormat LogFormat() const { + return Basehdr.Format >> 24; + } + + ui64 FrameLength() const { + return Basehdr.Length - sizeof(TCompressedFrameHeader2); + } + + // Length including the header + ui64 FullLength() const { + return sizeof(*this) + FrameLength(); + } + + TCompressedFrameBaseHeader Basehdr; + TCompressedFrameHeader2 Framehdr; +}; + +struct TFrameLoadError: public yexception { + explicit TFrameLoadError(size_t skipAfter) + : SkipAfter(skipAfter) + {} + + size_t SkipAfter; +}; + +class TFrame : public TFrameHeader { +public: + // Reads the frame after the header has been read. + TFrame(IInputStream& in, TFrameHeader header, IEventFactory*); + + TString GetRawFrame() const; + TString GetCompressedFrame() const; + + ui64 Address() const { return Address_; } + +private: + const TConstEventPtr& GetEvent(size_t index) const { + return EventsCache_[index]; + } + + void ClearEventsCache() const; + + THolder<TLengthLimitedInput> Limiter_; + mutable TVector<TConstEventPtr> EventsCache_; + + IEventFactory* Fac_; + ui64 Address_; + + friend class TFrameDecoder; + friend class TFrameStreamer; + +private: + class TIterator: TEventStream { + public: + TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter); + ~TIterator() override = default; + + bool Avail() const override { + return Index_ < Size_; + } + + TConstEventPtr operator*() const override; + bool Next() override; + + private: + void SkipToValidEvent(); + + const TFrame& Frame_; + size_t Size_; + TIntrusiveConstPtr<TEventFilter> Filter_; + size_t Index_; + }; + +public: + TFrame::TIterator GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter = nullptr) const; +}; + +// If `in` is derived from TCountingInput, Frame's address will +// be set accorting to the in->Counter(). Otherwise it will be zeroO +TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory*); + +using TFrameStream = TPacketInputStream<const TFrame&>; + +class IFrameFilter: public TSimpleRefCount<IFrameFilter> { +public: + IFrameFilter() { + } + + virtual ~IFrameFilter() = default; + + virtual bool FrameAllowed(const TFrame& frame) const = 0; +}; + +using IFrameFilterRef = TIntrusivePtr<IFrameFilter>; + +class TDurationFrameFilter: public IFrameFilter { +public: + TDurationFrameFilter(ui64 minFrameDuration, ui64 maxFrameDuration = Max<ui64>()) + : MinDuration_(minFrameDuration) + , MaxDuration_(maxFrameDuration) + { + } + + bool FrameAllowed(const TFrame& frame) const override { + return frame.Duration() >= MinDuration_ && frame.Duration() <= MaxDuration_; + } + +private: + const ui64 MinDuration_; + const ui64 MaxDuration_; +}; + +class TFrameIdFrameFilter: public IFrameFilter { +public: + TFrameIdFrameFilter(ui32 frameId) + : FrameId_(frameId) + { + } + + bool FrameAllowed(const TFrame& frame) const override { + return frame.FrameId() == FrameId_; + } + +private: + const ui32 FrameId_; +}; + +class TContainsEventFrameFilter: public IFrameFilter { +public: + TContainsEventFrameFilter(const TString& args, const IEventFactory* fac); + + bool FrameAllowed(const TFrame& frame) const override; + +private: + struct TMatchGroup { + TEventClass EventID; + TString FieldName; + TString ValueToMatch; + }; + + TVector<TMatchGroup> MatchGroups; +}; + +void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet); + +TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet); + +TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection); + +class TFrameStreamer: public TFrameStream { +public: + TFrameStreamer(IInputStream&, IEventFactory* fac, IFrameFilterRef ff = nullptr); + TFrameStreamer( + const TString& fileName, + ui64 startTime, + ui64 endTime, + ui64 maxRequestDuration, + IEventFactory* fac, + IFrameFilterRef ff = nullptr); + ~TFrameStreamer() override; + + bool Avail() const override; + const TFrame& operator*() const override; + bool Next() override; + +private: + bool DoNext(); + bool AllowedTimeRange(const TFrame& frame) const; + + bool AllowedFrame(const TFrame& frame) const { + return AllowedTimeRange(frame) && (!FrameFilter_ || FrameFilter_->FrameAllowed(frame)); + } + + void SkipToAllowedFrame() { + if (Frame_) { + while (!AllowedFrame(*Frame_) && DoNext()) { + //do nothing + } + } + } + + TBlob File_; + TMemoryInput MemoryIn_; + TCountingInput In_; + THolder<IInputStream> Stream_; + ui64 StartTime_ = 0; + ui64 EndTime_ = 0; + ui64 CutoffTime_ = 0; + TMaybe<TFrame> Frame_; + IFrameFilterRef FrameFilter_; + IEventFactory* EventFactory_; +}; + +class TFrameDecoder: TEventStream { +public: + TFrameDecoder(const TFrame&, const TEventFilter* const filter, bool strict = false, bool withRawData = false); + ~TFrameDecoder() override; + + bool Avail() const override; + + TConstEventPtr operator*() const override; + bool Next() override; + + const TStringBuf GetRawEvent() const; + +private: + TFrameDecoder(const TFrameDecoder&); + void operator=(const TFrameDecoder&); + + inline bool HaveData() const { + return Event_ != nullptr; + } + + void Decode(); + +private: + const TFrame& Frame_; + THolder<IInputStream> Decompressor_; + THolder<TLengthLimitedInput> Limiter_; + TEventPtr Event_; + const TEventFilter* const Flt_; + IEventFactory* Fac_; + THolder<TEvent> EndOfFrame_; + bool Strict_; + TBuffer UncompressedData_; + TStringBuf RawEventData_; + bool WithRawData_; +}; + +class TEventStreamer: public TEventStream { +public: + TEventStreamer(TFrameStream&, ui64 start, ui64 end, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering = false); + ~TEventStreamer() override; + + bool Avail() const override; + TConstEventPtr operator*() const override; + bool Next() override; + +private: + class TEventBuffer: public TEventStream { + public: + void SetCheckpoint(); + void Rollback(); + void Reorder(bool strongOrdering); + void Append(TConstEventPtr event, bool strongOrdering); + + bool Avail() const override; + TConstEventPtr operator*() const override; + bool Next() override; + + private: + TVector<TConstEventPtr> Buffer_; + size_t BufLen_ = 0; + ui64 LastTimestamp_ = 0; + }; + +private: + struct TInvalidEventTimestamps: public yexception { + }; + + bool LoadMoreEvents(); + void TransferEvents(const TFrame&); + +private: + TFrameStream& Frames_; + TEventBuffer Events_; + + ui64 Start_, End_; + ui64 MaxEndTimestamp_; + ui64 Frontier_; + bool StrongOrdering_; + bool LosslessStrongOrdering_; + TIntrusivePtr<TEventFilter> EventFilter_; +}; diff --git a/library/cpp/eventlog/proto/events_extension.proto b/library/cpp/eventlog/proto/events_extension.proto new file mode 100644 index 0000000000..7db1af3a59 --- /dev/null +++ b/library/cpp/eventlog/proto/events_extension.proto @@ -0,0 +1,22 @@ +import "google/protobuf/descriptor.proto"; + +option go_package = "github.com/ydb-platform/ydb/library/cpp/eventlog/proto;extensions"; +option java_package = "NEventLogEventsExtension"; + +extend google.protobuf.MessageOptions { + optional uint32 message_id = 50001; + optional string realm_name = 50002; +} + +message Repr { + enum ReprType { + none = 0; + as_bytes = 1; // Only for primitive types + as_hex = 2; // Only for primitive types + as_base64 = 3; // Only for 'string' and 'bytes' fields + }; +} + +extend google.protobuf.FieldOptions { + optional Repr.ReprType repr = 55003 [default = none]; +} diff --git a/library/cpp/eventlog/proto/internal.proto b/library/cpp/eventlog/proto/internal.proto new file mode 100644 index 0000000000..8070a09685 --- /dev/null +++ b/library/cpp/eventlog/proto/internal.proto @@ -0,0 +1,9 @@ +option go_package = "github.com/ydb-platform/ydb/library/cpp/eventlog/proto;extensions"; + +package NEventLogInternal; + +message TUnknownEvent { +}; + +message TEndOfFrameEvent { +}; diff --git a/library/cpp/eventlog/proto/ya.make b/library/cpp/eventlog/proto/ya.make new file mode 100644 index 0000000000..fbf5a6c619 --- /dev/null +++ b/library/cpp/eventlog/proto/ya.make @@ -0,0 +1,12 @@ +PROTO_LIBRARY() + +IF (NOT PY_PROTOS_FOR) + INCLUDE_TAGS(GO_PROTO) +ENDIF() + +SRCS( + events_extension.proto + internal.proto +) + +END() diff --git a/library/cpp/eventlog/threaded_eventlog.cpp b/library/cpp/eventlog/threaded_eventlog.cpp new file mode 100644 index 0000000000..67839063fb --- /dev/null +++ b/library/cpp/eventlog/threaded_eventlog.cpp @@ -0,0 +1 @@ +#include "threaded_eventlog.h" diff --git a/library/cpp/eventlog/threaded_eventlog.h b/library/cpp/eventlog/threaded_eventlog.h new file mode 100644 index 0000000000..52382b856d --- /dev/null +++ b/library/cpp/eventlog/threaded_eventlog.h @@ -0,0 +1,154 @@ +#pragma once + +#include "eventlog.h" + +#include <util/generic/string.h> +#include <util/thread/pool.h> + +class TThreadedEventLog: public TEventLogWithSlave { +public: + class TWrapper; + using TOverflowCallback = std::function<void(TWrapper& wrapper)>; + + enum class EDegradationResult { + ShouldWrite, + ShouldDrop, + }; + using TDegradationCallback = std::function<EDegradationResult(float fillFactor)>; + +public: + TThreadedEventLog( + IEventLog& parentLog, + size_t threadCount, + size_t queueSize, + TOverflowCallback cb, + TDegradationCallback degradationCallback = {}) + : TEventLogWithSlave(parentLog) + , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog")) + , ThreadCount(threadCount) + , QueueSize(queueSize) + , OverflowCallback(std::move(cb)) + , DegradationCallback(std::move(degradationCallback)) + { + Init(); + } + + TThreadedEventLog( + const TEventLogPtr& parentLog, + size_t threadCount, + size_t queueSize, + TOverflowCallback cb, + TDegradationCallback degradationCallback = {}) + : TEventLogWithSlave(parentLog) + , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog")) + , ThreadCount(threadCount) + , QueueSize(queueSize) + , OverflowCallback(std::move(cb)) + , DegradationCallback(std::move(degradationCallback)) + { + Init(); + } + + TThreadedEventLog(IEventLog& parentLog) + : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback()) + { + } + + TThreadedEventLog(const TEventLogPtr& parentLog) + : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback()) + { + } + + ~TThreadedEventLog() override { + try { + LogSaver.Stop(); + } catch (...) { + } + } + + void ReopenLog() override { + TEventLogWithSlave::ReopenLog(); + } + + void CloseLog() override { + LogSaver.Stop(); + TEventLogWithSlave::CloseLog(); + } + + void WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) override { + float fillFactor = 0.0f; + if (Y_LIKELY(LogSaver.GetMaxQueueSize() > 0)) { + fillFactor = static_cast<float>(LogSaver.Size()) / LogSaver.GetMaxQueueSize(); + } + + EDegradationResult status = EDegradationResult::ShouldWrite; + if (DegradationCallback) { + status = DegradationCallback(fillFactor); + } + if (Y_UNLIKELY(status == EDegradationResult::ShouldDrop)) { + return; + } + + THolder<TWrapper> wrapped; + wrapped.Reset(new TWrapper(buffer, startTimestamp, endTimestamp, Slave(), writeFrameCallback, std::move(metaFlags))); + + if (LogSaver.Add(wrapped.Get())) { + Y_UNUSED(wrapped.Release()); + } else if (OverflowCallback) { + OverflowCallback(*wrapped); + } + } + +private: + void Init() { + LogSaver.Start(ThreadCount, QueueSize); + } + +public: + class TWrapper: public IObjectInQueue { + public: + TWrapper(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + IEventLog& slave, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) + : StartTimestamp(startTimestamp) + , EndTimestamp(endTimestamp) + , Slave(&slave) + , WriteFrameCallback(writeFrameCallback) + , MetaFlags(std::move(metaFlags)) + { + Buffer.Swap(buffer); + } + + void Process(void*) override { + THolder<TWrapper> holder(this); + + WriteFrame(); + } + + void WriteFrame() { + Slave->WriteFrame(Buffer, StartTimestamp, EndTimestamp, WriteFrameCallback, std::move(MetaFlags)); + } + + private: + TBuffer Buffer; + TEventTimestamp StartTimestamp; + TEventTimestamp EndTimestamp; + IEventLog* Slave; + TWriteFrameCallbackPtr WriteFrameCallback; + TLogRecord::TMetaFlags MetaFlags; + }; + +private: + TThreadPool LogSaver; + const size_t ThreadCount; + const size_t QueueSize; + const TOverflowCallback OverflowCallback; + const TDegradationCallback DegradationCallback; +}; diff --git a/library/cpp/eventlog/ya.make b/library/cpp/eventlog/ya.make new file mode 100644 index 0000000000..fbbc1eff00 --- /dev/null +++ b/library/cpp/eventlog/ya.make @@ -0,0 +1,29 @@ +LIBRARY() + +PEERDIR( + library/cpp/blockcodecs + library/cpp/eventlog/proto + library/cpp/json + library/cpp/logger + library/cpp/protobuf/json + library/cpp/streams/growing_file_input + library/cpp/string_utils/base64 + contrib/libs/re2 +) + +SRCS( + common.h + evdecoder.cpp + event_field_output.cpp + event_field_printer.cpp + eventlog.cpp + eventlog_int.cpp + iterator.cpp + logparser.cpp + threaded_eventlog.cpp +) + +GENERATE_ENUM_SERIALIZATION(eventlog.h) +GENERATE_ENUM_SERIALIZATION(eventlog_int.h) + +END() |