aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/eventlog
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
committerqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
commit22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch)
treebffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/eventlog
parent332b99e2173f0425444abb759eebcb2fafaa9209 (diff)
downloadydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz
validate canons without yatest_common
Diffstat (limited to 'library/cpp/eventlog')
-rw-r--r--library/cpp/eventlog/common.h10
-rw-r--r--library/cpp/eventlog/dumper/common.cpp81
-rw-r--r--library/cpp/eventlog/dumper/common.h8
-rw-r--r--library/cpp/eventlog/dumper/evlogdump.cpp414
-rw-r--r--library/cpp/eventlog/dumper/evlogdump.h11
-rw-r--r--library/cpp/eventlog/dumper/tunable_event_processor.cpp1
-rw-r--r--library/cpp/eventlog/dumper/tunable_event_processor.h19
-rw-r--r--library/cpp/eventlog/evdecoder.cpp112
-rw-r--r--library/cpp/eventlog/evdecoder.h16
-rw-r--r--library/cpp/eventlog/event_field_output.cpp65
-rw-r--r--library/cpp/eventlog/event_field_output.h29
-rw-r--r--library/cpp/eventlog/event_field_printer.cpp27
-rw-r--r--library/cpp/eventlog/event_field_printer.h38
-rw-r--r--library/cpp/eventlog/eventlog.cpp554
-rw-r--r--library/cpp/eventlog/eventlog.h623
-rw-r--r--library/cpp/eventlog/eventlog_int.cpp12
-rw-r--r--library/cpp/eventlog/eventlog_int.h72
-rw-r--r--library/cpp/eventlog/events_extension.h161
-rw-r--r--library/cpp/eventlog/iterator.cpp88
-rw-r--r--library/cpp/eventlog/iterator.h51
-rw-r--r--library/cpp/eventlog/logparser.cpp814
-rw-r--r--library/cpp/eventlog/logparser.h343
-rw-r--r--library/cpp/eventlog/proto/events_extension.proto22
-rw-r--r--library/cpp/eventlog/proto/internal.proto9
-rw-r--r--library/cpp/eventlog/threaded_eventlog.cpp1
-rw-r--r--library/cpp/eventlog/threaded_eventlog.h154
26 files changed, 3735 insertions, 0 deletions
diff --git a/library/cpp/eventlog/common.h b/library/cpp/eventlog/common.h
new file mode 100644
index 00000000000..75c512c13ef
--- /dev/null
+++ b/library/cpp/eventlog/common.h
@@ -0,0 +1,10 @@
+#pragma once
+
+template <class T>
+class TPacketInputStream {
+public:
+ virtual bool Avail() const = 0;
+ virtual T operator*() const = 0;
+ virtual bool Next() = 0;
+ virtual ~TPacketInputStream() = default;
+};
diff --git a/library/cpp/eventlog/dumper/common.cpp b/library/cpp/eventlog/dumper/common.cpp
new file mode 100644
index 00000000000..eebe3b6ee33
--- /dev/null
+++ b/library/cpp/eventlog/dumper/common.cpp
@@ -0,0 +1,81 @@
+#include "common.h"
+
+#include <contrib/libs/re2/re2/re2.h>
+
+#include <util/datetime/base.h>
+#include <util/datetime/parser.h>
+#include <util/string/cast.h>
+
+static time_t RecentTime(int h, int m, int s) {
+ time_t now = time(nullptr);
+ tm tmTmp;
+ localtime_r(&now, &tmTmp);
+ tmTmp.tm_hour = h;
+ tmTmp.tm_min = m;
+ tmTmp.tm_sec = s;
+ time_t today = mktime(&tmTmp);
+ tmTmp.tm_mday -= 1;
+ time_t yesterday = mktime(&tmTmp);
+ return today <= now ? today : yesterday;
+}
+
+static bool ParseRecentTime(const TString& str, time_t& result) {
+ RE2 RecentTimePattern("(\\d{1,2}):(\\d{2})(?::(\\d{2}))?");
+ re2::StringPiece hStr, mStr, sStr;
+ if (!RE2::FullMatch({str.data(), str.size()}, RecentTimePattern, &hStr, &mStr, &sStr)) {
+ return false;
+ }
+ int h = FromString<int>(hStr.data(), hStr.length());
+ int m = FromString<int>(mStr.data(), mStr.length());
+ int s = FromString<int>(sStr.data(), sStr.length(), 0);
+ if (h > 23 || m > 59 || s > 59) {
+ return false;
+ }
+ result = RecentTime(h, m, s);
+ return true;
+}
+
+namespace {
+ class TDefaultOffset8601Parser: public TIso8601DateTimeParser {
+ public:
+ TDefaultOffset8601Parser(int offsetHours) {
+ DateTimeFields.ZoneOffsetMinutes = offsetHours * 60;
+ }
+ };
+} // namespace
+
+static bool ParseISO8601DateTimeWithDefaultOffset(TStringBuf str, int offsetHours, time_t& result) {
+ TDefaultOffset8601Parser parser{offsetHours};
+
+ if (!parser.ParsePart(str.data(), str.size())) {
+ return false;
+ }
+
+ const TInstant instant = parser.GetResult(TInstant::Max());
+ if (instant == TInstant::Max()) {
+ return false;
+ }
+
+ result = instant.TimeT();
+ return true;
+}
+
+ui64 ParseTime(const TString& str, ui64 defValue, int offset) {
+ if (!str) {
+ return defValue;
+ }
+
+ time_t utcTime;
+
+ if (ParseISO8601DateTimeWithDefaultOffset(str, offset, utcTime)) {
+ return (ui64)utcTime * 1000000;
+ }
+
+ if (ParseRecentTime(str, utcTime)) {
+ return (ui64)utcTime * 1000000;
+ }
+
+ // if conversion fails, TryFromString leaves defValue unchanged
+ TryFromString<ui64>(str, defValue);
+ return defValue;
+}
diff --git a/library/cpp/eventlog/dumper/common.h b/library/cpp/eventlog/dumper/common.h
new file mode 100644
index 00000000000..839fc7b221c
--- /dev/null
+++ b/library/cpp/eventlog/dumper/common.h
@@ -0,0 +1,8 @@
+#pragma once
+
+#include <util/generic/string.h>
+
+/**
+ * Returns microseconds since the epoch.
+ */
+ui64 ParseTime(const TString& str, ui64 defValue, int offset /* timezone offset, default MSK */ = 3);
diff --git a/library/cpp/eventlog/dumper/evlogdump.cpp b/library/cpp/eventlog/dumper/evlogdump.cpp
new file mode 100644
index 00000000000..003b412265b
--- /dev/null
+++ b/library/cpp/eventlog/dumper/evlogdump.cpp
@@ -0,0 +1,414 @@
+#include "common.h"
+#include "evlogdump.h"
+
+#include <library/cpp/eventlog/evdecoder.h>
+#include <library/cpp/eventlog/iterator.h>
+#include <library/cpp/eventlog/logparser.h>
+
+#include <library/cpp/getopt/last_getopt.h>
+
+#ifndef NO_SVN_DEPEND
+#include <library/cpp/svnversion/svnversion.h>
+#endif
+
+#include <util/datetime/base.h>
+#include <util/generic/ptr.h>
+#include <util/stream/file.h>
+#include <util/string/cast.h>
+#include <util/string/type.h>
+
+using namespace NEventLog;
+namespace nlg = NLastGetopt;
+
+IFrameFilterRef CreateDurationFrameFilter(const TString& params) {
+ TStringBuf d(params);
+ TStringBuf l, r;
+ d.Split(':', l, r);
+ TDuration min = l.empty() ? TDuration::Zero() : FromString<TDuration>(l);
+ TDuration max = r.empty() ? TDuration::Max() : FromString<TDuration>(r);
+
+ if (min > TDuration::Zero() || max < TDuration::Max()) {
+ return new TDurationFrameFilter(min.GetValue(), max.GetValue());
+ }
+
+ return nullptr;
+}
+
+IFrameFilterRef CreateFrameIdFilter(const ui32 frameId) {
+ return new TFrameIdFrameFilter(frameId);
+}
+
+IFrameFilterRef CreateContainsEventFrameFilter(const TString& args, const IEventFactory* factory) {
+ return new TContainsEventFrameFilter(args, factory);
+}
+
+void ListAllEvents(IEventFactory* factory, IOutputStream* out) {
+ Y_ENSURE(out);
+
+ for (TEventClass eventClass = factory->EventClassBegin(); eventClass < factory->EventClassEnd(); eventClass++) {
+ THolder<TEvent> event(factory->CreateLogEvent(eventClass));
+
+ const TStringBuf& name = event->GetName();
+ if (name) {
+ (*out) << name << "\n";
+ }
+ }
+}
+
+void ListEventFieldsByEventId(const TEventClass eventClass, IEventFactory* factory, IOutputStream* out) {
+ THolder<TEvent> event(factory->CreateLogEvent(eventClass));
+ const NProtoBuf::Message* message = event->GetProto();
+ const google::protobuf::Descriptor* descriptor = message->GetDescriptor();
+
+ (*out) << descriptor->DebugString();
+}
+
+void ListEventFields(const TString& eventName, IEventFactory* factory, IOutputStream* out) {
+ Y_ENSURE(out);
+
+ TEventClass eventClass;
+
+ try {
+ eventClass = factory->ClassByName(eventName);
+ } catch (yexception& e) {
+ if (!TryFromString<TEventClass>(eventName, eventClass)) {
+ (*out) << "Cannot dervie event class from event name: " << eventName << Endl;
+ return;
+ }
+ }
+
+ ListEventFieldsByEventId(eventClass, factory, out);
+}
+
+int PrintHelpEvents(const TString& helpEvents, IEventFactory* factory) {
+ if (helpEvents == "all") {
+ ListAllEvents(factory, &Cout);
+ } else if (IsNumber(helpEvents)) {
+ ListEventFieldsByEventId(IntFromString<ui32, 10>(helpEvents), factory, &Cout);
+ } else {
+ ListEventFields(helpEvents, factory, &Cout);
+ }
+
+ return 0;
+}
+
+int IterateEventLog(IEventFactory* fac, IEventProcessor* proc, int argc, const char** argv) {
+ class TProxy: public ITunableEventProcessor {
+ public:
+ TProxy(IEventProcessor* proc)
+ : Processor_(proc)
+ {
+ }
+
+ void AddOptions(NLastGetopt::TOpts&) override {
+ }
+
+ void SetOptions(const TEvent::TOutputOptions& options) override {
+ Processor_->SetOptions(options);
+ }
+
+ void ProcessEvent(const TEvent* ev) override {
+ Processor_->ProcessEvent(ev);
+ }
+
+ bool CheckedProcessEvent(const TEvent* ev) override {
+ return Processor_->CheckedProcessEvent(ev);
+ }
+
+ private:
+ IEventProcessor* Processor_ = nullptr;
+ };
+
+ TProxy proxy(proc);
+ return IterateEventLog(fac, &proxy, argc, argv);
+}
+
+int IterateEventLog(IEventFactory* fac, ITunableEventProcessor* proc, int argc, const char** argv) {
+ nlg::TOpts opts;
+ opts.AddHelpOption('?');
+ opts.AddHelpOption('h');
+ opts.SetTitle(
+ "Search EventLog dumper\n\n"
+ "Examples:\n"
+ "evlogdump -s 1228484839332219 -e 1228484840332219 event_log\n"
+ "evlogdump -s 2008-12-12T12:00:00+03 -e 2008-12-12T12:05:00+03 event_log\n"
+ "evlogdump -s 12:40 -e 12:55 event_log\n"
+ "evlogdump -i 301,302,303 event_log\n"
+ "evlogdump -i SubSourceResponse,SubSourceOk,SubSourceError event_log\n"
+ "evlogdump -d 40ms:60ms event_log\n"
+ "evlogdump -c ReqId:ReqId:1563185655856304-30407004790538173600035-vla1-0671-p1\n"
+ "\n"
+ "Fine manual: https://nda.ya.ru/3Tpo3L\n"
+ "\n"
+ "If in trouble using this (or bugs encountered),\n"
+ "please don't hesitate to ask middlesearch/basesearch dev team\n"
+ "(vmordovin@, mvel@, etc)\n");
+
+ TString start;
+ opts.AddLongOption(
+ 's', "start-time",
+ "Start time (Unix time in microseconds, ISO8601, or HH:MM[:SS] in the last 24 hours).\n"
+ "Long frames can produce inaccurate cropping and event loss in dump (see SEARCH-5576)")
+ .Optional()
+ .StoreResult(&start);
+
+ TString end;
+ opts.AddLongOption(
+ 'e', "end-time",
+ "End time (Unix time in microseconds, ISO8601, or HH:MM[:SS] in the last 24 hours)")
+ .Optional()
+ .StoreResult(&end);
+
+ TOptions o;
+
+ opts.AddLongOption(
+ 'm', "max-request-duration",
+ "Max duration of a request in the log, in us")
+ .Optional()
+ .StoreResult(&o.MaxRequestDuration);
+
+ bool oneFrame = false;
+ opts.AddLongOption(
+ 'f', "one-frame",
+ "Treat input file as single frame dump (e.g. from gdb)")
+ .NoArgument()
+ .Optional()
+ .StoreValue(&oneFrame, true);
+
+ opts.AddLongOption(
+ 'v', "version",
+ "Print program version")
+ .NoArgument()
+ .Optional()
+ .Handler(&nlg::PrintVersionAndExit);
+
+ TString includeEventList;
+ opts.AddLongOption(
+ 'i', "event-list",
+ "Comma-separated list of included event class IDs or names "
+ "(e.g. 265,287 or MainTaskError,ContextCreated)")
+ .Optional()
+ .StoreResult(&includeEventList)
+ .StoreValue(&o.EnableEvents, true);
+
+ TString durationFilterStr;
+ opts.AddLongOption(
+ 'd', "duration",
+ "DurationMin[:DurationMax] (values must contain a unit, valid examples are: 50us, 50ms, 50s)\n"
+ "(show frames with duration greater or equal DurationMin [and less or equal than DurationMax])")
+ .Optional()
+ .StoreResult(&durationFilterStr);
+
+ const ui32 INVALID_FRAME_ID = ui32(-1);
+ ui32 frameIdFilter = INVALID_FRAME_ID;
+ opts.AddLongOption(
+ 'n', "frame-id",
+ "Filter frame with given id\n")
+ .Optional()
+ .StoreResult(&frameIdFilter);
+
+ TString excludeEventList;
+ opts.AddLongOption(
+ 'x', "exclude-list",
+ "Comma-separated list of excluded event class IDs or names (see also --event-list option)")
+ .Optional()
+ .StoreResult(&excludeEventList)
+ .StoreValue(&o.EnableEvents, false);
+
+ opts.AddLongOption(
+ 'o', "frame-order",
+ "Order events by time only inside a frame (faster)")
+ .NoArgument()
+ .Optional()
+ .StoreValue(&o.ForceWeakOrdering, true);
+
+ opts.AddLongOption(
+ 'O', "global-order",
+ "Globally but lossy (may lose some frames) order events by time only (slower)")
+ .NoArgument()
+ .Optional()
+ .StoreValue(&o.ForceStrongOrdering, true);
+
+ opts.AddLongOption(
+ "lossless-strong-order",
+ "Globally order events by time only (super slow and memory heavy)")
+ .NoArgument()
+ .Optional()
+ .StoreValue(&o.ForceLosslessStrongOrdering, true)
+ .StoreValue(&o.ForceStrongOrdering, true);
+
+ opts.AddLongOption(
+ 'S', "stream",
+ "Force stream mode (for log files with invalid ending)")
+ .NoArgument()
+ .Optional()
+ .StoreValue(&o.ForceStreamMode, true);
+
+ opts.AddLongOption(
+ 't', "tail",
+ "Open file like `tail -f` does")
+ .NoArgument()
+ .Optional()
+ .StoreValue(&o.TailFMode, true);
+
+ bool unescaped = false;
+ opts.AddLongOption(
+ 'u', "unescaped",
+ "Don't escape \\t, \\n chars in message fields")
+ .NoArgument()
+ .StoreValue(&unescaped, true);
+
+ bool json = false;
+ opts.AddLongOption(
+ 'j', "json",
+ "Print messages as JSON values")
+ .NoArgument()
+ .StoreValue(&json, true);
+
+ TEvent::TOutputOptions outputOptions;
+ opts.AddLongOption(
+ 'r', "human-readable",
+ "Print some fields (e.g. timestamp) in human-readable format, add time offsets")
+ .NoArgument()
+ .StoreValue(&outputOptions.HumanReadable, true);
+
+ //Supports only fields of type string
+ TString containsEvent;
+ opts.AddLongOption(
+ 'c', "contains-event",
+ "Only print frames that contain events whose particular field's value matches given value.\n"
+ "Match group should be provided in format: EventName:FieldName:ValueToMatch\n"
+ "If more than one match group is provided, they should be separated by / delimiter.\n"
+ "Nested fields are supported through \'.\' in FieldName.\n"
+ "Symbols \'/\' and \':\' in EventName, FieldName or ValueToMatch must be escaped.\n"
+ "NOTE: bool values are 1 and 0.")
+ .Optional()
+ .StoreResult(&containsEvent);
+
+ TString helpEvents;
+ opts.AddLongOption("help-events")
+ .RequiredArgument("EVENT, EVENT_ID or string \"all\"")
+ .Optional()
+ .StoreResult(&helpEvents);
+
+ proc->AddOptions(opts);
+
+ opts.SetFreeArgsMin(0);
+ opts.SetFreeArgsMax(1);
+ opts.SetFreeArgTitle(0, "<eventlog>", "Event log file");
+
+ try {
+ const nlg::TOptsParseResult optsRes(&opts, argc, argv);
+
+ if (helpEvents) {
+ return PrintHelpEvents(helpEvents, fac);
+ }
+
+ TVector<TString> freeArgs = optsRes.GetFreeArgs();
+ if (freeArgs) {
+ o.FileName = freeArgs[0];
+ }
+
+ ui32 conditionsHappened = 0;
+
+ if (frameIdFilter != INVALID_FRAME_ID) {
+ ++conditionsHappened;
+ }
+ if (containsEvent) {
+ ++conditionsHappened;
+ }
+ if (durationFilterStr) {
+ ++conditionsHappened;
+ }
+
+ if (conditionsHappened > 1) {
+ throw nlg::TUsageException() << "You can only use no more than one of frame id, duration or contains event frame filters.";
+ }
+
+ if (frameIdFilter != INVALID_FRAME_ID) {
+ o.FrameFilter = CreateFrameIdFilter(frameIdFilter);
+ } else if (durationFilterStr) {
+ o.FrameFilter = CreateDurationFrameFilter(durationFilterStr);
+ } else if (containsEvent) {
+ o.FrameFilter = CreateContainsEventFrameFilter(containsEvent, fac);
+ }
+
+ // handle some inconsistencies
+ if (o.ForceWeakOrdering && o.ForceStrongOrdering) {
+ throw nlg::TUsageException() << "You can use either strong (-O) or weak (-o) ordering. ";
+ }
+ if (includeEventList && excludeEventList) {
+ throw nlg::TUsageException() << "You can use either include (-i) or exclude (-x) events list. ";
+ }
+ if (unescaped && json) {
+ throw nlg::TUsageException() << "You can use either unescaped (-u) or json (-j) output format. ";
+ }
+
+ proc->CheckOptions();
+ } catch (...) {
+ Cerr << "Usage error: " << CurrentExceptionMessage() << Endl;
+ return 1;
+ }
+
+ o.EvList = o.EnableEvents ? includeEventList : excludeEventList;
+ if (json) {
+ outputOptions.OutputFormat = TEvent::TOutputFormat::Json;
+ } else if (unescaped) {
+ outputOptions.OutputFormat = TEvent::TOutputFormat::TabSeparatedRaw;
+ } else {
+ outputOptions.OutputFormat = TEvent::TOutputFormat::TabSeparated;
+ }
+
+ IEventProcessor* eventProcessor = NEvClass::Processor();
+ // FIXME(mvel): A little of hell here: `proc` and `eventProcessor` are `IEventProcessor`s
+ // So we need to set options for BOTH!
+
+ eventProcessor->SetOptions(outputOptions);
+ proc->SetOptions(outputOptions);
+ proc->SetEventProcessor(eventProcessor);
+
+ if (oneFrame) {
+ THolder<IInputStream> fileInput;
+
+ // this is for coredumps analysis, usage:
+ // gdb: dump binary memory framefile Data_ Data_ + Pos_
+ // evlogdump -f framefile
+ IInputStream* usedInput = nullptr;
+
+ if (o.FileName.size()) {
+ fileInput.Reset(new TUnbufferedFileInput(o.FileName));
+ usedInput = fileInput.Get();
+ } else {
+ usedInput = &Cin;
+ }
+
+ try {
+ for (;;) {
+ proc->ProcessEvent(DecodeEvent(*usedInput, true, 0, nullptr, fac).Get());
+ }
+ } catch (...) {
+ }
+
+ return 0;
+ }
+
+ o.StartTime = ParseTime(start, MIN_START_TIME);
+ o.EndTime = ParseTime(end, MAX_END_TIME);
+
+ try {
+ THolder<IIterator> it = CreateIterator(o, fac);
+
+ while (const auto ev = it->Next()) {
+ if (!proc->CheckedProcessEvent(ev.Get())) {
+ break;
+ }
+ }
+
+ return 0;
+ } catch (...) {
+ Cout.Flush();
+ Cerr << "Error occured: " << CurrentExceptionMessage() << Endl;
+ }
+
+ return 1;
+}
diff --git a/library/cpp/eventlog/dumper/evlogdump.h b/library/cpp/eventlog/dumper/evlogdump.h
new file mode 100644
index 00000000000..eb150573746
--- /dev/null
+++ b/library/cpp/eventlog/dumper/evlogdump.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include "tunable_event_processor.h"
+
+#include <library/cpp/eventlog/eventlog.h>
+
+int IterateEventLog(IEventFactory* fac, IEventProcessor* proc, int argc, const char** argv);
+int IterateEventLog(IEventFactory* fac, ITunableEventProcessor* proc, int argc, const char** argv);
+
+// added for using in infra/libs/logger/log_printer.cpp
+int PrintHelpEvents(const TString& helpEvents, IEventFactory* factory);
diff --git a/library/cpp/eventlog/dumper/tunable_event_processor.cpp b/library/cpp/eventlog/dumper/tunable_event_processor.cpp
new file mode 100644
index 00000000000..8333089cb54
--- /dev/null
+++ b/library/cpp/eventlog/dumper/tunable_event_processor.cpp
@@ -0,0 +1 @@
+#include "tunable_event_processor.h"
diff --git a/library/cpp/eventlog/dumper/tunable_event_processor.h b/library/cpp/eventlog/dumper/tunable_event_processor.h
new file mode 100644
index 00000000000..b56eae9222a
--- /dev/null
+++ b/library/cpp/eventlog/dumper/tunable_event_processor.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <library/cpp/eventlog/eventlog.h>
+
+namespace NLastGetopt {
+ class TOpts;
+}
+
+class ITunableEventProcessor: public IEventProcessor {
+public:
+ virtual void SetEventProcessor(IEventProcessor* /*processor*/) {
+ }
+
+ virtual void AddOptions(NLastGetopt::TOpts& opts) = 0;
+ virtual void CheckOptions() {
+ }
+ virtual ~ITunableEventProcessor() {
+ }
+};
diff --git a/library/cpp/eventlog/evdecoder.cpp b/library/cpp/eventlog/evdecoder.cpp
new file mode 100644
index 00000000000..e4413a1b0e0
--- /dev/null
+++ b/library/cpp/eventlog/evdecoder.cpp
@@ -0,0 +1,112 @@
+#include <util/memory/tempbuf.h>
+#include <util/string/cast.h>
+#include <util/stream/output.h>
+
+#include "evdecoder.h"
+#include "logparser.h"
+
+static const char* const UNKNOWN_EVENT_CLASS = "Unknown event class";
+
+static inline void LogError(ui64 frameAddr, const char* msg, bool strict) {
+ if (!strict) {
+ Cerr << "EventDecoder warning @" << frameAddr << ": " << msg << Endl;
+ } else {
+ ythrow yexception() << "EventDecoder error @" << frameAddr << ": " << msg;
+ }
+}
+
+static inline bool SkipData(IInputStream& s, size_t amount) {
+ return (amount == s.Skip(amount));
+}
+
+// There are 2 log fomats: the one, that allows event skip without event decode (it has stored event length)
+// and another, that requires each event decode just to seek over stream. needRead == true means the latter format.
+static inline THolder<TEvent> DoDecodeEvent(IInputStream& s, const TEventFilter* const filter, const bool needRead, IEventFactory* fac) {
+ TEventTimestamp ts;
+ TEventClass c;
+ THolder<TEvent> e;
+
+ ::Load(&s, ts);
+ ::Load(&s, c);
+
+ bool needReturn = false;
+
+ if (!filter || filter->EventAllowed(c)) {
+ needReturn = true;
+ }
+
+ if (needRead || needReturn) {
+ e.Reset(fac->CreateLogEvent(c));
+
+ if (!!e) {
+ e->Timestamp = ts;
+ e->Load(s);
+ } else if (needReturn) {
+ e.Reset(new TUnknownEvent(ts, c));
+ }
+
+ if (!needReturn) {
+ e.Reset(nullptr);
+ }
+ }
+
+ return e;
+}
+
+THolder<TEvent> DecodeFramed(IInputStream& inp, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) {
+ ui32 len;
+ ::Load(&inp, len);
+
+ if (len < sizeof(ui32)) {
+ ythrow TEventDecoderError() << "invalid event length";
+ }
+
+ TLengthLimitedInput s(&inp, len - sizeof(ui32));
+
+ try {
+ THolder<TEvent> e = DoDecodeEvent(s, filter, false, fac);
+ if (!!e) {
+ if (!s.Left()) {
+ return e;
+ } else if (e->Class == 0) {
+ if (!SkipData(s, s.Left())) {
+ ythrow TEventDecoderError() << "cannot skip bad event";
+ }
+
+ return e;
+ }
+
+ LogError(frameAddr, "Event is not fully read", strict);
+ }
+ } catch (const TLoadEOF&) {
+ if (s.Left()) {
+ throw;
+ }
+
+ LogError(frameAddr, "Unexpected event end", strict);
+ }
+
+ if (!SkipData(s, s.Left())) {
+ ythrow TEventDecoderError() << "cannot skip bad event";
+ }
+
+ return nullptr;
+}
+
+THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) {
+ try {
+ if (framed) {
+ return DecodeFramed(s, frameAddr, filter, fac, strict);
+ } else {
+ THolder<TEvent> e = DoDecodeEvent(s, filter, true, fac);
+ // e(0) means event, skipped by filter. Not an error.
+ if (!!e && !e->Class) {
+ ythrow TEventDecoderError() << UNKNOWN_EVENT_CLASS;
+ }
+
+ return e;
+ }
+ } catch (const TLoadEOF&) {
+ ythrow TEventDecoderError() << "unexpected frame end";
+ }
+}
diff --git a/library/cpp/eventlog/evdecoder.h b/library/cpp/eventlog/evdecoder.h
new file mode 100644
index 00000000000..eedfc821743
--- /dev/null
+++ b/library/cpp/eventlog/evdecoder.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+#include <util/generic/ptr.h>
+
+#include "eventlog.h"
+
+class TEvent;
+class IInputStream;
+class TEventFilter;
+
+struct TEventDecoderError: public yexception {
+};
+
+THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict = false);
+bool AcceptableContent(TEventLogFormat);
diff --git a/library/cpp/eventlog/event_field_output.cpp b/library/cpp/eventlog/event_field_output.cpp
new file mode 100644
index 00000000000..a1b4c2dafa1
--- /dev/null
+++ b/library/cpp/eventlog/event_field_output.cpp
@@ -0,0 +1,65 @@
+#include "event_field_output.h"
+
+#include <util/string/split.h>
+
+namespace {
+ TString MakeSeparators(EFieldOutputFlags flags) {
+ TString res;
+ res.reserve(3);
+
+ if (flags & EFieldOutputFlag::EscapeTab) {
+ res.append('\t');
+ }
+ if (flags & EFieldOutputFlag::EscapeNewLine) {
+ res.append('\n');
+ }
+ if (flags & EFieldOutputFlag::EscapeBackSlash) {
+ res.append('\\');
+ }
+
+ return res;
+ }
+}
+
+TEventFieldOutput::TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags)
+ : Output(output)
+ , Flags(flags)
+ , Separators(MakeSeparators(flags))
+{
+}
+
+IOutputStream& TEventFieldOutput::GetOutputStream() {
+ return Output;
+}
+
+EFieldOutputFlags TEventFieldOutput::GetFlags() const {
+ return Flags;
+}
+
+void TEventFieldOutput::DoWrite(const void* buf, size_t len) {
+ if (!Flags) {
+ Output.Write(buf, len);
+ return;
+ }
+
+ TStringBuf chunk{static_cast<const char*>(buf), len};
+
+ for (const auto part : StringSplitter(chunk).SplitBySet(Separators.data())) {
+ TStringBuf token = part.Token();
+ TStringBuf delim = part.Delim();
+
+ if (!token.empty()) {
+ Output.Write(token);
+ }
+ if ("\n" == delim) {
+ Output.Write(TStringBuf("\\n"));
+ } else if ("\t" == delim) {
+ Output.Write(TStringBuf("\\t"));
+ } else if ("\\" == delim) {
+ Output.Write(TStringBuf("\\\\"));
+ } else {
+ Y_ASSERT(delim.empty());
+ }
+ }
+}
+
diff --git a/library/cpp/eventlog/event_field_output.h b/library/cpp/eventlog/event_field_output.h
new file mode 100644
index 00000000000..ed9db0ae167
--- /dev/null
+++ b/library/cpp/eventlog/event_field_output.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include <util/stream/output.h>
+#include <util/generic/flags.h>
+
+enum class EFieldOutputFlag {
+ EscapeTab = 0x1, // escape \t in field value
+ EscapeNewLine = 0x2, // escape \n in field value
+ EscapeBackSlash = 0x4 // escape \ in field value
+};
+
+Y_DECLARE_FLAGS(EFieldOutputFlags, EFieldOutputFlag);
+Y_DECLARE_OPERATORS_FOR_FLAGS(EFieldOutputFlags);
+
+class TEventFieldOutput: public IOutputStream {
+public:
+ TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags);
+
+ IOutputStream& GetOutputStream();
+ EFieldOutputFlags GetFlags() const;
+
+protected:
+ void DoWrite(const void* buf, size_t len) override;
+
+private:
+ IOutputStream& Output;
+ EFieldOutputFlags Flags;
+ TString Separators;
+};
diff --git a/library/cpp/eventlog/event_field_printer.cpp b/library/cpp/eventlog/event_field_printer.cpp
new file mode 100644
index 00000000000..29c6b4b661e
--- /dev/null
+++ b/library/cpp/eventlog/event_field_printer.cpp
@@ -0,0 +1,27 @@
+#include "event_field_printer.h"
+
+#include <library/cpp/protobuf/json/proto2json.h>
+
+namespace {
+
+ const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig()
+ .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault)
+ .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>());
+
+} // namespace
+
+TEventProtobufMessageFieldPrinter::TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode)
+ : Mode(mode)
+{}
+
+template <>
+void TEventProtobufMessageFieldPrinter::PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output) {
+ switch (Mode) {
+ case EProtobufMessageFieldPrintMode::DEFAULT:
+ case EProtobufMessageFieldPrintMode::JSON: {
+ // Do not use field.PrintJSON() here: IGNIETFERRO-2002
+ NProtobufJson::Proto2Json(field, output, PROTO_2_JSON_CONFIG);
+ break;
+ }
+ }
+}
diff --git a/library/cpp/eventlog/event_field_printer.h b/library/cpp/eventlog/event_field_printer.h
new file mode 100644
index 00000000000..835e8f4a850
--- /dev/null
+++ b/library/cpp/eventlog/event_field_printer.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include "event_field_output.h"
+
+#include <google/protobuf/message.h>
+
+// NB: For historical reasons print code for all primitive types/repeated fields/etc generated by https://a.yandex-team.ru/arc/trunk/arcadia/tools/event2cpp
+
+enum class EProtobufMessageFieldPrintMode {
+ // Use <TEventProtobufMessageFieldType>::Print method for fields that has it
+ // Print json for other fields
+ DEFAULT = 0,
+
+ JSON = 1,
+};
+
+class TEventProtobufMessageFieldPrinter {
+public:
+ explicit TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode);
+
+ template <typename TEventProtobufMessageFieldType, bool HasPrintFunction>
+ void PrintProtobufMessageFieldToOutput(const TEventProtobufMessageFieldType& field, TEventFieldOutput& output) {
+ if constexpr (HasPrintFunction) {
+ if (Mode == EProtobufMessageFieldPrintMode::DEFAULT) {
+ field.Print(output.GetOutputStream(), output.GetFlags());
+ return;
+ }
+ }
+
+ PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(field, output);
+ }
+
+ template <>
+ void PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output);
+
+private:
+ EProtobufMessageFieldPrintMode Mode;
+};
diff --git a/library/cpp/eventlog/eventlog.cpp b/library/cpp/eventlog/eventlog.cpp
new file mode 100644
index 00000000000..458a632b4a2
--- /dev/null
+++ b/library/cpp/eventlog/eventlog.cpp
@@ -0,0 +1,554 @@
+#include <util/datetime/base.h>
+#include <util/stream/zlib.h>
+#include <util/stream/length.h>
+#include <util/generic/buffer.h>
+#include <util/generic/yexception.h>
+#include <util/digest/murmur.h>
+#include <util/generic/singleton.h>
+#include <util/generic/function.h>
+#include <util/stream/output.h>
+#include <util/stream/format.h>
+#include <util/stream/null.h>
+
+#include <google/protobuf/messagext.h>
+
+#include "eventlog.h"
+#include "events_extension.h"
+#include "evdecoder.h"
+#include "logparser.h"
+#include <library/cpp/eventlog/proto/internal.pb.h>
+
+#include <library/cpp/json/json_writer.h>
+#include <library/cpp/protobuf/json/proto2json.h>
+
+
+TAtomic eventlogFrameCounter = 0;
+
+namespace {
+
+ const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig()
+ .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault)
+ .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>());
+
+ ui32 GenerateFrameId() {
+ return ui32(AtomicAdd(eventlogFrameCounter, 1));
+ }
+
+ inline const NProtoBuf::Message* UnknownEventMessage() {
+ return Singleton<NEventLogInternal::TUnknownEvent>();
+ }
+
+} // namespace
+
+void TEvent::Print(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const {
+ if (options.OutputFormat == TOutputFormat::TabSeparatedRaw) {
+ PrintHeader(out, options, eventState);
+ DoPrint(out, {});
+ } else if (options.OutputFormat == TOutputFormat::TabSeparated) {
+ PrintHeader(out, options, eventState);
+ DoPrint(
+ out,
+ EFieldOutputFlags{} | EFieldOutputFlag::EscapeNewLine | EFieldOutputFlag::EscapeBackSlash);
+ } else if (options.OutputFormat == TOutputFormat::Json) {
+ NJson::TJsonWriterConfig jsonWriterConfig;
+ jsonWriterConfig.FormatOutput = 0;
+ NJson::TJsonWriter jsonWriter(&out, jsonWriterConfig);
+
+ jsonWriter.OpenMap();
+ PrintJsonHeader(jsonWriter);
+ DoPrintJson(jsonWriter);
+ jsonWriter.CloseMap();
+ }
+}
+
+void TEvent::PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const {
+ if (options.HumanReadable) {
+ out << TInstant::MicroSeconds(Timestamp).ToString() << "\t";
+ if (Timestamp >= eventState.FrameStartTime)
+ out << "+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.FrameStartTime));
+ else // a bug somewhere? anyway, let's handle it in a nice fashion
+ out << "-" << HumanReadable(TDuration::MicroSeconds(eventState.FrameStartTime - Timestamp));
+
+ if (Timestamp >= eventState.PrevEventTime)
+ out << " (+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.PrevEventTime)) << ")";
+ // else: these events are async and out-of-order, relative time diff makes no sense, skip it
+
+ out << "\tF# " << FrameId << '\t';
+ } else {
+ out << static_cast<TEventTimestamp>(Timestamp);
+ out << '\t' << FrameId << '\t';
+ }
+}
+
+void TEvent::PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const {
+ jsonWriter.Write("Timestamp", Timestamp);
+ jsonWriter.Write("FrameId", FrameId);
+}
+
+class TProtobufEvent: public TEvent {
+public:
+ TProtobufEvent(TEventTimestamp t, size_t eventId, const NProtoBuf::Message& msg)
+ : TEvent(eventId, t)
+ , Message_(&msg)
+ , EventFactory_(NProtoBuf::TEventFactory::Instance())
+ {
+ }
+
+ TProtobufEvent()
+ : TEvent(0, 0)
+ , EventFactory_(NProtoBuf::TEventFactory::Instance())
+ {
+ }
+
+ explicit TProtobufEvent(ui32 id, NProtoBuf::TEventFactory* eventFactory = NProtoBuf::TEventFactory::Instance())
+ : TEvent(id, 0)
+ , EventFactory_(eventFactory)
+ {
+ InnerMsg_.Reset(EventFactory_->CreateEvent(Class));
+ Message_ = InnerMsg_.Get();
+ }
+
+ ui32 Id() const {
+ return Class;
+ }
+
+ void Load(IInputStream& in) override {
+ if (!!InnerMsg_) {
+ InnerMsg_->ParseFromArcadiaStream(&in);
+ } else {
+ TransferData(&in, &Cnull);
+ }
+ }
+
+ void Save(IOutputStream& out) const override {
+ Message_->SerializeToArcadiaStream(&out);
+ }
+
+ void SaveToBuffer(TBufferOutput& buf) const override {
+ size_t messageSize = Message_->ByteSize();
+ size_t before = buf.Buffer().Size();
+ buf.Buffer().Advance(messageSize);
+ Y_PROTOBUF_SUPPRESS_NODISCARD Message_->SerializeToArray(buf.Buffer().Data() + before, messageSize);
+ }
+
+ TStringBuf GetName() const override {
+ return EventFactory_->NameById(Id());
+ }
+
+private:
+ void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const override {
+ EventFactory_->PrintEvent(Id(), Message_, out, flags);
+ }
+ void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override {
+ jsonWriter.OpenMap("EventBody");
+ jsonWriter.Write("Type", GetName());
+
+ jsonWriter.Write("Fields");
+ NProtobufJson::Proto2Json(*GetProto(), jsonWriter, PROTO_2_JSON_CONFIG);
+
+ jsonWriter.CloseMap();
+ }
+
+ const NProtoBuf::Message* GetProto() const override {
+ if (Message_) {
+ return Message_;
+ }
+
+ return UnknownEventMessage();
+ }
+
+private:
+ const NProtoBuf::Message* Message_ = nullptr;
+ NProtoBuf::TEventFactory* EventFactory_;
+ THolder<NProtoBuf::Message> InnerMsg_;
+
+ friend class TEventLogFrame;
+};
+
+void TEventLogFrame::LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev) {
+ TProtobufEvent event(Now().MicroSeconds(), eventId, ev);
+
+ LogEventImpl(event);
+}
+
+void TEventLogFrame::LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev) {
+ TProtobufEvent event(timestamp, eventId, ev);
+
+ LogEventImpl(event);
+}
+
+template <>
+void TEventLogFrame::DebugDump(const TProtobufEvent& ev) {
+ static TMutex lock;
+
+ with_lock (lock) {
+ Cerr << ev.Timestamp << "\t" << ev.GetName() << "\t";
+ ev.GetProto()->PrintJSON(Cerr);
+ Cerr << Endl;
+ }
+}
+
+#pragma pack(push, 1)
+struct TFrameHeaderData {
+ char SyncField[COMPRESSED_LOG_FRAME_SYNC_DATA.size()];
+ TCompressedFrameBaseHeader Header;
+ TCompressedFrameHeader2 HeaderEx;
+};
+#pragma pack(pop)
+
+TEventLogFrame::TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : EvLog_(parentLog.HasNullBackend() ? nullptr : &parentLog)
+ , NeedAlwaysSafeAdd_(needAlwaysSafeAdd)
+ , ForceDump_(false)
+ , WriteFrameCallback_(std::move(writeFrameCallback))
+{
+ DoInit();
+}
+
+TEventLogFrame::TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : EvLog_(parentLog)
+ , NeedAlwaysSafeAdd_(needAlwaysSafeAdd)
+ , ForceDump_(false)
+ , WriteFrameCallback_(std::move(writeFrameCallback))
+{
+ if (EvLog_ && EvLog_->HasNullBackend()) {
+ EvLog_ = nullptr;
+ }
+
+ DoInit();
+}
+
+TEventLogFrame::TEventLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : EvLog_(nullptr)
+ , NeedAlwaysSafeAdd_(needAlwaysSafeAdd)
+ , ForceDump_(false)
+ , WriteFrameCallback_(std::move(writeFrameCallback))
+{
+ DoInit();
+}
+
+void TEventLogFrame::Flush() {
+ if (EvLog_ == nullptr)
+ return;
+
+ TBuffer& buf = Buf_.Buffer();
+
+ if (buf.Empty()) {
+ return;
+ }
+
+ EvLog_->WriteFrame(buf, StartTimestamp_, EndTimestamp_, WriteFrameCallback_, std::move(MetaFlags_));
+
+ DoInit();
+
+ return;
+}
+
+void TEventLogFrame::SafeFlush() {
+ TGuard<TMutex> g(Mtx_);
+ Flush();
+}
+
+void TEventLogFrame::AddEvent(TEventTimestamp timestamp) {
+ if (timestamp < StartTimestamp_) {
+ StartTimestamp_ = timestamp;
+ }
+
+ if (timestamp > EndTimestamp_) {
+ EndTimestamp_ = timestamp;
+ }
+}
+
+void TEventLogFrame::DoInit() {
+ Buf_.Buffer().Clear();
+
+ StartTimestamp_ = (TEventTimestamp)-1;
+ EndTimestamp_ = 0;
+}
+
+void TEventLogFrame::VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory) {
+ const auto doVisit = [this, &visitor, eventFactory]() {
+ TBuffer& buf = Buf_.Buffer();
+
+ TBufferInput bufferInput(buf);
+ TLengthLimitedInput limitedInput(&bufferInput, buf.size());
+
+ TEventFilter EventFilter(false);
+
+ while (limitedInput.Left()) {
+ THolder<TEvent> event = DecodeEvent(limitedInput, true, 0, &EventFilter, eventFactory);
+
+ visitor.Visit(*event);
+ }
+ };
+ if (NeedAlwaysSafeAdd_) {
+ TGuard<TMutex> g(Mtx_);
+ doVisit();
+ } else {
+ doVisit();
+ }
+}
+
+TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback))
+{
+}
+
+TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback))
+{
+}
+
+TSelfFlushLogFrame::TSelfFlushLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
+ : TEventLogFrame(needAlwaysSafeAdd, std::move(writeFrameCallback))
+{
+}
+
+TSelfFlushLogFrame::~TSelfFlushLogFrame() {
+ try {
+ Flush();
+ } catch (...) {
+ }
+}
+
+IEventLog::~IEventLog() {
+}
+
+static THolder<TLogBackend> ConstructBackend(const TString& fileName, const TEventLogBackendOptions& backendOpts) {
+ try {
+ THolder<TLogBackend> backend;
+ if (backendOpts.UseSyncPageCacheBackend) {
+ backend = MakeHolder<TSyncPageCacheFileLogBackend>(fileName, backendOpts.SyncPageCacheBackendBufferSize, backendOpts.SyncPageCacheBackendMaxPendingSize);
+ } else {
+ backend = MakeHolder<TFileLogBackend>(fileName);
+ }
+ return MakeHolder<TReopenLogBackend>(std::move(backend));
+ } catch (...) {
+ Cdbg << "Warning: Cannot open event log '" << fileName << "': " << CurrentExceptionMessage() << "." << Endl;
+ }
+
+ return MakeHolder<TNullLogBackend>();
+}
+
+TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat)
+ : Log_(ConstructBackend(fileName, backendOpts))
+ , ContentFormat_(contentFormat)
+ , LogFormat_(logFormat.Defined() ? *logFormat : COMPRESSED_LOG_FORMAT_V4)
+ , HasNullBackend_(Log_.IsNullLog())
+ , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc"))
+ , ZstdCodec_(NBlockCodecs::Codec("zstd_1"))
+{
+ Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5);
+
+ if (contentFormat & 0xff000000) {
+ ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")";
+ }
+}
+
+TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts)
+ : TEventLog(fileName, contentFormat, backendOpts, COMPRESSED_LOG_FORMAT_V4)
+{
+}
+
+TEventLog::TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat)
+ : Log_(log)
+ , ContentFormat_(contentFormat)
+ , LogFormat_(logFormat)
+ , HasNullBackend_(Log_.IsNullLog())
+ , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc"))
+ , ZstdCodec_(NBlockCodecs::Codec("zstd_1"))
+{
+ if (contentFormat & 0xff000000) {
+ ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")";
+ }
+}
+
+TEventLog::TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat)
+ : Log_(MakeHolder<TNullLogBackend>())
+ , ContentFormat_(contentFormat)
+ , LogFormat_(logFormat)
+ , HasNullBackend_(true)
+ , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc"))
+ , ZstdCodec_(NBlockCodecs::Codec("zstd_1"))
+{
+ if (contentFormat & 0xff000000) {
+ ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")";
+ }
+}
+
+TEventLog::~TEventLog() {
+}
+
+void TEventLog::ReopenLog() {
+ Log_.ReopenLog();
+}
+
+void TEventLog::CloseLog() {
+ Log_.CloseLog();
+}
+
+void TEventLog::Flush() {
+}
+
+namespace {
+ class TOnExceptionAction {
+ public:
+ TOnExceptionAction(std::function<void()>&& f)
+ : F_(std::move(f))
+ {
+ }
+
+ ~TOnExceptionAction() {
+ if (F_ && UncaughtException()) {
+ try {
+ F_();
+ } catch (...) {
+ }
+ }
+ }
+
+ private:
+ std::function<void()> F_;
+ };
+}
+
+void TEventLog::WriteFrame(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ TWriteFrameCallbackPtr writeFrameCallback,
+ TLogRecord::TMetaFlags metaFlags) {
+ Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5);
+
+ TBuffer& b1 = buffer;
+
+ size_t maxCompressedLength = (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) ? b1.Size() + 256 : ZstdCodec_->MaxCompressedLength(b1);
+
+ // Reserve enough memory to minimize reallocs
+ TBufferOutput outbuf(sizeof(TFrameHeaderData) + maxCompressedLength);
+ TBuffer& b2 = outbuf.Buffer();
+ b2.Proceed(sizeof(TFrameHeaderData));
+
+ {
+ TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data());
+
+ memcpy(hdr.SyncField, COMPRESSED_LOG_FRAME_SYNC_DATA.data(), COMPRESSED_LOG_FRAME_SYNC_DATA.size());
+ hdr.Header.Format = (LogFormat_ << 24) | (ContentFormat_ & 0xffffff);
+ hdr.Header.FrameId = GenerateFrameId();
+ hdr.HeaderEx.UncompressedDatalen = (ui32)b1.Size();
+ hdr.HeaderEx.StartTimestamp = startTimestamp;
+ hdr.HeaderEx.EndTimestamp = endTimestamp;
+ hdr.HeaderEx.PayloadChecksum = 0;
+ hdr.HeaderEx.CompressorVersion = 0;
+ }
+
+ if (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) {
+ TBuffer encoded(b1.Size() + sizeof(TFrameHeaderData) + 256);
+ Lz4hcCodec_->Encode(b1, encoded);
+
+ TZLibCompress compr(&outbuf, ZLib::ZLib, 6, 2048);
+ compr.Write(encoded.data(), encoded.size());
+ compr.Finish();
+ } else {
+ b2.Advance(ZstdCodec_->Compress(b1, b2.Pos()));
+ }
+
+ {
+ const size_t k = sizeof(TCompressedFrameBaseHeader) + COMPRESSED_LOG_FRAME_SYNC_DATA.size();
+ TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data());
+ hdr.Header.Length = static_cast<ui32>(b2.size() - k);
+ hdr.HeaderEx.PayloadChecksum = MurmurHash<ui32>(b2.data() + sizeof(TFrameHeaderData), b2.size() - sizeof(TFrameHeaderData));
+
+ const size_t n = sizeof(TFrameHeaderData) - (COMPRESSED_LOG_FRAME_SYNC_DATA.size() + sizeof(hdr.HeaderEx.HeaderChecksum));
+ hdr.HeaderEx.HeaderChecksum = MurmurHash<ui32>(b2.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(), n);
+ }
+
+ const TBuffer& frameData = outbuf.Buffer();
+
+ TOnExceptionAction actionCallback([this] {
+ if (ErrorCallback_) {
+ ErrorCallback_->OnWriteError();
+ }
+ });
+
+ if (writeFrameCallback) {
+ writeFrameCallback->OnAfterCompress(frameData, startTimestamp, endTimestamp);
+ }
+
+ Log_.Write(frameData.Data(), frameData.Size(), std::move(metaFlags));
+ if (SuccessCallback_) {
+ SuccessCallback_->OnWriteSuccess(frameData);
+ }
+}
+
+TEvent* TProtobufEventFactory::CreateLogEvent(TEventClass c) {
+ return new TProtobufEvent(c, EventFactory_);
+}
+
+TEventClass TProtobufEventFactory::ClassByName(TStringBuf name) const {
+ return EventFactory_->IdByName(name);
+}
+
+TEventClass TProtobufEventFactory::EventClassBegin() const {
+ const auto& items = EventFactory_->FactoryItems();
+
+ if (items.empty()) {
+ return static_cast<TEventClass>(0);
+ }
+
+ return static_cast<TEventClass>(items.begin()->first);
+}
+
+TEventClass TProtobufEventFactory::EventClassEnd() const {
+ const auto& items = EventFactory_->FactoryItems();
+
+ if (items.empty()) {
+ return static_cast<TEventClass>(0);
+ }
+
+ return static_cast<TEventClass>(items.rbegin()->first + 1);
+}
+
+namespace NEvClass {
+ IEventFactory* Factory() {
+ return Singleton<TProtobufEventFactory>();
+ }
+
+ IEventProcessor* Processor() {
+ return Singleton<TProtobufEventProcessor>();
+ }
+}
+
+const NProtoBuf::Message* TUnknownEvent::GetProto() const {
+ return UnknownEventMessage();
+}
+
+TStringBuf TUnknownEvent::GetName() const {
+ return TStringBuf("UnknownEvent");
+}
+
+void TUnknownEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const {
+ jsonWriter.OpenMap("EventBody");
+ jsonWriter.Write("Type", GetName());
+ jsonWriter.Write("EventId", (size_t)Class);
+ jsonWriter.CloseMap();
+}
+
+TStringBuf TEndOfFrameEvent::GetName() const {
+ return TStringBuf("EndOfFrame");
+}
+
+const NProtoBuf::Message* TEndOfFrameEvent::GetProto() const {
+ return Singleton<NEventLogInternal::TEndOfFrameEvent>();
+}
+
+void TEndOfFrameEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const {
+ jsonWriter.OpenMap("EventBody");
+ jsonWriter.Write("Type", GetName());
+ jsonWriter.OpenMap("Fields");
+ jsonWriter.CloseMap();
+ jsonWriter.CloseMap();
+}
+
+THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev) {
+ return MakeHolder<TProtobufEvent>(ts, eventId, ev);
+}
diff --git a/library/cpp/eventlog/eventlog.h b/library/cpp/eventlog/eventlog.h
new file mode 100644
index 00000000000..45c2dfb17fd
--- /dev/null
+++ b/library/cpp/eventlog/eventlog.h
@@ -0,0 +1,623 @@
+#pragma once
+
+#include "eventlog_int.h"
+#include "event_field_output.h"
+#include "events_extension.h"
+
+#include <library/cpp/blockcodecs/codecs.h>
+#include <library/cpp/logger/all.h>
+
+#include <google/protobuf/message.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/stream/output.h>
+#include <util/stream/buffer.h>
+#include <util/stream/str.h>
+#include <util/system/mutex.h>
+#include <util/stream/output.h>
+#include <util/system/env.h>
+#include <util/system/unaligned_mem.h>
+#include <util/ysaveload.h>
+
+#include <cstdlib>
+
+namespace NJson {
+ class TJsonWriter;
+}
+
+class IEventLog;
+
+class TEvent : public TThrRefBase {
+public:
+ enum class TOutputFormat {
+ TabSeparated,
+ TabSeparatedRaw, // disables escaping
+ Json
+ };
+
+ struct TOutputOptions {
+ TOutputFormat OutputFormat = TOutputFormat::TabSeparated;
+ // Dump some fields (e.g. timestamp) in more human-readable format
+ bool HumanReadable = false;
+
+ TOutputOptions(TOutputFormat outputFormat = TOutputFormat::TabSeparated)
+ : OutputFormat(outputFormat)
+ {
+ }
+
+ TOutputOptions(TOutputFormat outputFormat, bool humanReadable)
+ : OutputFormat(outputFormat)
+ , HumanReadable(humanReadable)
+ {
+ }
+ };
+
+ struct TEventState {
+ TEventTimestamp FrameStartTime = 0;
+ TEventTimestamp PrevEventTime = 0;
+ TEventState() {
+ }
+ };
+
+ TEvent(TEventClass c, TEventTimestamp t)
+ : Class(c)
+ , Timestamp(t)
+ {
+ }
+
+ virtual ~TEvent() = default;
+
+ // Note, that descendants MUST have Save() & Load() methods to alter
+ // only its new variables, not the base class!
+ virtual void Save(IOutputStream& out) const = 0;
+ virtual void SaveToBuffer(TBufferOutput& out) const {
+ Save(out);
+ }
+
+ // Note, that descendants MUST have Save() & Load() methods to alter
+ // only its new variables, not the base class!
+ virtual void Load(IInputStream& i) = 0;
+
+ virtual TStringBuf GetName() const = 0;
+ virtual const NProtoBuf::Message* GetProto() const = 0;
+
+ void Print(IOutputStream& out, const TOutputOptions& options = TOutputOptions(), const TEventState& eventState = TEventState()) const;
+ void PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const;
+
+ TString ToString() const {
+ TStringStream buff;
+ Print(buff);
+ return buff.Str();
+ }
+
+ void FullSaveToBuffer(TBufferOutput& buf) const {
+ SaveMessageHeader(buf);
+ this->SaveToBuffer(buf);
+ }
+
+ void FullSave(IOutputStream& o) const {
+ SaveMessageHeader(o);
+ this->Save(o);
+ }
+
+ void FullLoad(IInputStream& i) {
+ ::Load(&i, Timestamp);
+ ::Load(&i, Class);
+ this->Load(i);
+ }
+
+ template <class T>
+ const T* Get() const {
+ return static_cast<const T*>(this->GetProto());
+ }
+
+ TEventClass Class;
+ TEventTimestamp Timestamp;
+ ui32 FrameId = 0;
+
+private:
+ void SaveMessageHeader(IOutputStream& out) const {
+ ::Save(&out, Timestamp);
+ ::Save(&out, Class);
+ }
+
+ virtual void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const = 0;
+ virtual void DoPrintJson(NJson::TJsonWriter& jsonWriter) const = 0;
+
+ void PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const;
+};
+
+using TEventPtr = TIntrusivePtr<TEvent>;
+using TConstEventPtr = TIntrusiveConstPtr<TEvent>;
+
+class IEventProcessor {
+public:
+ virtual void SetOptions(const TEvent::TOutputOptions& options) {
+ Options_ = options;
+ }
+ virtual void ProcessEvent(const TEvent* ev) = 0;
+ virtual bool CheckedProcessEvent(const TEvent* ev) {
+ ProcessEvent(ev);
+ return true;
+ }
+ virtual ~IEventProcessor() = default;
+
+protected:
+ TEvent::TOutputOptions Options_;
+};
+
+class IEventFactory {
+public:
+ virtual TEvent* CreateLogEvent(TEventClass c) = 0;
+ virtual TEventLogFormat CurrentFormat() = 0;
+ virtual TEventClass ClassByName(TStringBuf name) const = 0;
+ virtual TEventClass EventClassBegin() const = 0;
+ virtual TEventClass EventClassEnd() const = 0;
+ virtual ~IEventFactory() = default;
+};
+
+class TUnknownEvent: public TEvent {
+public:
+ TUnknownEvent(TEventTimestamp ts, TEventClass cls)
+ : TEvent(cls, ts)
+ {
+ }
+
+ ~TUnknownEvent() override = default;
+
+ void Save(IOutputStream& /* o */) const override {
+ ythrow yexception() << "TUnknownEvent cannot be saved";
+ }
+
+ void Load(IInputStream& /* i */) override {
+ ythrow yexception() << "TUnknownEvent cannot be loaded";
+ }
+
+ TStringBuf GetName() const override;
+
+private:
+ void DoPrint(IOutputStream& out, EFieldOutputFlags) const override {
+ out << GetName() << "\t" << (size_t)Class;
+ }
+
+ void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override;
+
+ const NProtoBuf::Message* GetProto() const override;
+};
+
+class TEndOfFrameEvent: public TEvent {
+public:
+ enum {
+ EventClass = 0
+ };
+
+ TEndOfFrameEvent(TEventTimestamp ts)
+ : TEvent(TEndOfFrameEvent::EventClass, ts)
+ {
+ }
+
+ ~TEndOfFrameEvent() override = default;
+
+ void Save(IOutputStream& o) const override {
+ (void)o;
+ ythrow yexception() << "TEndOfFrameEvent cannot be saved";
+ }
+
+ void Load(IInputStream& i) override {
+ (void)i;
+ ythrow yexception() << "TEndOfFrameEvent cannot be loaded";
+ }
+
+ TStringBuf GetName() const override;
+
+private:
+ void DoPrint(IOutputStream& out, EFieldOutputFlags) const override {
+ out << GetName();
+ }
+ void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override;
+
+ const NProtoBuf::Message* GetProto() const override;
+};
+
+class ILogFrameEventVisitor {
+public:
+ virtual ~ILogFrameEventVisitor() = default;
+
+ virtual void Visit(const TEvent& event) = 0;
+};
+
+class IWriteFrameCallback : public TAtomicRefCount<IWriteFrameCallback> {
+public:
+ virtual ~IWriteFrameCallback() = default;
+
+ virtual void OnAfterCompress(const TBuffer& compressedFrame, TEventTimestamp startTimestamp, TEventTimestamp endTimestamp) = 0;
+};
+
+using TWriteFrameCallbackPtr = TIntrusivePtr<IWriteFrameCallback>;
+
+class TEventLogFrame {
+public:
+ TEventLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+ TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+ TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+
+ virtual ~TEventLogFrame() = default;
+
+ void Flush();
+ void SafeFlush();
+
+ void ForceDump() {
+ ForceDump_ = true;
+ }
+
+ template <class T>
+ inline void LogEvent(const T& ev) {
+ if (NeedAlwaysSafeAdd_) {
+ SafeLogEvent(ev);
+ } else {
+ UnSafeLogEvent(ev);
+ }
+ }
+
+ template <class T>
+ inline void LogEvent(TEventTimestamp timestamp, const T& ev) {
+ if (NeedAlwaysSafeAdd_) {
+ SafeLogEvent(timestamp, ev);
+ } else {
+ UnSafeLogEvent(timestamp, ev);
+ }
+ }
+
+ template <class T>
+ inline void UnSafeLogEvent(const T& ev) {
+ if (!IsEventIgnored(ev.ID))
+ LogProtobufEvent(ev.ID, ev);
+ }
+
+ template <class T>
+ inline void UnSafeLogEvent(TEventTimestamp timestamp, const T& ev) {
+ if (!IsEventIgnored(ev.ID))
+ LogProtobufEvent(timestamp, ev.ID, ev);
+ }
+
+ template <class T>
+ inline void SafeLogEvent(const T& ev) {
+ if (!IsEventIgnored(ev.ID)) {
+ TGuard<TMutex> g(Mtx_);
+ LogProtobufEvent(ev.ID, ev);
+ }
+ }
+
+ template <class T>
+ inline void SafeLogEvent(TEventTimestamp timestamp, const T& ev) {
+ if (!IsEventIgnored(ev.ID)) {
+ TGuard<TMutex> g(Mtx_);
+ LogProtobufEvent(timestamp, ev.ID, ev);
+ }
+ }
+
+ void VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory);
+
+ inline bool IsEventIgnored(size_t eventId) const {
+ Y_UNUSED(eventId); // in future we might want to selectively discard only some kinds of messages
+ return !IsDebugModeEnabled() && EvLog_ == nullptr && !ForceDump_;
+ }
+
+ void Enable(IEventLog& evLog) {
+ EvLog_ = &evLog;
+ }
+
+ void Disable() {
+ EvLog_ = nullptr;
+ }
+
+ void SetNeedAlwaysSafeAdd(bool val) {
+ NeedAlwaysSafeAdd_ = val;
+ }
+
+ void SetWriteFrameCallback(TWriteFrameCallbackPtr writeFrameCallback) {
+ WriteFrameCallback_ = writeFrameCallback;
+ }
+
+ void AddMetaFlag(const TString& key, const TString& value) {
+ if (NeedAlwaysSafeAdd_) {
+ TGuard<TMutex> g(Mtx_);
+ MetaFlags_.emplace_back(key, value);
+ } else {
+ MetaFlags_.emplace_back(key, value);
+ }
+ }
+
+protected:
+ void LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev);
+ void LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev);
+
+private:
+ static bool IsDebugModeEnabled() {
+ static struct TSelector {
+ bool Flag;
+
+ TSelector()
+ : Flag(GetEnv("EVLOG_DEBUG") == TStringBuf("1"))
+ {
+ }
+ } selector;
+
+ return selector.Flag;
+ }
+
+ template <class T>
+ void DebugDump(const T& ev);
+
+ // T must be a descendant of NEvClass::TEvent
+ template <class T>
+ inline void LogEventImpl(const T& ev) {
+ if (EvLog_ != nullptr || ForceDump_) {
+ TBuffer& b = Buf_.Buffer();
+ size_t lastSize = b.size();
+ ::Save(&Buf_, ui32(0));
+ ev.FullSaveToBuffer(Buf_);
+ WriteUnaligned<ui32>(b.data() + lastSize, (ui32)(b.size() - lastSize));
+ AddEvent(ev.Timestamp);
+ }
+
+ if (IsDebugModeEnabled()) {
+ DebugDump(ev);
+ }
+ }
+
+ void AddEvent(TEventTimestamp timestamp);
+ void DoInit();
+
+private:
+ TBufferOutput Buf_;
+ TEventTimestamp StartTimestamp_, EndTimestamp_;
+ IEventLog* EvLog_;
+ TMutex Mtx_;
+ bool NeedAlwaysSafeAdd_;
+ bool ForceDump_;
+ TWriteFrameCallbackPtr WriteFrameCallback_;
+ TLogRecord::TMetaFlags MetaFlags_;
+ friend class TEventRecord;
+};
+
+class TSelfFlushLogFrame: public TEventLogFrame, public TAtomicRefCount<TSelfFlushLogFrame> {
+public:
+ TSelfFlushLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+ TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+ TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
+
+ virtual ~TSelfFlushLogFrame();
+};
+
+using TSelfFlushLogFramePtr = TIntrusivePtr<TSelfFlushLogFrame>;
+
+class IEventLog: public TAtomicRefCount<IEventLog> {
+public:
+ class IErrorCallback {
+ public:
+ virtual ~IErrorCallback() {
+ }
+
+ virtual void OnWriteError() = 0;
+ };
+
+ class ISuccessCallback {
+ public:
+ virtual ~ISuccessCallback() {
+ }
+
+ virtual void OnWriteSuccess(const TBuffer& frameData) = 0;
+ };
+
+ virtual ~IEventLog();
+
+ virtual void ReopenLog() = 0;
+ virtual void CloseLog() = 0;
+ virtual void Flush() = 0;
+ virtual void SetErrorCallback(IErrorCallback*) {
+ }
+ virtual void SetSuccessCallback(ISuccessCallback*) {
+ }
+
+ template <class T>
+ void LogEvent(const T& ev) {
+ TEventLogFrame frame(*this);
+ frame.LogEvent(ev);
+ frame.Flush();
+ }
+
+ virtual bool HasNullBackend() const = 0;
+
+ virtual void WriteFrame(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ TWriteFrameCallbackPtr writeFrameCallback = nullptr,
+ TLogRecord::TMetaFlags metaFlags = {}) = 0;
+};
+
+struct TEventLogBackendOptions {
+ bool UseSyncPageCacheBackend = false;
+ size_t SyncPageCacheBackendBufferSize = 0;
+ size_t SyncPageCacheBackendMaxPendingSize = 0;
+};
+
+class TEventLog: public IEventLog {
+public:
+ /*
+ * Параметр contentformat указывает формат контента лога, например какие могут в логе
+ * встретится классы событий, какие параметры у этих событий, и пр. Старший байт параметра
+ * должен быть нулевым.
+ */
+ TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat);
+ TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts = {});
+ TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4);
+ TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4);
+
+ ~TEventLog() override;
+
+ void ReopenLog() override;
+ void CloseLog() override;
+ void Flush() override;
+ void SetErrorCallback(IErrorCallback* errorCallback) override {
+ ErrorCallback_ = errorCallback;
+ }
+ void SetSuccessCallback(ISuccessCallback* successCallback) override {
+ SuccessCallback_ = successCallback;
+ }
+
+ template <class T>
+ void LogEvent(const T& ev) {
+ TEventLogFrame frame(*this);
+ frame.LogEvent(ev);
+ frame.Flush();
+ }
+
+ bool HasNullBackend() const override {
+ return HasNullBackend_;
+ }
+
+ void WriteFrame(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ TWriteFrameCallbackPtr writeFrameCallback = nullptr,
+ TLogRecord::TMetaFlags metaFlags = {}) override;
+
+private:
+ mutable TLog Log_;
+ TEventLogFormat ContentFormat_;
+ const TEventLogFormat LogFormat_;
+ bool HasNullBackend_;
+ const NBlockCodecs::ICodec* const Lz4hcCodec_;
+ const NBlockCodecs::ICodec* const ZstdCodec_;
+ IErrorCallback* ErrorCallback_ = nullptr;
+ ISuccessCallback* SuccessCallback_ = nullptr;
+};
+
+using TEventLogPtr = TIntrusivePtr<IEventLog>;
+
+class TEventLogWithSlave: public IEventLog {
+public:
+ TEventLogWithSlave(IEventLog& parentLog)
+ : Slave_(&parentLog)
+ {
+ }
+
+ TEventLogWithSlave(const TEventLogPtr& parentLog)
+ : SlavePtr_(parentLog)
+ , Slave_(SlavePtr_.Get())
+ {
+ }
+
+ ~TEventLogWithSlave() override {
+ try {
+ Slave().Flush();
+ } catch (...) {
+ }
+ }
+
+ void Flush() override {
+ Slave().Flush();
+ }
+
+ void ReopenLog() override {
+ return Slave().ReopenLog();
+ }
+ void CloseLog() override {
+ return Slave().CloseLog();
+ }
+
+ bool HasNullBackend() const override {
+ return Slave().HasNullBackend();
+ }
+
+ void WriteFrame(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ TWriteFrameCallbackPtr writeFrameCallback = nullptr,
+ TLogRecord::TMetaFlags metaFlags = {}) override {
+ Slave().WriteFrame(buffer, startTimestamp, endTimestamp, writeFrameCallback, std::move(metaFlags));
+ }
+
+ void SetErrorCallback(IErrorCallback* errorCallback) override {
+ Slave().SetErrorCallback(errorCallback);
+ }
+
+ void SetSuccessCallback(ISuccessCallback* successCallback) override {
+ Slave().SetSuccessCallback(successCallback);
+ }
+
+protected:
+ inline IEventLog& Slave() const {
+ return *Slave_;
+ }
+
+private:
+ TEventLogPtr SlavePtr_;
+ IEventLog* Slave_ = nullptr;
+};
+
+extern TAtomic eventlogFrameCounter;
+
+class TProtobufEventProcessor: public IEventProcessor {
+public:
+ void ProcessEvent(const TEvent* ev) override final {
+ ProcessEvent(ev, &Cout);
+ }
+
+ void ProcessEvent(const TEvent* ev, IOutputStream *out) {
+ UpdateEventState(ev);
+ DoProcessEvent(ev, out);
+ EventState_.PrevEventTime = ev->Timestamp;
+ }
+protected:
+ virtual void DoProcessEvent(const TEvent * ev, IOutputStream *out) {
+ ev->Print(*out, Options_, EventState_);
+ (*out) << Endl;
+ }
+ ui32 CurrentFrameId_ = Max<ui32>();
+ TEvent::TEventState EventState_;
+
+private:
+ void UpdateEventState(const TEvent *ev) {
+ if (ev->FrameId != CurrentFrameId_) {
+ EventState_.FrameStartTime = ev->Timestamp;
+ EventState_.PrevEventTime = ev->Timestamp;
+ CurrentFrameId_ = ev->FrameId;
+ }
+ }
+};
+
+class TProtobufEventFactory: public IEventFactory {
+public:
+ TProtobufEventFactory(NProtoBuf::TEventFactory* factory = NProtoBuf::TEventFactory::Instance())
+ : EventFactory_(factory)
+ {
+ }
+
+ TEvent* CreateLogEvent(TEventClass c) override;
+
+ TEventLogFormat CurrentFormat() override {
+ return 0;
+ }
+
+ TEventClass ClassByName(TStringBuf name) const override;
+
+ TEventClass EventClassBegin() const override;
+
+ TEventClass EventClassEnd() const override;
+
+ ~TProtobufEventFactory() override = default;
+
+private:
+ NProtoBuf::TEventFactory* EventFactory_;
+};
+
+THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev);
+
+namespace NEvClass {
+ IEventFactory* Factory();
+ IEventProcessor* Processor();
+}
diff --git a/library/cpp/eventlog/eventlog_int.cpp b/library/cpp/eventlog/eventlog_int.cpp
new file mode 100644
index 00000000000..faa8c42cbeb
--- /dev/null
+++ b/library/cpp/eventlog/eventlog_int.cpp
@@ -0,0 +1,12 @@
+#include "eventlog_int.h"
+
+#include <util/string/cast.h>
+
+TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str) {
+ EEventLogFormat format;
+ if (TryFromString(str, format)) {
+ return static_cast<TEventLogFormat>(format);
+ } else {
+ return {};
+ }
+}
diff --git a/library/cpp/eventlog/eventlog_int.h b/library/cpp/eventlog/eventlog_int.h
new file mode 100644
index 00000000000..eb00fecfab6
--- /dev/null
+++ b/library/cpp/eventlog/eventlog_int.h
@@ -0,0 +1,72 @@
+#pragma once
+
+#include <util/stream/output.h>
+#include <util/generic/maybe.h>
+#include <util/generic/utility.h>
+#include <util/generic/yexception.h>
+#include <util/ysaveload.h>
+
+using TEventClass = ui32;
+using TEventLogFormat = ui32;
+using TEventTimestamp = ui64;
+
+constexpr TStringBuf COMPRESSED_LOG_FRAME_SYNC_DATA =
+ "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+ "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+ "\x00\x00\x00\x00\xfe\x00\x00\xff\xff\x00\x00\xff\xff\x00"
+ "\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff"
+ "\xff\x00\x00\xff\xff\x00\x00\xff"sv;
+
+static_assert(COMPRESSED_LOG_FRAME_SYNC_DATA.size() == 64);
+
+/*
+ * Коды форматов логов. Форматом лога считается формат служебных
+ * структур лога. К примеру формат заголовка, наличие компрессии, и т.д.
+ * Имеет значение только 1 младший байт.
+ */
+
+enum EEventLogFormat : TEventLogFormat {
+ // Формат версии 1. Используется компрессор LZQ.
+ COMPRESSED_LOG_FORMAT_V1 = 1,
+
+ // Формат версии 2. Используется компрессор ZLIB. Добавлены CRC заголовка и данных,
+ // поле типа компрессора.
+ COMPRESSED_LOG_FORMAT_V2 = 2,
+
+ // Формат версии 3. Используется компрессор ZLIB. В начинке фреймов перед каждым событием добавлен его размер.
+ COMPRESSED_LOG_FORMAT_V3 = 3,
+
+ // Lz4hc codec + zlib
+ COMPRESSED_LOG_FORMAT_V4 = 4 /* "zlib_lz4" */,
+
+ // zstd
+ COMPRESSED_LOG_FORMAT_V5 = 5 /* "zstd" */,
+};
+
+TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str);
+
+#pragma pack(push, 1)
+
+struct TCompressedFrameBaseHeader {
+ TEventLogFormat Format;
+ ui32 Length; // Длина остатка фрейма в байтах, после этого заголовка
+ ui32 FrameId;
+};
+
+struct TCompressedFrameHeader {
+ TEventTimestamp StartTimestamp;
+ TEventTimestamp EndTimestamp;
+ ui32 UncompressedDatalen; // Длина данных, которые были закомпрессированы
+ ui32 PayloadChecksum; // В логе версии 1 поле не используется
+};
+
+struct TCompressedFrameHeader2: public TCompressedFrameHeader {
+ ui8 CompressorVersion; // Сейчас не используется
+ ui32 HeaderChecksum;
+};
+
+#pragma pack(pop)
+
+Y_DECLARE_PODTYPE(TCompressedFrameBaseHeader);
+Y_DECLARE_PODTYPE(TCompressedFrameHeader);
+Y_DECLARE_PODTYPE(TCompressedFrameHeader2);
diff --git a/library/cpp/eventlog/events_extension.h b/library/cpp/eventlog/events_extension.h
new file mode 100644
index 00000000000..0cf062f9590
--- /dev/null
+++ b/library/cpp/eventlog/events_extension.h
@@ -0,0 +1,161 @@
+#pragma once
+
+#include "event_field_output.h"
+
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/message.h>
+
+#include <library/cpp/threading/atomic/bool.h>
+#include <library/cpp/string_utils/base64/base64.h>
+
+#include <util/generic/map.h>
+#include <util/generic/deque.h>
+#include <util/generic/singleton.h>
+#include <util/string/hex.h>
+#include <util/system/guard.h>
+#include <util/system/mutex.h>
+
+namespace NProtoBuf {
+ class TEventFactory {
+ public:
+ typedef ::google::protobuf::Message Message;
+ typedef void (*TEventSerializer)(const Message* event, IOutputStream& output, EFieldOutputFlags flags);
+ typedef void (*TRegistrationFunc)();
+
+ private:
+ class TFactoryItem {
+ public:
+ TFactoryItem(const Message* prototype, const TEventSerializer serializer)
+ : Prototype_(prototype)
+ , Serializer_(serializer)
+ {
+ }
+
+ TStringBuf GetName() const {
+ return Prototype_->GetDescriptor()->name();
+ }
+
+ Message* Create() const {
+ return Prototype_->New();
+ }
+
+ void PrintEvent(const Message* event, IOutputStream& out, EFieldOutputFlags flags) const {
+ (*Serializer_)(event, out, flags);
+ }
+
+ private:
+ const Message* Prototype_;
+ const TEventSerializer Serializer_;
+ };
+
+ typedef TMap<size_t, TFactoryItem> TFactoryMap;
+
+ public:
+ TEventFactory()
+ : FactoryItems_()
+ {
+ }
+
+ void ScheduleRegistration(TRegistrationFunc func) {
+ EventRegistrators_.push_back(func);
+ }
+
+ void RegisterEvent(size_t eventId, const Message* prototype, const TEventSerializer serializer) {
+ FactoryItems_.insert(std::make_pair(eventId, TFactoryItem(prototype, serializer)));
+ }
+
+ size_t IdByName(TStringBuf eventname) {
+ DelayedRegistration();
+ for (TFactoryMap::const_iterator it = FactoryItems_.begin(); it != FactoryItems_.end(); ++it) {
+ if (it->second.GetName() == eventname)
+ return it->first;
+ }
+
+ ythrow yexception() << "do not know event '" << eventname << "'";
+ }
+
+ TStringBuf NameById(size_t id) {
+ DelayedRegistration();
+ TFactoryMap::const_iterator it = FactoryItems_.find(id);
+ return it != FactoryItems_.end() ? it->second.GetName() : TStringBuf();
+ }
+
+ Message* CreateEvent(size_t eventId) {
+ DelayedRegistration();
+ TFactoryMap::const_iterator it = FactoryItems_.find(eventId);
+
+ if (it != FactoryItems_.end()) {
+ return it->second.Create();
+ }
+
+ return nullptr;
+ }
+
+ const TMap<size_t, TFactoryItem>& FactoryItems() {
+ DelayedRegistration();
+ return FactoryItems_;
+ }
+
+ void PrintEvent(
+ size_t eventId,
+ const Message* event,
+ IOutputStream& output,
+ EFieldOutputFlags flags = {}) {
+ DelayedRegistration();
+ TFactoryMap::const_iterator it = FactoryItems_.find(eventId);
+
+ if (it != FactoryItems_.end()) {
+ it->second.PrintEvent(event, output, flags);
+ }
+ }
+
+ static TEventFactory* Instance() {
+ return Singleton<TEventFactory>();
+ }
+
+ private:
+ void DelayedRegistration() {
+ if (!DelayedRegistrationDone_) {
+ TGuard<TMutex> guard(MutexEventRegistrators_);
+ Y_UNUSED(guard);
+ while (!EventRegistrators_.empty()) {
+ EventRegistrators_.front()();
+ EventRegistrators_.pop_front();
+ }
+ DelayedRegistrationDone_ = true;
+ }
+ }
+
+ private:
+ TMap<size_t, TFactoryItem> FactoryItems_;
+ TDeque<TRegistrationFunc> EventRegistrators_;
+ NAtomic::TBool DelayedRegistrationDone_ = false;
+ TMutex MutexEventRegistrators_;
+ };
+
+ template <typename T>
+ void PrintAsBytes(const T& obj, IOutputStream& output) {
+ const ui8* b = reinterpret_cast<const ui8*>(&obj);
+ const ui8* e = b + sizeof(T);
+ const char* delim = "";
+
+ while (b != e) {
+ output << delim;
+ output << (int)*b++;
+ delim = ".";
+ }
+ }
+
+ template <typename T>
+ void PrintAsHex(const T& obj, IOutputStream& output) {
+ output << "0x";
+ output << HexEncode(&obj, sizeof(T));
+ }
+
+ inline void PrintAsBase64(TStringBuf data, IOutputStream& output) {
+ if (!data.empty()) {
+ output << Base64Encode(data);
+ }
+ }
+
+}
diff --git a/library/cpp/eventlog/iterator.cpp b/library/cpp/eventlog/iterator.cpp
new file mode 100644
index 00000000000..71f955bca82
--- /dev/null
+++ b/library/cpp/eventlog/iterator.cpp
@@ -0,0 +1,88 @@
+#include "iterator.h"
+
+#include <library/cpp/streams/growing_file_input/growing_file_input.h>
+
+#include <util/string/cast.h>
+#include <util/string/split.h>
+#include <util/string/type.h>
+#include <util/stream/file.h>
+
+using namespace NEventLog;
+
+namespace {
+ inline TIntrusivePtr<TEventFilter> ConstructEventFilter(bool enableEvents, const TString& evList, IEventFactory* fac) {
+ if (evList.empty()) {
+ return nullptr;
+ }
+
+ TVector<TString> events;
+
+ StringSplitter(evList).Split(',').SkipEmpty().Collect(&events);
+ if (events.empty()) {
+ return nullptr;
+ }
+
+ TIntrusivePtr<TEventFilter> filter(new TEventFilter(enableEvents));
+
+ for (const auto& event : events) {
+ if (IsNumber(event))
+ filter->AddEventClass(FromString<size_t>(event));
+ else
+ filter->AddEventClass(fac->ClassByName(event));
+ }
+
+ return filter;
+ }
+
+ struct TIterator: public IIterator {
+ inline TIterator(const TOptions& o, IEventFactory* fac)
+ : First(true)
+ {
+ if (o.FileName.size()) {
+ if (o.ForceStreamMode || o.TailFMode) {
+ FileInput.Reset(o.TailFMode ? (IInputStream*)new TGrowingFileInput(o.FileName) : (IInputStream*)new TUnbufferedFileInput(o.FileName));
+ FrameStream.Reset(new TFrameStreamer(*FileInput, fac, o.FrameFilter));
+ } else {
+ FrameStream.Reset(new TFrameStreamer(o.FileName, o.StartTime, o.EndTime, o.MaxRequestDuration, fac, o.FrameFilter));
+ }
+ } else {
+ FrameStream.Reset(new TFrameStreamer(*o.Input, fac, o.FrameFilter));
+ }
+
+ EvFilter = ConstructEventFilter(o.EnableEvents, o.EvList, fac);
+ EventStream.Reset(new TEventStreamer(*FrameStream, o.StartTime, o.EndTime, o.ForceStrongOrdering, EvFilter, o.ForceLosslessStrongOrdering));
+ }
+
+ TConstEventPtr Next() override {
+ if (First) {
+ First = false;
+
+ if (!EventStream->Avail()) {
+ return nullptr;
+ }
+ } else {
+ if (!EventStream->Next()) {
+ return nullptr;
+ }
+ }
+
+ return **EventStream;
+ }
+
+ THolder<IInputStream> FileInput;
+ THolder<TFrameStreamer> FrameStream;
+ TIntrusivePtr<TEventFilter> EvFilter;
+ THolder<TEventStreamer> EventStream;
+ bool First;
+ };
+}
+
+IIterator::~IIterator() = default;
+
+THolder<IIterator> NEventLog::CreateIterator(const TOptions& o, IEventFactory* fac) {
+ return MakeHolder<TIterator>(o, fac);
+}
+
+THolder<IIterator> NEventLog::CreateIterator(const TOptions& o) {
+ return MakeHolder<TIterator>(o, NEvClass::Factory());
+}
diff --git a/library/cpp/eventlog/iterator.h b/library/cpp/eventlog/iterator.h
new file mode 100644
index 00000000000..71a61ed5494
--- /dev/null
+++ b/library/cpp/eventlog/iterator.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include <util/stream/input.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/iterator.h>
+
+#include "eventlog.h"
+#include "logparser.h"
+
+namespace NEventLog {
+ struct TOptions {
+ inline TOptions& SetFileName(const TString& fileName) {
+ FileName = fileName;
+
+ return *this;
+ }
+
+ inline TOptions& SetForceStrongOrdering(bool v) {
+ if(!ForceLosslessStrongOrdering) {
+ ForceStrongOrdering = v;
+ }
+
+ return *this;
+ }
+
+ ui64 StartTime = MIN_START_TIME;
+ ui64 EndTime = MAX_END_TIME;
+ ui64 MaxRequestDuration = MAX_REQUEST_DURATION;
+ TString FileName;
+ bool ForceStrongOrdering = false;
+ bool ForceWeakOrdering = false;
+ bool EnableEvents = true;
+ TString EvList;
+ bool ForceStreamMode = false;
+ bool ForceLosslessStrongOrdering = false;
+ bool TailFMode = false;
+ IInputStream* Input = &Cin;
+ IFrameFilterRef FrameFilter;
+ };
+
+ class IIterator: public TInputRangeAdaptor<IIterator> {
+ public:
+ virtual ~IIterator();
+
+ virtual TConstEventPtr Next() = 0;
+ };
+
+ THolder<IIterator> CreateIterator(const TOptions& o);
+ THolder<IIterator> CreateIterator(const TOptions& o, IEventFactory* fac);
+}
diff --git a/library/cpp/eventlog/logparser.cpp b/library/cpp/eventlog/logparser.cpp
new file mode 100644
index 00000000000..6f8959f7888
--- /dev/null
+++ b/library/cpp/eventlog/logparser.cpp
@@ -0,0 +1,814 @@
+#include "logparser.h"
+#include "evdecoder.h"
+
+#include <util/stream/output.h>
+#include <util/stream/zlib.h>
+#include <util/digest/murmur.h>
+#include <util/generic/algorithm.h>
+#include <util/generic/scope.h>
+#include <util/generic/hash_set.h>
+#include <util/string/split.h>
+#include <util/string/cast.h>
+#include <util/string/escape.h>
+#include <util/string/builder.h>
+
+#include <contrib/libs/re2/re2/re2.h>
+
+#include <algorithm>
+#include <array>
+
+namespace {
+ bool FastforwardUntilSyncHeader(IInputStream* in) {
+ // Usually this function finds the correct header at the first hit
+ std::array<char, COMPRESSED_LOG_FRAME_SYNC_DATA.size()> buffer;
+ if (in->Load(buffer.data(), buffer.size()) != buffer.size()) {
+ return false;
+ }
+
+ auto begin = buffer.begin();
+
+ for (;;) {
+ if (std::mismatch(
+ begin, buffer.end(),
+ COMPRESSED_LOG_FRAME_SYNC_DATA.begin()).first == buffer.end() &&
+ std::mismatch(
+ buffer.begin(), begin,
+ COMPRESSED_LOG_FRAME_SYNC_DATA.begin() + (buffer.end() - begin)).first == begin) {
+ return true;
+ }
+ if (!in->ReadChar(*begin)) {
+ return false;
+ }
+ ++begin;
+ if (begin == buffer.end()) {
+ begin = buffer.begin();
+ }
+ }
+ }
+
+ bool HasCorrectChecksum(const TFrameHeader& header) {
+ // Calculating hash over all the fields of the read header except for the field with the hash of the header itself.
+ const size_t baseSize = sizeof(TCompressedFrameBaseHeader) + sizeof(TCompressedFrameHeader2) - sizeof(ui32);
+ const ui32 checksum = MurmurHash<ui32>(&header.Basehdr, baseSize);
+ return checksum == header.Framehdr.HeaderChecksum;
+ }
+
+ TMaybe<TFrameHeader> FindNextFrameHeader(IInputStream* in) {
+ for (;;) {
+ if (FastforwardUntilSyncHeader(in)) {
+ try {
+ return TFrameHeader(*in);
+ } catch (const TFrameLoadError& err) {
+ Cdbg << err.what() << Endl;
+ in->Skip(err.SkipAfter);
+ }
+ } else {
+ return Nothing();
+ }
+ }
+ }
+
+ std::pair<TMaybe<TFrameHeader>, TStringBuf> FindNextFrameHeader(TStringBuf span) {
+ for (;;) {
+ auto iter = std::search(
+ span.begin(), span.end(),
+ COMPRESSED_LOG_FRAME_SYNC_DATA.begin(), COMPRESSED_LOG_FRAME_SYNC_DATA.end());
+ const size_t offset = iter - span.begin();
+
+ if (offset != span.size()) {
+ span = span.substr(offset);
+ try {
+ TMemoryInput in(
+ span.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(),
+ span.size() - COMPRESSED_LOG_FRAME_SYNC_DATA.size());
+ return {TFrameHeader(in), span};
+ } catch (const TFrameLoadError& err) {
+ Cdbg << err.what() << Endl;
+ span = span.substr(err.SkipAfter);
+ }
+ } else {
+ return {Nothing(), {}};
+ }
+ }
+ }
+
+ size_t FindFrames(const TStringBuf span, ui64 start, ui64 end, ui64 maxRequestDuration) {
+ Y_ENSURE(start <= end);
+
+ const auto leftTimeBound = start - Min(start, maxRequestDuration);
+ const auto rightTimeBound = end + Min(maxRequestDuration, Max<ui64>() - end);
+
+ TStringBuf subspan = span;
+ TMaybe<TFrameHeader> maybeLeftFrame;
+ std::tie(maybeLeftFrame, subspan) = FindNextFrameHeader(subspan);
+
+ if (!maybeLeftFrame || maybeLeftFrame->EndTime() > rightTimeBound) {
+ return span.size();
+ }
+
+ if (maybeLeftFrame->StartTime() > leftTimeBound) {
+ return 0;
+ }
+
+ while (subspan.size() > maybeLeftFrame->FullLength()) {
+ const auto mid = subspan.data() + subspan.size() / 2;
+ auto [midFrame, rightHalfSpan] = FindNextFrameHeader({mid, subspan.data() + subspan.size()});
+ if (!midFrame) {
+ // If mid is in the middle of the last frame, here we will lose it meaning that
+ // we will find previous frame as the result.
+ // This is fine because we will iterate frames starting from that.
+ subspan = subspan.substr(0, subspan.size() / 2);
+ continue;
+ }
+ if (midFrame->StartTime() <= leftTimeBound) {
+ maybeLeftFrame = midFrame;
+ subspan = rightHalfSpan;
+ } else {
+ subspan = subspan.substr(0, subspan.size() / 2);
+ }
+ }
+
+ return subspan.data() - span.data();
+ }
+}
+
+TFrameHeader::TFrameHeader(IInputStream& in) {
+ try {
+ ::Load(&in, Basehdr);
+
+ Y_ENSURE(Basehdr.Length, "Empty frame additional data");
+
+ ::Load(&in, Framehdr);
+ switch (LogFormat()) {
+ case COMPRESSED_LOG_FORMAT_V1:
+ break;
+
+ case COMPRESSED_LOG_FORMAT_V2:
+ case COMPRESSED_LOG_FORMAT_V3:
+ case COMPRESSED_LOG_FORMAT_V4:
+ case COMPRESSED_LOG_FORMAT_V5:
+ Y_ENSURE(!Framehdr.CompressorVersion, "Wrong compressor");
+
+ Y_ENSURE(HasCorrectChecksum(*this), "Wrong header checksum");
+ break;
+
+ default:
+ ythrow yexception() << "Unsupported log structure format";
+ };
+
+ Y_ENSURE(Framehdr.StartTimestamp <= Framehdr.EndTimestamp, "Wrong start/end timestamps");
+
+ // Each frame must contain at least one event.
+ Y_ENSURE(Framehdr.UncompressedDatalen, "Empty frame payload");
+ } catch (...) {
+ TString location = "";
+ if (const auto* cnt = dynamic_cast<TCountingInput *>(&in)) {
+ location = "@ " + ToString(cnt->Counter());
+ }
+ ythrow TFrameLoadError(FrameLength()) << "Frame Load Error" << location << ": " << CurrentExceptionMessage();
+ }
+}
+
+TFrame::TFrame(IInputStream& in, TFrameHeader header, IEventFactory* fac)
+ : TFrameHeader(header)
+ , Limiter_(MakeHolder<TLengthLimitedInput>(&in, header.FrameLength()))
+ , Fac_(fac)
+{
+ if (auto* cnt = dynamic_cast<TCountingInput *>(&in)) {
+ Address_ = cnt->Counter() - sizeof(TFrameHeader);
+ } else {
+ Address_ = 0;
+ }
+}
+
+TFrame::TIterator TFrame::GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter) const {
+ if (EventsCache_.empty()) {
+ for (TFrameDecoder decoder{*this, eventFilter.Get()}; decoder.Avail(); decoder.Next()) {
+ EventsCache_.emplace_back(*decoder);
+ }
+ }
+
+ return TIterator(*this, eventFilter);
+}
+
+void TFrame::ClearEventsCache() const {
+ EventsCache_.clear();
+}
+
+TString TFrame::GetCompressedFrame() const {
+ const auto left = Limiter_->Left();
+ TString payload = Limiter_->ReadAll();
+ Y_ENSURE(payload.size() == left, "Could not read frame payload: premature end of stream");
+ const ui32 checksum = MurmurHash<ui32>(payload.data(), payload.size());
+ Y_ENSURE(checksum == Framehdr.PayloadChecksum, "Invalid frame checksum");
+
+ return payload;
+}
+
+TString TFrame::GetRawFrame() const {
+ TString frameBuf = GetCompressedFrame();
+ TStringInput sin(frameBuf);
+ return TZLibDecompress{&sin}.ReadAll();
+}
+
+TFrame::TIterator::TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter)
+ : Frame_(frame)
+ , Size_(frame.EventsCache_.size())
+ , Filter_(filter)
+ , Index_(0)
+{
+ SkipToValidEvent();
+}
+
+TConstEventPtr TFrame::TIterator::operator*() const {
+ return Frame_.GetEvent(Index_);
+}
+
+bool TFrame::TIterator::Next() {
+ Index_++;
+ SkipToValidEvent();
+ return Index_ < Size_;
+}
+
+void TFrame::TIterator::SkipToValidEvent() {
+ if (!Filter_) {
+ return;
+ }
+
+ for (; Index_ < Size_; ++Index_) {
+ if (Filter_->EventAllowed(Frame_.GetEvent(Index_)->Class)) {
+ break;
+ }
+ }
+}
+
+TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory* eventFactory) {
+ if (auto header = FindNextFrameHeader(in)) {
+ return TFrame{*in, *header, eventFactory};
+ } else {
+ return Nothing();
+ }
+}
+
+TContainsEventFrameFilter::TContainsEventFrameFilter(const TString& unparsedMatchGroups, const IEventFactory* eventFactory) {
+ TVector<TStringBuf> tokens;
+
+ SplitWithEscaping(tokens, unparsedMatchGroups, "/");
+
+ // Amount of match groups
+ size_t size = tokens.size();
+ MatchGroups.resize(size);
+
+ for (size_t i = 0; i < size; i++) {
+ TMatchGroup& group = MatchGroups[i];
+ TVector<TStringBuf> groupTokens;
+ SplitWithEscaping(groupTokens, tokens[i], ":");
+
+ Y_ENSURE(groupTokens.size() == 3);
+
+ try {
+ group.EventID = eventFactory->ClassByName(groupTokens[0]);
+ } catch (yexception& e) {
+ if (!TryFromString<TEventClass>(groupTokens[0], group.EventID)) {
+ e << "\nAppend:\n" << "Cannot derive EventId from EventType: " << groupTokens[0];
+ throw e;
+ }
+ }
+
+ group.FieldName = groupTokens[1];
+ group.ValueToMatch = UnescapeCharacters(groupTokens[2], "/:");
+ }
+}
+
+bool TContainsEventFrameFilter::FrameAllowed(const TFrame& frame) const {
+ THashSet<size_t> toMatchSet;
+ for (size_t i = 0; i < MatchGroups.size(); i++) {
+ toMatchSet.insert(i);
+ }
+
+ for (auto it = frame.GetIterator(); it.Avail(); it.Next()) {
+ TConstEventPtr event(*it);
+ TVector<size_t> indicesToErase;
+
+ if (!toMatchSet.empty()) {
+ const NProtoBuf::Message* message = event->GetProto();
+ const google::protobuf::Descriptor* descriptor = message->GetDescriptor();
+ const google::protobuf::Reflection* reflection = message->GetReflection();
+
+ Y_ENSURE(descriptor);
+ Y_ENSURE(reflection);
+
+ for (size_t groupIndex : toMatchSet) {
+ const TMatchGroup& group = MatchGroups[groupIndex];
+
+ if (event->Class == group.EventID) {
+ TVector<TString> parts = StringSplitter(group.FieldName).Split('.').ToList<TString>();
+ TString lastPart = std::move(parts.back());
+ parts.pop_back();
+
+ for (auto part : parts) {
+ auto fieldDescriptor = descriptor->FindFieldByName(part);
+ Y_ENSURE(fieldDescriptor, "Cannot find field \"" + part + "\". Full fieldname is \"" + group.FieldName + "\".");
+
+ message = &reflection->GetMessage(*message, fieldDescriptor);
+ descriptor = message->GetDescriptor();
+ reflection = message->GetReflection();
+
+ Y_ENSURE(descriptor);
+ Y_ENSURE(reflection);
+ }
+
+ const google::protobuf::FieldDescriptor* fieldDescriptor = descriptor->FindFieldByName(lastPart);
+ Y_ENSURE(fieldDescriptor, "Cannot find field \"" + lastPart + "\". Full fieldname is \"" + group.FieldName + "\".");
+
+ TString fieldValue = GetEventFieldAsString(message, fieldDescriptor, reflection);
+ if (re2::RE2::FullMatch(fieldValue, group.ValueToMatch)) {
+ indicesToErase.push_back(groupIndex);
+ }
+ }
+ }
+
+ for (size_t idx : indicesToErase) {
+ toMatchSet.erase(idx);
+ }
+
+ if (toMatchSet.empty()) {
+ return true;
+ }
+ }
+ }
+
+ return toMatchSet.empty();
+}
+
+void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet) {
+ size_t tokenStart = 0;
+ const TString characterSet = TString::Join("\\", externalCharacterSet);
+
+ for (size_t position = stringToSplit.find_first_of(characterSet); position != TString::npos; position = stringToSplit.find_first_of(characterSet, position + 1)) {
+ if (stringToSplit[position] == '\\') {
+ position++;
+ } else {
+ if (tokenStart != position) {
+ tokens.push_back(TStringBuf(stringToSplit, tokenStart, position - tokenStart));
+ }
+ tokenStart = position + 1;
+ }
+ }
+
+ if (tokenStart < stringToSplit.size()) {
+ tokens.push_back(TStringBuf(stringToSplit, tokenStart, stringToSplit.size() - tokenStart));
+ }
+}
+
+TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet) {
+ TStringBuilder stringBuilder;
+ size_t tokenStart = 0;
+
+ for (size_t position = stringToUnescape.find('\\', 0u); position != TString::npos; position = stringToUnescape.find('\\', position + 2)) {
+ if (position + 1 < stringToUnescape.size() && characterSet.find(stringToUnescape[position + 1]) != TString::npos) {
+ stringBuilder << TStringBuf(stringToUnescape, tokenStart, position - tokenStart);
+ tokenStart = position + 1;
+ }
+ }
+
+ if (tokenStart < stringToUnescape.size()) {
+ stringBuilder << TStringBuf(stringToUnescape, tokenStart, stringToUnescape.size() - tokenStart);
+ }
+
+ return stringBuilder;
+}
+
+TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection) {
+ Y_ENSURE(message);
+ Y_ENSURE(fieldDescriptor);
+ Y_ENSURE(reflection);
+
+ TString result;
+ switch (fieldDescriptor->type()) {
+ case google::protobuf::FieldDescriptor::Type::TYPE_DOUBLE:
+ result = ToString(reflection->GetDouble(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_FLOAT:
+ result = ToString(reflection->GetFloat(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_BOOL:
+ result = ToString(reflection->GetBool(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_INT32:
+ result = ToString(reflection->GetInt32(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_UINT32:
+ result = ToString(reflection->GetUInt32(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_INT64:
+ result = ToString(reflection->GetInt64(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_UINT64:
+ result = ToString(reflection->GetUInt64(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_STRING:
+ result = ToString(reflection->GetString(*message, fieldDescriptor));
+ break;
+ case google::protobuf::FieldDescriptor::Type::TYPE_ENUM:
+ {
+ const NProtoBuf::EnumValueDescriptor* enumValueDescriptor = reflection->GetEnum(*message, fieldDescriptor);
+ result = ToString(enumValueDescriptor->name());
+ }
+ break;
+ default:
+ throw yexception() << "GetEventFieldAsString for type " << fieldDescriptor->type_name() << " is not implemented.";
+ }
+ return result;
+}
+
+TFrameStreamer::TFrameStreamer(IInputStream& s, IEventFactory* fac, IFrameFilterRef ff)
+ : In_(&s)
+ , FrameFilter_(ff)
+ , EventFactory_(fac)
+{
+ Frame_ = FindNextFrame(&In_, EventFactory_);
+
+ SkipToAllowedFrame();
+}
+
+TFrameStreamer::TFrameStreamer(
+ const TString& fileName,
+ ui64 startTime,
+ ui64 endTime,
+ ui64 maxRequestDuration,
+ IEventFactory* fac,
+ IFrameFilterRef ff)
+ : File_(TBlob::FromFile(fileName))
+ , MemoryIn_(File_.Data(), File_.Size())
+ , In_(&MemoryIn_)
+ , StartTime_(startTime)
+ , EndTime_(endTime)
+ , CutoffTime_(endTime + Min(maxRequestDuration, Max<ui64>() - endTime))
+ , FrameFilter_(ff)
+ , EventFactory_(fac)
+{
+ In_.Skip(FindFrames(File_.AsStringBuf(), startTime, endTime, maxRequestDuration));
+ Frame_ = FindNextFrame(&In_, fac);
+ SkipToAllowedFrame();
+}
+
+TFrameStreamer::~TFrameStreamer() = default;
+
+bool TFrameStreamer::Avail() const {
+ return Frame_.Defined();
+}
+
+const TFrame& TFrameStreamer::operator*() const {
+ Y_ENSURE(Frame_, "Frame streamer depleted");
+
+ return *Frame_;
+}
+
+bool TFrameStreamer::Next() {
+ DoNext();
+ SkipToAllowedFrame();
+
+ return Frame_.Defined();
+}
+
+bool TFrameStreamer::AllowedTimeRange(const TFrame& frame) const {
+ const bool allowedStartTime = (StartTime_ == 0) || ((StartTime_ <= frame.StartTime()) && (frame.StartTime() <= EndTime_));
+ const bool allowedEndTime = (EndTime_ == 0) || ((StartTime_ <= frame.EndTime()) && (frame.EndTime() <= EndTime_));
+ return allowedStartTime || allowedEndTime;
+}
+
+bool TFrameStreamer::DoNext() {
+ if (!Frame_) {
+ return false;
+ }
+ In_.Skip(Frame_->Limiter_->Left());
+ Frame_ = FindNextFrame(&In_, EventFactory_);
+
+ if (Frame_ && CutoffTime_ > 0 && Frame_->EndTime() > CutoffTime_) {
+ Frame_.Clear();
+ }
+
+ return Frame_.Defined();
+}
+
+namespace {
+ struct TDecodeBuffer {
+ TDecodeBuffer(const TString codec, IInputStream& src, size_t bs) {
+ TBuffer from(bs);
+
+ {
+ TBufferOutput b(from);
+ TransferData(&src, &b);
+ }
+
+ NBlockCodecs::Codec(codec)->Decode(from, DecodeBuffer);
+ }
+
+ explicit TDecodeBuffer(IInputStream& src) {
+ TBufferOutput b(DecodeBuffer);
+ TransferData(&src, &b);
+ }
+
+ TBuffer DecodeBuffer;
+ };
+
+ class TBlockCodecStream: private TDecodeBuffer, public TBufferInput {
+ public:
+ TBlockCodecStream(const TString codec, IInputStream& src, size_t bs)
+ : TDecodeBuffer(codec, src, bs)
+ , TBufferInput(DecodeBuffer)
+ {}
+
+ explicit TBlockCodecStream(IInputStream& src)
+ : TDecodeBuffer(src)
+ , TBufferInput(DecodeBuffer)
+ {}
+ };
+}
+
+TFrameDecoder::TFrameDecoder(const TFrame& fr, const TEventFilter* const filter, bool strict, bool withRawData)
+ : Frame_(fr)
+ , Event_(nullptr)
+ , Flt_(filter)
+ , Fac_(fr.Fac_)
+ , EndOfFrame_(new TEndOfFrameEvent(Frame_.EndTime()))
+ , Strict_(strict)
+ , WithRawData_(withRawData)
+{
+ switch (fr.LogFormat()) {
+ case COMPRESSED_LOG_FORMAT_V2:
+ case COMPRESSED_LOG_FORMAT_V3:
+ case COMPRESSED_LOG_FORMAT_V4:
+ case COMPRESSED_LOG_FORMAT_V5: {
+ const auto payload = fr.GetCompressedFrame();
+ TMemoryInput payloadInput{payload};
+
+ if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V5) {
+ Decompressor_.Reset(new TBlockCodecStream("zstd_1", payloadInput, payload.size()));
+ } else {
+ TZLibDecompress zlib(&payloadInput);
+ Decompressor_.Reset(new TBlockCodecStream(zlib));
+ if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V4) {
+ Decompressor_.Reset(new TBlockCodecStream("lz4hc", *Decompressor_, payload.size()));
+ }
+ }
+
+ break;
+ }
+
+ default:
+ ythrow yexception() << "unsupported log format: " << fr.LogFormat() << Endl;
+ break;
+ };
+
+ if (WithRawData_) {
+ TBufferOutput out(UncompressedData_);
+ TLengthLimitedInput limiter(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen);
+
+ TransferData(&limiter, &out);
+ Decompressor_.Reset(new TMemoryInput(UncompressedData_.data(), UncompressedData_.size()));
+ }
+
+ Limiter_.Reset(new TLengthLimitedInput(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen));
+
+ Decode();
+}
+
+TFrameDecoder::~TFrameDecoder() = default;
+
+bool TFrameDecoder::Avail() const {
+ return HaveData();
+}
+
+TConstEventPtr TFrameDecoder::operator*() const {
+ Y_ENSURE(HaveData(), "Decoder depleted");
+
+ return Event_;
+}
+
+bool TFrameDecoder::Next() {
+ if (HaveData()) {
+ Decode();
+ }
+
+ return HaveData();
+}
+
+void TFrameDecoder::Decode() {
+ Event_ = nullptr;
+ const bool framed = (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V3) || (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V4 || Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V5);
+
+ size_t evBegin = 0;
+ size_t evEnd = 0;
+ if (WithRawData_)
+ evBegin = UncompressedData_.Size() - Limiter_->Left();
+
+ while (Limiter_->Left() && !(Event_ = DecodeEvent(*Limiter_, framed, Frame_.Address(), Flt_, Fac_, Strict_).Release())) {
+ }
+
+ if (WithRawData_) {
+ evEnd = UncompressedData_.Size() - Limiter_->Left();
+ RawEventData_ = TStringBuf(UncompressedData_.data() + evBegin, UncompressedData_.data() + evEnd);
+ }
+
+ if (!Event_ && (!Flt_ || (Flt_->EventAllowed(TEndOfFrameEvent::EventClass)))) {
+ Event_ = EndOfFrame_.Release();
+ }
+
+ if (!!Event_) {
+ Event_->FrameId = Frame_.FrameId();
+ }
+}
+
+const TStringBuf TFrameDecoder::GetRawEvent() const {
+ return RawEventData_;
+}
+
+TEventStreamer::TEventStreamer(TFrameStream& fs, ui64 s, ui64 e, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering)
+ : Frames_(fs)
+ , Start_(s)
+ , End_(e)
+ , MaxEndTimestamp_(0)
+ , Frontier_(0)
+ , StrongOrdering_(strongOrdering)
+ , LosslessStrongOrdering_(losslessStrongOrdering)
+ , EventFilter_(filter)
+{
+
+ if (Start_ > End_) {
+ ythrow yexception() << "Wrong main interval";
+ }
+
+ TEventStreamer::Next();
+}
+
+TEventStreamer::~TEventStreamer() = default;
+
+bool TEventStreamer::Avail() const {
+ return Events_.Avail() && (*Events_)->Timestamp <= Frontier_;
+}
+
+TConstEventPtr TEventStreamer::operator*() const {
+ Y_ENSURE(TEventStreamer::Avail(), "Event streamer depleted");
+
+ return *Events_;
+}
+
+bool TEventStreamer::Next() {
+ if (Events_.Avail() && Events_.Next() && (*Events_)->Timestamp <= Frontier_) {
+ return true;
+ }
+
+ for (;;) {
+ if (!LoadMoreEvents()) {
+ return false;
+ }
+
+ if (TEventStreamer::Avail()) {
+ return true;
+ }
+ }
+}
+
+/*
+Two parameters are used in the function:
+Frontier - the moment of time up to which inclusively all the log events made their way
+ into the buffer (and might have been already extracted out of it).
+Horizon - the moment of time, that equals to Frontier + MAX_REQUEST_DURATION.
+In order to get all the log events up to the Frontier inclusively,
+ frames need to be read until "end time" of the current frame exceeds the Horizon.
+*/
+bool TEventStreamer::LoadMoreEvents() {
+ if (!Frames_.Avail()) {
+ return false;
+ }
+
+ const TFrame& fr1 = *Frames_;
+ const ui64 maxRequestDuration = (StrongOrdering_ ? MAX_REQUEST_DURATION : 0);
+
+ if (fr1.EndTime() <= Frontier_ + maxRequestDuration) {
+ ythrow yexception() << "Wrong frame stream state";
+ }
+
+ if (Frontier_ >= End_) {
+ return false;
+ }
+
+ const ui64 old_frontier = Frontier_;
+ Frontier_ = fr1.EndTime();
+
+ {
+ Y_DEFER {
+ Events_.Reorder(StrongOrdering_);
+ };
+
+ for (; Frames_.Avail(); Frames_.Next()) {
+ const TFrame& fr2 = *Frames_;
+
+ // Frames need to start later than the Frontier.
+ if (StrongOrdering_ && fr2.StartTime() <= old_frontier) {
+ Cdbg << "Invalid frame encountered" << Endl;
+ continue;
+ }
+
+ if (fr2.EndTime() > MaxEndTimestamp_) {
+ MaxEndTimestamp_ = fr2.EndTime();
+ }
+
+ if (fr2.EndTime() > Frontier_ + maxRequestDuration && !LosslessStrongOrdering_) {
+ return true;
+ }
+
+ // Checking for the frame to be within the main time borders.
+ if (fr2.EndTime() >= Start_ && fr2.StartTime() <= End_) {
+ TransferEvents(fr2);
+ }
+ }
+ }
+
+ Frontier_ = MaxEndTimestamp_;
+
+ return true;
+}
+
+void TEventStreamer::TransferEvents(const TFrame& fr) {
+ Events_.SetCheckpoint();
+
+ try {
+ for (auto it = fr.GetIterator(EventFilter_); it.Avail(); it.Next()) {
+ TConstEventPtr ev = *it;
+
+ if (ev->Timestamp > fr.EndTime() || ev->Timestamp < fr.StartTime()) {
+ ythrow TInvalidEventTimestamps() << "Event timestamp out of frame range";
+ }
+
+ if (ev->Timestamp >= Start_ && ev->Timestamp <= End_) {
+ Events_.Append(ev, StrongOrdering_);
+ }
+ }
+ } catch (const TInvalidEventTimestamps& err) {
+ Events_.Rollback();
+ Cdbg << "EventsTransfer error: InvalidEventTimestamps: " << err.what() << Endl;
+ } catch (const TFrameLoadError& err) {
+ Events_.Rollback();
+ Cdbg << "EventsTransfer error: " << err.what() << Endl;
+ } catch (const TEventDecoderError& err) {
+ Events_.Rollback();
+ Cdbg << "EventsTransfer error: EventDecoder error: " << err.what() << Endl;
+ } catch (const TZLibDecompressorError& err) {
+ Events_.Rollback();
+ Cdbg << "EventsTransfer error: ZLibDecompressor error: " << err.what() << Endl;
+ } catch (...) {
+ Events_.Rollback();
+ throw;
+ }
+}
+
+void TEventStreamer::TEventBuffer::SetCheckpoint() {
+ BufLen_ = Buffer_.size();
+}
+
+void TEventStreamer::TEventBuffer::Rollback() {
+ Buffer_.resize(BufLen_);
+}
+
+void TEventStreamer::TEventBuffer::Reorder(bool strongOrdering) {
+ SetCheckpoint();
+
+ std::reverse(Buffer_.begin(), Buffer_.end());
+
+ if (strongOrdering) {
+ StableSort(Buffer_.begin(), Buffer_.end(), [&](const auto& a, const auto& b) {
+ return (a->Timestamp > b->Timestamp) ||
+ ((a->Timestamp == b->Timestamp) && !a->Class && b->Class);
+ });
+ }
+}
+
+void TEventStreamer::TEventBuffer::Append(TConstEventPtr ev, bool strongOrdering) {
+ // Events in buffer output must be in an ascending order.
+ Y_ENSURE(!strongOrdering || ev->Timestamp >= LastTimestamp_, "Trying to append out-of-order event");
+
+ Buffer_.push_back(std::move(ev));
+}
+
+bool TEventStreamer::TEventBuffer::Avail() const {
+ return !Buffer_.empty();
+}
+
+TConstEventPtr TEventStreamer::TEventBuffer::operator*() const {
+ Y_ENSURE(!Buffer_.empty(), "Event buffer is empty");
+
+ return Buffer_.back();
+}
+
+bool TEventStreamer::TEventBuffer::Next() {
+ if (!Buffer_.empty()) {
+ LastTimestamp_ = Buffer_.back()->Timestamp;
+ Buffer_.pop_back();
+ return !Buffer_.empty();
+ } else {
+ return false;
+ }
+}
diff --git a/library/cpp/eventlog/logparser.h b/library/cpp/eventlog/logparser.h
new file mode 100644
index 00000000000..f819e725894
--- /dev/null
+++ b/library/cpp/eventlog/logparser.h
@@ -0,0 +1,343 @@
+#pragma once
+
+#include <util/generic/ptr.h>
+#include <util/generic/yexception.h>
+#include <util/generic/vector.h>
+#include <util/generic/set.h>
+#include <util/generic/maybe.h>
+#include <util/memory/blob.h>
+#include <util/stream/length.h>
+#include <util/stream/mem.h>
+
+#include "eventlog_int.h"
+#include "eventlog.h"
+#include "common.h"
+
+class IInputStream;
+
+static const ui64 MAX_REQUEST_DURATION = 60'000'000;
+static const ui64 MIN_START_TIME = MAX_REQUEST_DURATION;
+static const ui64 MAX_END_TIME = ((ui64)-1) - MAX_REQUEST_DURATION;
+
+class TEventFilter: public TSet<TEventClass>, public TSimpleRefCount<TEventFilter> {
+public:
+ TEventFilter(bool enableEvents)
+ : Enable_(enableEvents)
+ {
+ }
+
+ void AddEventClass(TEventClass cls) {
+ insert(cls);
+ }
+
+ bool EventAllowed(TEventClass cls) const {
+ bool found = (find(cls) != end());
+
+ return Enable_ == found;
+ }
+
+private:
+ bool Enable_;
+};
+
+using TEventStream = TPacketInputStream<TConstEventPtr>;
+
+struct TFrameHeader {
+ // Reads header from the stream. The caller must make sure that the
+ // sync data is present just befor the stream position.
+ explicit TFrameHeader(IInputStream& in);
+
+ ui64 StartTime() const {
+ return Framehdr.StartTimestamp;
+ }
+
+ ui64 EndTime() const {
+ return Framehdr.EndTimestamp;
+ }
+
+ ui32 FrameId() const {
+ return Basehdr.FrameId;
+ }
+
+ ui64 Duration() const {
+ return EndTime() - StartTime();
+ }
+
+ TEventLogFormat ContentFormat() const {
+ return Basehdr.Format & 0xffffff;
+ }
+
+ TEventLogFormat LogFormat() const {
+ return Basehdr.Format >> 24;
+ }
+
+ ui64 FrameLength() const {
+ return Basehdr.Length - sizeof(TCompressedFrameHeader2);
+ }
+
+ // Length including the header
+ ui64 FullLength() const {
+ return sizeof(*this) + FrameLength();
+ }
+
+ TCompressedFrameBaseHeader Basehdr;
+ TCompressedFrameHeader2 Framehdr;
+};
+
+struct TFrameLoadError: public yexception {
+ explicit TFrameLoadError(size_t skipAfter)
+ : SkipAfter(skipAfter)
+ {}
+
+ size_t SkipAfter;
+};
+
+class TFrame : public TFrameHeader {
+public:
+ // Reads the frame after the header has been read.
+ TFrame(IInputStream& in, TFrameHeader header, IEventFactory*);
+
+ TString GetRawFrame() const;
+ TString GetCompressedFrame() const;
+
+ ui64 Address() const { return Address_; }
+
+private:
+ const TConstEventPtr& GetEvent(size_t index) const {
+ return EventsCache_[index];
+ }
+
+ void ClearEventsCache() const;
+
+ THolder<TLengthLimitedInput> Limiter_;
+ mutable TVector<TConstEventPtr> EventsCache_;
+
+ IEventFactory* Fac_;
+ ui64 Address_;
+
+ friend class TFrameDecoder;
+ friend class TFrameStreamer;
+
+private:
+ class TIterator: TEventStream {
+ public:
+ TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter);
+ ~TIterator() override = default;
+
+ bool Avail() const override {
+ return Index_ < Size_;
+ }
+
+ TConstEventPtr operator*() const override;
+ bool Next() override;
+
+ private:
+ void SkipToValidEvent();
+
+ const TFrame& Frame_;
+ size_t Size_;
+ TIntrusiveConstPtr<TEventFilter> Filter_;
+ size_t Index_;
+ };
+
+public:
+ TFrame::TIterator GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter = nullptr) const;
+};
+
+// If `in` is derived from TCountingInput, Frame's address will
+// be set accorting to the in->Counter(). Otherwise it will be zeroO
+TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory*);
+
+using TFrameStream = TPacketInputStream<const TFrame&>;
+
+class IFrameFilter: public TSimpleRefCount<IFrameFilter> {
+public:
+ IFrameFilter() {
+ }
+
+ virtual ~IFrameFilter() = default;
+
+ virtual bool FrameAllowed(const TFrame& frame) const = 0;
+};
+
+using IFrameFilterRef = TIntrusivePtr<IFrameFilter>;
+
+class TDurationFrameFilter: public IFrameFilter {
+public:
+ TDurationFrameFilter(ui64 minFrameDuration, ui64 maxFrameDuration = Max<ui64>())
+ : MinDuration_(minFrameDuration)
+ , MaxDuration_(maxFrameDuration)
+ {
+ }
+
+ bool FrameAllowed(const TFrame& frame) const override {
+ return frame.Duration() >= MinDuration_ && frame.Duration() <= MaxDuration_;
+ }
+
+private:
+ const ui64 MinDuration_;
+ const ui64 MaxDuration_;
+};
+
+class TFrameIdFrameFilter: public IFrameFilter {
+public:
+ TFrameIdFrameFilter(ui32 frameId)
+ : FrameId_(frameId)
+ {
+ }
+
+ bool FrameAllowed(const TFrame& frame) const override {
+ return frame.FrameId() == FrameId_;
+ }
+
+private:
+ const ui32 FrameId_;
+};
+
+class TContainsEventFrameFilter: public IFrameFilter {
+public:
+ TContainsEventFrameFilter(const TString& args, const IEventFactory* fac);
+
+ bool FrameAllowed(const TFrame& frame) const override;
+
+private:
+ struct TMatchGroup {
+ TEventClass EventID;
+ TString FieldName;
+ TString ValueToMatch;
+ };
+
+ TVector<TMatchGroup> MatchGroups;
+};
+
+void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet);
+
+TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet);
+
+TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection);
+
+class TFrameStreamer: public TFrameStream {
+public:
+ TFrameStreamer(IInputStream&, IEventFactory* fac, IFrameFilterRef ff = nullptr);
+ TFrameStreamer(
+ const TString& fileName,
+ ui64 startTime,
+ ui64 endTime,
+ ui64 maxRequestDuration,
+ IEventFactory* fac,
+ IFrameFilterRef ff = nullptr);
+ ~TFrameStreamer() override;
+
+ bool Avail() const override;
+ const TFrame& operator*() const override;
+ bool Next() override;
+
+private:
+ bool DoNext();
+ bool AllowedTimeRange(const TFrame& frame) const;
+
+ bool AllowedFrame(const TFrame& frame) const {
+ return AllowedTimeRange(frame) && (!FrameFilter_ || FrameFilter_->FrameAllowed(frame));
+ }
+
+ void SkipToAllowedFrame() {
+ if (Frame_) {
+ while (!AllowedFrame(*Frame_) && DoNext()) {
+ //do nothing
+ }
+ }
+ }
+
+ TBlob File_;
+ TMemoryInput MemoryIn_;
+ TCountingInput In_;
+ THolder<IInputStream> Stream_;
+ ui64 StartTime_ = 0;
+ ui64 EndTime_ = 0;
+ ui64 CutoffTime_ = 0;
+ TMaybe<TFrame> Frame_;
+ IFrameFilterRef FrameFilter_;
+ IEventFactory* EventFactory_;
+};
+
+class TFrameDecoder: TEventStream {
+public:
+ TFrameDecoder(const TFrame&, const TEventFilter* const filter, bool strict = false, bool withRawData = false);
+ ~TFrameDecoder() override;
+
+ bool Avail() const override;
+
+ TConstEventPtr operator*() const override;
+ bool Next() override;
+
+ const TStringBuf GetRawEvent() const;
+
+private:
+ TFrameDecoder(const TFrameDecoder&);
+ void operator=(const TFrameDecoder&);
+
+ inline bool HaveData() const {
+ return Event_ != nullptr;
+ }
+
+ void Decode();
+
+private:
+ const TFrame& Frame_;
+ THolder<IInputStream> Decompressor_;
+ THolder<TLengthLimitedInput> Limiter_;
+ TEventPtr Event_;
+ const TEventFilter* const Flt_;
+ IEventFactory* Fac_;
+ THolder<TEvent> EndOfFrame_;
+ bool Strict_;
+ TBuffer UncompressedData_;
+ TStringBuf RawEventData_;
+ bool WithRawData_;
+};
+
+class TEventStreamer: public TEventStream {
+public:
+ TEventStreamer(TFrameStream&, ui64 start, ui64 end, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering = false);
+ ~TEventStreamer() override;
+
+ bool Avail() const override;
+ TConstEventPtr operator*() const override;
+ bool Next() override;
+
+private:
+ class TEventBuffer: public TEventStream {
+ public:
+ void SetCheckpoint();
+ void Rollback();
+ void Reorder(bool strongOrdering);
+ void Append(TConstEventPtr event, bool strongOrdering);
+
+ bool Avail() const override;
+ TConstEventPtr operator*() const override;
+ bool Next() override;
+
+ private:
+ TVector<TConstEventPtr> Buffer_;
+ size_t BufLen_ = 0;
+ ui64 LastTimestamp_ = 0;
+ };
+
+private:
+ struct TInvalidEventTimestamps: public yexception {
+ };
+
+ bool LoadMoreEvents();
+ void TransferEvents(const TFrame&);
+
+private:
+ TFrameStream& Frames_;
+ TEventBuffer Events_;
+
+ ui64 Start_, End_;
+ ui64 MaxEndTimestamp_;
+ ui64 Frontier_;
+ bool StrongOrdering_;
+ bool LosslessStrongOrdering_;
+ TIntrusivePtr<TEventFilter> EventFilter_;
+};
diff --git a/library/cpp/eventlog/proto/events_extension.proto b/library/cpp/eventlog/proto/events_extension.proto
new file mode 100644
index 00000000000..37a7d7e5614
--- /dev/null
+++ b/library/cpp/eventlog/proto/events_extension.proto
@@ -0,0 +1,22 @@
+import "google/protobuf/descriptor.proto";
+
+option go_package = "a.yandex-team.ru/library/cpp/eventlog/proto;extensions";
+option java_package = "NEventLogEventsExtension";
+
+extend google.protobuf.MessageOptions {
+ optional uint32 message_id = 50001;
+ optional string realm_name = 50002;
+}
+
+message Repr {
+ enum ReprType {
+ none = 0;
+ as_bytes = 1; // Only for primitive types
+ as_hex = 2; // Only for primitive types
+ as_base64 = 3; // Only for 'string' and 'bytes' fields
+ };
+}
+
+extend google.protobuf.FieldOptions {
+ optional Repr.ReprType repr = 55003 [default = none];
+}
diff --git a/library/cpp/eventlog/proto/internal.proto b/library/cpp/eventlog/proto/internal.proto
new file mode 100644
index 00000000000..234230e0949
--- /dev/null
+++ b/library/cpp/eventlog/proto/internal.proto
@@ -0,0 +1,9 @@
+option go_package = "a.yandex-team.ru/library/cpp/eventlog/proto;extensions";
+
+package NEventLogInternal;
+
+message TUnknownEvent {
+};
+
+message TEndOfFrameEvent {
+};
diff --git a/library/cpp/eventlog/threaded_eventlog.cpp b/library/cpp/eventlog/threaded_eventlog.cpp
new file mode 100644
index 00000000000..67839063fbd
--- /dev/null
+++ b/library/cpp/eventlog/threaded_eventlog.cpp
@@ -0,0 +1 @@
+#include "threaded_eventlog.h"
diff --git a/library/cpp/eventlog/threaded_eventlog.h b/library/cpp/eventlog/threaded_eventlog.h
new file mode 100644
index 00000000000..52382b856d1
--- /dev/null
+++ b/library/cpp/eventlog/threaded_eventlog.h
@@ -0,0 +1,154 @@
+#pragma once
+
+#include "eventlog.h"
+
+#include <util/generic/string.h>
+#include <util/thread/pool.h>
+
+class TThreadedEventLog: public TEventLogWithSlave {
+public:
+ class TWrapper;
+ using TOverflowCallback = std::function<void(TWrapper& wrapper)>;
+
+ enum class EDegradationResult {
+ ShouldWrite,
+ ShouldDrop,
+ };
+ using TDegradationCallback = std::function<EDegradationResult(float fillFactor)>;
+
+public:
+ TThreadedEventLog(
+ IEventLog& parentLog,
+ size_t threadCount,
+ size_t queueSize,
+ TOverflowCallback cb,
+ TDegradationCallback degradationCallback = {})
+ : TEventLogWithSlave(parentLog)
+ , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog"))
+ , ThreadCount(threadCount)
+ , QueueSize(queueSize)
+ , OverflowCallback(std::move(cb))
+ , DegradationCallback(std::move(degradationCallback))
+ {
+ Init();
+ }
+
+ TThreadedEventLog(
+ const TEventLogPtr& parentLog,
+ size_t threadCount,
+ size_t queueSize,
+ TOverflowCallback cb,
+ TDegradationCallback degradationCallback = {})
+ : TEventLogWithSlave(parentLog)
+ , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog"))
+ , ThreadCount(threadCount)
+ , QueueSize(queueSize)
+ , OverflowCallback(std::move(cb))
+ , DegradationCallback(std::move(degradationCallback))
+ {
+ Init();
+ }
+
+ TThreadedEventLog(IEventLog& parentLog)
+ : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback())
+ {
+ }
+
+ TThreadedEventLog(const TEventLogPtr& parentLog)
+ : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback())
+ {
+ }
+
+ ~TThreadedEventLog() override {
+ try {
+ LogSaver.Stop();
+ } catch (...) {
+ }
+ }
+
+ void ReopenLog() override {
+ TEventLogWithSlave::ReopenLog();
+ }
+
+ void CloseLog() override {
+ LogSaver.Stop();
+ TEventLogWithSlave::CloseLog();
+ }
+
+ void WriteFrame(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ TWriteFrameCallbackPtr writeFrameCallback = nullptr,
+ TLogRecord::TMetaFlags metaFlags = {}) override {
+ float fillFactor = 0.0f;
+ if (Y_LIKELY(LogSaver.GetMaxQueueSize() > 0)) {
+ fillFactor = static_cast<float>(LogSaver.Size()) / LogSaver.GetMaxQueueSize();
+ }
+
+ EDegradationResult status = EDegradationResult::ShouldWrite;
+ if (DegradationCallback) {
+ status = DegradationCallback(fillFactor);
+ }
+ if (Y_UNLIKELY(status == EDegradationResult::ShouldDrop)) {
+ return;
+ }
+
+ THolder<TWrapper> wrapped;
+ wrapped.Reset(new TWrapper(buffer, startTimestamp, endTimestamp, Slave(), writeFrameCallback, std::move(metaFlags)));
+
+ if (LogSaver.Add(wrapped.Get())) {
+ Y_UNUSED(wrapped.Release());
+ } else if (OverflowCallback) {
+ OverflowCallback(*wrapped);
+ }
+ }
+
+private:
+ void Init() {
+ LogSaver.Start(ThreadCount, QueueSize);
+ }
+
+public:
+ class TWrapper: public IObjectInQueue {
+ public:
+ TWrapper(TBuffer& buffer,
+ TEventTimestamp startTimestamp,
+ TEventTimestamp endTimestamp,
+ IEventLog& slave,
+ TWriteFrameCallbackPtr writeFrameCallback = nullptr,
+ TLogRecord::TMetaFlags metaFlags = {})
+ : StartTimestamp(startTimestamp)
+ , EndTimestamp(endTimestamp)
+ , Slave(&slave)
+ , WriteFrameCallback(writeFrameCallback)
+ , MetaFlags(std::move(metaFlags))
+ {
+ Buffer.Swap(buffer);
+ }
+
+ void Process(void*) override {
+ THolder<TWrapper> holder(this);
+
+ WriteFrame();
+ }
+
+ void WriteFrame() {
+ Slave->WriteFrame(Buffer, StartTimestamp, EndTimestamp, WriteFrameCallback, std::move(MetaFlags));
+ }
+
+ private:
+ TBuffer Buffer;
+ TEventTimestamp StartTimestamp;
+ TEventTimestamp EndTimestamp;
+ IEventLog* Slave;
+ TWriteFrameCallbackPtr WriteFrameCallback;
+ TLogRecord::TMetaFlags MetaFlags;
+ };
+
+private:
+ TThreadPool LogSaver;
+ const size_t ThreadCount;
+ const size_t QueueSize;
+ const TOverflowCallback OverflowCallback;
+ const TDegradationCallback DegradationCallback;
+};