aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/eventlog
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-30 13:26:22 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-30 15:44:45 +0300
commit0a98fece5a9b54f16afeb3a94b3eb3105e9c3962 (patch)
tree291d72dbd7e9865399f668c84d11ed86fb190bbf /library/cpp/eventlog
parentcb2c8d75065e5b3c47094067cb4aa407d4813298 (diff)
downloadydb-0a98fece5a9b54f16afeb3a94b3eb3105e9c3962.tar.gz
YQ Connector:Use docker-compose in integrational tests
Diffstat (limited to 'library/cpp/eventlog')
-rw-r--r--library/cpp/eventlog/common.h10
-rw-r--r--library/cpp/eventlog/evdecoder.cpp112
-rw-r--r--library/cpp/eventlog/evdecoder.h16
-rw-r--r--library/cpp/eventlog/event_field_output.cpp68
-rw-r--r--library/cpp/eventlog/event_field_output.h29
-rw-r--r--library/cpp/eventlog/event_field_printer.cpp27
-rw-r--r--library/cpp/eventlog/event_field_printer.h38
-rw-r--r--library/cpp/eventlog/eventlog.cpp554
-rw-r--r--library/cpp/eventlog/eventlog.h623
-rw-r--r--library/cpp/eventlog/eventlog_int.cpp12
-rw-r--r--library/cpp/eventlog/eventlog_int.h72
-rw-r--r--library/cpp/eventlog/events_extension.h161
-rw-r--r--library/cpp/eventlog/iterator.cpp88
-rw-r--r--library/cpp/eventlog/iterator.h51
-rw-r--r--library/cpp/eventlog/logparser.cpp814
-rw-r--r--library/cpp/eventlog/logparser.h343
-rw-r--r--library/cpp/eventlog/proto/events_extension.proto22
-rw-r--r--library/cpp/eventlog/proto/internal.proto9
-rw-r--r--library/cpp/eventlog/proto/ya.make12
-rw-r--r--library/cpp/eventlog/threaded_eventlog.cpp1
-rw-r--r--library/cpp/eventlog/threaded_eventlog.h154
-rw-r--r--library/cpp/eventlog/ya.make29
22 files changed, 3245 insertions, 0 deletions
diff --git a/library/cpp/eventlog/common.h b/library/cpp/eventlog/common.h
new file mode 100644
index 0000000000..75c512c13e
--- /dev/null
+++ b/library/cpp/eventlog/common.h
@@ -0,0 +1,10 @@
+#pragma once
+
+template <class T>
+class TPacketInputStream {
+public:
+ virtual bool Avail() const = 0;
+ virtual T operator*() const = 0;
+ virtual bool Next() = 0;
+ virtual ~TPacketInputStream() = default;
+};
diff --git a/library/cpp/eventlog/evdecoder.cpp b/library/cpp/eventlog/evdecoder.cpp
new file mode 100644
index 0000000000..e4413a1b0e
--- /dev/null
+++ b/library/cpp/eventlog/evdecoder.cpp
@@ -0,0 +1,112 @@
+#include <util/memory/tempbuf.h>
+#include <util/string/cast.h>
+#include <util/stream/output.h>
+
+#include "evdecoder.h"
+#include "logparser.h"
+
+static const char* const UNKNOWN_EVENT_CLASS = "Unknown event class";
+
+static inline void LogError(ui64 frameAddr, const char* msg, bool strict) {
+ if (!strict) {
+ Cerr << "EventDecoder warning @" << frameAddr << ": " << msg << Endl;
+ } else {
+ ythrow yexception() << "EventDecoder error @" << frameAddr << ": " << msg;
+ }
+}
+
+static inline bool SkipData(IInputStream& s, size_t amount) {
+ return (amount == s.Skip(amount));
+}
+
+// There are 2 log fomats: the one, that allows event skip without event decode (it has stored event length)
+// and another, that requires each event decode just to seek over stream. needRead == true means the latter format.
+static inline THolder<TEvent> DoDecodeEvent(IInputStream& s, const TEventFilter* const filter, const bool needRead, IEventFactory* fac) {
+ TEventTimestamp ts;
+ TEventClass c;
+ THolder<TEvent> e;
+
+ ::Load(&s, ts);
+ ::Load(&s, c);
+
+ bool needReturn = false;
+
+ if (!filter || filter->EventAllowed(c)) {
+ needReturn = true;
+ }
+
+ if (needRead || needReturn) {
+ e.Reset(fac->CreateLogEvent(c));
+
+ if (!!e) {
+ e->Timestamp = ts;
+ e->Load(s);
+ } else if (needReturn) {
+ e.Reset(new TUnknownEvent(ts, c));
+ }
+
+ if (!needReturn) {
+ e.Reset(nullptr);
+ }
+ }
+
+ return e;
+}
+
+THolder<TEvent> DecodeFramed(IInputStream& inp, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) {
+ ui32 len;
+ ::Load(&inp, len);
+
+ if (len < sizeof(ui32)) {
+ ythrow TEventDecoderError() << "invalid event length";
+ }
+
+ TLengthLimitedInput s(&inp, len - sizeof(ui32));
+
+ try {
+ THolder<TEvent> e = DoDecodeEvent(s, filter, false, fac);
+ if (!!e) {
+ if (!s.Left()) {
+ return e;
+ } else if (e->Class == 0) {
+ if (!SkipData(s, s.Left())) {
+ ythrow TEventDecoderError() << "cannot skip bad event";
+ }
+
+ return e;
+ }
+
+ LogError(frameAddr, "Event is not fully read", strict);
+ }
+ } catch (const TLoadEOF&) {
+ if (s.Left()) {
+ throw;
+ }
+
+ LogError(frameAddr, "Unexpected event end", strict);
+ }
+
+ if (!SkipData(s, s.Left())) {
+ ythrow TEventDecoderError() << "cannot skip bad event";
+ }
+
+ return nullptr;
+}
+
+THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) {
+ try {
+ if (framed) {
+ return DecodeFramed(s, frameAddr, filter, fac, strict);
+ } else {
+ THolder<TEvent> e = DoDecodeEvent(s, filter, true, fac);
+ // e(0) means event, skipped by filter. Not an error.
+ if (!!e && !e->Class) {
+ ythrow TEventDecoderError() << UNKNOWN_EVENT_CLASS;
+ }
+
+ return e;
+ }
+ } catch (const TLoadEOF&) {
+ ythrow TEventDecoderError() << "unexpected frame end";
+ }
+}
diff --git a/library/cpp/eventlog/evdecoder.h b/library/cpp/eventlog/evdecoder.h
new file mode 100644
index 0000000000..eedfc82174
--- /dev/null
+++ b/library/cpp/eventlog/evdecoder.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+#include <util/generic/ptr.h>
+
+#include "eventlog.h"
+
+class TEvent;
+class IInputStream;
+class TEventFilter;
+
+struct TEventDecoderError: public yexception {
+};
+
+THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict = false);
+bool AcceptableContent(TEventLogFormat);
diff --git a/library/cpp/eventlog/event_field_output.cpp b/library/cpp/eventlog/event_field_output.cpp
new file mode 100644
index 0000000000..f9d98dac9d
--- /dev/null
+++ b/library/cpp/eventlog/event_field_output.cpp
@@ -0,0 +1,68 @@
+#include "event_field_output.h"
+
+#include <util/string/split.h>
+
+namespace {
+ TString MakeSeparators(EFieldOutputFlags flags) {
+ TString res;
+ res.reserve(3);
+
+ if (flags & EFieldOutputFlag::EscapeTab) {
+ res.append('\t');
+ }
+ if (flags & EFieldOutputFlag::EscapeNewLine) {
+ res.append('\n');
+ res.append('\r');
+ }
+ if (flags & EFieldOutputFlag::EscapeBackSlash) {
+ res.append('\\');
+ }
+
+ return res;
+ }
+}
+
+TEventFieldOutput::TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags)
+ : Output(output)
+ , Flags(flags)
+ , Separators(MakeSeparators(flags))
+{
+}
+
+IOutputStream& TEventFieldOutput::GetOutputStream() {
+ return Output;
+}
+
+EFieldOutputFlags TEventFieldOutput::GetFlags() const {
+ return Flags;
+}
+
+void TEventFieldOutput::DoWrite(const void* buf, size_t len) {
+ if (!Flags) {
+ Output.Write(buf, len);
+ return;
+ }
+
+ TStringBuf chunk{static_cast<const char*>(buf), len};
+
+ for (const auto part : StringSplitter(chunk).SplitBySet(Separators.data())) {
+ TStringBuf token = part.Token();
+ TStringBuf delim = part.Delim();
+
+ if (!token.empty()) {
+ Output.Write(token);
+ }
+ if ("\n" == delim) {
+ Output.Write(TStringBuf("\\n"));
+ } else if ("\r" == delim) {
+ Output.Write(TStringBuf("\\r"));
+ } else if ("\t" == delim) {
+ Output.Write(TStringBuf("\\t"));
+ } else if ("\\" == delim) {
+ Output.Write(TStringBuf("\\\\"));
+ } else {
+ Y_ASSERT(delim.empty());
+ }
+ }
+}
+
diff --git a/library/cpp/eventlog/event_field_output.h b/library/cpp/eventlog/event_field_output.h
new file mode 100644
index 0000000000..ed9db0ae16
--- /dev/null
+++ b/library/cpp/eventlog/event_field_output.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include <util/stream/output.h>
+#include <util/generic/flags.h>
+
+enum class EFieldOutputFlag {
+ EscapeTab = 0x1, // escape \t in field value
+ EscapeNewLine = 0x2, // escape \n in field value
+ EscapeBackSlash = 0x4 // escape \ in field value
+};
+
+Y_DECLARE_FLAGS(EFieldOutputFlags, EFieldOutputFlag);
+Y_DECLARE_OPERATORS_FOR_FLAGS(EFieldOutputFlags);
+
+class TEventFieldOutput: public IOutputStream {
+public:
+ TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags);
+
+ IOutputStream& GetOutputStream();
+ EFieldOutputFlags GetFlags() const;
+
+protected:
+ void DoWrite(const void* buf, size_t len) override;
+
+private:
+ IOutputStream& Output;
+ EFieldOutputFlags Flags;
+ TString Separators;
+};
diff --git a/library/cpp/eventlog/event_field_printer.cpp b/library/cpp/eventlog/event_field_printer.cpp
new file mode 100644
index 0000000000..29c6b4b661
--- /dev/null
+++ b/library/cpp/eventlog/event_field_printer.cpp
@@ -0,0 +1,27 @@
+#include "event_field_printer.h"
+
+#include <library/cpp/protobuf/json/proto2json.h>
+
+namespace {
+
+ const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig()
+ .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault)
+ .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>());
+
+} // namespace
+
+TEventProtobufMessageFieldPrinter::TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode)
+ : Mode(mode)
+{}
+
+template <>
+void TEventProtobufMessageFieldPrinter::PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output) {
+ switch (Mode) {
+ case EProtobufMessageFieldPrintMode::DEFAULT:
+ case EProtobufMessageFieldPrintMode::JSON: {
+ // Do not use field.PrintJSON() here: IGNIETFERRO-2002
+ NProtobufJson::Proto2Json(field, output, PROTO_2_JSON_CONFIG);
+ break;
+ }
+ }
+}
diff --git a/library/cpp/eventlog/event_field_printer.h b/library/cpp/eventlog/event_field_printer.h
new file mode 100644
index 0000000000..835e8f4a85
--- /dev/null
+++ b/library/cpp/eventlog/event_field_printer.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include "event_field_output.h"
+
+#include <google/protobuf/message.h>
+
+// NB: For historical reasons print code for all primitive types/repeated fields/etc generated by https://a.yandex-team.ru/arc/trunk/arcadia/tools/event2cpp
+
+enum class EProtobufMessageFieldPrintMode {
+ // Use <TEventProtobufMessageFieldType>::Print method for fields that has it
+ // Print json for other fields
+ DEFAULT = 0,
+
+ JSON = 1,
+};
+
+class TEventProtobufMessageFieldPrinter {
+public:
+ explicit TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode);
+
+ template <typename TEventProtobufMessageFieldType, bool HasPrintFunction>
+ void PrintProtobufMessageFieldToOutput(const TEventProtobufMessageFieldType& field, TEventFieldOutput& output) {
+ if constexpr (HasPrintFunction) {
+ if (Mode == EProtobufMessageFieldPrintMode::DEFAULT) {
+ field.Print(output.GetOutputStream(), output.GetFlags());
+ return;
+ }
+ }
+
+ PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(field, output);
+ }
+
+ template <>
+ void PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output);
+
+private:
+ EProtobufMessageFieldPrintMode Mode;
+};
diff --git a/library/cpp/eventlog/eventlog.cpp b/library/cpp/eventlog/eventlog.cpp
new file mode 100644
index 0000000000..458a632b4a
--- /dev/null
+++ b/library/cpp/eventlog/eventlog.cpp
@@ -0,0 +1,554 @@
+#include <util/datetime/base.h>
+#include <util/stream/zlib.h>
+#include <util/stream/length.h>
+#include <util/generic/buffer.h>
+#include <util/generic/yexception.h>
+#include <util/digest/murmur.h>
+#include <util/generic/singleton.h>
+#include <util/generic/function.h>
+#include <util/stream/output.h>
+#include <util/stream/format.h>
+#include <util/stream/null.h>
+
+#include <google/protobuf/messagext.h>
+
+#include "eventlog.h"
+#include "events_extension.h"
+#include "evdecoder.h"
+#include "logparser.h"
+#include <library/cpp/eventlog/proto/internal.pb.h>
+
+#include <library/cpp/json/json_writer.h>
+#include <library/cpp/protobuf/json/proto2json.h>
+
+
+TAtomic eventlogFrameCounter = 0;
+
+namespace {
+
+ const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig()
+ .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault)
+ .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>());
+
+ ui32 GenerateFrameId() {
+ return ui32(AtomicAdd(eventlogFrameCounter, 1));
+ }
+
+ inline const NProtoBuf::Message* UnknownEventMessage() {
+ return Singleton<NEventLogInternal::TUnknownEvent>();
+ }
+
+} // namespace
+
+void TEvent::Print(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const {
+ if (options.OutputFormat == TOutputFormat::TabSeparatedRaw) {
+ PrintHeader(out, options, eventState);
+ DoPrint(out, {});
+ } else if (options.OutputFormat == TOutputFormat::TabSeparated) {
+ PrintHeader(out, options, eventState);
+ DoPrint(
+ out,
+ EFieldOutputFlags{} | EFieldOutputFlag::EscapeNewLine | EFieldOutputFlag::EscapeBackSlash);
+ } else if (options.OutputFormat == TOutputFormat::Json) {
+ NJson::TJsonWriterConfig jsonWriterConfig;
+ jsonWriterConfig.FormatOutput = 0;
+ NJson::TJsonWriter jsonWriter(&out, jsonWriterConfig);
+
+ jsonWriter.OpenMap();
+ PrintJsonHeader(jsonWriter);
+ DoPrintJson(jsonWriter);
+ jsonWriter.CloseMap();
+ }
+}
+
+void TEvent::PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const {
+ if (options.HumanReadable) {
+ out << TInstant::MicroSeconds(Timestamp).ToString() << "\t";
+ if (Timestamp >= eventState.FrameStartTime)
+ out << "+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.FrameStartTime));
+ else // a bug somewhere? anyway, let's handle it in a nice fashion
+ out << "-" << HumanReadable(TDuration::MicroSeconds(eventState.FrameStartTime - Timestamp));
+
+ if (Timestamp >= eventState.PrevEventTime)
+ out << " (+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.PrevEventTime)) << ")";
+ // else: these events are async and out-of-order, relative time diff makes no sense, skip it
+
+ out << "\tF# " << FrameId << '\t';
+ } else {
+ out << static_cast<TEventTimestamp>(Timestamp);
+ out << '\t' << FrameId << '\t';
+ }
+}
+
+void TEvent::PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const {
+ jsonWriter.Write("Timestamp", Timestamp);
+ jsonWriter.Write("FrameId", FrameId);
+}
+
+class TProtobufEvent: public TEvent {
+public:
+ TProtobufEvent(TEventTimestamp t, size_t eventId, const NProtoBuf::Message& msg)
+ : TEvent(eventId, t)
+ , Message_(&msg)
+ , EventFactory_(NProtoBuf::TEventFactory::Instance())
+ {
+ }
+
+ TProtobufEvent()
+ : TEvent(0, 0)
+ , EventFactory_(NProtoBuf::TEventFactory::Instance())
+ {
+ }
+
+ explicit TProtobufEvent(ui32 id, NProtoBuf::TEventFactory* eventFactory = NProtoBuf::TEventFactory::Instance())
+ : TEvent(id, 0)
+ , EventFactory_(eventFactory)
+ {
+ InnerMsg_.Reset(EventFactory_->CreateEvent(Class));
+ Message_ = InnerMsg_.Get();
+ }
+
+ ui32 Id() const {
+ return Class;
+ }
+
+ void Load(IInputStream& in) override {
+ if (!!InnerMsg_) {
+ InnerMsg_->ParseFromArcadiaStream(&in);
+ } else {
+ TransferData(&in, &Cnull);
+ }
+ }
+
+ void Save(IOutputStream& out) const override {
+ Message_->SerializeToArcadiaStream(&out);
+ }
+
+ void SaveToBuffer(TBufferOutput& buf) const override {
+ size_t messageSize = Message_->ByteSize();
+ size_t before = buf.Buffer().Size();
+ buf.Buffer().Advance(messageSize);
+ Y_PROTOBUF_SUPPRESS_NODISCARD Message_->SerializeToArray(buf.Buffer().Data() + before, messageSize);
+ }
+
+ TStringBuf GetName() const override {
+ return EventFactory_->NameById(Id());
+ }
+
+private:
+ void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const override {
+ EventFactory_->PrintEvent(Id(), Message_, out, flags);
+ }
+ void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override {
+ jsonWriter.OpenMap("EventBody");
+ jsonWriter.Write("Type", GetName());
+
+ jsonWriter.Write("Fields");
+ NProtobufJson::Proto2Json(*GetProto(), jsonWriter, PROTO_2_JSON_CONFIG);
+
+ jsonWriter.CloseMap();
+ }
+
+ const NProtoBuf::Message* GetProto() const override {
+ if (Message_) {
+ return Message_;
+ }
+
+ return UnknownEventMessage();
+ }
+
+private:
+ const NProtoBuf::Message* Message_ = nullptr;
+ NProtoBuf::TEventFactory* EventFactory_;
+ THolder<NProtoBuf::Message> InnerMsg_;
+
+ friend class TEventLogFrame;
+};
+
+void TEventLogFrame::LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev) {
+ TProtobufEvent event(Now().MicroSeconds(), eventId, ev);
+
+ LogEventImpl(event);
+}
+
+void TEventLogFrame::LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev) {
+ TProtobufEvent event(timestamp, eventId, ev);
+
+ LogEventImpl(event);
+}
+
+template <>
+void TEventLogFrame::DebugDump(const TProtobufEvent& ev) {
+ static TMutex lock;
+
+ with_lock (lock) {
+ Cerr << ev.Timestamp << "\t" << ev.GetName() << "\t";
+ ev.GetProto()->PrintJSON(Cerr);
+ Cerr << Endl;
+ }
+}
+
+#pragma pack(push, 1)
+struct TFrameHeaderData {
+ char SyncField[COMPRESSED_LOG_FRAME_SYNC_DATA.size()];
+ TCompressedFrameBaseHeader Header;
+ TCompressedFrameHeader2 HeaderEx;
+};
+#pragma pack(pop)
+
+TEventLogFrame::TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : EvLog_(parentLog.HasNullBackend() ? nullptr : &parentLog)
+ , NeedAlwaysSafeAdd_(needAlwaysSafeAdd)
+ , ForceDump_(false)
+ , WriteFrameCallback_(std::move(writeFrameCallback))
+{
+ DoInit();
+}
+
+TEventLogFrame::TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : EvLog_(parentLog)
+ , NeedAlwaysSafeAdd_(needAlwaysSafeAdd)
+ , ForceDump_(false)
+ , WriteFrameCallback_(std::move(writeFrameCallback))
+{
+ if (EvLog_ && EvLog_->HasNullBackend()) {
+ EvLog_ = nullptr;
+ }
+
+ DoInit();
+}
+
+TEventLogFrame::TEventLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : EvLog_(nullptr)
+ , NeedAlwaysSafeAdd_(needAlwaysSafeAdd)
+ , ForceDump_(false)
+ , WriteFrameCallback_(std::move(writeFrameCallback))
+{
+ DoInit();
+}
+
+void TEventLogFrame::Flush() {
+ if (EvLog_ == nullptr)
+ return;
+
+ TBuffer& buf = Buf_.Buffer();
+
+ if (buf.Empty()) {
+ return;
+ }
+
+ EvLog_->WriteFrame(buf, StartTimestamp_, EndTimestamp_, WriteFrameCallback_, std::move(MetaFlags_));
+
+ DoInit();
+
+ return;
+}
+
+void TEventLogFrame::SafeFlush() {
+ TGuard<TMutex> g(Mtx_);
+ Flush();
+}
+
+void TEventLogFrame::AddEvent(TEventTimestamp timestamp) {
+ if (timestamp < StartTimestamp_) {
+ StartTimestamp_ = timestamp;
+ }
+
+ if (timestamp > EndTimestamp_) {
+ EndTimestamp_ = timestamp;
+ }
+}
+
+void TEventLogFrame::DoInit() {
+ Buf_.Buffer().Clear();
+
+ StartTimestamp_ = (TEventTimestamp)-1;
+ EndTimestamp_ = 0;
+}
+
+void TEventLogFrame::VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory) {
+ const auto doVisit = [this, &visitor, eventFactory]() {
+ TBuffer& buf = Buf_.Buffer();
+
+ TBufferInput bufferInput(buf);
+ TLengthLimitedInput limitedInput(&bufferInput, buf.size());
+
+ TEventFilter EventFilter(false);
+
+ while (limitedInput.Left()) {
+ THolder<TEvent> event = DecodeEvent(limitedInput, true, 0, &EventFilter, eventFactory);
+
+ visitor.Visit(*event);
+ }
+ };
+ if (NeedAlwaysSafeAdd_) {
+ TGuard<TMutex> g(Mtx_);
+ doVisit();
+ } else {
+ doVisit();
+ }
+}
+
+TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback))
+{
+}
+
+TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback))
+{
+}
+
+TSelfFlushLogFrame::TSelfFlushLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : TEventLogFrame(needAlwaysSafeAdd, std::move(writeFrameCallback))
+{
+}
+
+TSelfFlushLogFrame::~TSelfFlushLogFrame() {
+ try {
+ Flush();
+ } catch (...) {
+ }
+}
+
+IEventLog::~IEventLog() {
+}
+
+static THolder<TLogBackend> ConstructBackend(const TString& fileName, const TEventLogBackendOptions& backendOpts) {
+ try {
+ THolder<TLogBackend> backend;
+ if (backendOpts.UseSyncPageCacheBackend) {
+ backend = MakeHolder<TSyncPageCacheFileLogBackend>(fileName, backendOpts.SyncPageCacheBackendBufferSize, backendOpts.SyncPageCacheBackendMaxPendingSize);
+ } else {
+ backend = MakeHolder<TFileLogBackend>(fileName);
+ }
+ return MakeHolder<TReopenLogBackend>(std::move(backend));
+ } catch (...) {
+ Cdbg << "Warning: Cannot open event log '" << fileName << "': " << CurrentExceptionMessage() << "." << Endl;
+ }
+
+ return MakeHolder<TNullLogBackend>();
+}
+
+TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat)
+ : Log_(ConstructBackend(fileName, backendOpts))
+ , ContentFormat_(contentFormat)
+ , LogFormat_(logFormat.Defined() ? *logFormat : COMPRESSED_LOG_FORMAT_V4)
+ , HasNullBackend_(Log_.IsNullLog())
+ , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc"))
+ , ZstdCodec_(NBlockCodecs::Codec("zstd_1"))
+{
+ Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5);
+
+ if (contentFormat & 0xff000000) {
+ ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")";
+ }
+}
+
+TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts)
+ : TEventLog(fileName, contentFormat, backendOpts, COMPRESSED_LOG_FORMAT_V4)
+{
+}
+
+TEventLog::TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat)
+ : Log_(log)
+ , ContentFormat_(contentFormat)
+ , LogFormat_(logFormat)
+ , HasNullBackend_(Log_.IsNullLog())
+ , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc"))
+ , ZstdCodec_(NBlockCodecs::Codec("zstd_1"))
+{
+ if (contentFormat & 0xff000000) {
+ ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")";
+ }
+}
+
+TEventLog::TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat)
+ : Log_(MakeHolder<TNullLogBackend>())
+ , ContentFormat_(contentFormat)
+ , LogFormat_(logFormat)
+ , HasNullBackend_(true)
+ , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc"))
+ , ZstdCodec_(NBlockCodecs::Codec("zstd_1"))
+{
+ if (contentFormat & 0xff000000) {
+ ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")";
+ }
+}
+
+TEventLog::~TEventLog() {
+}
+
+void TEventLog::ReopenLog() {
+ Log_.ReopenLog();
+}
+
+void TEventLog::CloseLog() {
+ Log_.CloseLog();
+}
+
+void TEventLog::Flush() {
+}
+
+namespace {
+ class TOnExceptionAction {
+ public:
+ TOnExceptionAction(std::function<void()>&& f)
+ : F_(std::move(f))
+ {
+ }
+
+ ~TOnExceptionAction() {
+ if (F_ && UncaughtException()) {
+ try {
+ F_();
+ } catch (...) {
+ }
+ }
+ }
+
+ private:
+ std::function<void()> F_;
+ };
+}
+
+void TEventLog::WriteFrame(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ TWriteFrameCallbackPtr writeFrameCallback,
+ TLogRecord::TMetaFlags metaFlags) {
+ Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5);
+
+ TBuffer& b1 = buffer;
+
+ size_t maxCompressedLength = (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) ? b1.Size() + 256 : ZstdCodec_->MaxCompressedLength(b1);
+
+ // Reserve enough memory to minimize reallocs
+ TBufferOutput outbuf(sizeof(TFrameHeaderData) + maxCompressedLength);
+ TBuffer& b2 = outbuf.Buffer();
+ b2.Proceed(sizeof(TFrameHeaderData));
+
+ {
+ TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data());
+
+ memcpy(hdr.SyncField, COMPRESSED_LOG_FRAME_SYNC_DATA.data(), COMPRESSED_LOG_FRAME_SYNC_DATA.size());
+ hdr.Header.Format = (LogFormat_ << 24) | (ContentFormat_ & 0xffffff);
+ hdr.Header.FrameId = GenerateFrameId();
+ hdr.HeaderEx.UncompressedDatalen = (ui32)b1.Size();
+ hdr.HeaderEx.StartTimestamp = startTimestamp;
+ hdr.HeaderEx.EndTimestamp = endTimestamp;
+ hdr.HeaderEx.PayloadChecksum = 0;
+ hdr.HeaderEx.CompressorVersion = 0;
+ }
+
+ if (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) {
+ TBuffer encoded(b1.Size() + sizeof(TFrameHeaderData) + 256);
+ Lz4hcCodec_->Encode(b1, encoded);
+
+ TZLibCompress compr(&outbuf, ZLib::ZLib, 6, 2048);
+ compr.Write(encoded.data(), encoded.size());
+ compr.Finish();
+ } else {
+ b2.Advance(ZstdCodec_->Compress(b1, b2.Pos()));
+ }
+
+ {
+ const size_t k = sizeof(TCompressedFrameBaseHeader) + COMPRESSED_LOG_FRAME_SYNC_DATA.size();
+ TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data());
+ hdr.Header.Length = static_cast<ui32>(b2.size() - k);
+ hdr.HeaderEx.PayloadChecksum = MurmurHash<ui32>(b2.data() + sizeof(TFrameHeaderData), b2.size() - sizeof(TFrameHeaderData));
+
+ const size_t n = sizeof(TFrameHeaderData) - (COMPRESSED_LOG_FRAME_SYNC_DATA.size() + sizeof(hdr.HeaderEx.HeaderChecksum));
+ hdr.HeaderEx.HeaderChecksum = MurmurHash<ui32>(b2.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(), n);
+ }
+
+ const TBuffer& frameData = outbuf.Buffer();
+
+ TOnExceptionAction actionCallback([this] {
+ if (ErrorCallback_) {
+ ErrorCallback_->OnWriteError();
+ }
+ });
+
+ if (writeFrameCallback) {
+ writeFrameCallback->OnAfterCompress(frameData, startTimestamp, endTimestamp);
+ }
+
+ Log_.Write(frameData.Data(), frameData.Size(), std::move(metaFlags));
+ if (SuccessCallback_) {
+ SuccessCallback_->OnWriteSuccess(frameData);
+ }
+}
+
+TEvent* TProtobufEventFactory::CreateLogEvent(TEventClass c) {
+ return new TProtobufEvent(c, EventFactory_);
+}
+
+TEventClass TProtobufEventFactory::ClassByName(TStringBuf name) const {
+ return EventFactory_->IdByName(name);
+}
+
+TEventClass TProtobufEventFactory::EventClassBegin() const {
+ const auto& items = EventFactory_->FactoryItems();
+
+ if (items.empty()) {
+ return static_cast<TEventClass>(0);
+ }
+
+ return static_cast<TEventClass>(items.begin()->first);
+}
+
+TEventClass TProtobufEventFactory::EventClassEnd() const {
+ const auto& items = EventFactory_->FactoryItems();
+
+ if (items.empty()) {
+ return static_cast<TEventClass>(0);
+ }
+
+ return static_cast<TEventClass>(items.rbegin()->first + 1);
+}
+
+namespace NEvClass {
+ IEventFactory* Factory() {
+ return Singleton<TProtobufEventFactory>();
+ }
+
+ IEventProcessor* Processor() {
+ return Singleton<TProtobufEventProcessor>();
+ }
+}
+
+const NProtoBuf::Message* TUnknownEvent::GetProto() const {
+ return UnknownEventMessage();
+}
+
+TStringBuf TUnknownEvent::GetName() const {
+ return TStringBuf("UnknownEvent");
+}
+
+void TUnknownEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const {
+ jsonWriter.OpenMap("EventBody");
+ jsonWriter.Write("Type", GetName());
+ jsonWriter.Write("EventId", (size_t)Class);
+ jsonWriter.CloseMap();
+}
+
+TStringBuf TEndOfFrameEvent::GetName() const {
+ return TStringBuf("EndOfFrame");
+}
+
+const NProtoBuf::Message* TEndOfFrameEvent::GetProto() const {
+ return Singleton<NEventLogInternal::TEndOfFrameEvent>();
+}
+
+void TEndOfFrameEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const {
+ jsonWriter.OpenMap("EventBody");
+ jsonWriter.Write("Type", GetName());
+ jsonWriter.OpenMap("Fields");
+ jsonWriter.CloseMap();
+ jsonWriter.CloseMap();
+}
+
+THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev) {
+ return MakeHolder<TProtobufEvent>(ts, eventId, ev);
+}
diff --git a/library/cpp/eventlog/eventlog.h b/library/cpp/eventlog/eventlog.h
new file mode 100644
index 0000000000..45c2dfb17f
--- /dev/null
+++ b/library/cpp/eventlog/eventlog.h
@@ -0,0 +1,623 @@
+#pragma once
+
+#include "eventlog_int.h"
+#include "event_field_output.h"
+#include "events_extension.h"
+
+#include <library/cpp/blockcodecs/codecs.h>
+#include <library/cpp/logger/all.h>
+
+#include <google/protobuf/message.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/stream/output.h>
+#include <util/stream/buffer.h>
+#include <util/stream/str.h>
+#include <util/system/mutex.h>
+#include <util/stream/output.h>
+#include <util/system/env.h>
+#include <util/system/unaligned_mem.h>
+#include <util/ysaveload.h>
+
+#include <cstdlib>
+
+namespace NJson {
+ class TJsonWriter;
+}
+
+class IEventLog;
+
+class TEvent : public TThrRefBase {
+public:
+ enum class TOutputFormat {
+ TabSeparated,
+ TabSeparatedRaw, // disables escaping
+ Json
+ };
+
+ struct TOutputOptions {
+ TOutputFormat OutputFormat = TOutputFormat::TabSeparated;
+ // Dump some fields (e.g. timestamp) in more human-readable format
+ bool HumanReadable = false;
+
+ TOutputOptions(TOutputFormat outputFormat = TOutputFormat::TabSeparated)
+ : OutputFormat(outputFormat)
+ {
+ }
+
+ TOutputOptions(TOutputFormat outputFormat, bool humanReadable)
+ : OutputFormat(outputFormat)
+ , HumanReadable(humanReadable)
+ {
+ }
+ };
+
+ struct TEventState {
+ TEventTimestamp FrameStartTime = 0;
+ TEventTimestamp PrevEventTime = 0;
+ TEventState() {
+ }
+ };
+
+ TEvent(TEventClass c, TEventTimestamp t)
+ : Class(c)
+ , Timestamp(t)
+ {
+ }
+
+ virtual ~TEvent() = default;
+
+ // Note, that descendants MUST have Save() & Load() methods to alter
+ // only its new variables, not the base class!
+ virtual void Save(IOutputStream& out) const = 0;
+ virtual void SaveToBuffer(TBufferOutput& out) const {
+ Save(out);
+ }
+
+ // Note, that descendants MUST have Save() & Load() methods to alter
+ // only its new variables, not the base class!
+ virtual void Load(IInputStream& i) = 0;
+
+ virtual TStringBuf GetName() const = 0;
+ virtual const NProtoBuf::Message* GetProto() const = 0;
+
+ void Print(IOutputStream& out, const TOutputOptions& options = TOutputOptions(), const TEventState& eventState = TEventState()) const;
+ void PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const;
+
+ TString ToString() const {
+ TStringStream buff;
+ Print(buff);
+ return buff.Str();
+ }
+
+ void FullSaveToBuffer(TBufferOutput& buf) const {
+ SaveMessageHeader(buf);
+ this->SaveToBuffer(buf);
+ }
+
+ void FullSave(IOutputStream& o) const {
+ SaveMessageHeader(o);
+ this->Save(o);
+ }
+
+ void FullLoad(IInputStream& i) {
+ ::Load(&i, Timestamp);
+ ::Load(&i, Class);
+ this->Load(i);
+ }
+
+ template <class T>
+ const T* Get() const {
+ return static_cast<const T*>(this->GetProto());
+ }
+
+ TEventClass Class;
+ TEventTimestamp Timestamp;
+ ui32 FrameId = 0;
+
+private:
+ void SaveMessageHeader(IOutputStream& out) const {
+ ::Save(&out, Timestamp);
+ ::Save(&out, Class);
+ }
+
+ virtual void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const = 0;
+ virtual void DoPrintJson(NJson::TJsonWriter& jsonWriter) const = 0;
+
+ void PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const;
+};
+
+using TEventPtr = TIntrusivePtr<TEvent>;
+using TConstEventPtr = TIntrusiveConstPtr<TEvent>;
+
+class IEventProcessor {
+public:
+ virtual void SetOptions(const TEvent::TOutputOptions& options) {
+ Options_ = options;
+ }
+ virtual void ProcessEvent(const TEvent* ev) = 0;
+ virtual bool CheckedProcessEvent(const TEvent* ev) {
+ ProcessEvent(ev);
+ return true;
+ }
+ virtual ~IEventProcessor() = default;
+
+protected:
+ TEvent::TOutputOptions Options_;
+};
+
+class IEventFactory {
+public:
+ virtual TEvent* CreateLogEvent(TEventClass c) = 0;
+ virtual TEventLogFormat CurrentFormat() = 0;
+ virtual TEventClass ClassByName(TStringBuf name) const = 0;
+ virtual TEventClass EventClassBegin() const = 0;
+ virtual TEventClass EventClassEnd() const = 0;
+ virtual ~IEventFactory() = default;
+};
+
+class TUnknownEvent: public TEvent {
+public:
+ TUnknownEvent(TEventTimestamp ts, TEventClass cls)
+ : TEvent(cls, ts)
+ {
+ }
+
+ ~TUnknownEvent() override = default;
+
+ void Save(IOutputStream& /* o */) const override {
+ ythrow yexception() << "TUnknownEvent cannot be saved";
+ }
+
+ void Load(IInputStream& /* i */) override {
+ ythrow yexception() << "TUnknownEvent cannot be loaded";
+ }
+
+ TStringBuf GetName() const override;
+
+private:
+ void DoPrint(IOutputStream& out, EFieldOutputFlags) const override {
+ out << GetName() << "\t" << (size_t)Class;
+ }
+
+ void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override;
+
+ const NProtoBuf::Message* GetProto() const override;
+};
+
+class TEndOfFrameEvent: public TEvent {
+public:
+ enum {
+ EventClass = 0
+ };
+
+ TEndOfFrameEvent(TEventTimestamp ts)
+ : TEvent(TEndOfFrameEvent::EventClass, ts)
+ {
+ }
+
+ ~TEndOfFrameEvent() override = default;
+
+ void Save(IOutputStream& o) const override {
+ (void)o;
+ ythrow yexception() << "TEndOfFrameEvent cannot be saved";
+ }
+
+ void Load(IInputStream& i) override {
+ (void)i;
+ ythrow yexception() << "TEndOfFrameEvent cannot be loaded";
+ }
+
+ TStringBuf GetName() const override;
+
+private:
+ void DoPrint(IOutputStream& out, EFieldOutputFlags) const override {
+ out << GetName();
+ }
+ void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override;
+
+ const NProtoBuf::Message* GetProto() const override;
+};
+
+class ILogFrameEventVisitor {
+public:
+ virtual ~ILogFrameEventVisitor() = default;
+
+ virtual void Visit(const TEvent& event) = 0;
+};
+
+class IWriteFrameCallback : public TAtomicRefCount<IWriteFrameCallback> {
+public:
+ virtual ~IWriteFrameCallback() = default;
+
+ virtual void OnAfterCompress(const TBuffer& compressedFrame, TEventTimestamp startTimestamp, TEventTimestamp endTimestamp) = 0;
+};
+
+using TWriteFrameCallbackPtr = TIntrusivePtr<IWriteFrameCallback>;
+
+class TEventLogFrame {
+public:
+ TEventLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+ TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+ TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+
+ virtual ~TEventLogFrame() = default;
+
+ void Flush();
+ void SafeFlush();
+
+ void ForceDump() {
+ ForceDump_ = true;
+ }
+
+ template <class T>
+ inline void LogEvent(const T& ev) {
+ if (NeedAlwaysSafeAdd_) {
+ SafeLogEvent(ev);
+ } else {
+ UnSafeLogEvent(ev);
+ }
+ }
+
+ template <class T>
+ inline void LogEvent(TEventTimestamp timestamp, const T& ev) {
+ if (NeedAlwaysSafeAdd_) {
+ SafeLogEvent(timestamp, ev);
+ } else {
+ UnSafeLogEvent(timestamp, ev);
+ }
+ }
+
+ template <class T>
+ inline void UnSafeLogEvent(const T& ev) {
+ if (!IsEventIgnored(ev.ID))
+ LogProtobufEvent(ev.ID, ev);
+ }
+
+ template <class T>
+ inline void UnSafeLogEvent(TEventTimestamp timestamp, const T& ev) {
+ if (!IsEventIgnored(ev.ID))
+ LogProtobufEvent(timestamp, ev.ID, ev);
+ }
+
+ template <class T>
+ inline void SafeLogEvent(const T& ev) {
+ if (!IsEventIgnored(ev.ID)) {
+ TGuard<TMutex> g(Mtx_);
+ LogProtobufEvent(ev.ID, ev);
+ }
+ }
+
+ template <class T>
+ inline void SafeLogEvent(TEventTimestamp timestamp, const T& ev) {
+ if (!IsEventIgnored(ev.ID)) {
+ TGuard<TMutex> g(Mtx_);
+ LogProtobufEvent(timestamp, ev.ID, ev);
+ }
+ }
+
+ void VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory);
+
+ inline bool IsEventIgnored(size_t eventId) const {
+ Y_UNUSED(eventId); // in future we might want to selectively discard only some kinds of messages
+ return !IsDebugModeEnabled() && EvLog_ == nullptr && !ForceDump_;
+ }
+
+ void Enable(IEventLog& evLog) {
+ EvLog_ = &evLog;
+ }
+
+ void Disable() {
+ EvLog_ = nullptr;
+ }
+
+ void SetNeedAlwaysSafeAdd(bool val) {
+ NeedAlwaysSafeAdd_ = val;
+ }
+
+ void SetWriteFrameCallback(TWriteFrameCallbackPtr writeFrameCallback) {
+ WriteFrameCallback_ = writeFrameCallback;
+ }
+
+ void AddMetaFlag(const TString& key, const TString& value) {
+ if (NeedAlwaysSafeAdd_) {
+ TGuard<TMutex> g(Mtx_);
+ MetaFlags_.emplace_back(key, value);
+ } else {
+ MetaFlags_.emplace_back(key, value);
+ }
+ }
+
+protected:
+ void LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev);
+ void LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev);
+
+private:
+ static bool IsDebugModeEnabled() {
+ static struct TSelector {
+ bool Flag;
+
+ TSelector()
+ : Flag(GetEnv("EVLOG_DEBUG") == TStringBuf("1"))
+ {
+ }
+ } selector;
+
+ return selector.Flag;
+ }
+
+ template <class T>
+ void DebugDump(const T& ev);
+
+ // T must be a descendant of NEvClass::TEvent
+ template <class T>
+ inline void LogEventImpl(const T& ev) {
+ if (EvLog_ != nullptr || ForceDump_) {
+ TBuffer& b = Buf_.Buffer();
+ size_t lastSize = b.size();
+ ::Save(&Buf_, ui32(0));
+ ev.FullSaveToBuffer(Buf_);
+ WriteUnaligned<ui32>(b.data() + lastSize, (ui32)(b.size() - lastSize));
+ AddEvent(ev.Timestamp);
+ }
+
+ if (IsDebugModeEnabled()) {
+ DebugDump(ev);
+ }
+ }
+
+ void AddEvent(TEventTimestamp timestamp);
+ void DoInit();
+
+private:
+ TBufferOutput Buf_;
+ TEventTimestamp StartTimestamp_, EndTimestamp_;
+ IEventLog* EvLog_;
+ TMutex Mtx_;
+ bool NeedAlwaysSafeAdd_;
+ bool ForceDump_;
+ TWriteFrameCallbackPtr WriteFrameCallback_;
+ TLogRecord::TMetaFlags MetaFlags_;
+ friend class TEventRecord;
+};
+
+class TSelfFlushLogFrame: public TEventLogFrame, public TAtomicRefCount<TSelfFlushLogFrame> {
+public:
+ TSelfFlushLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+ TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+ TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+
+ virtual ~TSelfFlushLogFrame();
+};
+
+using TSelfFlushLogFramePtr = TIntrusivePtr<TSelfFlushLogFrame>;
+
+class IEventLog: public TAtomicRefCount<IEventLog> {
+public:
+ class IErrorCallback {
+ public:
+ virtual ~IErrorCallback() {
+ }
+
+ virtual void OnWriteError() = 0;
+ };
+
+ class ISuccessCallback {
+ public:
+ virtual ~ISuccessCallback() {
+ }
+
+ virtual void OnWriteSuccess(const TBuffer& frameData) = 0;
+ };
+
+ virtual ~IEventLog();
+
+ virtual void ReopenLog() = 0;
+ virtual void CloseLog() = 0;
+ virtual void Flush() = 0;
+ virtual void SetErrorCallback(IErrorCallback*) {
+ }
+ virtual void SetSuccessCallback(ISuccessCallback*) {
+ }
+
+ template <class T>
+ void LogEvent(const T& ev) {
+ TEventLogFrame frame(*this);
+ frame.LogEvent(ev);
+ frame.Flush();
+ }
+
+ virtual bool HasNullBackend() const = 0;
+
+ virtual void WriteFrame(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ TWriteFrameCallbackPtr writeFrameCallback = nullptr,
+ TLogRecord::TMetaFlags metaFlags = {}) = 0;
+};
+
+struct TEventLogBackendOptions {
+ bool UseSyncPageCacheBackend = false;
+ size_t SyncPageCacheBackendBufferSize = 0;
+ size_t SyncPageCacheBackendMaxPendingSize = 0;
+};
+
+class TEventLog: public IEventLog {
+public:
+ /*
+ * Параметр contentformat указывает формат контента лога, например какие могут в логе
+ * встретится классы событий, какие параметры у этих событий, и пр. Старший байт параметра
+ * должен быть нулевым.
+ */
+ TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat);
+ TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts = {});
+ TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4);
+ TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4);
+
+ ~TEventLog() override;
+
+ void ReopenLog() override;
+ void CloseLog() override;
+ void Flush() override;
+ void SetErrorCallback(IErrorCallback* errorCallback) override {
+ ErrorCallback_ = errorCallback;
+ }
+ void SetSuccessCallback(ISuccessCallback* successCallback) override {
+ SuccessCallback_ = successCallback;
+ }
+
+ template <class T>
+ void LogEvent(const T& ev) {
+ TEventLogFrame frame(*this);
+ frame.LogEvent(ev);
+ frame.Flush();
+ }
+
+ bool HasNullBackend() const override {
+ return HasNullBackend_;
+ }
+
+ void WriteFrame(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ TWriteFrameCallbackPtr writeFrameCallback = nullptr,
+ TLogRecord::TMetaFlags metaFlags = {}) override;
+
+private:
+ mutable TLog Log_;
+ TEventLogFormat ContentFormat_;
+ const TEventLogFormat LogFormat_;
+ bool HasNullBackend_;
+ const NBlockCodecs::ICodec* const Lz4hcCodec_;
+ const NBlockCodecs::ICodec* const ZstdCodec_;
+ IErrorCallback* ErrorCallback_ = nullptr;
+ ISuccessCallback* SuccessCallback_ = nullptr;
+};
+
+using TEventLogPtr = TIntrusivePtr<IEventLog>;
+
+class TEventLogWithSlave: public IEventLog {
+public:
+ TEventLogWithSlave(IEventLog& parentLog)
+ : Slave_(&parentLog)
+ {
+ }
+
+ TEventLogWithSlave(const TEventLogPtr& parentLog)
+ : SlavePtr_(parentLog)
+ , Slave_(SlavePtr_.Get())
+ {
+ }
+
+ ~TEventLogWithSlave() override {
+ try {
+ Slave().Flush();
+ } catch (...) {
+ }
+ }
+
+ void Flush() override {
+ Slave().Flush();
+ }
+
+ void ReopenLog() override {
+ return Slave().ReopenLog();
+ }
+ void CloseLog() override {
+ return Slave().CloseLog();
+ }
+
+ bool HasNullBackend() const override {
+ return Slave().HasNullBackend();
+ }
+
+ void WriteFrame(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ TWriteFrameCallbackPtr writeFrameCallback = nullptr,
+ TLogRecord::TMetaFlags metaFlags = {}) override {
+ Slave().WriteFrame(buffer, startTimestamp, endTimestamp, writeFrameCallback, std::move(metaFlags));
+ }
+
+ void SetErrorCallback(IErrorCallback* errorCallback) override {
+ Slave().SetErrorCallback(errorCallback);
+ }
+
+ void SetSuccessCallback(ISuccessCallback* successCallback) override {
+ Slave().SetSuccessCallback(successCallback);
+ }
+
+protected:
+ inline IEventLog& Slave() const {
+ return *Slave_;
+ }
+
+private:
+ TEventLogPtr SlavePtr_;
+ IEventLog* Slave_ = nullptr;
+};
+
+extern TAtomic eventlogFrameCounter;
+
+class TProtobufEventProcessor: public IEventProcessor {
+public:
+ void ProcessEvent(const TEvent* ev) override final {
+ ProcessEvent(ev, &Cout);
+ }
+
+ void ProcessEvent(const TEvent* ev, IOutputStream *out) {
+ UpdateEventState(ev);
+ DoProcessEvent(ev, out);
+ EventState_.PrevEventTime = ev->Timestamp;
+ }
+protected:
+ virtual void DoProcessEvent(const TEvent * ev, IOutputStream *out) {
+ ev->Print(*out, Options_, EventState_);
+ (*out) << Endl;
+ }
+ ui32 CurrentFrameId_ = Max<ui32>();
+ TEvent::TEventState EventState_;
+
+private:
+ void UpdateEventState(const TEvent *ev) {
+ if (ev->FrameId != CurrentFrameId_) {
+ EventState_.FrameStartTime = ev->Timestamp;
+ EventState_.PrevEventTime = ev->Timestamp;
+ CurrentFrameId_ = ev->FrameId;
+ }
+ }
+};
+
+class TProtobufEventFactory: public IEventFactory {
+public:
+ TProtobufEventFactory(NProtoBuf::TEventFactory* factory = NProtoBuf::TEventFactory::Instance())
+ : EventFactory_(factory)
+ {
+ }
+
+ TEvent* CreateLogEvent(TEventClass c) override;
+
+ TEventLogFormat CurrentFormat() override {
+ return 0;
+ }
+
+ TEventClass ClassByName(TStringBuf name) const override;
+
+ TEventClass EventClassBegin() const override;
+
+ TEventClass EventClassEnd() const override;
+
+ ~TProtobufEventFactory() override = default;
+
+private:
+ NProtoBuf::TEventFactory* EventFactory_;
+};
+
+THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev);
+
+namespace NEvClass {
+ IEventFactory* Factory();
+ IEventProcessor* Processor();
+}
diff --git a/library/cpp/eventlog/eventlog_int.cpp b/library/cpp/eventlog/eventlog_int.cpp
new file mode 100644
index 0000000000..faa8c42cbe
--- /dev/null
+++ b/library/cpp/eventlog/eventlog_int.cpp
@@ -0,0 +1,12 @@
+#include "eventlog_int.h"
+
+#include <util/string/cast.h>
+
+TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str) {
+ EEventLogFormat format;
+ if (TryFromString(str, format)) {
+ return static_cast<TEventLogFormat>(format);
+ } else {
+ return {};
+ }
+}
diff --git a/library/cpp/eventlog/eventlog_int.h b/library/cpp/eventlog/eventlog_int.h
new file mode 100644
index 0000000000..eb00fecfab
--- /dev/null
+++ b/library/cpp/eventlog/eventlog_int.h
@@ -0,0 +1,72 @@
+#pragma once
+
+#include <util/stream/output.h>
+#include <util/generic/maybe.h>
+#include <util/generic/utility.h>
+#include <util/generic/yexception.h>
+#include <util/ysaveload.h>
+
+using TEventClass = ui32;
+using TEventLogFormat = ui32;
+using TEventTimestamp = ui64;
+
+constexpr TStringBuf COMPRESSED_LOG_FRAME_SYNC_DATA =
+ "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+ "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+ "\x00\x00\x00\x00\xfe\x00\x00\xff\xff\x00\x00\xff\xff\x00"
+ "\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff"
+ "\xff\x00\x00\xff\xff\x00\x00\xff"sv;
+
+static_assert(COMPRESSED_LOG_FRAME_SYNC_DATA.size() == 64);
+
+/*
+ * Коды форматов логов. Форматом лога считается формат служебных
+ * структур лога. К примеру формат заголовка, наличие компрессии, и т.д.
+ * Имеет значение только 1 младший байт.
+ */
+
+enum EEventLogFormat : TEventLogFormat {
+ // Формат версии 1. Используется компрессор LZQ.
+ COMPRESSED_LOG_FORMAT_V1 = 1,
+
+ // Формат версии 2. Используется компрессор ZLIB. Добавлены CRC заголовка и данных,
+ // поле типа компрессора.
+ COMPRESSED_LOG_FORMAT_V2 = 2,
+
+ // Формат версии 3. Используется компрессор ZLIB. В начинке фреймов перед каждым событием добавлен его размер.
+ COMPRESSED_LOG_FORMAT_V3 = 3,
+
+ // Lz4hc codec + zlib
+ COMPRESSED_LOG_FORMAT_V4 = 4 /* "zlib_lz4" */,
+
+ // zstd
+ COMPRESSED_LOG_FORMAT_V5 = 5 /* "zstd" */,
+};
+
+TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str);
+
+#pragma pack(push, 1)
+
+struct TCompressedFrameBaseHeader {
+ TEventLogFormat Format;
+ ui32 Length; // Длина остатка фрейма в байтах, после этого заголовка
+ ui32 FrameId;
+};
+
+struct TCompressedFrameHeader {
+ TEventTimestamp StartTimestamp;
+ TEventTimestamp EndTimestamp;
+ ui32 UncompressedDatalen; // Длина данных, которые были закомпрессированы
+ ui32 PayloadChecksum; // В логе версии 1 поле не используется
+};
+
+struct TCompressedFrameHeader2: public TCompressedFrameHeader {
+ ui8 CompressorVersion; // Сейчас не используется
+ ui32 HeaderChecksum;
+};
+
+#pragma pack(pop)
+
+Y_DECLARE_PODTYPE(TCompressedFrameBaseHeader);
+Y_DECLARE_PODTYPE(TCompressedFrameHeader);
+Y_DECLARE_PODTYPE(TCompressedFrameHeader2);
diff --git a/library/cpp/eventlog/events_extension.h b/library/cpp/eventlog/events_extension.h
new file mode 100644
index 0000000000..0cf062f959
--- /dev/null
+++ b/library/cpp/eventlog/events_extension.h
@@ -0,0 +1,161 @@
+#pragma once
+
+#include "event_field_output.h"
+
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/message.h>
+
+#include <library/cpp/threading/atomic/bool.h>
+#include <library/cpp/string_utils/base64/base64.h>
+
+#include <util/generic/map.h>
+#include <util/generic/deque.h>
+#include <util/generic/singleton.h>
+#include <util/string/hex.h>
+#include <util/system/guard.h>
+#include <util/system/mutex.h>
+
+namespace NProtoBuf {
+ class TEventFactory {
+ public:
+ typedef ::google::protobuf::Message Message;
+ typedef void (*TEventSerializer)(const Message* event, IOutputStream& output, EFieldOutputFlags flags);
+ typedef void (*TRegistrationFunc)();
+
+ private:
+ class TFactoryItem {
+ public:
+ TFactoryItem(const Message* prototype, const TEventSerializer serializer)
+ : Prototype_(prototype)
+ , Serializer_(serializer)
+ {
+ }
+
+ TStringBuf GetName() const {
+ return Prototype_->GetDescriptor()->name();
+ }
+
+ Message* Create() const {
+ return Prototype_->New();
+ }
+
+ void PrintEvent(const Message* event, IOutputStream& out, EFieldOutputFlags flags) const {
+ (*Serializer_)(event, out, flags);
+ }
+
+ private:
+ const Message* Prototype_;
+ const TEventSerializer Serializer_;
+ };
+
+ typedef TMap<size_t, TFactoryItem> TFactoryMap;
+
+ public:
+ TEventFactory()
+ : FactoryItems_()
+ {
+ }
+
+ void ScheduleRegistration(TRegistrationFunc func) {
+ EventRegistrators_.push_back(func);
+ }
+
+ void RegisterEvent(size_t eventId, const Message* prototype, const TEventSerializer serializer) {
+ FactoryItems_.insert(std::make_pair(eventId, TFactoryItem(prototype, serializer)));
+ }
+
+ size_t IdByName(TStringBuf eventname) {
+ DelayedRegistration();
+ for (TFactoryMap::const_iterator it = FactoryItems_.begin(); it != FactoryItems_.end(); ++it) {
+ if (it->second.GetName() == eventname)
+ return it->first;
+ }
+
+ ythrow yexception() << "do not know event '" << eventname << "'";
+ }
+
+ TStringBuf NameById(size_t id) {
+ DelayedRegistration();
+ TFactoryMap::const_iterator it = FactoryItems_.find(id);
+ return it != FactoryItems_.end() ? it->second.GetName() : TStringBuf();
+ }
+
+ Message* CreateEvent(size_t eventId) {
+ DelayedRegistration();
+ TFactoryMap::const_iterator it = FactoryItems_.find(eventId);
+
+ if (it != FactoryItems_.end()) {
+ return it->second.Create();
+ }
+
+ return nullptr;
+ }
+
+ const TMap<size_t, TFactoryItem>& FactoryItems() {
+ DelayedRegistration();
+ return FactoryItems_;
+ }
+
+ void PrintEvent(
+ size_t eventId,
+ const Message* event,
+ IOutputStream& output,
+ EFieldOutputFlags flags = {}) {
+ DelayedRegistration();
+ TFactoryMap::const_iterator it = FactoryItems_.find(eventId);
+
+ if (it != FactoryItems_.end()) {
+ it->second.PrintEvent(event, output, flags);
+ }
+ }
+
+ static TEventFactory* Instance() {
+ return Singleton<TEventFactory>();
+ }
+
+ private:
+ void DelayedRegistration() {
+ if (!DelayedRegistrationDone_) {
+ TGuard<TMutex> guard(MutexEventRegistrators_);
+ Y_UNUSED(guard);
+ while (!EventRegistrators_.empty()) {
+ EventRegistrators_.front()();
+ EventRegistrators_.pop_front();
+ }
+ DelayedRegistrationDone_ = true;
+ }
+ }
+
+ private:
+ TMap<size_t, TFactoryItem> FactoryItems_;
+ TDeque<TRegistrationFunc> EventRegistrators_;
+ NAtomic::TBool DelayedRegistrationDone_ = false;
+ TMutex MutexEventRegistrators_;
+ };
+
+ template <typename T>
+ void PrintAsBytes(const T& obj, IOutputStream& output) {
+ const ui8* b = reinterpret_cast<const ui8*>(&obj);
+ const ui8* e = b + sizeof(T);
+ const char* delim = "";
+
+ while (b != e) {
+ output << delim;
+ output << (int)*b++;
+ delim = ".";
+ }
+ }
+
+ template <typename T>
+ void PrintAsHex(const T& obj, IOutputStream& output) {
+ output << "0x";
+ output << HexEncode(&obj, sizeof(T));
+ }
+
+ inline void PrintAsBase64(TStringBuf data, IOutputStream& output) {
+ if (!data.empty()) {
+ output << Base64Encode(data);
+ }
+ }
+
+}
diff --git a/library/cpp/eventlog/iterator.cpp b/library/cpp/eventlog/iterator.cpp
new file mode 100644
index 0000000000..71f955bca8
--- /dev/null
+++ b/library/cpp/eventlog/iterator.cpp
@@ -0,0 +1,88 @@
+#include "iterator.h"
+
+#include <library/cpp/streams/growing_file_input/growing_file_input.h>
+
+#include <util/string/cast.h>
+#include <util/string/split.h>
+#include <util/string/type.h>
+#include <util/stream/file.h>
+
+using namespace NEventLog;
+
+namespace {
+ inline TIntrusivePtr<TEventFilter> ConstructEventFilter(bool enableEvents, const TString& evList, IEventFactory* fac) {
+ if (evList.empty()) {
+ return nullptr;
+ }
+
+ TVector<TString> events;
+
+ StringSplitter(evList).Split(',').SkipEmpty().Collect(&events);
+ if (events.empty()) {
+ return nullptr;
+ }
+
+ TIntrusivePtr<TEventFilter> filter(new TEventFilter(enableEvents));
+
+ for (const auto& event : events) {
+ if (IsNumber(event))
+ filter->AddEventClass(FromString<size_t>(event));
+ else
+ filter->AddEventClass(fac->ClassByName(event));
+ }
+
+ return filter;
+ }
+
+ struct TIterator: public IIterator {
+ inline TIterator(const TOptions& o, IEventFactory* fac)
+ : First(true)
+ {
+ if (o.FileName.size()) {
+ if (o.ForceStreamMode || o.TailFMode) {
+ FileInput.Reset(o.TailFMode ? (IInputStream*)new TGrowingFileInput(o.FileName) : (IInputStream*)new TUnbufferedFileInput(o.FileName));
+ FrameStream.Reset(new TFrameStreamer(*FileInput, fac, o.FrameFilter));
+ } else {
+ FrameStream.Reset(new TFrameStreamer(o.FileName, o.StartTime, o.EndTime, o.MaxRequestDuration, fac, o.FrameFilter));
+ }
+ } else {
+ FrameStream.Reset(new TFrameStreamer(*o.Input, fac, o.FrameFilter));
+ }
+
+ EvFilter = ConstructEventFilter(o.EnableEvents, o.EvList, fac);
+ EventStream.Reset(new TEventStreamer(*FrameStream, o.StartTime, o.EndTime, o.ForceStrongOrdering, EvFilter, o.ForceLosslessStrongOrdering));
+ }
+
+ TConstEventPtr Next() override {
+ if (First) {
+ First = false;
+
+ if (!EventStream->Avail()) {
+ return nullptr;
+ }
+ } else {
+ if (!EventStream->Next()) {
+ return nullptr;
+ }
+ }
+
+ return **EventStream;
+ }
+
+ THolder<IInputStream> FileInput;
+ THolder<TFrameStreamer> FrameStream;
+ TIntrusivePtr<TEventFilter> EvFilter;
+ THolder<TEventStreamer> EventStream;
+ bool First;
+ };
+}
+
+IIterator::~IIterator() = default;
+
+THolder<IIterator> NEventLog::CreateIterator(const TOptions& o, IEventFactory* fac) {
+ return MakeHolder<TIterator>(o, fac);
+}
+
+THolder<IIterator> NEventLog::CreateIterator(const TOptions& o) {
+ return MakeHolder<TIterator>(o, NEvClass::Factory());
+}
diff --git a/library/cpp/eventlog/iterator.h b/library/cpp/eventlog/iterator.h
new file mode 100644
index 0000000000..71a61ed549
--- /dev/null
+++ b/library/cpp/eventlog/iterator.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include <util/stream/input.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/iterator.h>
+
+#include "eventlog.h"
+#include "logparser.h"
+
+namespace NEventLog {
+ struct TOptions {
+ inline TOptions& SetFileName(const TString& fileName) {
+ FileName = fileName;
+
+ return *this;
+ }
+
+ inline TOptions& SetForceStrongOrdering(bool v) {
+ if(!ForceLosslessStrongOrdering) {
+ ForceStrongOrdering = v;
+ }
+
+ return *this;
+ }
+
+ ui64 StartTime = MIN_START_TIME;
+ ui64 EndTime = MAX_END_TIME;
+ ui64 MaxRequestDuration = MAX_REQUEST_DURATION;
+ TString FileName;
+ bool ForceStrongOrdering = false;
+ bool ForceWeakOrdering = false;
+ bool EnableEvents = true;
+ TString EvList;
+ bool ForceStreamMode = false;
+ bool ForceLosslessStrongOrdering = false;
+ bool TailFMode = false;
+ IInputStream* Input = &Cin;
+ IFrameFilterRef FrameFilter;
+ };
+
+ class IIterator: public TInputRangeAdaptor<IIterator> {
+ public:
+ virtual ~IIterator();
+
+ virtual TConstEventPtr Next() = 0;
+ };
+
+ THolder<IIterator> CreateIterator(const TOptions& o);
+ THolder<IIterator> CreateIterator(const TOptions& o, IEventFactory* fac);
+}
diff --git a/library/cpp/eventlog/logparser.cpp b/library/cpp/eventlog/logparser.cpp
new file mode 100644
index 0000000000..6f8959f788
--- /dev/null
+++ b/library/cpp/eventlog/logparser.cpp
@@ -0,0 +1,814 @@
+#include "logparser.h"
+#include "evdecoder.h"
+
+#include <util/stream/output.h>
+#include <util/stream/zlib.h>
+#include <util/digest/murmur.h>
+#include <util/generic/algorithm.h>
+#include <util/generic/scope.h>
+#include <util/generic/hash_set.h>
+#include <util/string/split.h>
+#include <util/string/cast.h>
+#include <util/string/escape.h>
+#include <util/string/builder.h>
+
+#include <contrib/libs/re2/re2/re2.h>
+
+#include <algorithm>
+#include <array>
+
+namespace {
+ bool FastforwardUntilSyncHeader(IInputStream* in) {
+ // Usually this function finds the correct header at the first hit
+ std::array<char, COMPRESSED_LOG_FRAME_SYNC_DATA.size()> buffer;
+ if (in->Load(buffer.data(), buffer.size()) != buffer.size()) {
+ return false;
+ }
+
+ auto begin = buffer.begin();
+
+ for (;;) {
+ if (std::mismatch(
+ begin, buffer.end(),
+ COMPRESSED_LOG_FRAME_SYNC_DATA.begin()).first == buffer.end() &&
+ std::mismatch(
+ buffer.begin(), begin,
+ COMPRESSED_LOG_FRAME_SYNC_DATA.begin() + (buffer.end() - begin)).first == begin) {
+ return true;
+ }
+ if (!in->ReadChar(*begin)) {
+ return false;
+ }
+ ++begin;
+ if (begin == buffer.end()) {
+ begin = buffer.begin();
+ }
+ }
+ }
+
+ bool HasCorrectChecksum(const TFrameHeader& header) {
+ // Calculating hash over all the fields of the read header except for the field with the hash of the header itself.
+ const size_t baseSize = sizeof(TCompressedFrameBaseHeader) + sizeof(TCompressedFrameHeader2) - sizeof(ui32);
+ const ui32 checksum = MurmurHash<ui32>(&header.Basehdr, baseSize);
+ return checksum == header.Framehdr.HeaderChecksum;
+ }
+
+ TMaybe<TFrameHeader> FindNextFrameHeader(IInputStream* in) {
+ for (;;) {
+ if (FastforwardUntilSyncHeader(in)) {
+ try {
+ return TFrameHeader(*in);
+ } catch (const TFrameLoadError& err) {
+ Cdbg << err.what() << Endl;
+ in->Skip(err.SkipAfter);
+ }
+ } else {
+ return Nothing();
+ }
+ }
+ }
+
+ std::pair<TMaybe<TFrameHeader>, TStringBuf> FindNextFrameHeader(TStringBuf span) {
+ for (;;) {
+ auto iter = std::search(
+ span.begin(), span.end(),
+ COMPRESSED_LOG_FRAME_SYNC_DATA.begin(), COMPRESSED_LOG_FRAME_SYNC_DATA.end());
+ const size_t offset = iter - span.begin();
+
+ if (offset != span.size()) {
+ span = span.substr(offset);
+ try {
+ TMemoryInput in(
+ span.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(),
+ span.size() - COMPRESSED_LOG_FRAME_SYNC_DATA.size());
+ return {TFrameHeader(in), span};
+ } catch (const TFrameLoadError& err) {
+ Cdbg << err.what() << Endl;
+ span = span.substr(err.SkipAfter);
+ }
+ } else {
+ return {Nothing(), {}};
+ }
+ }
+ }
+
+ size_t FindFrames(const TStringBuf span, ui64 start, ui64 end, ui64 maxRequestDuration) {
+ Y_ENSURE(start <= end);
+
+ const auto leftTimeBound = start - Min(start, maxRequestDuration);
+ const auto rightTimeBound = end + Min(maxRequestDuration, Max<ui64>() - end);
+
+ TStringBuf subspan = span;
+ TMaybe<TFrameHeader> maybeLeftFrame;
+ std::tie(maybeLeftFrame, subspan) = FindNextFrameHeader(subspan);
+
+ if (!maybeLeftFrame || maybeLeftFrame->EndTime() > rightTimeBound) {
+ return span.size();
+ }
+
+ if (maybeLeftFrame->StartTime() > leftTimeBound) {
+ return 0;
+ }
+
+ while (subspan.size() > maybeLeftFrame->FullLength()) {
+ const auto mid = subspan.data() + subspan.size() / 2;
+ auto [midFrame, rightHalfSpan] = FindNextFrameHeader({mid, subspan.data() + subspan.size()});
+ if (!midFrame) {
+ // If mid is in the middle of the last frame, here we will lose it meaning that
+ // we will find previous frame as the result.
+ // This is fine because we will iterate frames starting from that.
+ subspan = subspan.substr(0, subspan.size() / 2);
+ continue;
+ }
+ if (midFrame->StartTime() <= leftTimeBound) {
+ maybeLeftFrame = midFrame;
+ subspan = rightHalfSpan;
+ } else {
+ subspan = subspan.substr(0, subspan.size() / 2);
+ }
+ }
+
+ return subspan.data() - span.data();
+ }
+}
+
+TFrameHeader::TFrameHeader(IInputStream& in) {
+ try {
+ ::Load(&in, Basehdr);
+
+ Y_ENSURE(Basehdr.Length, "Empty frame additional data");
+
+ ::Load(&in, Framehdr);
+ switch (LogFormat()) {
+ case COMPRESSED_LOG_FORMAT_V1:
+ break;
+
+ case COMPRESSED_LOG_FORMAT_V2:
+ case COMPRESSED_LOG_FORMAT_V3:
+ case COMPRESSED_LOG_FORMAT_V4:
+ case COMPRESSED_LOG_FORMAT_V5:
+ Y_ENSURE(!Framehdr.CompressorVersion, "Wrong compressor");
+
+ Y_ENSURE(HasCorrectChecksum(*this), "Wrong header checksum");
+ break;
+
+ default:
+ ythrow yexception() << "Unsupported log structure format";
+ };
+
+ Y_ENSURE(Framehdr.StartTimestamp <= Framehdr.EndTimestamp, "Wrong start/end timestamps");
+
+ // Each frame must contain at least one event.
+ Y_ENSURE(Framehdr.UncompressedDatalen, "Empty frame payload");
+ } catch (...) {
+ TString location = "";
+ if (const auto* cnt = dynamic_cast<TCountingInput *>(&in)) {
+ location = "@ " + ToString(cnt->Counter());
+ }
+ ythrow TFrameLoadError(FrameLength()) << "Frame Load Error" << location << ": " << CurrentExceptionMessage();
+ }
+}
+
+TFrame::TFrame(IInputStream& in, TFrameHeader header, IEventFactory* fac)
+ : TFrameHeader(header)
+ , Limiter_(MakeHolder<TLengthLimitedInput>(&in, header.FrameLength()))
+ , Fac_(fac)
+{
+ if (auto* cnt = dynamic_cast<TCountingInput *>(&in)) {
+ Address_ = cnt->Counter() - sizeof(TFrameHeader);
+ } else {
+ Address_ = 0;
+ }
+}
+
+TFrame::TIterator TFrame::GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter) const {
+ if (EventsCache_.empty()) {
+ for (TFrameDecoder decoder{*this, eventFilter.Get()}; decoder.Avail(); decoder.Next()) {
+ EventsCache_.emplace_back(*decoder);
+ }
+ }
+
+ return TIterator(*this, eventFilter);
+}
+
+void TFrame::ClearEventsCache() const {
+ EventsCache_.clear();
+}
+
+TString TFrame::GetCompressedFrame() const {
+ const auto left = Limiter_->Left();
+ TString payload = Limiter_->ReadAll();
+ Y_ENSURE(payload.size() == left, "Could not read frame payload: premature end of stream");
+ const ui32 checksum = MurmurHash<ui32>(payload.data(), payload.size());
+ Y_ENSURE(checksum == Framehdr.PayloadChecksum, "Invalid frame checksum");
+
+ return payload;
+}
+
+TString TFrame::GetRawFrame() const {
+ TString frameBuf = GetCompressedFrame();
+ TStringInput sin(frameBuf);
+ return TZLibDecompress{&sin}.ReadAll();
+}
+
+TFrame::TIterator::TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter)
+ : Frame_(frame)
+ , Size_(frame.EventsCache_.size())
+ , Filter_(filter)
+ , Index_(0)
+{
+ SkipToValidEvent();
+}
+
+TConstEventPtr TFrame::TIterator::operator*() const {
+ return Frame_.GetEvent(Index_);
+}
+
+bool TFrame::TIterator::Next() {
+ Index_++;
+ SkipToValidEvent();
+ return Index_ < Size_;
+}
+
+void TFrame::TIterator::SkipToValidEvent() {
+ if (!Filter_) {
+ return;
+ }
+
+ for (; Index_ < Size_; ++Index_) {
+ if (Filter_->EventAllowed(Frame_.GetEvent(Index_)->Class)) {
+ break;
+ }
+ }
+}
+
+TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory* eventFactory) {
+ if (auto header = FindNextFrameHeader(in)) {
+ return TFrame{*in, *header, eventFactory};
+ } else {
+ return Nothing();
+ }
+}
+
+TContainsEventFrameFilter::TContainsEventFrameFilter(const TString& unparsedMatchGroups, const IEventFactory* eventFactory) {
+ TVector<TStringBuf> tokens;
+
+ SplitWithEscaping(tokens, unparsedMatchGroups, "/");
+
+ // Amount of match groups
+ size_t size = tokens.size();
+ MatchGroups.resize(size);
+
+ for (size_t i = 0; i < size; i++) {
+ TMatchGroup& group = MatchGroups[i];
+ TVector<TStringBuf> groupTokens;
+ SplitWithEscaping(groupTokens, tokens[i], ":");
+
+ Y_ENSURE(groupTokens.size() == 3);
+
+ try {
+ group.EventID = eventFactory->ClassByName(groupTokens[0]);
+ } catch (yexception& e) {
+ if (!TryFromString<TEventClass>(groupTokens[0], group.EventID)) {
+ e << "\nAppend:\n" << "Cannot derive EventId from EventType: " << groupTokens[0];
+ throw e;
+ }
+ }
+
+ group.FieldName = groupTokens[1];
+ group.ValueToMatch = UnescapeCharacters(groupTokens[2], "/:");
+ }
+}
+
+bool TContainsEventFrameFilter::FrameAllowed(const TFrame& frame) const {
+ THashSet<size_t> toMatchSet;
+ for (size_t i = 0; i < MatchGroups.size(); i++) {
+ toMatchSet.insert(i);
+ }
+
+ for (auto it = frame.GetIterator(); it.Avail(); it.Next()) {
+ TConstEventPtr event(*it);
+ TVector<size_t> indicesToErase;
+
+ if (!toMatchSet.empty()) {
+ const NProtoBuf::Message* message = event->GetProto();
+ const google::protobuf::Descriptor* descriptor = message->GetDescriptor();
+ const google::protobuf::Reflection* reflection = message->GetReflection();
+
+ Y_ENSURE(descriptor);
+ Y_ENSURE(reflection);
+
+ for (size_t groupIndex : toMatchSet) {
+ const TMatchGroup& group = MatchGroups[groupIndex];
+
+ if (event->Class == group.EventID) {
+ TVector<TString> parts = StringSplitter(group.FieldName).Split('.').ToList<TString>();
+ TString lastPart = std::move(parts.back());
+ parts.pop_back();
+
+ for (auto part : parts) {
+ auto fieldDescriptor = descriptor->FindFieldByName(part);
+ Y_ENSURE(fieldDescriptor, "Cannot find field \"" + part + "\". Full fieldname is \"" + group.FieldName + "\".");
+
+ message = &reflection->GetMessage(*message, fieldDescriptor);
+ descriptor = message->GetDescriptor();
+ reflection = message->GetReflection();
+
+ Y_ENSURE(descriptor);
+ Y_ENSURE(reflection);
+ }
+
+ const google::protobuf::FieldDescriptor* fieldDescriptor = descriptor->FindFieldByName(lastPart);
+ Y_ENSURE(fieldDescriptor, "Cannot find field \"" + lastPart + "\". Full fieldname is \"" + group.FieldName + "\".");
+
+ TString fieldValue = GetEventFieldAsString(message, fieldDescriptor, reflection);
+ if (re2::RE2::FullMatch(fieldValue, group.ValueToMatch)) {
+ indicesToErase.push_back(groupIndex);
+ }
+ }
+ }
+
+ for (size_t idx : indicesToErase) {
+ toMatchSet.erase(idx);
+ }
+
+ if (toMatchSet.empty()) {
+ return true;
+ }
+ }
+ }
+
+ return toMatchSet.empty();
+}
+
+void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet) {
+ size_t tokenStart = 0;
+ const TString characterSet = TString::Join("\\", externalCharacterSet);
+
+ for (size_t position = stringToSplit.find_first_of(characterSet); position != TString::npos; position = stringToSplit.find_first_of(characterSet, position + 1)) {
+ if (stringToSplit[position] == '\\') {
+ position++;
+ } else {
+ if (tokenStart != position) {
+ tokens.push_back(TStringBuf(stringToSplit, tokenStart, position - tokenStart));
+ }
+ tokenStart = position + 1;
+ }
+ }
+
+ if (tokenStart < stringToSplit.size()) {
+ tokens.push_back(TStringBuf(stringToSplit, tokenStart, stringToSplit.size() - tokenStart));
+ }
+}
+
+TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet) {
+ TStringBuilder stringBuilder;
+ size_t tokenStart = 0;
+
+ for (size_t position = stringToUnescape.find('\\', 0u); position != TString::npos; position = stringToUnescape.find('\\', position + 2)) {
+ if (position + 1 < stringToUnescape.size() && characterSet.find(stringToUnescape[position + 1]) != TString::npos) {
+ stringBuilder << TStringBuf(stringToUnescape, tokenStart, position - tokenStart);
+ tokenStart = position + 1;
+ }
+ }
+
+ if (tokenStart < stringToUnescape.size()) {
+ stringBuilder << TStringBuf(stringToUnescape, tokenStart, stringToUnescape.size() - tokenStart);
+ }
+
+ return stringBuilder;
+}
+
+TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection) {
+ Y_ENSURE(message);
+ Y_ENSURE(fieldDescriptor);
+ Y_ENSURE(reflection);
+
+ TString result;
+ switch (fieldDescriptor->type()) {
+ case google::protobuf::FieldDescriptor::Type::TYPE_DOUBLE:
+ result = ToString(reflection->GetDouble(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_FLOAT:
+ result = ToString(reflection->GetFloat(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_BOOL:
+ result = ToString(reflection->GetBool(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_INT32:
+ result = ToString(reflection->GetInt32(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_UINT32:
+ result = ToString(reflection->GetUInt32(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_INT64:
+ result = ToString(reflection->GetInt64(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_UINT64:
+ result = ToString(reflection->GetUInt64(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_STRING:
+ result = ToString(reflection->GetString(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_ENUM:
+ {
+ const NProtoBuf::EnumValueDescriptor* enumValueDescriptor = reflection->GetEnum(*message, fieldDescriptor);
+ result = ToString(enumValueDescriptor->name());
+ }
+ break;
+ default:
+ throw yexception() << "GetEventFieldAsString for type " << fieldDescriptor->type_name() << " is not implemented.";
+ }
+ return result;
+}
+
+TFrameStreamer::TFrameStreamer(IInputStream& s, IEventFactory* fac, IFrameFilterRef ff)
+ : In_(&s)
+ , FrameFilter_(ff)
+ , EventFactory_(fac)
+{
+ Frame_ = FindNextFrame(&In_, EventFactory_);
+
+ SkipToAllowedFrame();
+}
+
+TFrameStreamer::TFrameStreamer(
+ const TString& fileName,
+ ui64 startTime,
+ ui64 endTime,
+ ui64 maxRequestDuration,
+ IEventFactory* fac,
+ IFrameFilterRef ff)
+ : File_(TBlob::FromFile(fileName))
+ , MemoryIn_(File_.Data(), File_.Size())
+ , In_(&MemoryIn_)
+ , StartTime_(startTime)
+ , EndTime_(endTime)
+ , CutoffTime_(endTime + Min(maxRequestDuration, Max<ui64>() - endTime))
+ , FrameFilter_(ff)
+ , EventFactory_(fac)
+{
+ In_.Skip(FindFrames(File_.AsStringBuf(), startTime, endTime, maxRequestDuration));
+ Frame_ = FindNextFrame(&In_, fac);
+ SkipToAllowedFrame();
+}
+
+TFrameStreamer::~TFrameStreamer() = default;
+
+bool TFrameStreamer::Avail() const {
+ return Frame_.Defined();
+}
+
+const TFrame& TFrameStreamer::operator*() const {
+ Y_ENSURE(Frame_, "Frame streamer depleted");
+
+ return *Frame_;
+}
+
+bool TFrameStreamer::Next() {
+ DoNext();
+ SkipToAllowedFrame();
+
+ return Frame_.Defined();
+}
+
+bool TFrameStreamer::AllowedTimeRange(const TFrame& frame) const {
+ const bool allowedStartTime = (StartTime_ == 0) || ((StartTime_ <= frame.StartTime()) && (frame.StartTime() <= EndTime_));
+ const bool allowedEndTime = (EndTime_ == 0) || ((StartTime_ <= frame.EndTime()) && (frame.EndTime() <= EndTime_));
+ return allowedStartTime || allowedEndTime;
+}
+
+bool TFrameStreamer::DoNext() {
+ if (!Frame_) {
+ return false;
+ }
+ In_.Skip(Frame_->Limiter_->Left());
+ Frame_ = FindNextFrame(&In_, EventFactory_);
+
+ if (Frame_ && CutoffTime_ > 0 && Frame_->EndTime() > CutoffTime_) {
+ Frame_.Clear();
+ }
+
+ return Frame_.Defined();
+}
+
+namespace {
+ struct TDecodeBuffer {
+ TDecodeBuffer(const TString codec, IInputStream& src, size_t bs) {
+ TBuffer from(bs);
+
+ {
+ TBufferOutput b(from);
+ TransferData(&src, &b);
+ }
+
+ NBlockCodecs::Codec(codec)->Decode(from, DecodeBuffer);
+ }
+
+ explicit TDecodeBuffer(IInputStream& src) {
+ TBufferOutput b(DecodeBuffer);
+ TransferData(&src, &b);
+ }
+
+ TBuffer DecodeBuffer;
+ };
+
+ class TBlockCodecStream: private TDecodeBuffer, public TBufferInput {
+ public:
+ TBlockCodecStream(const TString codec, IInputStream& src, size_t bs)
+ : TDecodeBuffer(codec, src, bs)
+ , TBufferInput(DecodeBuffer)
+ {}
+
+ explicit TBlockCodecStream(IInputStream& src)
+ : TDecodeBuffer(src)
+ , TBufferInput(DecodeBuffer)
+ {}
+ };
+}
+
+TFrameDecoder::TFrameDecoder(const TFrame& fr, const TEventFilter* const filter, bool strict, bool withRawData)
+ : Frame_(fr)
+ , Event_(nullptr)
+ , Flt_(filter)
+ , Fac_(fr.Fac_)
+ , EndOfFrame_(new TEndOfFrameEvent(Frame_.EndTime()))
+ , Strict_(strict)
+ , WithRawData_(withRawData)
+{
+ switch (fr.LogFormat()) {
+ case COMPRESSED_LOG_FORMAT_V2:
+ case COMPRESSED_LOG_FORMAT_V3:
+ case COMPRESSED_LOG_FORMAT_V4:
+ case COMPRESSED_LOG_FORMAT_V5: {
+ const auto payload = fr.GetCompressedFrame();
+ TMemoryInput payloadInput{payload};
+
+ if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V5) {
+ Decompressor_.Reset(new TBlockCodecStream("zstd_1", payloadInput, payload.size()));
+ } else {
+ TZLibDecompress zlib(&payloadInput);
+ Decompressor_.Reset(new TBlockCodecStream(zlib));
+ if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V4) {
+ Decompressor_.Reset(new TBlockCodecStream("lz4hc", *Decompressor_, payload.size()));
+ }
+ }
+
+ break;
+ }
+
+ default:
+ ythrow yexception() << "unsupported log format: " << fr.LogFormat() << Endl;
+ break;
+ };
+
+ if (WithRawData_) {
+ TBufferOutput out(UncompressedData_);
+ TLengthLimitedInput limiter(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen);
+
+ TransferData(&limiter, &out);
+ Decompressor_.Reset(new TMemoryInput(UncompressedData_.data(), UncompressedData_.size()));
+ }
+
+ Limiter_.Reset(new TLengthLimitedInput(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen));
+
+ Decode();
+}
+
+TFrameDecoder::~TFrameDecoder() = default;
+
+bool TFrameDecoder::Avail() const {
+ return HaveData();
+}
+
+TConstEventPtr TFrameDecoder::operator*() const {
+ Y_ENSURE(HaveData(), "Decoder depleted");
+
+ return Event_;
+}
+
+bool TFrameDecoder::Next() {
+ if (HaveData()) {
+ Decode();
+ }
+
+ return HaveData();
+}
+
+void TFrameDecoder::Decode() {
+ Event_ = nullptr;
+ const bool framed = (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V3) || (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V4 || Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V5);
+
+ size_t evBegin = 0;
+ size_t evEnd = 0;
+ if (WithRawData_)
+ evBegin = UncompressedData_.Size() - Limiter_->Left();
+
+ while (Limiter_->Left() && !(Event_ = DecodeEvent(*Limiter_, framed, Frame_.Address(), Flt_, Fac_, Strict_).Release())) {
+ }
+
+ if (WithRawData_) {
+ evEnd = UncompressedData_.Size() - Limiter_->Left();
+ RawEventData_ = TStringBuf(UncompressedData_.data() + evBegin, UncompressedData_.data() + evEnd);
+ }
+
+ if (!Event_ && (!Flt_ || (Flt_->EventAllowed(TEndOfFrameEvent::EventClass)))) {
+ Event_ = EndOfFrame_.Release();
+ }
+
+ if (!!Event_) {
+ Event_->FrameId = Frame_.FrameId();
+ }
+}
+
+const TStringBuf TFrameDecoder::GetRawEvent() const {
+ return RawEventData_;
+}
+
+TEventStreamer::TEventStreamer(TFrameStream& fs, ui64 s, ui64 e, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering)
+ : Frames_(fs)
+ , Start_(s)
+ , End_(e)
+ , MaxEndTimestamp_(0)
+ , Frontier_(0)
+ , StrongOrdering_(strongOrdering)
+ , LosslessStrongOrdering_(losslessStrongOrdering)
+ , EventFilter_(filter)
+{
+
+ if (Start_ > End_) {
+ ythrow yexception() << "Wrong main interval";
+ }
+
+ TEventStreamer::Next();
+}
+
+TEventStreamer::~TEventStreamer() = default;
+
+bool TEventStreamer::Avail() const {
+ return Events_.Avail() && (*Events_)->Timestamp <= Frontier_;
+}
+
+TConstEventPtr TEventStreamer::operator*() const {
+ Y_ENSURE(TEventStreamer::Avail(), "Event streamer depleted");
+
+ return *Events_;
+}
+
+bool TEventStreamer::Next() {
+ if (Events_.Avail() && Events_.Next() && (*Events_)->Timestamp <= Frontier_) {
+ return true;
+ }
+
+ for (;;) {
+ if (!LoadMoreEvents()) {
+ return false;
+ }
+
+ if (TEventStreamer::Avail()) {
+ return true;
+ }
+ }
+}
+
+/*
+Two parameters are used in the function:
+Frontier - the moment of time up to which inclusively all the log events made their way
+ into the buffer (and might have been already extracted out of it).
+Horizon - the moment of time, that equals to Frontier + MAX_REQUEST_DURATION.
+In order to get all the log events up to the Frontier inclusively,
+ frames need to be read until "end time" of the current frame exceeds the Horizon.
+*/
+bool TEventStreamer::LoadMoreEvents() {
+ if (!Frames_.Avail()) {
+ return false;
+ }
+
+ const TFrame& fr1 = *Frames_;
+ const ui64 maxRequestDuration = (StrongOrdering_ ? MAX_REQUEST_DURATION : 0);
+
+ if (fr1.EndTime() <= Frontier_ + maxRequestDuration) {
+ ythrow yexception() << "Wrong frame stream state";
+ }
+
+ if (Frontier_ >= End_) {
+ return false;
+ }
+
+ const ui64 old_frontier = Frontier_;
+ Frontier_ = fr1.EndTime();
+
+ {
+ Y_DEFER {
+ Events_.Reorder(StrongOrdering_);
+ };
+
+ for (; Frames_.Avail(); Frames_.Next()) {
+ const TFrame& fr2 = *Frames_;
+
+ // Frames need to start later than the Frontier.
+ if (StrongOrdering_ && fr2.StartTime() <= old_frontier) {
+ Cdbg << "Invalid frame encountered" << Endl;
+ continue;
+ }
+
+ if (fr2.EndTime() > MaxEndTimestamp_) {
+ MaxEndTimestamp_ = fr2.EndTime();
+ }
+
+ if (fr2.EndTime() > Frontier_ + maxRequestDuration && !LosslessStrongOrdering_) {
+ return true;
+ }
+
+ // Checking for the frame to be within the main time borders.
+ if (fr2.EndTime() >= Start_ && fr2.StartTime() <= End_) {
+ TransferEvents(fr2);
+ }
+ }
+ }
+
+ Frontier_ = MaxEndTimestamp_;
+
+ return true;
+}
+
+void TEventStreamer::TransferEvents(const TFrame& fr) {
+ Events_.SetCheckpoint();
+
+ try {
+ for (auto it = fr.GetIterator(EventFilter_); it.Avail(); it.Next()) {
+ TConstEventPtr ev = *it;
+
+ if (ev->Timestamp > fr.EndTime() || ev->Timestamp < fr.StartTime()) {
+ ythrow TInvalidEventTimestamps() << "Event timestamp out of frame range";
+ }
+
+ if (ev->Timestamp >= Start_ && ev->Timestamp <= End_) {
+ Events_.Append(ev, StrongOrdering_);
+ }
+ }
+ } catch (const TInvalidEventTimestamps& err) {
+ Events_.Rollback();
+ Cdbg << "EventsTransfer error: InvalidEventTimestamps: " << err.what() << Endl;
+ } catch (const TFrameLoadError& err) {
+ Events_.Rollback();
+ Cdbg << "EventsTransfer error: " << err.what() << Endl;
+ } catch (const TEventDecoderError& err) {
+ Events_.Rollback();
+ Cdbg << "EventsTransfer error: EventDecoder error: " << err.what() << Endl;
+ } catch (const TZLibDecompressorError& err) {
+ Events_.Rollback();
+ Cdbg << "EventsTransfer error: ZLibDecompressor error: " << err.what() << Endl;
+ } catch (...) {
+ Events_.Rollback();
+ throw;
+ }
+}
+
+void TEventStreamer::TEventBuffer::SetCheckpoint() {
+ BufLen_ = Buffer_.size();
+}
+
+void TEventStreamer::TEventBuffer::Rollback() {
+ Buffer_.resize(BufLen_);
+}
+
+void TEventStreamer::TEventBuffer::Reorder(bool strongOrdering) {
+ SetCheckpoint();
+
+ std::reverse(Buffer_.begin(), Buffer_.end());
+
+ if (strongOrdering) {
+ StableSort(Buffer_.begin(), Buffer_.end(), [&](const auto& a, const auto& b) {
+ return (a->Timestamp > b->Timestamp) ||
+ ((a->Timestamp == b->Timestamp) && !a->Class && b->Class);
+ });
+ }
+}
+
+void TEventStreamer::TEventBuffer::Append(TConstEventPtr ev, bool strongOrdering) {
+ // Events in buffer output must be in an ascending order.
+ Y_ENSURE(!strongOrdering || ev->Timestamp >= LastTimestamp_, "Trying to append out-of-order event");
+
+ Buffer_.push_back(std::move(ev));
+}
+
+bool TEventStreamer::TEventBuffer::Avail() const {
+ return !Buffer_.empty();
+}
+
+TConstEventPtr TEventStreamer::TEventBuffer::operator*() const {
+ Y_ENSURE(!Buffer_.empty(), "Event buffer is empty");
+
+ return Buffer_.back();
+}
+
+bool TEventStreamer::TEventBuffer::Next() {
+ if (!Buffer_.empty()) {
+ LastTimestamp_ = Buffer_.back()->Timestamp;
+ Buffer_.pop_back();
+ return !Buffer_.empty();
+ } else {
+ return false;
+ }
+}
diff --git a/library/cpp/eventlog/logparser.h b/library/cpp/eventlog/logparser.h
new file mode 100644
index 0000000000..f819e72589
--- /dev/null
+++ b/library/cpp/eventlog/logparser.h
@@ -0,0 +1,343 @@
+#pragma once
+
+#include <util/generic/ptr.h>
+#include <util/generic/yexception.h>
+#include <util/generic/vector.h>
+#include <util/generic/set.h>
+#include <util/generic/maybe.h>
+#include <util/memory/blob.h>
+#include <util/stream/length.h>
+#include <util/stream/mem.h>
+
+#include "eventlog_int.h"
+#include "eventlog.h"
+#include "common.h"
+
+class IInputStream;
+
+static const ui64 MAX_REQUEST_DURATION = 60'000'000;
+static const ui64 MIN_START_TIME = MAX_REQUEST_DURATION;
+static const ui64 MAX_END_TIME = ((ui64)-1) - MAX_REQUEST_DURATION;
+
+class TEventFilter: public TSet<TEventClass>, public TSimpleRefCount<TEventFilter> {
+public:
+ TEventFilter(bool enableEvents)
+ : Enable_(enableEvents)
+ {
+ }
+
+ void AddEventClass(TEventClass cls) {
+ insert(cls);
+ }
+
+ bool EventAllowed(TEventClass cls) const {
+ bool found = (find(cls) != end());
+
+ return Enable_ == found;
+ }
+
+private:
+ bool Enable_;
+};
+
+using TEventStream = TPacketInputStream<TConstEventPtr>;
+
+struct TFrameHeader {
+ // Reads header from the stream. The caller must make sure that the
+ // sync data is present just befor the stream position.
+ explicit TFrameHeader(IInputStream& in);
+
+ ui64 StartTime() const {
+ return Framehdr.StartTimestamp;
+ }
+
+ ui64 EndTime() const {
+ return Framehdr.EndTimestamp;
+ }
+
+ ui32 FrameId() const {
+ return Basehdr.FrameId;
+ }
+
+ ui64 Duration() const {
+ return EndTime() - StartTime();
+ }
+
+ TEventLogFormat ContentFormat() const {
+ return Basehdr.Format & 0xffffff;
+ }
+
+ TEventLogFormat LogFormat() const {
+ return Basehdr.Format >> 24;
+ }
+
+ ui64 FrameLength() const {
+ return Basehdr.Length - sizeof(TCompressedFrameHeader2);
+ }
+
+ // Length including the header
+ ui64 FullLength() const {
+ return sizeof(*this) + FrameLength();
+ }
+
+ TCompressedFrameBaseHeader Basehdr;
+ TCompressedFrameHeader2 Framehdr;
+};
+
+struct TFrameLoadError: public yexception {
+ explicit TFrameLoadError(size_t skipAfter)
+ : SkipAfter(skipAfter)
+ {}
+
+ size_t SkipAfter;
+};
+
+class TFrame : public TFrameHeader {
+public:
+ // Reads the frame after the header has been read.
+ TFrame(IInputStream& in, TFrameHeader header, IEventFactory*);
+
+ TString GetRawFrame() const;
+ TString GetCompressedFrame() const;
+
+ ui64 Address() const { return Address_; }
+
+private:
+ const TConstEventPtr& GetEvent(size_t index) const {
+ return EventsCache_[index];
+ }
+
+ void ClearEventsCache() const;
+
+ THolder<TLengthLimitedInput> Limiter_;
+ mutable TVector<TConstEventPtr> EventsCache_;
+
+ IEventFactory* Fac_;
+ ui64 Address_;
+
+ friend class TFrameDecoder;
+ friend class TFrameStreamer;
+
+private:
+ class TIterator: TEventStream {
+ public:
+ TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter);
+ ~TIterator() override = default;
+
+ bool Avail() const override {
+ return Index_ < Size_;
+ }
+
+ TConstEventPtr operator*() const override;
+ bool Next() override;
+
+ private:
+ void SkipToValidEvent();
+
+ const TFrame& Frame_;
+ size_t Size_;
+ TIntrusiveConstPtr<TEventFilter> Filter_;
+ size_t Index_;
+ };
+
+public:
+ TFrame::TIterator GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter = nullptr) const;
+};
+
+// If `in` is derived from TCountingInput, Frame's address will
+// be set accorting to the in->Counter(). Otherwise it will be zeroO
+TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory*);
+
+using TFrameStream = TPacketInputStream<const TFrame&>;
+
+class IFrameFilter: public TSimpleRefCount<IFrameFilter> {
+public:
+ IFrameFilter() {
+ }
+
+ virtual ~IFrameFilter() = default;
+
+ virtual bool FrameAllowed(const TFrame& frame) const = 0;
+};
+
+using IFrameFilterRef = TIntrusivePtr<IFrameFilter>;
+
+class TDurationFrameFilter: public IFrameFilter {
+public:
+ TDurationFrameFilter(ui64 minFrameDuration, ui64 maxFrameDuration = Max<ui64>())
+ : MinDuration_(minFrameDuration)
+ , MaxDuration_(maxFrameDuration)
+ {
+ }
+
+ bool FrameAllowed(const TFrame& frame) const override {
+ return frame.Duration() >= MinDuration_ && frame.Duration() <= MaxDuration_;
+ }
+
+private:
+ const ui64 MinDuration_;
+ const ui64 MaxDuration_;
+};
+
+class TFrameIdFrameFilter: public IFrameFilter {
+public:
+ TFrameIdFrameFilter(ui32 frameId)
+ : FrameId_(frameId)
+ {
+ }
+
+ bool FrameAllowed(const TFrame& frame) const override {
+ return frame.FrameId() == FrameId_;
+ }
+
+private:
+ const ui32 FrameId_;
+};
+
+class TContainsEventFrameFilter: public IFrameFilter {
+public:
+ TContainsEventFrameFilter(const TString& args, const IEventFactory* fac);
+
+ bool FrameAllowed(const TFrame& frame) const override;
+
+private:
+ struct TMatchGroup {
+ TEventClass EventID;
+ TString FieldName;
+ TString ValueToMatch;
+ };
+
+ TVector<TMatchGroup> MatchGroups;
+};
+
+void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet);
+
+TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet);
+
+TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection);
+
+class TFrameStreamer: public TFrameStream {
+public:
+ TFrameStreamer(IInputStream&, IEventFactory* fac, IFrameFilterRef ff = nullptr);
+ TFrameStreamer(
+ const TString& fileName,
+ ui64 startTime,
+ ui64 endTime,
+ ui64 maxRequestDuration,
+ IEventFactory* fac,
+ IFrameFilterRef ff = nullptr);
+ ~TFrameStreamer() override;
+
+ bool Avail() const override;
+ const TFrame& operator*() const override;
+ bool Next() override;
+
+private:
+ bool DoNext();
+ bool AllowedTimeRange(const TFrame& frame) const;
+
+ bool AllowedFrame(const TFrame& frame) const {
+ return AllowedTimeRange(frame) && (!FrameFilter_ || FrameFilter_->FrameAllowed(frame));
+ }
+
+ void SkipToAllowedFrame() {
+ if (Frame_) {
+ while (!AllowedFrame(*Frame_) && DoNext()) {
+ //do nothing
+ }
+ }
+ }
+
+ TBlob File_;
+ TMemoryInput MemoryIn_;
+ TCountingInput In_;
+ THolder<IInputStream> Stream_;
+ ui64 StartTime_ = 0;
+ ui64 EndTime_ = 0;
+ ui64 CutoffTime_ = 0;
+ TMaybe<TFrame> Frame_;
+ IFrameFilterRef FrameFilter_;
+ IEventFactory* EventFactory_;
+};
+
+class TFrameDecoder: TEventStream {
+public:
+ TFrameDecoder(const TFrame&, const TEventFilter* const filter, bool strict = false, bool withRawData = false);
+ ~TFrameDecoder() override;
+
+ bool Avail() const override;
+
+ TConstEventPtr operator*() const override;
+ bool Next() override;
+
+ const TStringBuf GetRawEvent() const;
+
+private:
+ TFrameDecoder(const TFrameDecoder&);
+ void operator=(const TFrameDecoder&);
+
+ inline bool HaveData() const {
+ return Event_ != nullptr;
+ }
+
+ void Decode();
+
+private:
+ const TFrame& Frame_;
+ THolder<IInputStream> Decompressor_;
+ THolder<TLengthLimitedInput> Limiter_;
+ TEventPtr Event_;
+ const TEventFilter* const Flt_;
+ IEventFactory* Fac_;
+ THolder<TEvent> EndOfFrame_;
+ bool Strict_;
+ TBuffer UncompressedData_;
+ TStringBuf RawEventData_;
+ bool WithRawData_;
+};
+
+class TEventStreamer: public TEventStream {
+public:
+ TEventStreamer(TFrameStream&, ui64 start, ui64 end, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering = false);
+ ~TEventStreamer() override;
+
+ bool Avail() const override;
+ TConstEventPtr operator*() const override;
+ bool Next() override;
+
+private:
+ class TEventBuffer: public TEventStream {
+ public:
+ void SetCheckpoint();
+ void Rollback();
+ void Reorder(bool strongOrdering);
+ void Append(TConstEventPtr event, bool strongOrdering);
+
+ bool Avail() const override;
+ TConstEventPtr operator*() const override;
+ bool Next() override;
+
+ private:
+ TVector<TConstEventPtr> Buffer_;
+ size_t BufLen_ = 0;
+ ui64 LastTimestamp_ = 0;
+ };
+
+private:
+ struct TInvalidEventTimestamps: public yexception {
+ };
+
+ bool LoadMoreEvents();
+ void TransferEvents(const TFrame&);
+
+private:
+ TFrameStream& Frames_;
+ TEventBuffer Events_;
+
+ ui64 Start_, End_;
+ ui64 MaxEndTimestamp_;
+ ui64 Frontier_;
+ bool StrongOrdering_;
+ bool LosslessStrongOrdering_;
+ TIntrusivePtr<TEventFilter> EventFilter_;
+};
diff --git a/library/cpp/eventlog/proto/events_extension.proto b/library/cpp/eventlog/proto/events_extension.proto
new file mode 100644
index 0000000000..7db1af3a59
--- /dev/null
+++ b/library/cpp/eventlog/proto/events_extension.proto
@@ -0,0 +1,22 @@
+import "google/protobuf/descriptor.proto";
+
+option go_package = "github.com/ydb-platform/ydb/library/cpp/eventlog/proto;extensions";
+option java_package = "NEventLogEventsExtension";
+
+extend google.protobuf.MessageOptions {
+ optional uint32 message_id = 50001;
+ optional string realm_name = 50002;
+}
+
+message Repr {
+ enum ReprType {
+ none = 0;
+ as_bytes = 1; // Only for primitive types
+ as_hex = 2; // Only for primitive types
+ as_base64 = 3; // Only for 'string' and 'bytes' fields
+ };
+}
+
+extend google.protobuf.FieldOptions {
+ optional Repr.ReprType repr = 55003 [default = none];
+}
diff --git a/library/cpp/eventlog/proto/internal.proto b/library/cpp/eventlog/proto/internal.proto
new file mode 100644
index 0000000000..8070a09685
--- /dev/null
+++ b/library/cpp/eventlog/proto/internal.proto
@@ -0,0 +1,9 @@
+option go_package = "github.com/ydb-platform/ydb/library/cpp/eventlog/proto;extensions";
+
+package NEventLogInternal;
+
+message TUnknownEvent {
+};
+
+message TEndOfFrameEvent {
+};
diff --git a/library/cpp/eventlog/proto/ya.make b/library/cpp/eventlog/proto/ya.make
new file mode 100644
index 0000000000..fbf5a6c619
--- /dev/null
+++ b/library/cpp/eventlog/proto/ya.make
@@ -0,0 +1,12 @@
+PROTO_LIBRARY()
+
+IF (NOT PY_PROTOS_FOR)
+ INCLUDE_TAGS(GO_PROTO)
+ENDIF()
+
+SRCS(
+ events_extension.proto
+ internal.proto
+)
+
+END()
diff --git a/library/cpp/eventlog/threaded_eventlog.cpp b/library/cpp/eventlog/threaded_eventlog.cpp
new file mode 100644
index 0000000000..67839063fb
--- /dev/null
+++ b/library/cpp/eventlog/threaded_eventlog.cpp
@@ -0,0 +1 @@
+#include "threaded_eventlog.h"
diff --git a/library/cpp/eventlog/threaded_eventlog.h b/library/cpp/eventlog/threaded_eventlog.h
new file mode 100644
index 0000000000..52382b856d
--- /dev/null
+++ b/library/cpp/eventlog/threaded_eventlog.h
@@ -0,0 +1,154 @@
+#pragma once
+
+#include "eventlog.h"
+
+#include <util/generic/string.h>
+#include <util/thread/pool.h>
+
+class TThreadedEventLog: public TEventLogWithSlave {
+public:
+ class TWrapper;
+ using TOverflowCallback = std::function<void(TWrapper& wrapper)>;
+
+ enum class EDegradationResult {
+ ShouldWrite,
+ ShouldDrop,
+ };
+ using TDegradationCallback = std::function<EDegradationResult(float fillFactor)>;
+
+public:
+ TThreadedEventLog(
+ IEventLog& parentLog,
+ size_t threadCount,
+ size_t queueSize,
+ TOverflowCallback cb,
+ TDegradationCallback degradationCallback = {})
+ : TEventLogWithSlave(parentLog)
+ , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog"))
+ , ThreadCount(threadCount)
+ , QueueSize(queueSize)
+ , OverflowCallback(std::move(cb))
+ , DegradationCallback(std::move(degradationCallback))
+ {
+ Init();
+ }
+
+ TThreadedEventLog(
+ const TEventLogPtr& parentLog,
+ size_t threadCount,
+ size_t queueSize,
+ TOverflowCallback cb,
+ TDegradationCallback degradationCallback = {})
+ : TEventLogWithSlave(parentLog)
+ , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog"))
+ , ThreadCount(threadCount)
+ , QueueSize(queueSize)
+ , OverflowCallback(std::move(cb))
+ , DegradationCallback(std::move(degradationCallback))
+ {
+ Init();
+ }
+
+ TThreadedEventLog(IEventLog& parentLog)
+ : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback())
+ {
+ }
+
+ TThreadedEventLog(const TEventLogPtr& parentLog)
+ : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback())
+ {
+ }
+
+ ~TThreadedEventLog() override {
+ try {
+ LogSaver.Stop();
+ } catch (...) {
+ }
+ }
+
+ void ReopenLog() override {
+ TEventLogWithSlave::ReopenLog();
+ }
+
+ void CloseLog() override {
+ LogSaver.Stop();
+ TEventLogWithSlave::CloseLog();
+ }
+
+ void WriteFrame(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ TWriteFrameCallbackPtr writeFrameCallback = nullptr,
+ TLogRecord::TMetaFlags metaFlags = {}) override {
+ float fillFactor = 0.0f;
+ if (Y_LIKELY(LogSaver.GetMaxQueueSize() > 0)) {
+ fillFactor = static_cast<float>(LogSaver.Size()) / LogSaver.GetMaxQueueSize();
+ }
+
+ EDegradationResult status = EDegradationResult::ShouldWrite;
+ if (DegradationCallback) {
+ status = DegradationCallback(fillFactor);
+ }
+ if (Y_UNLIKELY(status == EDegradationResult::ShouldDrop)) {
+ return;
+ }
+
+ THolder<TWrapper> wrapped;
+ wrapped.Reset(new TWrapper(buffer, startTimestamp, endTimestamp, Slave(), writeFrameCallback, std::move(metaFlags)));
+
+ if (LogSaver.Add(wrapped.Get())) {
+ Y_UNUSED(wrapped.Release());
+ } else if (OverflowCallback) {
+ OverflowCallback(*wrapped);
+ }
+ }
+
+private:
+ void Init() {
+ LogSaver.Start(ThreadCount, QueueSize);
+ }
+
+public:
+ class TWrapper: public IObjectInQueue {
+ public:
+ TWrapper(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ IEventLog& slave,
+ TWriteFrameCallbackPtr writeFrameCallback = nullptr,
+ TLogRecord::TMetaFlags metaFlags = {})
+ : StartTimestamp(startTimestamp)
+ , EndTimestamp(endTimestamp)
+ , Slave(&slave)
+ , WriteFrameCallback(writeFrameCallback)
+ , MetaFlags(std::move(metaFlags))
+ {
+ Buffer.Swap(buffer);
+ }
+
+ void Process(void*) override {
+ THolder<TWrapper> holder(this);
+
+ WriteFrame();
+ }
+
+ void WriteFrame() {
+ Slave->WriteFrame(Buffer, StartTimestamp, EndTimestamp, WriteFrameCallback, std::move(MetaFlags));
+ }
+
+ private:
+ TBuffer Buffer;
+ TEventTimestamp StartTimestamp;
+ TEventTimestamp EndTimestamp;
+ IEventLog* Slave;
+ TWriteFrameCallbackPtr WriteFrameCallback;
+ TLogRecord::TMetaFlags MetaFlags;
+ };
+
+private:
+ TThreadPool LogSaver;
+ const size_t ThreadCount;
+ const size_t QueueSize;
+ const TOverflowCallback OverflowCallback;
+ const TDegradationCallback DegradationCallback;
+};
diff --git a/library/cpp/eventlog/ya.make b/library/cpp/eventlog/ya.make
new file mode 100644
index 0000000000..fbbc1eff00
--- /dev/null
+++ b/library/cpp/eventlog/ya.make
@@ -0,0 +1,29 @@
+LIBRARY()
+
+PEERDIR(
+ library/cpp/blockcodecs
+ library/cpp/eventlog/proto
+ library/cpp/json
+ library/cpp/logger
+ library/cpp/protobuf/json
+ library/cpp/streams/growing_file_input
+ library/cpp/string_utils/base64
+ contrib/libs/re2
+)
+
+SRCS(
+ common.h
+ evdecoder.cpp
+ event_field_output.cpp
+ event_field_printer.cpp
+ eventlog.cpp
+ eventlog_int.cpp
+ iterator.cpp
+ logparser.cpp
+ threaded_eventlog.cpp
+)
+
+GENERATE_ENUM_SERIALIZATION(eventlog.h)
+GENERATE_ENUM_SERIALIZATION(eventlog_int.h)
+
+END()