aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/eventlog
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2022-12-02 11:31:25 +0300
committerqrort <qrort@yandex-team.com>2022-12-02 11:31:25 +0300
commitb1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806 (patch)
tree2a23209faf0fea5586a6d4b9cee60d1b318d29fe /library/cpp/eventlog
parent559174a9144de40d6bb3997ea4073c82289b4974 (diff)
downloadydb-b1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806.tar.gz
remove kikimr/driver DEPENDS
Diffstat (limited to 'library/cpp/eventlog')
-rw-r--r--library/cpp/eventlog/common.h10
-rw-r--r--library/cpp/eventlog/dumper/common.cpp81
-rw-r--r--library/cpp/eventlog/dumper/common.h8
-rw-r--r--library/cpp/eventlog/dumper/evlogdump.cpp414
-rw-r--r--library/cpp/eventlog/dumper/evlogdump.h11
-rw-r--r--library/cpp/eventlog/dumper/tunable_event_processor.cpp1
-rw-r--r--library/cpp/eventlog/dumper/tunable_event_processor.h19
-rw-r--r--library/cpp/eventlog/evdecoder.cpp112
-rw-r--r--library/cpp/eventlog/evdecoder.h16
-rw-r--r--library/cpp/eventlog/event_field_output.cpp65
-rw-r--r--library/cpp/eventlog/event_field_output.h29
-rw-r--r--library/cpp/eventlog/event_field_printer.cpp27
-rw-r--r--library/cpp/eventlog/event_field_printer.h38
-rw-r--r--library/cpp/eventlog/eventlog.cpp554
-rw-r--r--library/cpp/eventlog/eventlog.h623
-rw-r--r--library/cpp/eventlog/eventlog_int.cpp12
-rw-r--r--library/cpp/eventlog/eventlog_int.h72
-rw-r--r--library/cpp/eventlog/events_extension.h161
-rw-r--r--library/cpp/eventlog/iterator.cpp88
-rw-r--r--library/cpp/eventlog/iterator.h51
-rw-r--r--library/cpp/eventlog/logparser.cpp814
-rw-r--r--library/cpp/eventlog/logparser.h343
-rw-r--r--library/cpp/eventlog/proto/events_extension.proto22
-rw-r--r--library/cpp/eventlog/proto/internal.proto9
-rw-r--r--library/cpp/eventlog/threaded_eventlog.cpp1
-rw-r--r--library/cpp/eventlog/threaded_eventlog.h154
26 files changed, 0 insertions, 3735 deletions
diff --git a/library/cpp/eventlog/common.h b/library/cpp/eventlog/common.h
deleted file mode 100644
index 75c512c13ef..00000000000
--- a/library/cpp/eventlog/common.h
+++ /dev/null
@@ -1,10 +0,0 @@
-#pragma once
-
-template <class T>
-class TPacketInputStream {
-public:
- virtual bool Avail() const = 0;
- virtual T operator*() const = 0;
- virtual bool Next() = 0;
- virtual ~TPacketInputStream() = default;
-};
diff --git a/library/cpp/eventlog/dumper/common.cpp b/library/cpp/eventlog/dumper/common.cpp
deleted file mode 100644
index eebe3b6ee33..00000000000
--- a/library/cpp/eventlog/dumper/common.cpp
+++ /dev/null
@@ -1,81 +0,0 @@
-#include "common.h"
-
-#include <contrib/libs/re2/re2/re2.h>
-
-#include <util/datetime/base.h>
-#include <util/datetime/parser.h>
-#include <util/string/cast.h>
-
-static time_t RecentTime(int h, int m, int s) {
- time_t now = time(nullptr);
- tm tmTmp;
- localtime_r(&now, &tmTmp);
- tmTmp.tm_hour = h;
- tmTmp.tm_min = m;
- tmTmp.tm_sec = s;
- time_t today = mktime(&tmTmp);
- tmTmp.tm_mday -= 1;
- time_t yesterday = mktime(&tmTmp);
- return today <= now ? today : yesterday;
-}
-
-static bool ParseRecentTime(const TString& str, time_t& result) {
- RE2 RecentTimePattern("(\\d{1,2}):(\\d{2})(?::(\\d{2}))?");
- re2::StringPiece hStr, mStr, sStr;
- if (!RE2::FullMatch({str.data(), str.size()}, RecentTimePattern, &hStr, &mStr, &sStr)) {
- return false;
- }
- int h = FromString<int>(hStr.data(), hStr.length());
- int m = FromString<int>(mStr.data(), mStr.length());
- int s = FromString<int>(sStr.data(), sStr.length(), 0);
- if (h > 23 || m > 59 || s > 59) {
- return false;
- }
- result = RecentTime(h, m, s);
- return true;
-}
-
-namespace {
- class TDefaultOffset8601Parser: public TIso8601DateTimeParser {
- public:
- TDefaultOffset8601Parser(int offsetHours) {
- DateTimeFields.ZoneOffsetMinutes = offsetHours * 60;
- }
- };
-} // namespace
-
-static bool ParseISO8601DateTimeWithDefaultOffset(TStringBuf str, int offsetHours, time_t& result) {
- TDefaultOffset8601Parser parser{offsetHours};
-
- if (!parser.ParsePart(str.data(), str.size())) {
- return false;
- }
-
- const TInstant instant = parser.GetResult(TInstant::Max());
- if (instant == TInstant::Max()) {
- return false;
- }
-
- result = instant.TimeT();
- return true;
-}
-
-ui64 ParseTime(const TString& str, ui64 defValue, int offset) {
- if (!str) {
- return defValue;
- }
-
- time_t utcTime;
-
- if (ParseISO8601DateTimeWithDefaultOffset(str, offset, utcTime)) {
- return (ui64)utcTime * 1000000;
- }
-
- if (ParseRecentTime(str, utcTime)) {
- return (ui64)utcTime * 1000000;
- }
-
- // if conversion fails, TryFromString leaves defValue unchanged
- TryFromString<ui64>(str, defValue);
- return defValue;
-}
diff --git a/library/cpp/eventlog/dumper/common.h b/library/cpp/eventlog/dumper/common.h
deleted file mode 100644
index 839fc7b221c..00000000000
--- a/library/cpp/eventlog/dumper/common.h
+++ /dev/null
@@ -1,8 +0,0 @@
-#pragma once
-
-#include <util/generic/string.h>
-
-/**
- * Returns microseconds since the epoch.
- */
-ui64 ParseTime(const TString& str, ui64 defValue, int offset /* timezone offset, default MSK */ = 3);
diff --git a/library/cpp/eventlog/dumper/evlogdump.cpp b/library/cpp/eventlog/dumper/evlogdump.cpp
deleted file mode 100644
index 003b412265b..00000000000
--- a/library/cpp/eventlog/dumper/evlogdump.cpp
+++ /dev/null
@@ -1,414 +0,0 @@
-#include "common.h"
-#include "evlogdump.h"
-
-#include <library/cpp/eventlog/evdecoder.h>
-#include <library/cpp/eventlog/iterator.h>
-#include <library/cpp/eventlog/logparser.h>
-
-#include <library/cpp/getopt/last_getopt.h>
-
-#ifndef NO_SVN_DEPEND
-#include <library/cpp/svnversion/svnversion.h>
-#endif
-
-#include <util/datetime/base.h>
-#include <util/generic/ptr.h>
-#include <util/stream/file.h>
-#include <util/string/cast.h>
-#include <util/string/type.h>
-
-using namespace NEventLog;
-namespace nlg = NLastGetopt;
-
-IFrameFilterRef CreateDurationFrameFilter(const TString& params) {
- TStringBuf d(params);
- TStringBuf l, r;
- d.Split(':', l, r);
- TDuration min = l.empty() ? TDuration::Zero() : FromString<TDuration>(l);
- TDuration max = r.empty() ? TDuration::Max() : FromString<TDuration>(r);
-
- if (min > TDuration::Zero() || max < TDuration::Max()) {
- return new TDurationFrameFilter(min.GetValue(), max.GetValue());
- }
-
- return nullptr;
-}
-
-IFrameFilterRef CreateFrameIdFilter(const ui32 frameId) {
- return new TFrameIdFrameFilter(frameId);
-}
-
-IFrameFilterRef CreateContainsEventFrameFilter(const TString& args, const IEventFactory* factory) {
- return new TContainsEventFrameFilter(args, factory);
-}
-
-void ListAllEvents(IEventFactory* factory, IOutputStream* out) {
- Y_ENSURE(out);
-
- for (TEventClass eventClass = factory->EventClassBegin(); eventClass < factory->EventClassEnd(); eventClass++) {
- THolder<TEvent> event(factory->CreateLogEvent(eventClass));
-
- const TStringBuf& name = event->GetName();
- if (name) {
- (*out) << name << "\n";
- }
- }
-}
-
-void ListEventFieldsByEventId(const TEventClass eventClass, IEventFactory* factory, IOutputStream* out) {
- THolder<TEvent> event(factory->CreateLogEvent(eventClass));
- const NProtoBuf::Message* message = event->GetProto();
- const google::protobuf::Descriptor* descriptor = message->GetDescriptor();
-
- (*out) << descriptor->DebugString();
-}
-
-void ListEventFields(const TString& eventName, IEventFactory* factory, IOutputStream* out) {
- Y_ENSURE(out);
-
- TEventClass eventClass;
-
- try {
- eventClass = factory->ClassByName(eventName);
- } catch (yexception& e) {
- if (!TryFromString<TEventClass>(eventName, eventClass)) {
- (*out) << "Cannot dervie event class from event name: " << eventName << Endl;
- return;
- }
- }
-
- ListEventFieldsByEventId(eventClass, factory, out);
-}
-
-int PrintHelpEvents(const TString& helpEvents, IEventFactory* factory) {
- if (helpEvents == "all") {
- ListAllEvents(factory, &Cout);
- } else if (IsNumber(helpEvents)) {
- ListEventFieldsByEventId(IntFromString<ui32, 10>(helpEvents), factory, &Cout);
- } else {
- ListEventFields(helpEvents, factory, &Cout);
- }
-
- return 0;
-}
-
-int IterateEventLog(IEventFactory* fac, IEventProcessor* proc, int argc, const char** argv) {
- class TProxy: public ITunableEventProcessor {
- public:
- TProxy(IEventProcessor* proc)
- : Processor_(proc)
- {
- }
-
- void AddOptions(NLastGetopt::TOpts&) override {
- }
-
- void SetOptions(const TEvent::TOutputOptions& options) override {
- Processor_->SetOptions(options);
- }
-
- void ProcessEvent(const TEvent* ev) override {
- Processor_->ProcessEvent(ev);
- }
-
- bool CheckedProcessEvent(const TEvent* ev) override {
- return Processor_->CheckedProcessEvent(ev);
- }
-
- private:
- IEventProcessor* Processor_ = nullptr;
- };
-
- TProxy proxy(proc);
- return IterateEventLog(fac, &proxy, argc, argv);
-}
-
-int IterateEventLog(IEventFactory* fac, ITunableEventProcessor* proc, int argc, const char** argv) {
- nlg::TOpts opts;
- opts.AddHelpOption('?');
- opts.AddHelpOption('h');
- opts.SetTitle(
- "Search EventLog dumper\n\n"
- "Examples:\n"
- "evlogdump -s 1228484839332219 -e 1228484840332219 event_log\n"
- "evlogdump -s 2008-12-12T12:00:00+03 -e 2008-12-12T12:05:00+03 event_log\n"
- "evlogdump -s 12:40 -e 12:55 event_log\n"
- "evlogdump -i 301,302,303 event_log\n"
- "evlogdump -i SubSourceResponse,SubSourceOk,SubSourceError event_log\n"
- "evlogdump -d 40ms:60ms event_log\n"
- "evlogdump -c ReqId:ReqId:1563185655856304-30407004790538173600035-vla1-0671-p1\n"
- "\n"
- "Fine manual: https://nda.ya.ru/3Tpo3L\n"
- "\n"
- "If in trouble using this (or bugs encountered),\n"
- "please don't hesitate to ask middlesearch/basesearch dev team\n"
- "(vmordovin@, mvel@, etc)\n");
-
- TString start;
- opts.AddLongOption(
- 's', "start-time",
- "Start time (Unix time in microseconds, ISO8601, or HH:MM[:SS] in the last 24 hours).\n"
- "Long frames can produce inaccurate cropping and event loss in dump (see SEARCH-5576)")
- .Optional()
- .StoreResult(&start);
-
- TString end;
- opts.AddLongOption(
- 'e', "end-time",
- "End time (Unix time in microseconds, ISO8601, or HH:MM[:SS] in the last 24 hours)")
- .Optional()
- .StoreResult(&end);
-
- TOptions o;
-
- opts.AddLongOption(
- 'm', "max-request-duration",
- "Max duration of a request in the log, in us")
- .Optional()
- .StoreResult(&o.MaxRequestDuration);
-
- bool oneFrame = false;
- opts.AddLongOption(
- 'f', "one-frame",
- "Treat input file as single frame dump (e.g. from gdb)")
- .NoArgument()
- .Optional()
- .StoreValue(&oneFrame, true);
-
- opts.AddLongOption(
- 'v', "version",
- "Print program version")
- .NoArgument()
- .Optional()
- .Handler(&nlg::PrintVersionAndExit);
-
- TString includeEventList;
- opts.AddLongOption(
- 'i', "event-list",
- "Comma-separated list of included event class IDs or names "
- "(e.g. 265,287 or MainTaskError,ContextCreated)")
- .Optional()
- .StoreResult(&includeEventList)
- .StoreValue(&o.EnableEvents, true);
-
- TString durationFilterStr;
- opts.AddLongOption(
- 'd', "duration",
- "DurationMin[:DurationMax] (values must contain a unit, valid examples are: 50us, 50ms, 50s)\n"
- "(show frames with duration greater or equal DurationMin [and less or equal than DurationMax])")
- .Optional()
- .StoreResult(&durationFilterStr);
-
- const ui32 INVALID_FRAME_ID = ui32(-1);
- ui32 frameIdFilter = INVALID_FRAME_ID;
- opts.AddLongOption(
- 'n', "frame-id",
- "Filter frame with given id\n")
- .Optional()
- .StoreResult(&frameIdFilter);
-
- TString excludeEventList;
- opts.AddLongOption(
- 'x', "exclude-list",
- "Comma-separated list of excluded event class IDs or names (see also --event-list option)")
- .Optional()
- .StoreResult(&excludeEventList)
- .StoreValue(&o.EnableEvents, false);
-
- opts.AddLongOption(
- 'o', "frame-order",
- "Order events by time only inside a frame (faster)")
- .NoArgument()
- .Optional()
- .StoreValue(&o.ForceWeakOrdering, true);
-
- opts.AddLongOption(
- 'O', "global-order",
- "Globally but lossy (may lose some frames) order events by time only (slower)")
- .NoArgument()
- .Optional()
- .StoreValue(&o.ForceStrongOrdering, true);
-
- opts.AddLongOption(
- "lossless-strong-order",
- "Globally order events by time only (super slow and memory heavy)")
- .NoArgument()
- .Optional()
- .StoreValue(&o.ForceLosslessStrongOrdering, true)
- .StoreValue(&o.ForceStrongOrdering, true);
-
- opts.AddLongOption(
- 'S', "stream",
- "Force stream mode (for log files with invalid ending)")
- .NoArgument()
- .Optional()
- .StoreValue(&o.ForceStreamMode, true);
-
- opts.AddLongOption(
- 't', "tail",
- "Open file like `tail -f` does")
- .NoArgument()
- .Optional()
- .StoreValue(&o.TailFMode, true);
-
- bool unescaped = false;
- opts.AddLongOption(
- 'u', "unescaped",
- "Don't escape \\t, \\n chars in message fields")
- .NoArgument()
- .StoreValue(&unescaped, true);
-
- bool json = false;
- opts.AddLongOption(
- 'j', "json",
- "Print messages as JSON values")
- .NoArgument()
- .StoreValue(&json, true);
-
- TEvent::TOutputOptions outputOptions;
- opts.AddLongOption(
- 'r', "human-readable",
- "Print some fields (e.g. timestamp) in human-readable format, add time offsets")
- .NoArgument()
- .StoreValue(&outputOptions.HumanReadable, true);
-
- //Supports only fields of type string
- TString containsEvent;
- opts.AddLongOption(
- 'c', "contains-event",
- "Only print frames that contain events whose particular field's value matches given value.\n"
- "Match group should be provided in format: EventName:FieldName:ValueToMatch\n"
- "If more than one match group is provided, they should be separated by / delimiter.\n"
- "Nested fields are supported through \'.\' in FieldName.\n"
- "Symbols \'/\' and \':\' in EventName, FieldName or ValueToMatch must be escaped.\n"
- "NOTE: bool values are 1 and 0.")
- .Optional()
- .StoreResult(&containsEvent);
-
- TString helpEvents;
- opts.AddLongOption("help-events")
- .RequiredArgument("EVENT, EVENT_ID or string \"all\"")
- .Optional()
- .StoreResult(&helpEvents);
-
- proc->AddOptions(opts);
-
- opts.SetFreeArgsMin(0);
- opts.SetFreeArgsMax(1);
- opts.SetFreeArgTitle(0, "<eventlog>", "Event log file");
-
- try {
- const nlg::TOptsParseResult optsRes(&opts, argc, argv);
-
- if (helpEvents) {
- return PrintHelpEvents(helpEvents, fac);
- }
-
- TVector<TString> freeArgs = optsRes.GetFreeArgs();
- if (freeArgs) {
- o.FileName = freeArgs[0];
- }
-
- ui32 conditionsHappened = 0;
-
- if (frameIdFilter != INVALID_FRAME_ID) {
- ++conditionsHappened;
- }
- if (containsEvent) {
- ++conditionsHappened;
- }
- if (durationFilterStr) {
- ++conditionsHappened;
- }
-
- if (conditionsHappened > 1) {
- throw nlg::TUsageException() << "You can only use no more than one of frame id, duration or contains event frame filters.";
- }
-
- if (frameIdFilter != INVALID_FRAME_ID) {
- o.FrameFilter = CreateFrameIdFilter(frameIdFilter);
- } else if (durationFilterStr) {
- o.FrameFilter = CreateDurationFrameFilter(durationFilterStr);
- } else if (containsEvent) {
- o.FrameFilter = CreateContainsEventFrameFilter(containsEvent, fac);
- }
-
- // handle some inconsistencies
- if (o.ForceWeakOrdering && o.ForceStrongOrdering) {
- throw nlg::TUsageException() << "You can use either strong (-O) or weak (-o) ordering. ";
- }
- if (includeEventList && excludeEventList) {
- throw nlg::TUsageException() << "You can use either include (-i) or exclude (-x) events list. ";
- }
- if (unescaped && json) {
- throw nlg::TUsageException() << "You can use either unescaped (-u) or json (-j) output format. ";
- }
-
- proc->CheckOptions();
- } catch (...) {
- Cerr << "Usage error: " << CurrentExceptionMessage() << Endl;
- return 1;
- }
-
- o.EvList = o.EnableEvents ? includeEventList : excludeEventList;
- if (json) {
- outputOptions.OutputFormat = TEvent::TOutputFormat::Json;
- } else if (unescaped) {
- outputOptions.OutputFormat = TEvent::TOutputFormat::TabSeparatedRaw;
- } else {
- outputOptions.OutputFormat = TEvent::TOutputFormat::TabSeparated;
- }
-
- IEventProcessor* eventProcessor = NEvClass::Processor();
- // FIXME(mvel): A little of hell here: `proc` and `eventProcessor` are `IEventProcessor`s
- // So we need to set options for BOTH!
-
- eventProcessor->SetOptions(outputOptions);
- proc->SetOptions(outputOptions);
- proc->SetEventProcessor(eventProcessor);
-
- if (oneFrame) {
- THolder<IInputStream> fileInput;
-
- // this is for coredumps analysis, usage:
- // gdb: dump binary memory framefile Data_ Data_ + Pos_
- // evlogdump -f framefile
- IInputStream* usedInput = nullptr;
-
- if (o.FileName.size()) {
- fileInput.Reset(new TUnbufferedFileInput(o.FileName));
- usedInput = fileInput.Get();
- } else {
- usedInput = &Cin;
- }
-
- try {
- for (;;) {
- proc->ProcessEvent(DecodeEvent(*usedInput, true, 0, nullptr, fac).Get());
- }
- } catch (...) {
- }
-
- return 0;
- }
-
- o.StartTime = ParseTime(start, MIN_START_TIME);
- o.EndTime = ParseTime(end, MAX_END_TIME);
-
- try {
- THolder<IIterator> it = CreateIterator(o, fac);
-
- while (const auto ev = it->Next()) {
- if (!proc->CheckedProcessEvent(ev.Get())) {
- break;
- }
- }
-
- return 0;
- } catch (...) {
- Cout.Flush();
- Cerr << "Error occured: " << CurrentExceptionMessage() << Endl;
- }
-
- return 1;
-}
diff --git a/library/cpp/eventlog/dumper/evlogdump.h b/library/cpp/eventlog/dumper/evlogdump.h
deleted file mode 100644
index eb150573746..00000000000
--- a/library/cpp/eventlog/dumper/evlogdump.h
+++ /dev/null
@@ -1,11 +0,0 @@
-#pragma once
-
-#include "tunable_event_processor.h"
-
-#include <library/cpp/eventlog/eventlog.h>
-
-int IterateEventLog(IEventFactory* fac, IEventProcessor* proc, int argc, const char** argv);
-int IterateEventLog(IEventFactory* fac, ITunableEventProcessor* proc, int argc, const char** argv);
-
-// added for using in infra/libs/logger/log_printer.cpp
-int PrintHelpEvents(const TString& helpEvents, IEventFactory* factory);
diff --git a/library/cpp/eventlog/dumper/tunable_event_processor.cpp b/library/cpp/eventlog/dumper/tunable_event_processor.cpp
deleted file mode 100644
index 8333089cb54..00000000000
--- a/library/cpp/eventlog/dumper/tunable_event_processor.cpp
+++ /dev/null
@@ -1 +0,0 @@
-#include "tunable_event_processor.h"
diff --git a/library/cpp/eventlog/dumper/tunable_event_processor.h b/library/cpp/eventlog/dumper/tunable_event_processor.h
deleted file mode 100644
index b56eae9222a..00000000000
--- a/library/cpp/eventlog/dumper/tunable_event_processor.h
+++ /dev/null
@@ -1,19 +0,0 @@
-#pragma once
-
-#include <library/cpp/eventlog/eventlog.h>
-
-namespace NLastGetopt {
- class TOpts;
-}
-
-class ITunableEventProcessor: public IEventProcessor {
-public:
- virtual void SetEventProcessor(IEventProcessor* /*processor*/) {
- }
-
- virtual void AddOptions(NLastGetopt::TOpts& opts) = 0;
- virtual void CheckOptions() {
- }
- virtual ~ITunableEventProcessor() {
- }
-};
diff --git a/library/cpp/eventlog/evdecoder.cpp b/library/cpp/eventlog/evdecoder.cpp
deleted file mode 100644
index e4413a1b0e0..00000000000
--- a/library/cpp/eventlog/evdecoder.cpp
+++ /dev/null
@@ -1,112 +0,0 @@
-#include <util/memory/tempbuf.h>
-#include <util/string/cast.h>
-#include <util/stream/output.h>
-
-#include "evdecoder.h"
-#include "logparser.h"
-
-static const char* const UNKNOWN_EVENT_CLASS = "Unknown event class";
-
-static inline void LogError(ui64 frameAddr, const char* msg, bool strict) {
- if (!strict) {
- Cerr << "EventDecoder warning @" << frameAddr << ": " << msg << Endl;
- } else {
- ythrow yexception() << "EventDecoder error @" << frameAddr << ": " << msg;
- }
-}
-
-static inline bool SkipData(IInputStream& s, size_t amount) {
- return (amount == s.Skip(amount));
-}
-
-// There are 2 log fomats: the one, that allows event skip without event decode (it has stored event length)
-// and another, that requires each event decode just to seek over stream. needRead == true means the latter format.
-static inline THolder<TEvent> DoDecodeEvent(IInputStream& s, const TEventFilter* const filter, const bool needRead, IEventFactory* fac) {
- TEventTimestamp ts;
- TEventClass c;
- THolder<TEvent> e;
-
- ::Load(&s, ts);
- ::Load(&s, c);
-
- bool needReturn = false;
-
- if (!filter || filter->EventAllowed(c)) {
- needReturn = true;
- }
-
- if (needRead || needReturn) {
- e.Reset(fac->CreateLogEvent(c));
-
- if (!!e) {
- e->Timestamp = ts;
- e->Load(s);
- } else if (needReturn) {
- e.Reset(new TUnknownEvent(ts, c));
- }
-
- if (!needReturn) {
- e.Reset(nullptr);
- }
- }
-
- return e;
-}
-
-THolder<TEvent> DecodeFramed(IInputStream& inp, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) {
- ui32 len;
- ::Load(&inp, len);
-
- if (len < sizeof(ui32)) {
- ythrow TEventDecoderError() << "invalid event length";
- }
-
- TLengthLimitedInput s(&inp, len - sizeof(ui32));
-
- try {
- THolder<TEvent> e = DoDecodeEvent(s, filter, false, fac);
- if (!!e) {
- if (!s.Left()) {
- return e;
- } else if (e->Class == 0) {
- if (!SkipData(s, s.Left())) {
- ythrow TEventDecoderError() << "cannot skip bad event";
- }
-
- return e;
- }
-
- LogError(frameAddr, "Event is not fully read", strict);
- }
- } catch (const TLoadEOF&) {
- if (s.Left()) {
- throw;
- }
-
- LogError(frameAddr, "Unexpected event end", strict);
- }
-
- if (!SkipData(s, s.Left())) {
- ythrow TEventDecoderError() << "cannot skip bad event";
- }
-
- return nullptr;
-}
-
-THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) {
- try {
- if (framed) {
- return DecodeFramed(s, frameAddr, filter, fac, strict);
- } else {
- THolder<TEvent> e = DoDecodeEvent(s, filter, true, fac);
- // e(0) means event, skipped by filter. Not an error.
- if (!!e && !e->Class) {
- ythrow TEventDecoderError() << UNKNOWN_EVENT_CLASS;
- }
-
- return e;
- }
- } catch (const TLoadEOF&) {
- ythrow TEventDecoderError() << "unexpected frame end";
- }
-}
diff --git a/library/cpp/eventlog/evdecoder.h b/library/cpp/eventlog/evdecoder.h
deleted file mode 100644
index eedfc821743..00000000000
--- a/library/cpp/eventlog/evdecoder.h
+++ /dev/null
@@ -1,16 +0,0 @@
-#pragma once
-
-#include <util/generic/yexception.h>
-#include <util/generic/ptr.h>
-
-#include "eventlog.h"
-
-class TEvent;
-class IInputStream;
-class TEventFilter;
-
-struct TEventDecoderError: public yexception {
-};
-
-THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict = false);
-bool AcceptableContent(TEventLogFormat);
diff --git a/library/cpp/eventlog/event_field_output.cpp b/library/cpp/eventlog/event_field_output.cpp
deleted file mode 100644
index a1b4c2dafa1..00000000000
--- a/library/cpp/eventlog/event_field_output.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-#include "event_field_output.h"
-
-#include <util/string/split.h>
-
-namespace {
- TString MakeSeparators(EFieldOutputFlags flags) {
- TString res;
- res.reserve(3);
-
- if (flags & EFieldOutputFlag::EscapeTab) {
- res.append('\t');
- }
- if (flags & EFieldOutputFlag::EscapeNewLine) {
- res.append('\n');
- }
- if (flags & EFieldOutputFlag::EscapeBackSlash) {
- res.append('\\');
- }
-
- return res;
- }
-}
-
-TEventFieldOutput::TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags)
- : Output(output)
- , Flags(flags)
- , Separators(MakeSeparators(flags))
-{
-}
-
-IOutputStream& TEventFieldOutput::GetOutputStream() {
- return Output;
-}
-
-EFieldOutputFlags TEventFieldOutput::GetFlags() const {
- return Flags;
-}
-
-void TEventFieldOutput::DoWrite(const void* buf, size_t len) {
- if (!Flags) {
- Output.Write(buf, len);
- return;
- }
-
- TStringBuf chunk{static_cast<const char*>(buf), len};
-
- for (const auto part : StringSplitter(chunk).SplitBySet(Separators.data())) {
- TStringBuf token = part.Token();
- TStringBuf delim = part.Delim();
-
- if (!token.empty()) {
- Output.Write(token);
- }
- if ("\n" == delim) {
- Output.Write(TStringBuf("\\n"));
- } else if ("\t" == delim) {
- Output.Write(TStringBuf("\\t"));
- } else if ("\\" == delim) {
- Output.Write(TStringBuf("\\\\"));
- } else {
- Y_ASSERT(delim.empty());
- }
- }
-}
-
diff --git a/library/cpp/eventlog/event_field_output.h b/library/cpp/eventlog/event_field_output.h
deleted file mode 100644
index ed9db0ae167..00000000000
--- a/library/cpp/eventlog/event_field_output.h
+++ /dev/null
@@ -1,29 +0,0 @@
-#pragma once
-
-#include <util/stream/output.h>
-#include <util/generic/flags.h>
-
-enum class EFieldOutputFlag {
- EscapeTab = 0x1, // escape \t in field value
- EscapeNewLine = 0x2, // escape \n in field value
- EscapeBackSlash = 0x4 // escape \ in field value
-};
-
-Y_DECLARE_FLAGS(EFieldOutputFlags, EFieldOutputFlag);
-Y_DECLARE_OPERATORS_FOR_FLAGS(EFieldOutputFlags);
-
-class TEventFieldOutput: public IOutputStream {
-public:
- TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags);
-
- IOutputStream& GetOutputStream();
- EFieldOutputFlags GetFlags() const;
-
-protected:
- void DoWrite(const void* buf, size_t len) override;
-
-private:
- IOutputStream& Output;
- EFieldOutputFlags Flags;
- TString Separators;
-};
diff --git a/library/cpp/eventlog/event_field_printer.cpp b/library/cpp/eventlog/event_field_printer.cpp
deleted file mode 100644
index 29c6b4b661e..00000000000
--- a/library/cpp/eventlog/event_field_printer.cpp
+++ /dev/null
@@ -1,27 +0,0 @@
-#include "event_field_printer.h"
-
-#include <library/cpp/protobuf/json/proto2json.h>
-
-namespace {
-
- const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig()
- .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault)
- .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>());
-
-} // namespace
-
-TEventProtobufMessageFieldPrinter::TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode)
- : Mode(mode)
-{}
-
-template <>
-void TEventProtobufMessageFieldPrinter::PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output) {
- switch (Mode) {
- case EProtobufMessageFieldPrintMode::DEFAULT:
- case EProtobufMessageFieldPrintMode::JSON: {
- // Do not use field.PrintJSON() here: IGNIETFERRO-2002
- NProtobufJson::Proto2Json(field, output, PROTO_2_JSON_CONFIG);
- break;
- }
- }
-}
diff --git a/library/cpp/eventlog/event_field_printer.h b/library/cpp/eventlog/event_field_printer.h
deleted file mode 100644
index 835e8f4a850..00000000000
--- a/library/cpp/eventlog/event_field_printer.h
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-
-#include "event_field_output.h"
-
-#include <google/protobuf/message.h>
-
-// NB: For historical reasons print code for all primitive types/repeated fields/etc generated by https://a.yandex-team.ru/arc/trunk/arcadia/tools/event2cpp
-
-enum class EProtobufMessageFieldPrintMode {
- // Use <TEventProtobufMessageFieldType>::Print method for fields that has it
- // Print json for other fields
- DEFAULT = 0,
-
- JSON = 1,
-};
-
-class TEventProtobufMessageFieldPrinter {
-public:
- explicit TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode);
-
- template <typename TEventProtobufMessageFieldType, bool HasPrintFunction>
- void PrintProtobufMessageFieldToOutput(const TEventProtobufMessageFieldType& field, TEventFieldOutput& output) {
- if constexpr (HasPrintFunction) {
- if (Mode == EProtobufMessageFieldPrintMode::DEFAULT) {
- field.Print(output.GetOutputStream(), output.GetFlags());
- return;
- }
- }
-
- PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(field, output);
- }
-
- template <>
- void PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output);
-
-private:
- EProtobufMessageFieldPrintMode Mode;
-};
diff --git a/library/cpp/eventlog/eventlog.cpp b/library/cpp/eventlog/eventlog.cpp
deleted file mode 100644
index 458a632b4a2..00000000000
--- a/library/cpp/eventlog/eventlog.cpp
+++ /dev/null
@@ -1,554 +0,0 @@
-#include <util/datetime/base.h>
-#include <util/stream/zlib.h>
-#include <util/stream/length.h>
-#include <util/generic/buffer.h>
-#include <util/generic/yexception.h>
-#include <util/digest/murmur.h>
-#include <util/generic/singleton.h>
-#include <util/generic/function.h>
-#include <util/stream/output.h>
-#include <util/stream/format.h>
-#include <util/stream/null.h>
-
-#include <google/protobuf/messagext.h>
-
-#include "eventlog.h"
-#include "events_extension.h"
-#include "evdecoder.h"
-#include "logparser.h"
-#include <library/cpp/eventlog/proto/internal.pb.h>
-
-#include <library/cpp/json/json_writer.h>
-#include <library/cpp/protobuf/json/proto2json.h>
-
-
-TAtomic eventlogFrameCounter = 0;
-
-namespace {
-
- const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig()
- .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault)
- .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>());
-
- ui32 GenerateFrameId() {
- return ui32(AtomicAdd(eventlogFrameCounter, 1));
- }
-
- inline const NProtoBuf::Message* UnknownEventMessage() {
- return Singleton<NEventLogInternal::TUnknownEvent>();
- }
-
-} // namespace
-
-void TEvent::Print(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const {
- if (options.OutputFormat == TOutputFormat::TabSeparatedRaw) {
- PrintHeader(out, options, eventState);
- DoPrint(out, {});
- } else if (options.OutputFormat == TOutputFormat::TabSeparated) {
- PrintHeader(out, options, eventState);
- DoPrint(
- out,
- EFieldOutputFlags{} | EFieldOutputFlag::EscapeNewLine | EFieldOutputFlag::EscapeBackSlash);
- } else if (options.OutputFormat == TOutputFormat::Json) {
- NJson::TJsonWriterConfig jsonWriterConfig;
- jsonWriterConfig.FormatOutput = 0;
- NJson::TJsonWriter jsonWriter(&out, jsonWriterConfig);
-
- jsonWriter.OpenMap();
- PrintJsonHeader(jsonWriter);
- DoPrintJson(jsonWriter);
- jsonWriter.CloseMap();
- }
-}
-
-void TEvent::PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const {
- if (options.HumanReadable) {
- out << TInstant::MicroSeconds(Timestamp).ToString() << "\t";
- if (Timestamp >= eventState.FrameStartTime)
- out << "+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.FrameStartTime));
- else // a bug somewhere? anyway, let's handle it in a nice fashion
- out << "-" << HumanReadable(TDuration::MicroSeconds(eventState.FrameStartTime - Timestamp));
-
- if (Timestamp >= eventState.PrevEventTime)
- out << " (+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.PrevEventTime)) << ")";
- // else: these events are async and out-of-order, relative time diff makes no sense, skip it
-
- out << "\tF# " << FrameId << '\t';
- } else {
- out << static_cast<TEventTimestamp>(Timestamp);
- out << '\t' << FrameId << '\t';
- }
-}
-
-void TEvent::PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const {
- jsonWriter.Write("Timestamp", Timestamp);
- jsonWriter.Write("FrameId", FrameId);
-}
-
-class TProtobufEvent: public TEvent {
-public:
- TProtobufEvent(TEventTimestamp t, size_t eventId, const NProtoBuf::Message& msg)
- : TEvent(eventId, t)
- , Message_(&msg)
- , EventFactory_(NProtoBuf::TEventFactory::Instance())
- {
- }
-
- TProtobufEvent()
- : TEvent(0, 0)
- , EventFactory_(NProtoBuf::TEventFactory::Instance())
- {
- }
-
- explicit TProtobufEvent(ui32 id, NProtoBuf::TEventFactory* eventFactory = NProtoBuf::TEventFactory::Instance())
- : TEvent(id, 0)
- , EventFactory_(eventFactory)
- {
- InnerMsg_.Reset(EventFactory_->CreateEvent(Class));
- Message_ = InnerMsg_.Get();
- }
-
- ui32 Id() const {
- return Class;
- }
-
- void Load(IInputStream& in) override {
- if (!!InnerMsg_) {
- InnerMsg_->ParseFromArcadiaStream(&in);
- } else {
- TransferData(&in, &Cnull);
- }
- }
-
- void Save(IOutputStream& out) const override {
- Message_->SerializeToArcadiaStream(&out);
- }
-
- void SaveToBuffer(TBufferOutput& buf) const override {
- size_t messageSize = Message_->ByteSize();
- size_t before = buf.Buffer().Size();
- buf.Buffer().Advance(messageSize);
- Y_PROTOBUF_SUPPRESS_NODISCARD Message_->SerializeToArray(buf.Buffer().Data() + before, messageSize);
- }
-
- TStringBuf GetName() const override {
- return EventFactory_->NameById(Id());
- }
-
-private:
- void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const override {
- EventFactory_->PrintEvent(Id(), Message_, out, flags);
- }
- void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override {
- jsonWriter.OpenMap("EventBody");
- jsonWriter.Write("Type", GetName());
-
- jsonWriter.Write("Fields");
- NProtobufJson::Proto2Json(*GetProto(), jsonWriter, PROTO_2_JSON_CONFIG);
-
- jsonWriter.CloseMap();
- }
-
- const NProtoBuf::Message* GetProto() const override {
- if (Message_) {
- return Message_;
- }
-
- return UnknownEventMessage();
- }
-
-private:
- const NProtoBuf::Message* Message_ = nullptr;
- NProtoBuf::TEventFactory* EventFactory_;
- THolder<NProtoBuf::Message> InnerMsg_;
-
- friend class TEventLogFrame;
-};
-
-void TEventLogFrame::LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev) {
- TProtobufEvent event(Now().MicroSeconds(), eventId, ev);
-
- LogEventImpl(event);
-}
-
-void TEventLogFrame::LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev) {
- TProtobufEvent event(timestamp, eventId, ev);
-
- LogEventImpl(event);
-}
-
-template <>
-void TEventLogFrame::DebugDump(const TProtobufEvent& ev) {
- static TMutex lock;
-
- with_lock (lock) {
- Cerr << ev.Timestamp << "\t" << ev.GetName() << "\t";
- ev.GetProto()->PrintJSON(Cerr);
- Cerr << Endl;
- }
-}
-
-#pragma pack(push, 1)
-struct TFrameHeaderData {
- char SyncField[COMPRESSED_LOG_FRAME_SYNC_DATA.size()];
- TCompressedFrameBaseHeader Header;
- TCompressedFrameHeader2 HeaderEx;
-};
-#pragma pack(pop)
-
-TEventLogFrame::TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
- : EvLog_(parentLog.HasNullBackend() ? nullptr : &parentLog)
- , NeedAlwaysSafeAdd_(needAlwaysSafeAdd)
- , ForceDump_(false)
- , WriteFrameCallback_(std::move(writeFrameCallback))
-{
- DoInit();
-}
-
-TEventLogFrame::TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
- : EvLog_(parentLog)
- , NeedAlwaysSafeAdd_(needAlwaysSafeAdd)
- , ForceDump_(false)
- , WriteFrameCallback_(std::move(writeFrameCallback))
-{
- if (EvLog_ && EvLog_->HasNullBackend()) {
- EvLog_ = nullptr;
- }
-
- DoInit();
-}
-
-TEventLogFrame::TEventLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
- : EvLog_(nullptr)
- , NeedAlwaysSafeAdd_(needAlwaysSafeAdd)
- , ForceDump_(false)
- , WriteFrameCallback_(std::move(writeFrameCallback))
-{
- DoInit();
-}
-
-void TEventLogFrame::Flush() {
- if (EvLog_ == nullptr)
- return;
-
- TBuffer& buf = Buf_.Buffer();
-
- if (buf.Empty()) {
- return;
- }
-
- EvLog_->WriteFrame(buf, StartTimestamp_, EndTimestamp_, WriteFrameCallback_, std::move(MetaFlags_));
-
- DoInit();
-
- return;
-}
-
-void TEventLogFrame::SafeFlush() {
- TGuard<TMutex> g(Mtx_);
- Flush();
-}
-
-void TEventLogFrame::AddEvent(TEventTimestamp timestamp) {
- if (timestamp < StartTimestamp_) {
- StartTimestamp_ = timestamp;
- }
-
- if (timestamp > EndTimestamp_) {
- EndTimestamp_ = timestamp;
- }
-}
-
-void TEventLogFrame::DoInit() {
- Buf_.Buffer().Clear();
-
- StartTimestamp_ = (TEventTimestamp)-1;
- EndTimestamp_ = 0;
-}
-
-void TEventLogFrame::VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory) {
- const auto doVisit = [this, &visitor, eventFactory]() {
- TBuffer& buf = Buf_.Buffer();
-
- TBufferInput bufferInput(buf);
- TLengthLimitedInput limitedInput(&bufferInput, buf.size());
-
- TEventFilter EventFilter(false);
-
- while (limitedInput.Left()) {
- THolder<TEvent> event = DecodeEvent(limitedInput, true, 0, &EventFilter, eventFactory);
-
- visitor.Visit(*event);
- }
- };
- if (NeedAlwaysSafeAdd_) {
- TGuard<TMutex> g(Mtx_);
- doVisit();
- } else {
- doVisit();
- }
-}
-
-TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
- : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback))
-{
-}
-
-TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
- : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback))
-{
-}
-
-TSelfFlushLogFrame::TSelfFlushLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback)
- : TEventLogFrame(needAlwaysSafeAdd, std::move(writeFrameCallback))
-{
-}
-
-TSelfFlushLogFrame::~TSelfFlushLogFrame() {
- try {
- Flush();
- } catch (...) {
- }
-}
-
-IEventLog::~IEventLog() {
-}
-
-static THolder<TLogBackend> ConstructBackend(const TString& fileName, const TEventLogBackendOptions& backendOpts) {
- try {
- THolder<TLogBackend> backend;
- if (backendOpts.UseSyncPageCacheBackend) {
- backend = MakeHolder<TSyncPageCacheFileLogBackend>(fileName, backendOpts.SyncPageCacheBackendBufferSize, backendOpts.SyncPageCacheBackendMaxPendingSize);
- } else {
- backend = MakeHolder<TFileLogBackend>(fileName);
- }
- return MakeHolder<TReopenLogBackend>(std::move(backend));
- } catch (...) {
- Cdbg << "Warning: Cannot open event log '" << fileName << "': " << CurrentExceptionMessage() << "." << Endl;
- }
-
- return MakeHolder<TNullLogBackend>();
-}
-
-TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat)
- : Log_(ConstructBackend(fileName, backendOpts))
- , ContentFormat_(contentFormat)
- , LogFormat_(logFormat.Defined() ? *logFormat : COMPRESSED_LOG_FORMAT_V4)
- , HasNullBackend_(Log_.IsNullLog())
- , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc"))
- , ZstdCodec_(NBlockCodecs::Codec("zstd_1"))
-{
- Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5);
-
- if (contentFormat & 0xff000000) {
- ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")";
- }
-}
-
-TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts)
- : TEventLog(fileName, contentFormat, backendOpts, COMPRESSED_LOG_FORMAT_V4)
-{
-}
-
-TEventLog::TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat)
- : Log_(log)
- , ContentFormat_(contentFormat)
- , LogFormat_(logFormat)
- , HasNullBackend_(Log_.IsNullLog())
- , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc"))
- , ZstdCodec_(NBlockCodecs::Codec("zstd_1"))
-{
- if (contentFormat & 0xff000000) {
- ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")";
- }
-}
-
-TEventLog::TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat)
- : Log_(MakeHolder<TNullLogBackend>())
- , ContentFormat_(contentFormat)
- , LogFormat_(logFormat)
- , HasNullBackend_(true)
- , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc"))
- , ZstdCodec_(NBlockCodecs::Codec("zstd_1"))
-{
- if (contentFormat & 0xff000000) {
- ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")";
- }
-}
-
-TEventLog::~TEventLog() {
-}
-
-void TEventLog::ReopenLog() {
- Log_.ReopenLog();
-}
-
-void TEventLog::CloseLog() {
- Log_.CloseLog();
-}
-
-void TEventLog::Flush() {
-}
-
-namespace {
- class TOnExceptionAction {
- public:
- TOnExceptionAction(std::function<void()>&& f)
- : F_(std::move(f))
- {
- }
-
- ~TOnExceptionAction() {
- if (F_ && UncaughtException()) {
- try {
- F_();
- } catch (...) {
- }
- }
- }
-
- private:
- std::function<void()> F_;
- };
-}
-
-void TEventLog::WriteFrame(TBuffer& buffer,
- TEventTimestamp startTimestamp,
- TEventTimestamp endTimestamp,
- TWriteFrameCallbackPtr writeFrameCallback,
- TLogRecord::TMetaFlags metaFlags) {
- Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5);
-
- TBuffer& b1 = buffer;
-
- size_t maxCompressedLength = (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) ? b1.Size() + 256 : ZstdCodec_->MaxCompressedLength(b1);
-
- // Reserve enough memory to minimize reallocs
- TBufferOutput outbuf(sizeof(TFrameHeaderData) + maxCompressedLength);
- TBuffer& b2 = outbuf.Buffer();
- b2.Proceed(sizeof(TFrameHeaderData));
-
- {
- TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data());
-
- memcpy(hdr.SyncField, COMPRESSED_LOG_FRAME_SYNC_DATA.data(), COMPRESSED_LOG_FRAME_SYNC_DATA.size());
- hdr.Header.Format = (LogFormat_ << 24) | (ContentFormat_ & 0xffffff);
- hdr.Header.FrameId = GenerateFrameId();
- hdr.HeaderEx.UncompressedDatalen = (ui32)b1.Size();
- hdr.HeaderEx.StartTimestamp = startTimestamp;
- hdr.HeaderEx.EndTimestamp = endTimestamp;
- hdr.HeaderEx.PayloadChecksum = 0;
- hdr.HeaderEx.CompressorVersion = 0;
- }
-
- if (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) {
- TBuffer encoded(b1.Size() + sizeof(TFrameHeaderData) + 256);
- Lz4hcCodec_->Encode(b1, encoded);
-
- TZLibCompress compr(&outbuf, ZLib::ZLib, 6, 2048);
- compr.Write(encoded.data(), encoded.size());
- compr.Finish();
- } else {
- b2.Advance(ZstdCodec_->Compress(b1, b2.Pos()));
- }
-
- {
- const size_t k = sizeof(TCompressedFrameBaseHeader) + COMPRESSED_LOG_FRAME_SYNC_DATA.size();
- TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data());
- hdr.Header.Length = static_cast<ui32>(b2.size() - k);
- hdr.HeaderEx.PayloadChecksum = MurmurHash<ui32>(b2.data() + sizeof(TFrameHeaderData), b2.size() - sizeof(TFrameHeaderData));
-
- const size_t n = sizeof(TFrameHeaderData) - (COMPRESSED_LOG_FRAME_SYNC_DATA.size() + sizeof(hdr.HeaderEx.HeaderChecksum));
- hdr.HeaderEx.HeaderChecksum = MurmurHash<ui32>(b2.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(), n);
- }
-
- const TBuffer& frameData = outbuf.Buffer();
-
- TOnExceptionAction actionCallback([this] {
- if (ErrorCallback_) {
- ErrorCallback_->OnWriteError();
- }
- });
-
- if (writeFrameCallback) {
- writeFrameCallback->OnAfterCompress(frameData, startTimestamp, endTimestamp);
- }
-
- Log_.Write(frameData.Data(), frameData.Size(), std::move(metaFlags));
- if (SuccessCallback_) {
- SuccessCallback_->OnWriteSuccess(frameData);
- }
-}
-
-TEvent* TProtobufEventFactory::CreateLogEvent(TEventClass c) {
- return new TProtobufEvent(c, EventFactory_);
-}
-
-TEventClass TProtobufEventFactory::ClassByName(TStringBuf name) const {
- return EventFactory_->IdByName(name);
-}
-
-TEventClass TProtobufEventFactory::EventClassBegin() const {
- const auto& items = EventFactory_->FactoryItems();
-
- if (items.empty()) {
- return static_cast<TEventClass>(0);
- }
-
- return static_cast<TEventClass>(items.begin()->first);
-}
-
-TEventClass TProtobufEventFactory::EventClassEnd() const {
- const auto& items = EventFactory_->FactoryItems();
-
- if (items.empty()) {
- return static_cast<TEventClass>(0);
- }
-
- return static_cast<TEventClass>(items.rbegin()->first + 1);
-}
-
-namespace NEvClass {
- IEventFactory* Factory() {
- return Singleton<TProtobufEventFactory>();
- }
-
- IEventProcessor* Processor() {
- return Singleton<TProtobufEventProcessor>();
- }
-}
-
-const NProtoBuf::Message* TUnknownEvent::GetProto() const {
- return UnknownEventMessage();
-}
-
-TStringBuf TUnknownEvent::GetName() const {
- return TStringBuf("UnknownEvent");
-}
-
-void TUnknownEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const {
- jsonWriter.OpenMap("EventBody");
- jsonWriter.Write("Type", GetName());
- jsonWriter.Write("EventId", (size_t)Class);
- jsonWriter.CloseMap();
-}
-
-TStringBuf TEndOfFrameEvent::GetName() const {
- return TStringBuf("EndOfFrame");
-}
-
-const NProtoBuf::Message* TEndOfFrameEvent::GetProto() const {
- return Singleton<NEventLogInternal::TEndOfFrameEvent>();
-}
-
-void TEndOfFrameEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const {
- jsonWriter.OpenMap("EventBody");
- jsonWriter.Write("Type", GetName());
- jsonWriter.OpenMap("Fields");
- jsonWriter.CloseMap();
- jsonWriter.CloseMap();
-}
-
-THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev) {
- return MakeHolder<TProtobufEvent>(ts, eventId, ev);
-}
diff --git a/library/cpp/eventlog/eventlog.h b/library/cpp/eventlog/eventlog.h
deleted file mode 100644
index 45c2dfb17fd..00000000000
--- a/library/cpp/eventlog/eventlog.h
+++ /dev/null
@@ -1,623 +0,0 @@
-#pragma once
-
-#include "eventlog_int.h"
-#include "event_field_output.h"
-#include "events_extension.h"
-
-#include <library/cpp/blockcodecs/codecs.h>
-#include <library/cpp/logger/all.h>
-
-#include <google/protobuf/message.h>
-
-#include <util/datetime/base.h>
-#include <util/generic/ptr.h>
-#include <util/generic/string.h>
-#include <util/stream/output.h>
-#include <util/stream/buffer.h>
-#include <util/stream/str.h>
-#include <util/system/mutex.h>
-#include <util/stream/output.h>
-#include <util/system/env.h>
-#include <util/system/unaligned_mem.h>
-#include <util/ysaveload.h>
-
-#include <cstdlib>
-
-namespace NJson {
- class TJsonWriter;
-}
-
-class IEventLog;
-
-class TEvent : public TThrRefBase {
-public:
- enum class TOutputFormat {
- TabSeparated,
- TabSeparatedRaw, // disables escaping
- Json
- };
-
- struct TOutputOptions {
- TOutputFormat OutputFormat = TOutputFormat::TabSeparated;
- // Dump some fields (e.g. timestamp) in more human-readable format
- bool HumanReadable = false;
-
- TOutputOptions(TOutputFormat outputFormat = TOutputFormat::TabSeparated)
- : OutputFormat(outputFormat)
- {
- }
-
- TOutputOptions(TOutputFormat outputFormat, bool humanReadable)
- : OutputFormat(outputFormat)
- , HumanReadable(humanReadable)
- {
- }
- };
-
- struct TEventState {
- TEventTimestamp FrameStartTime = 0;
- TEventTimestamp PrevEventTime = 0;
- TEventState() {
- }
- };
-
- TEvent(TEventClass c, TEventTimestamp t)
- : Class(c)
- , Timestamp(t)
- {
- }
-
- virtual ~TEvent() = default;
-
- // Note, that descendants MUST have Save() & Load() methods to alter
- // only its new variables, not the base class!
- virtual void Save(IOutputStream& out) const = 0;
- virtual void SaveToBuffer(TBufferOutput& out) const {
- Save(out);
- }
-
- // Note, that descendants MUST have Save() & Load() methods to alter
- // only its new variables, not the base class!
- virtual void Load(IInputStream& i) = 0;
-
- virtual TStringBuf GetName() const = 0;
- virtual const NProtoBuf::Message* GetProto() const = 0;
-
- void Print(IOutputStream& out, const TOutputOptions& options = TOutputOptions(), const TEventState& eventState = TEventState()) const;
- void PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const;
-
- TString ToString() const {
- TStringStream buff;
- Print(buff);
- return buff.Str();
- }
-
- void FullSaveToBuffer(TBufferOutput& buf) const {
- SaveMessageHeader(buf);
- this->SaveToBuffer(buf);
- }
-
- void FullSave(IOutputStream& o) const {
- SaveMessageHeader(o);
- this->Save(o);
- }
-
- void FullLoad(IInputStream& i) {
- ::Load(&i, Timestamp);
- ::Load(&i, Class);
- this->Load(i);
- }
-
- template <class T>
- const T* Get() const {
- return static_cast<const T*>(this->GetProto());
- }
-
- TEventClass Class;
- TEventTimestamp Timestamp;
- ui32 FrameId = 0;
-
-private:
- void SaveMessageHeader(IOutputStream& out) const {
- ::Save(&out, Timestamp);
- ::Save(&out, Class);
- }
-
- virtual void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const = 0;
- virtual void DoPrintJson(NJson::TJsonWriter& jsonWriter) const = 0;
-
- void PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const;
-};
-
-using TEventPtr = TIntrusivePtr<TEvent>;
-using TConstEventPtr = TIntrusiveConstPtr<TEvent>;
-
-class IEventProcessor {
-public:
- virtual void SetOptions(const TEvent::TOutputOptions& options) {
- Options_ = options;
- }
- virtual void ProcessEvent(const TEvent* ev) = 0;
- virtual bool CheckedProcessEvent(const TEvent* ev) {
- ProcessEvent(ev);
- return true;
- }
- virtual ~IEventProcessor() = default;
-
-protected:
- TEvent::TOutputOptions Options_;
-};
-
-class IEventFactory {
-public:
- virtual TEvent* CreateLogEvent(TEventClass c) = 0;
- virtual TEventLogFormat CurrentFormat() = 0;
- virtual TEventClass ClassByName(TStringBuf name) const = 0;
- virtual TEventClass EventClassBegin() const = 0;
- virtual TEventClass EventClassEnd() const = 0;
- virtual ~IEventFactory() = default;
-};
-
-class TUnknownEvent: public TEvent {
-public:
- TUnknownEvent(TEventTimestamp ts, TEventClass cls)
- : TEvent(cls, ts)
- {
- }
-
- ~TUnknownEvent() override = default;
-
- void Save(IOutputStream& /* o */) const override {
- ythrow yexception() << "TUnknownEvent cannot be saved";
- }
-
- void Load(IInputStream& /* i */) override {
- ythrow yexception() << "TUnknownEvent cannot be loaded";
- }
-
- TStringBuf GetName() const override;
-
-private:
- void DoPrint(IOutputStream& out, EFieldOutputFlags) const override {
- out << GetName() << "\t" << (size_t)Class;
- }
-
- void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override;
-
- const NProtoBuf::Message* GetProto() const override;
-};
-
-class TEndOfFrameEvent: public TEvent {
-public:
- enum {
- EventClass = 0
- };
-
- TEndOfFrameEvent(TEventTimestamp ts)
- : TEvent(TEndOfFrameEvent::EventClass, ts)
- {
- }
-
- ~TEndOfFrameEvent() override = default;
-
- void Save(IOutputStream& o) const override {
- (void)o;
- ythrow yexception() << "TEndOfFrameEvent cannot be saved";
- }
-
- void Load(IInputStream& i) override {
- (void)i;
- ythrow yexception() << "TEndOfFrameEvent cannot be loaded";
- }
-
- TStringBuf GetName() const override;
-
-private:
- void DoPrint(IOutputStream& out, EFieldOutputFlags) const override {
- out << GetName();
- }
- void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override;
-
- const NProtoBuf::Message* GetProto() const override;
-};
-
-class ILogFrameEventVisitor {
-public:
- virtual ~ILogFrameEventVisitor() = default;
-
- virtual void Visit(const TEvent& event) = 0;
-};
-
-class IWriteFrameCallback : public TAtomicRefCount<IWriteFrameCallback> {
-public:
- virtual ~IWriteFrameCallback() = default;
-
- virtual void OnAfterCompress(const TBuffer& compressedFrame, TEventTimestamp startTimestamp, TEventTimestamp endTimestamp) = 0;
-};
-
-using TWriteFrameCallbackPtr = TIntrusivePtr<IWriteFrameCallback>;
-
-class TEventLogFrame {
-public:
- TEventLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
- TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
- TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
-
- virtual ~TEventLogFrame() = default;
-
- void Flush();
- void SafeFlush();
-
- void ForceDump() {
- ForceDump_ = true;
- }
-
- template <class T>
- inline void LogEvent(const T& ev) {
- if (NeedAlwaysSafeAdd_) {
- SafeLogEvent(ev);
- } else {
- UnSafeLogEvent(ev);
- }
- }
-
- template <class T>
- inline void LogEvent(TEventTimestamp timestamp, const T& ev) {
- if (NeedAlwaysSafeAdd_) {
- SafeLogEvent(timestamp, ev);
- } else {
- UnSafeLogEvent(timestamp, ev);
- }
- }
-
- template <class T>
- inline void UnSafeLogEvent(const T& ev) {
- if (!IsEventIgnored(ev.ID))
- LogProtobufEvent(ev.ID, ev);
- }
-
- template <class T>
- inline void UnSafeLogEvent(TEventTimestamp timestamp, const T& ev) {
- if (!IsEventIgnored(ev.ID))
- LogProtobufEvent(timestamp, ev.ID, ev);
- }
-
- template <class T>
- inline void SafeLogEvent(const T& ev) {
- if (!IsEventIgnored(ev.ID)) {
- TGuard<TMutex> g(Mtx_);
- LogProtobufEvent(ev.ID, ev);
- }
- }
-
- template <class T>
- inline void SafeLogEvent(TEventTimestamp timestamp, const T& ev) {
- if (!IsEventIgnored(ev.ID)) {
- TGuard<TMutex> g(Mtx_);
- LogProtobufEvent(timestamp, ev.ID, ev);
- }
- }
-
- void VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory);
-
- inline bool IsEventIgnored(size_t eventId) const {
- Y_UNUSED(eventId); // in future we might want to selectively discard only some kinds of messages
- return !IsDebugModeEnabled() && EvLog_ == nullptr && !ForceDump_;
- }
-
- void Enable(IEventLog& evLog) {
- EvLog_ = &evLog;
- }
-
- void Disable() {
- EvLog_ = nullptr;
- }
-
- void SetNeedAlwaysSafeAdd(bool val) {
- NeedAlwaysSafeAdd_ = val;
- }
-
- void SetWriteFrameCallback(TWriteFrameCallbackPtr writeFrameCallback) {
- WriteFrameCallback_ = writeFrameCallback;
- }
-
- void AddMetaFlag(const TString& key, const TString& value) {
- if (NeedAlwaysSafeAdd_) {
- TGuard<TMutex> g(Mtx_);
- MetaFlags_.emplace_back(key, value);
- } else {
- MetaFlags_.emplace_back(key, value);
- }
- }
-
-protected:
- void LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev);
- void LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev);
-
-private:
- static bool IsDebugModeEnabled() {
- static struct TSelector {
- bool Flag;
-
- TSelector()
- : Flag(GetEnv("EVLOG_DEBUG") == TStringBuf("1"))
- {
- }
- } selector;
-
- return selector.Flag;
- }
-
- template <class T>
- void DebugDump(const T& ev);
-
- // T must be a descendant of NEvClass::TEvent
- template <class T>
- inline void LogEventImpl(const T& ev) {
- if (EvLog_ != nullptr || ForceDump_) {
- TBuffer& b = Buf_.Buffer();
- size_t lastSize = b.size();
- ::Save(&Buf_, ui32(0));
- ev.FullSaveToBuffer(Buf_);
- WriteUnaligned<ui32>(b.data() + lastSize, (ui32)(b.size() - lastSize));
- AddEvent(ev.Timestamp);
- }
-
- if (IsDebugModeEnabled()) {
- DebugDump(ev);
- }
- }
-
- void AddEvent(TEventTimestamp timestamp);
- void DoInit();
-
-private:
- TBufferOutput Buf_;
- TEventTimestamp StartTimestamp_, EndTimestamp_;
- IEventLog* EvLog_;
- TMutex Mtx_;
- bool NeedAlwaysSafeAdd_;
- bool ForceDump_;
- TWriteFrameCallbackPtr WriteFrameCallback_;
- TLogRecord::TMetaFlags MetaFlags_;
- friend class TEventRecord;
-};
-
-class TSelfFlushLogFrame: public TEventLogFrame, public TAtomicRefCount<TSelfFlushLogFrame> {
-public:
- TSelfFlushLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
- TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
- TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr);
-
- virtual ~TSelfFlushLogFrame();
-};
-
-using TSelfFlushLogFramePtr = TIntrusivePtr<TSelfFlushLogFrame>;
-
-class IEventLog: public TAtomicRefCount<IEventLog> {
-public:
- class IErrorCallback {
- public:
- virtual ~IErrorCallback() {
- }
-
- virtual void OnWriteError() = 0;
- };
-
- class ISuccessCallback {
- public:
- virtual ~ISuccessCallback() {
- }
-
- virtual void OnWriteSuccess(const TBuffer& frameData) = 0;
- };
-
- virtual ~IEventLog();
-
- virtual void ReopenLog() = 0;
- virtual void CloseLog() = 0;
- virtual void Flush() = 0;
- virtual void SetErrorCallback(IErrorCallback*) {
- }
- virtual void SetSuccessCallback(ISuccessCallback*) {
- }
-
- template <class T>
- void LogEvent(const T& ev) {
- TEventLogFrame frame(*this);
- frame.LogEvent(ev);
- frame.Flush();
- }
-
- virtual bool HasNullBackend() const = 0;
-
- virtual void WriteFrame(TBuffer& buffer,
- TEventTimestamp startTimestamp,
- TEventTimestamp endTimestamp,
- TWriteFrameCallbackPtr writeFrameCallback = nullptr,
- TLogRecord::TMetaFlags metaFlags = {}) = 0;
-};
-
-struct TEventLogBackendOptions {
- bool UseSyncPageCacheBackend = false;
- size_t SyncPageCacheBackendBufferSize = 0;
- size_t SyncPageCacheBackendMaxPendingSize = 0;
-};
-
-class TEventLog: public IEventLog {
-public:
- /*
- * Параметр contentformat указывает формат контента лога, например какие могут в логе
- * встретится классы событий, какие параметры у этих событий, и пр. Старший байт параметра
- * должен быть нулевым.
- */
- TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat);
- TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts = {});
- TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4);
- TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4);
-
- ~TEventLog() override;
-
- void ReopenLog() override;
- void CloseLog() override;
- void Flush() override;
- void SetErrorCallback(IErrorCallback* errorCallback) override {
- ErrorCallback_ = errorCallback;
- }
- void SetSuccessCallback(ISuccessCallback* successCallback) override {
- SuccessCallback_ = successCallback;
- }
-
- template <class T>
- void LogEvent(const T& ev) {
- TEventLogFrame frame(*this);
- frame.LogEvent(ev);
- frame.Flush();
- }
-
- bool HasNullBackend() const override {
- return HasNullBackend_;
- }
-
- void WriteFrame(TBuffer& buffer,
- TEventTimestamp startTimestamp,
- TEventTimestamp endTimestamp,
- TWriteFrameCallbackPtr writeFrameCallback = nullptr,
- TLogRecord::TMetaFlags metaFlags = {}) override;
-
-private:
- mutable TLog Log_;
- TEventLogFormat ContentFormat_;
- const TEventLogFormat LogFormat_;
- bool HasNullBackend_;
- const NBlockCodecs::ICodec* const Lz4hcCodec_;
- const NBlockCodecs::ICodec* const ZstdCodec_;
- IErrorCallback* ErrorCallback_ = nullptr;
- ISuccessCallback* SuccessCallback_ = nullptr;
-};
-
-using TEventLogPtr = TIntrusivePtr<IEventLog>;
-
-class TEventLogWithSlave: public IEventLog {
-public:
- TEventLogWithSlave(IEventLog& parentLog)
- : Slave_(&parentLog)
- {
- }
-
- TEventLogWithSlave(const TEventLogPtr& parentLog)
- : SlavePtr_(parentLog)
- , Slave_(SlavePtr_.Get())
- {
- }
-
- ~TEventLogWithSlave() override {
- try {
- Slave().Flush();
- } catch (...) {
- }
- }
-
- void Flush() override {
- Slave().Flush();
- }
-
- void ReopenLog() override {
- return Slave().ReopenLog();
- }
- void CloseLog() override {
- return Slave().CloseLog();
- }
-
- bool HasNullBackend() const override {
- return Slave().HasNullBackend();
- }
-
- void WriteFrame(TBuffer& buffer,
- TEventTimestamp startTimestamp,
- TEventTimestamp endTimestamp,
- TWriteFrameCallbackPtr writeFrameCallback = nullptr,
- TLogRecord::TMetaFlags metaFlags = {}) override {
- Slave().WriteFrame(buffer, startTimestamp, endTimestamp, writeFrameCallback, std::move(metaFlags));
- }
-
- void SetErrorCallback(IErrorCallback* errorCallback) override {
- Slave().SetErrorCallback(errorCallback);
- }
-
- void SetSuccessCallback(ISuccessCallback* successCallback) override {
- Slave().SetSuccessCallback(successCallback);
- }
-
-protected:
- inline IEventLog& Slave() const {
- return *Slave_;
- }
-
-private:
- TEventLogPtr SlavePtr_;
- IEventLog* Slave_ = nullptr;
-};
-
-extern TAtomic eventlogFrameCounter;
-
-class TProtobufEventProcessor: public IEventProcessor {
-public:
- void ProcessEvent(const TEvent* ev) override final {
- ProcessEvent(ev, &Cout);
- }
-
- void ProcessEvent(const TEvent* ev, IOutputStream *out) {
- UpdateEventState(ev);
- DoProcessEvent(ev, out);
- EventState_.PrevEventTime = ev->Timestamp;
- }
-protected:
- virtual void DoProcessEvent(const TEvent * ev, IOutputStream *out) {
- ev->Print(*out, Options_, EventState_);
- (*out) << Endl;
- }
- ui32 CurrentFrameId_ = Max<ui32>();
- TEvent::TEventState EventState_;
-
-private:
- void UpdateEventState(const TEvent *ev) {
- if (ev->FrameId != CurrentFrameId_) {
- EventState_.FrameStartTime = ev->Timestamp;
- EventState_.PrevEventTime = ev->Timestamp;
- CurrentFrameId_ = ev->FrameId;
- }
- }
-};
-
-class TProtobufEventFactory: public IEventFactory {
-public:
- TProtobufEventFactory(NProtoBuf::TEventFactory* factory = NProtoBuf::TEventFactory::Instance())
- : EventFactory_(factory)
- {
- }
-
- TEvent* CreateLogEvent(TEventClass c) override;
-
- TEventLogFormat CurrentFormat() override {
- return 0;
- }
-
- TEventClass ClassByName(TStringBuf name) const override;
-
- TEventClass EventClassBegin() const override;
-
- TEventClass EventClassEnd() const override;
-
- ~TProtobufEventFactory() override = default;
-
-private:
- NProtoBuf::TEventFactory* EventFactory_;
-};
-
-THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev);
-
-namespace NEvClass {
- IEventFactory* Factory();
- IEventProcessor* Processor();
-}
diff --git a/library/cpp/eventlog/eventlog_int.cpp b/library/cpp/eventlog/eventlog_int.cpp
deleted file mode 100644
index faa8c42cbeb..00000000000
--- a/library/cpp/eventlog/eventlog_int.cpp
+++ /dev/null
@@ -1,12 +0,0 @@
-#include "eventlog_int.h"
-
-#include <util/string/cast.h>
-
-TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str) {
- EEventLogFormat format;
- if (TryFromString(str, format)) {
- return static_cast<TEventLogFormat>(format);
- } else {
- return {};
- }
-}
diff --git a/library/cpp/eventlog/eventlog_int.h b/library/cpp/eventlog/eventlog_int.h
deleted file mode 100644
index eb00fecfab6..00000000000
--- a/library/cpp/eventlog/eventlog_int.h
+++ /dev/null
@@ -1,72 +0,0 @@
-#pragma once
-
-#include <util/stream/output.h>
-#include <util/generic/maybe.h>
-#include <util/generic/utility.h>
-#include <util/generic/yexception.h>
-#include <util/ysaveload.h>
-
-using TEventClass = ui32;
-using TEventLogFormat = ui32;
-using TEventTimestamp = ui64;
-
-constexpr TStringBuf COMPRESSED_LOG_FRAME_SYNC_DATA =
- "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
- "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
- "\x00\x00\x00\x00\xfe\x00\x00\xff\xff\x00\x00\xff\xff\x00"
- "\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff"
- "\xff\x00\x00\xff\xff\x00\x00\xff"sv;
-
-static_assert(COMPRESSED_LOG_FRAME_SYNC_DATA.size() == 64);
-
-/*
- * Коды форматов логов. Форматом лога считается формат служебных
- * структур лога. К примеру формат заголовка, наличие компрессии, и т.д.
- * Имеет значение только 1 младший байт.
- */
-
-enum EEventLogFormat : TEventLogFormat {
- // Формат версии 1. Используется компрессор LZQ.
- COMPRESSED_LOG_FORMAT_V1 = 1,
-
- // Формат версии 2. Используется компрессор ZLIB. Добавлены CRC заголовка и данных,
- // поле типа компрессора.
- COMPRESSED_LOG_FORMAT_V2 = 2,
-
- // Формат версии 3. Используется компрессор ZLIB. В начинке фреймов перед каждым событием добавлен его размер.
- COMPRESSED_LOG_FORMAT_V3 = 3,
-
- // Lz4hc codec + zlib
- COMPRESSED_LOG_FORMAT_V4 = 4 /* "zlib_lz4" */,
-
- // zstd
- COMPRESSED_LOG_FORMAT_V5 = 5 /* "zstd" */,
-};
-
-TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str);
-
-#pragma pack(push, 1)
-
-struct TCompressedFrameBaseHeader {
- TEventLogFormat Format;
- ui32 Length; // Длина остатка фрейма в байтах, после этого заголовка
- ui32 FrameId;
-};
-
-struct TCompressedFrameHeader {
- TEventTimestamp StartTimestamp;
- TEventTimestamp EndTimestamp;
- ui32 UncompressedDatalen; // Длина данных, которые были закомпрессированы
- ui32 PayloadChecksum; // В логе версии 1 поле не используется
-};
-
-struct TCompressedFrameHeader2: public TCompressedFrameHeader {
- ui8 CompressorVersion; // Сейчас не используется
- ui32 HeaderChecksum;
-};
-
-#pragma pack(pop)
-
-Y_DECLARE_PODTYPE(TCompressedFrameBaseHeader);
-Y_DECLARE_PODTYPE(TCompressedFrameHeader);
-Y_DECLARE_PODTYPE(TCompressedFrameHeader2);
diff --git a/library/cpp/eventlog/events_extension.h b/library/cpp/eventlog/events_extension.h
deleted file mode 100644
index 0cf062f9590..00000000000
--- a/library/cpp/eventlog/events_extension.h
+++ /dev/null
@@ -1,161 +0,0 @@
-#pragma once
-
-#include "event_field_output.h"
-
-#include <google/protobuf/descriptor.h>
-#include <google/protobuf/message.h>
-
-#include <library/cpp/threading/atomic/bool.h>
-#include <library/cpp/string_utils/base64/base64.h>
-
-#include <util/generic/map.h>
-#include <util/generic/deque.h>
-#include <util/generic/singleton.h>
-#include <util/string/hex.h>
-#include <util/system/guard.h>
-#include <util/system/mutex.h>
-
-namespace NProtoBuf {
- class TEventFactory {
- public:
- typedef ::google::protobuf::Message Message;
- typedef void (*TEventSerializer)(const Message* event, IOutputStream& output, EFieldOutputFlags flags);
- typedef void (*TRegistrationFunc)();
-
- private:
- class TFactoryItem {
- public:
- TFactoryItem(const Message* prototype, const TEventSerializer serializer)
- : Prototype_(prototype)
- , Serializer_(serializer)
- {
- }
-
- TStringBuf GetName() const {
- return Prototype_->GetDescriptor()->name();
- }
-
- Message* Create() const {
- return Prototype_->New();
- }
-
- void PrintEvent(const Message* event, IOutputStream& out, EFieldOutputFlags flags) const {
- (*Serializer_)(event, out, flags);
- }
-
- private:
- const Message* Prototype_;
- const TEventSerializer Serializer_;
- };
-
- typedef TMap<size_t, TFactoryItem> TFactoryMap;
-
- public:
- TEventFactory()
- : FactoryItems_()
- {
- }
-
- void ScheduleRegistration(TRegistrationFunc func) {
- EventRegistrators_.push_back(func);
- }
-
- void RegisterEvent(size_t eventId, const Message* prototype, const TEventSerializer serializer) {
- FactoryItems_.insert(std::make_pair(eventId, TFactoryItem(prototype, serializer)));
- }
-
- size_t IdByName(TStringBuf eventname) {
- DelayedRegistration();
- for (TFactoryMap::const_iterator it = FactoryItems_.begin(); it != FactoryItems_.end(); ++it) {
- if (it->second.GetName() == eventname)
- return it->first;
- }
-
- ythrow yexception() << "do not know event '" << eventname << "'";
- }
-
- TStringBuf NameById(size_t id) {
- DelayedRegistration();
- TFactoryMap::const_iterator it = FactoryItems_.find(id);
- return it != FactoryItems_.end() ? it->second.GetName() : TStringBuf();
- }
-
- Message* CreateEvent(size_t eventId) {
- DelayedRegistration();
- TFactoryMap::const_iterator it = FactoryItems_.find(eventId);
-
- if (it != FactoryItems_.end()) {
- return it->second.Create();
- }
-
- return nullptr;
- }
-
- const TMap<size_t, TFactoryItem>& FactoryItems() {
- DelayedRegistration();
- return FactoryItems_;
- }
-
- void PrintEvent(
- size_t eventId,
- const Message* event,
- IOutputStream& output,
- EFieldOutputFlags flags = {}) {
- DelayedRegistration();
- TFactoryMap::const_iterator it = FactoryItems_.find(eventId);
-
- if (it != FactoryItems_.end()) {
- it->second.PrintEvent(event, output, flags);
- }
- }
-
- static TEventFactory* Instance() {
- return Singleton<TEventFactory>();
- }
-
- private:
- void DelayedRegistration() {
- if (!DelayedRegistrationDone_) {
- TGuard<TMutex> guard(MutexEventRegistrators_);
- Y_UNUSED(guard);
- while (!EventRegistrators_.empty()) {
- EventRegistrators_.front()();
- EventRegistrators_.pop_front();
- }
- DelayedRegistrationDone_ = true;
- }
- }
-
- private:
- TMap<size_t, TFactoryItem> FactoryItems_;
- TDeque<TRegistrationFunc> EventRegistrators_;
- NAtomic::TBool DelayedRegistrationDone_ = false;
- TMutex MutexEventRegistrators_;
- };
-
- template <typename T>
- void PrintAsBytes(const T& obj, IOutputStream& output) {
- const ui8* b = reinterpret_cast<const ui8*>(&obj);
- const ui8* e = b + sizeof(T);
- const char* delim = "";
-
- while (b != e) {
- output << delim;
- output << (int)*b++;
- delim = ".";
- }
- }
-
- template <typename T>
- void PrintAsHex(const T& obj, IOutputStream& output) {
- output << "0x";
- output << HexEncode(&obj, sizeof(T));
- }
-
- inline void PrintAsBase64(TStringBuf data, IOutputStream& output) {
- if (!data.empty()) {
- output << Base64Encode(data);
- }
- }
-
-}
diff --git a/library/cpp/eventlog/iterator.cpp b/library/cpp/eventlog/iterator.cpp
deleted file mode 100644
index 71f955bca82..00000000000
--- a/library/cpp/eventlog/iterator.cpp
+++ /dev/null
@@ -1,88 +0,0 @@
-#include "iterator.h"
-
-#include <library/cpp/streams/growing_file_input/growing_file_input.h>
-
-#include <util/string/cast.h>
-#include <util/string/split.h>
-#include <util/string/type.h>
-#include <util/stream/file.h>
-
-using namespace NEventLog;
-
-namespace {
- inline TIntrusivePtr<TEventFilter> ConstructEventFilter(bool enableEvents, const TString& evList, IEventFactory* fac) {
- if (evList.empty()) {
- return nullptr;
- }
-
- TVector<TString> events;
-
- StringSplitter(evList).Split(',').SkipEmpty().Collect(&events);
- if (events.empty()) {
- return nullptr;
- }
-
- TIntrusivePtr<TEventFilter> filter(new TEventFilter(enableEvents));
-
- for (const auto& event : events) {
- if (IsNumber(event))
- filter->AddEventClass(FromString<size_t>(event));
- else
- filter->AddEventClass(fac->ClassByName(event));
- }
-
- return filter;
- }
-
- struct TIterator: public IIterator {
- inline TIterator(const TOptions& o, IEventFactory* fac)
- : First(true)
- {
- if (o.FileName.size()) {
- if (o.ForceStreamMode || o.TailFMode) {
- FileInput.Reset(o.TailFMode ? (IInputStream*)new TGrowingFileInput(o.FileName) : (IInputStream*)new TUnbufferedFileInput(o.FileName));
- FrameStream.Reset(new TFrameStreamer(*FileInput, fac, o.FrameFilter));
- } else {
- FrameStream.Reset(new TFrameStreamer(o.FileName, o.StartTime, o.EndTime, o.MaxRequestDuration, fac, o.FrameFilter));
- }
- } else {
- FrameStream.Reset(new TFrameStreamer(*o.Input, fac, o.FrameFilter));
- }
-
- EvFilter = ConstructEventFilter(o.EnableEvents, o.EvList, fac);
- EventStream.Reset(new TEventStreamer(*FrameStream, o.StartTime, o.EndTime, o.ForceStrongOrdering, EvFilter, o.ForceLosslessStrongOrdering));
- }
-
- TConstEventPtr Next() override {
- if (First) {
- First = false;
-
- if (!EventStream->Avail()) {
- return nullptr;
- }
- } else {
- if (!EventStream->Next()) {
- return nullptr;
- }
- }
-
- return **EventStream;
- }
-
- THolder<IInputStream> FileInput;
- THolder<TFrameStreamer> FrameStream;
- TIntrusivePtr<TEventFilter> EvFilter;
- THolder<TEventStreamer> EventStream;
- bool First;
- };
-}
-
-IIterator::~IIterator() = default;
-
-THolder<IIterator> NEventLog::CreateIterator(const TOptions& o, IEventFactory* fac) {
- return MakeHolder<TIterator>(o, fac);
-}
-
-THolder<IIterator> NEventLog::CreateIterator(const TOptions& o) {
- return MakeHolder<TIterator>(o, NEvClass::Factory());
-}
diff --git a/library/cpp/eventlog/iterator.h b/library/cpp/eventlog/iterator.h
deleted file mode 100644
index 71a61ed5494..00000000000
--- a/library/cpp/eventlog/iterator.h
+++ /dev/null
@@ -1,51 +0,0 @@
-#pragma once
-
-#include <util/stream/input.h>
-#include <util/generic/ptr.h>
-#include <util/generic/string.h>
-#include <util/generic/iterator.h>
-
-#include "eventlog.h"
-#include "logparser.h"
-
-namespace NEventLog {
- struct TOptions {
- inline TOptions& SetFileName(const TString& fileName) {
- FileName = fileName;
-
- return *this;
- }
-
- inline TOptions& SetForceStrongOrdering(bool v) {
- if(!ForceLosslessStrongOrdering) {
- ForceStrongOrdering = v;
- }
-
- return *this;
- }
-
- ui64 StartTime = MIN_START_TIME;
- ui64 EndTime = MAX_END_TIME;
- ui64 MaxRequestDuration = MAX_REQUEST_DURATION;
- TString FileName;
- bool ForceStrongOrdering = false;
- bool ForceWeakOrdering = false;
- bool EnableEvents = true;
- TString EvList;
- bool ForceStreamMode = false;
- bool ForceLosslessStrongOrdering = false;
- bool TailFMode = false;
- IInputStream* Input = &Cin;
- IFrameFilterRef FrameFilter;
- };
-
- class IIterator: public TInputRangeAdaptor<IIterator> {
- public:
- virtual ~IIterator();
-
- virtual TConstEventPtr Next() = 0;
- };
-
- THolder<IIterator> CreateIterator(const TOptions& o);
- THolder<IIterator> CreateIterator(const TOptions& o, IEventFactory* fac);
-}
diff --git a/library/cpp/eventlog/logparser.cpp b/library/cpp/eventlog/logparser.cpp
deleted file mode 100644
index 6f8959f7888..00000000000
--- a/library/cpp/eventlog/logparser.cpp
+++ /dev/null
@@ -1,814 +0,0 @@
-#include "logparser.h"
-#include "evdecoder.h"
-
-#include <util/stream/output.h>
-#include <util/stream/zlib.h>
-#include <util/digest/murmur.h>
-#include <util/generic/algorithm.h>
-#include <util/generic/scope.h>
-#include <util/generic/hash_set.h>
-#include <util/string/split.h>
-#include <util/string/cast.h>
-#include <util/string/escape.h>
-#include <util/string/builder.h>
-
-#include <contrib/libs/re2/re2/re2.h>
-
-#include <algorithm>
-#include <array>
-
-namespace {
- bool FastforwardUntilSyncHeader(IInputStream* in) {
- // Usually this function finds the correct header at the first hit
- std::array<char, COMPRESSED_LOG_FRAME_SYNC_DATA.size()> buffer;
- if (in->Load(buffer.data(), buffer.size()) != buffer.size()) {
- return false;
- }
-
- auto begin = buffer.begin();
-
- for (;;) {
- if (std::mismatch(
- begin, buffer.end(),
- COMPRESSED_LOG_FRAME_SYNC_DATA.begin()).first == buffer.end() &&
- std::mismatch(
- buffer.begin(), begin,
- COMPRESSED_LOG_FRAME_SYNC_DATA.begin() + (buffer.end() - begin)).first == begin) {
- return true;
- }
- if (!in->ReadChar(*begin)) {
- return false;
- }
- ++begin;
- if (begin == buffer.end()) {
- begin = buffer.begin();
- }
- }
- }
-
- bool HasCorrectChecksum(const TFrameHeader& header) {
- // Calculating hash over all the fields of the read header except for the field with the hash of the header itself.
- const size_t baseSize = sizeof(TCompressedFrameBaseHeader) + sizeof(TCompressedFrameHeader2) - sizeof(ui32);
- const ui32 checksum = MurmurHash<ui32>(&header.Basehdr, baseSize);
- return checksum == header.Framehdr.HeaderChecksum;
- }
-
- TMaybe<TFrameHeader> FindNextFrameHeader(IInputStream* in) {
- for (;;) {
- if (FastforwardUntilSyncHeader(in)) {
- try {
- return TFrameHeader(*in);
- } catch (const TFrameLoadError& err) {
- Cdbg << err.what() << Endl;
- in->Skip(err.SkipAfter);
- }
- } else {
- return Nothing();
- }
- }
- }
-
- std::pair<TMaybe<TFrameHeader>, TStringBuf> FindNextFrameHeader(TStringBuf span) {
- for (;;) {
- auto iter = std::search(
- span.begin(), span.end(),
- COMPRESSED_LOG_FRAME_SYNC_DATA.begin(), COMPRESSED_LOG_FRAME_SYNC_DATA.end());
- const size_t offset = iter - span.begin();
-
- if (offset != span.size()) {
- span = span.substr(offset);
- try {
- TMemoryInput in(
- span.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(),
- span.size() - COMPRESSED_LOG_FRAME_SYNC_DATA.size());
- return {TFrameHeader(in), span};
- } catch (const TFrameLoadError& err) {
- Cdbg << err.what() << Endl;
- span = span.substr(err.SkipAfter);
- }
- } else {
- return {Nothing(), {}};
- }
- }
- }
-
- size_t FindFrames(const TStringBuf span, ui64 start, ui64 end, ui64 maxRequestDuration) {
- Y_ENSURE(start <= end);
-
- const auto leftTimeBound = start - Min(start, maxRequestDuration);
- const auto rightTimeBound = end + Min(maxRequestDuration, Max<ui64>() - end);
-
- TStringBuf subspan = span;
- TMaybe<TFrameHeader> maybeLeftFrame;
- std::tie(maybeLeftFrame, subspan) = FindNextFrameHeader(subspan);
-
- if (!maybeLeftFrame || maybeLeftFrame->EndTime() > rightTimeBound) {
- return span.size();
- }
-
- if (maybeLeftFrame->StartTime() > leftTimeBound) {
- return 0;
- }
-
- while (subspan.size() > maybeLeftFrame->FullLength()) {
- const auto mid = subspan.data() + subspan.size() / 2;
- auto [midFrame, rightHalfSpan] = FindNextFrameHeader({mid, subspan.data() + subspan.size()});
- if (!midFrame) {
- // If mid is in the middle of the last frame, here we will lose it meaning that
- // we will find previous frame as the result.
- // This is fine because we will iterate frames starting from that.
- subspan = subspan.substr(0, subspan.size() / 2);
- continue;
- }
- if (midFrame->StartTime() <= leftTimeBound) {
- maybeLeftFrame = midFrame;
- subspan = rightHalfSpan;
- } else {
- subspan = subspan.substr(0, subspan.size() / 2);
- }
- }
-
- return subspan.data() - span.data();
- }
-}
-
-TFrameHeader::TFrameHeader(IInputStream& in) {
- try {
- ::Load(&in, Basehdr);
-
- Y_ENSURE(Basehdr.Length, "Empty frame additional data");
-
- ::Load(&in, Framehdr);
- switch (LogFormat()) {
- case COMPRESSED_LOG_FORMAT_V1:
- break;
-
- case COMPRESSED_LOG_FORMAT_V2:
- case COMPRESSED_LOG_FORMAT_V3:
- case COMPRESSED_LOG_FORMAT_V4:
- case COMPRESSED_LOG_FORMAT_V5:
- Y_ENSURE(!Framehdr.CompressorVersion, "Wrong compressor");
-
- Y_ENSURE(HasCorrectChecksum(*this), "Wrong header checksum");
- break;
-
- default:
- ythrow yexception() << "Unsupported log structure format";
- };
-
- Y_ENSURE(Framehdr.StartTimestamp <= Framehdr.EndTimestamp, "Wrong start/end timestamps");
-
- // Each frame must contain at least one event.
- Y_ENSURE(Framehdr.UncompressedDatalen, "Empty frame payload");
- } catch (...) {
- TString location = "";
- if (const auto* cnt = dynamic_cast<TCountingInput *>(&in)) {
- location = "@ " + ToString(cnt->Counter());
- }
- ythrow TFrameLoadError(FrameLength()) << "Frame Load Error" << location << ": " << CurrentExceptionMessage();
- }
-}
-
-TFrame::TFrame(IInputStream& in, TFrameHeader header, IEventFactory* fac)
- : TFrameHeader(header)
- , Limiter_(MakeHolder<TLengthLimitedInput>(&in, header.FrameLength()))
- , Fac_(fac)
-{
- if (auto* cnt = dynamic_cast<TCountingInput *>(&in)) {
- Address_ = cnt->Counter() - sizeof(TFrameHeader);
- } else {
- Address_ = 0;
- }
-}
-
-TFrame::TIterator TFrame::GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter) const {
- if (EventsCache_.empty()) {
- for (TFrameDecoder decoder{*this, eventFilter.Get()}; decoder.Avail(); decoder.Next()) {
- EventsCache_.emplace_back(*decoder);
- }
- }
-
- return TIterator(*this, eventFilter);
-}
-
-void TFrame::ClearEventsCache() const {
- EventsCache_.clear();
-}
-
-TString TFrame::GetCompressedFrame() const {
- const auto left = Limiter_->Left();
- TString payload = Limiter_->ReadAll();
- Y_ENSURE(payload.size() == left, "Could not read frame payload: premature end of stream");
- const ui32 checksum = MurmurHash<ui32>(payload.data(), payload.size());
- Y_ENSURE(checksum == Framehdr.PayloadChecksum, "Invalid frame checksum");
-
- return payload;
-}
-
-TString TFrame::GetRawFrame() const {
- TString frameBuf = GetCompressedFrame();
- TStringInput sin(frameBuf);
- return TZLibDecompress{&sin}.ReadAll();
-}
-
-TFrame::TIterator::TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter)
- : Frame_(frame)
- , Size_(frame.EventsCache_.size())
- , Filter_(filter)
- , Index_(0)
-{
- SkipToValidEvent();
-}
-
-TConstEventPtr TFrame::TIterator::operator*() const {
- return Frame_.GetEvent(Index_);
-}
-
-bool TFrame::TIterator::Next() {
- Index_++;
- SkipToValidEvent();
- return Index_ < Size_;
-}
-
-void TFrame::TIterator::SkipToValidEvent() {
- if (!Filter_) {
- return;
- }
-
- for (; Index_ < Size_; ++Index_) {
- if (Filter_->EventAllowed(Frame_.GetEvent(Index_)->Class)) {
- break;
- }
- }
-}
-
-TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory* eventFactory) {
- if (auto header = FindNextFrameHeader(in)) {
- return TFrame{*in, *header, eventFactory};
- } else {
- return Nothing();
- }
-}
-
-TContainsEventFrameFilter::TContainsEventFrameFilter(const TString& unparsedMatchGroups, const IEventFactory* eventFactory) {
- TVector<TStringBuf> tokens;
-
- SplitWithEscaping(tokens, unparsedMatchGroups, "/");
-
- // Amount of match groups
- size_t size = tokens.size();
- MatchGroups.resize(size);
-
- for (size_t i = 0; i < size; i++) {
- TMatchGroup& group = MatchGroups[i];
- TVector<TStringBuf> groupTokens;
- SplitWithEscaping(groupTokens, tokens[i], ":");
-
- Y_ENSURE(groupTokens.size() == 3);
-
- try {
- group.EventID = eventFactory->ClassByName(groupTokens[0]);
- } catch (yexception& e) {
- if (!TryFromString<TEventClass>(groupTokens[0], group.EventID)) {
- e << "\nAppend:\n" << "Cannot derive EventId from EventType: " << groupTokens[0];
- throw e;
- }
- }
-
- group.FieldName = groupTokens[1];
- group.ValueToMatch = UnescapeCharacters(groupTokens[2], "/:");
- }
-}
-
-bool TContainsEventFrameFilter::FrameAllowed(const TFrame& frame) const {
- THashSet<size_t> toMatchSet;
- for (size_t i = 0; i < MatchGroups.size(); i++) {
- toMatchSet.insert(i);
- }
-
- for (auto it = frame.GetIterator(); it.Avail(); it.Next()) {
- TConstEventPtr event(*it);
- TVector<size_t> indicesToErase;
-
- if (!toMatchSet.empty()) {
- const NProtoBuf::Message* message = event->GetProto();
- const google::protobuf::Descriptor* descriptor = message->GetDescriptor();
- const google::protobuf::Reflection* reflection = message->GetReflection();
-
- Y_ENSURE(descriptor);
- Y_ENSURE(reflection);
-
- for (size_t groupIndex : toMatchSet) {
- const TMatchGroup& group = MatchGroups[groupIndex];
-
- if (event->Class == group.EventID) {
- TVector<TString> parts = StringSplitter(group.FieldName).Split('.').ToList<TString>();
- TString lastPart = std::move(parts.back());
- parts.pop_back();
-
- for (auto part : parts) {
- auto fieldDescriptor = descriptor->FindFieldByName(part);
- Y_ENSURE(fieldDescriptor, "Cannot find field \"" + part + "\". Full fieldname is \"" + group.FieldName + "\".");
-
- message = &reflection->GetMessage(*message, fieldDescriptor);
- descriptor = message->GetDescriptor();
- reflection = message->GetReflection();
-
- Y_ENSURE(descriptor);
- Y_ENSURE(reflection);
- }
-
- const google::protobuf::FieldDescriptor* fieldDescriptor = descriptor->FindFieldByName(lastPart);
- Y_ENSURE(fieldDescriptor, "Cannot find field \"" + lastPart + "\". Full fieldname is \"" + group.FieldName + "\".");
-
- TString fieldValue = GetEventFieldAsString(message, fieldDescriptor, reflection);
- if (re2::RE2::FullMatch(fieldValue, group.ValueToMatch)) {
- indicesToErase.push_back(groupIndex);
- }
- }
- }
-
- for (size_t idx : indicesToErase) {
- toMatchSet.erase(idx);
- }
-
- if (toMatchSet.empty()) {
- return true;
- }
- }
- }
-
- return toMatchSet.empty();
-}
-
-void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet) {
- size_t tokenStart = 0;
- const TString characterSet = TString::Join("\\", externalCharacterSet);
-
- for (size_t position = stringToSplit.find_first_of(characterSet); position != TString::npos; position = stringToSplit.find_first_of(characterSet, position + 1)) {
- if (stringToSplit[position] == '\\') {
- position++;
- } else {
- if (tokenStart != position) {
- tokens.push_back(TStringBuf(stringToSplit, tokenStart, position - tokenStart));
- }
- tokenStart = position + 1;
- }
- }
-
- if (tokenStart < stringToSplit.size()) {
- tokens.push_back(TStringBuf(stringToSplit, tokenStart, stringToSplit.size() - tokenStart));
- }
-}
-
-TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet) {
- TStringBuilder stringBuilder;
- size_t tokenStart = 0;
-
- for (size_t position = stringToUnescape.find('\\', 0u); position != TString::npos; position = stringToUnescape.find('\\', position + 2)) {
- if (position + 1 < stringToUnescape.size() && characterSet.find(stringToUnescape[position + 1]) != TString::npos) {
- stringBuilder << TStringBuf(stringToUnescape, tokenStart, position - tokenStart);
- tokenStart = position + 1;
- }
- }
-
- if (tokenStart < stringToUnescape.size()) {
- stringBuilder << TStringBuf(stringToUnescape, tokenStart, stringToUnescape.size() - tokenStart);
- }
-
- return stringBuilder;
-}
-
-TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection) {
- Y_ENSURE(message);
- Y_ENSURE(fieldDescriptor);
- Y_ENSURE(reflection);
-
- TString result;
- switch (fieldDescriptor->type()) {
- case google::protobuf::FieldDescriptor::Type::TYPE_DOUBLE:
- result = ToString(reflection->GetDouble(*message, fieldDescriptor));
- break;
- case google::protobuf::FieldDescriptor::Type::TYPE_FLOAT:
- result = ToString(reflection->GetFloat(*message, fieldDescriptor));
- break;
- case google::protobuf::FieldDescriptor::Type::TYPE_BOOL:
- result = ToString(reflection->GetBool(*message, fieldDescriptor));
- break;
- case google::protobuf::FieldDescriptor::Type::TYPE_INT32:
- result = ToString(reflection->GetInt32(*message, fieldDescriptor));
- break;
- case google::protobuf::FieldDescriptor::Type::TYPE_UINT32:
- result = ToString(reflection->GetUInt32(*message, fieldDescriptor));
- break;
- case google::protobuf::FieldDescriptor::Type::TYPE_INT64:
- result = ToString(reflection->GetInt64(*message, fieldDescriptor));
- break;
- case google::protobuf::FieldDescriptor::Type::TYPE_UINT64:
- result = ToString(reflection->GetUInt64(*message, fieldDescriptor));
- break;
- case google::protobuf::FieldDescriptor::Type::TYPE_STRING:
- result = ToString(reflection->GetString(*message, fieldDescriptor));
- break;
- case google::protobuf::FieldDescriptor::Type::TYPE_ENUM:
- {
- const NProtoBuf::EnumValueDescriptor* enumValueDescriptor = reflection->GetEnum(*message, fieldDescriptor);
- result = ToString(enumValueDescriptor->name());
- }
- break;
- default:
- throw yexception() << "GetEventFieldAsString for type " << fieldDescriptor->type_name() << " is not implemented.";
- }
- return result;
-}
-
-TFrameStreamer::TFrameStreamer(IInputStream& s, IEventFactory* fac, IFrameFilterRef ff)
- : In_(&s)
- , FrameFilter_(ff)
- , EventFactory_(fac)
-{
- Frame_ = FindNextFrame(&In_, EventFactory_);
-
- SkipToAllowedFrame();
-}
-
-TFrameStreamer::TFrameStreamer(
- const TString& fileName,
- ui64 startTime,
- ui64 endTime,
- ui64 maxRequestDuration,
- IEventFactory* fac,
- IFrameFilterRef ff)
- : File_(TBlob::FromFile(fileName))
- , MemoryIn_(File_.Data(), File_.Size())
- , In_(&MemoryIn_)
- , StartTime_(startTime)
- , EndTime_(endTime)
- , CutoffTime_(endTime + Min(maxRequestDuration, Max<ui64>() - endTime))
- , FrameFilter_(ff)
- , EventFactory_(fac)
-{
- In_.Skip(FindFrames(File_.AsStringBuf(), startTime, endTime, maxRequestDuration));
- Frame_ = FindNextFrame(&In_, fac);
- SkipToAllowedFrame();
-}
-
-TFrameStreamer::~TFrameStreamer() = default;
-
-bool TFrameStreamer::Avail() const {
- return Frame_.Defined();
-}
-
-const TFrame& TFrameStreamer::operator*() const {
- Y_ENSURE(Frame_, "Frame streamer depleted");
-
- return *Frame_;
-}
-
-bool TFrameStreamer::Next() {
- DoNext();
- SkipToAllowedFrame();
-
- return Frame_.Defined();
-}
-
-bool TFrameStreamer::AllowedTimeRange(const TFrame& frame) const {
- const bool allowedStartTime = (StartTime_ == 0) || ((StartTime_ <= frame.StartTime()) && (frame.StartTime() <= EndTime_));
- const bool allowedEndTime = (EndTime_ == 0) || ((StartTime_ <= frame.EndTime()) && (frame.EndTime() <= EndTime_));
- return allowedStartTime || allowedEndTime;
-}
-
-bool TFrameStreamer::DoNext() {
- if (!Frame_) {
- return false;
- }
- In_.Skip(Frame_->Limiter_->Left());
- Frame_ = FindNextFrame(&In_, EventFactory_);
-
- if (Frame_ && CutoffTime_ > 0 && Frame_->EndTime() > CutoffTime_) {
- Frame_.Clear();
- }
-
- return Frame_.Defined();
-}
-
-namespace {
- struct TDecodeBuffer {
- TDecodeBuffer(const TString codec, IInputStream& src, size_t bs) {
- TBuffer from(bs);
-
- {
- TBufferOutput b(from);
- TransferData(&src, &b);
- }
-
- NBlockCodecs::Codec(codec)->Decode(from, DecodeBuffer);
- }
-
- explicit TDecodeBuffer(IInputStream& src) {
- TBufferOutput b(DecodeBuffer);
- TransferData(&src, &b);
- }
-
- TBuffer DecodeBuffer;
- };
-
- class TBlockCodecStream: private TDecodeBuffer, public TBufferInput {
- public:
- TBlockCodecStream(const TString codec, IInputStream& src, size_t bs)
- : TDecodeBuffer(codec, src, bs)
- , TBufferInput(DecodeBuffer)
- {}
-
- explicit TBlockCodecStream(IInputStream& src)
- : TDecodeBuffer(src)
- , TBufferInput(DecodeBuffer)
- {}
- };
-}
-
-TFrameDecoder::TFrameDecoder(const TFrame& fr, const TEventFilter* const filter, bool strict, bool withRawData)
- : Frame_(fr)
- , Event_(nullptr)
- , Flt_(filter)
- , Fac_(fr.Fac_)
- , EndOfFrame_(new TEndOfFrameEvent(Frame_.EndTime()))
- , Strict_(strict)
- , WithRawData_(withRawData)
-{
- switch (fr.LogFormat()) {
- case COMPRESSED_LOG_FORMAT_V2:
- case COMPRESSED_LOG_FORMAT_V3:
- case COMPRESSED_LOG_FORMAT_V4:
- case COMPRESSED_LOG_FORMAT_V5: {
- const auto payload = fr.GetCompressedFrame();
- TMemoryInput payloadInput{payload};
-
- if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V5) {
- Decompressor_.Reset(new TBlockCodecStream("zstd_1", payloadInput, payload.size()));
- } else {
- TZLibDecompress zlib(&payloadInput);
- Decompressor_.Reset(new TBlockCodecStream(zlib));
- if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V4) {
- Decompressor_.Reset(new TBlockCodecStream("lz4hc", *Decompressor_, payload.size()));
- }
- }
-
- break;
- }
-
- default:
- ythrow yexception() << "unsupported log format: " << fr.LogFormat() << Endl;
- break;
- };
-
- if (WithRawData_) {
- TBufferOutput out(UncompressedData_);
- TLengthLimitedInput limiter(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen);
-
- TransferData(&limiter, &out);
- Decompressor_.Reset(new TMemoryInput(UncompressedData_.data(), UncompressedData_.size()));
- }
-
- Limiter_.Reset(new TLengthLimitedInput(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen));
-
- Decode();
-}
-
-TFrameDecoder::~TFrameDecoder() = default;
-
-bool TFrameDecoder::Avail() const {
- return HaveData();
-}
-
-TConstEventPtr TFrameDecoder::operator*() const {
- Y_ENSURE(HaveData(), "Decoder depleted");
-
- return Event_;
-}
-
-bool TFrameDecoder::Next() {
- if (HaveData()) {
- Decode();
- }
-
- return HaveData();
-}
-
-void TFrameDecoder::Decode() {
- Event_ = nullptr;
- const bool framed = (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V3) || (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V4 || Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V5);
-
- size_t evBegin = 0;
- size_t evEnd = 0;
- if (WithRawData_)
- evBegin = UncompressedData_.Size() - Limiter_->Left();
-
- while (Limiter_->Left() && !(Event_ = DecodeEvent(*Limiter_, framed, Frame_.Address(), Flt_, Fac_, Strict_).Release())) {
- }
-
- if (WithRawData_) {
- evEnd = UncompressedData_.Size() - Limiter_->Left();
- RawEventData_ = TStringBuf(UncompressedData_.data() + evBegin, UncompressedData_.data() + evEnd);
- }
-
- if (!Event_ && (!Flt_ || (Flt_->EventAllowed(TEndOfFrameEvent::EventClass)))) {
- Event_ = EndOfFrame_.Release();
- }
-
- if (!!Event_) {
- Event_->FrameId = Frame_.FrameId();
- }
-}
-
-const TStringBuf TFrameDecoder::GetRawEvent() const {
- return RawEventData_;
-}
-
-TEventStreamer::TEventStreamer(TFrameStream& fs, ui64 s, ui64 e, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering)
- : Frames_(fs)
- , Start_(s)
- , End_(e)
- , MaxEndTimestamp_(0)
- , Frontier_(0)
- , StrongOrdering_(strongOrdering)
- , LosslessStrongOrdering_(losslessStrongOrdering)
- , EventFilter_(filter)
-{
-
- if (Start_ > End_) {
- ythrow yexception() << "Wrong main interval";
- }
-
- TEventStreamer::Next();
-}
-
-TEventStreamer::~TEventStreamer() = default;
-
-bool TEventStreamer::Avail() const {
- return Events_.Avail() && (*Events_)->Timestamp <= Frontier_;
-}
-
-TConstEventPtr TEventStreamer::operator*() const {
- Y_ENSURE(TEventStreamer::Avail(), "Event streamer depleted");
-
- return *Events_;
-}
-
-bool TEventStreamer::Next() {
- if (Events_.Avail() && Events_.Next() && (*Events_)->Timestamp <= Frontier_) {
- return true;
- }
-
- for (;;) {
- if (!LoadMoreEvents()) {
- return false;
- }
-
- if (TEventStreamer::Avail()) {
- return true;
- }
- }
-}
-
-/*
-Two parameters are used in the function:
-Frontier - the moment of time up to which inclusively all the log events made their way
- into the buffer (and might have been already extracted out of it).
-Horizon - the moment of time, that equals to Frontier + MAX_REQUEST_DURATION.
-In order to get all the log events up to the Frontier inclusively,
- frames need to be read until "end time" of the current frame exceeds the Horizon.
-*/
-bool TEventStreamer::LoadMoreEvents() {
- if (!Frames_.Avail()) {
- return false;
- }
-
- const TFrame& fr1 = *Frames_;
- const ui64 maxRequestDuration = (StrongOrdering_ ? MAX_REQUEST_DURATION : 0);
-
- if (fr1.EndTime() <= Frontier_ + maxRequestDuration) {
- ythrow yexception() << "Wrong frame stream state";
- }
-
- if (Frontier_ >= End_) {
- return false;
- }
-
- const ui64 old_frontier = Frontier_;
- Frontier_ = fr1.EndTime();
-
- {
- Y_DEFER {
- Events_.Reorder(StrongOrdering_);
- };
-
- for (; Frames_.Avail(); Frames_.Next()) {
- const TFrame& fr2 = *Frames_;
-
- // Frames need to start later than the Frontier.
- if (StrongOrdering_ && fr2.StartTime() <= old_frontier) {
- Cdbg << "Invalid frame encountered" << Endl;
- continue;
- }
-
- if (fr2.EndTime() > MaxEndTimestamp_) {
- MaxEndTimestamp_ = fr2.EndTime();
- }
-
- if (fr2.EndTime() > Frontier_ + maxRequestDuration && !LosslessStrongOrdering_) {
- return true;
- }
-
- // Checking for the frame to be within the main time borders.
- if (fr2.EndTime() >= Start_ && fr2.StartTime() <= End_) {
- TransferEvents(fr2);
- }
- }
- }
-
- Frontier_ = MaxEndTimestamp_;
-
- return true;
-}
-
-void TEventStreamer::TransferEvents(const TFrame& fr) {
- Events_.SetCheckpoint();
-
- try {
- for (auto it = fr.GetIterator(EventFilter_); it.Avail(); it.Next()) {
- TConstEventPtr ev = *it;
-
- if (ev->Timestamp > fr.EndTime() || ev->Timestamp < fr.StartTime()) {
- ythrow TInvalidEventTimestamps() << "Event timestamp out of frame range";
- }
-
- if (ev->Timestamp >= Start_ && ev->Timestamp <= End_) {
- Events_.Append(ev, StrongOrdering_);
- }
- }
- } catch (const TInvalidEventTimestamps& err) {
- Events_.Rollback();
- Cdbg << "EventsTransfer error: InvalidEventTimestamps: " << err.what() << Endl;
- } catch (const TFrameLoadError& err) {
- Events_.Rollback();
- Cdbg << "EventsTransfer error: " << err.what() << Endl;
- } catch (const TEventDecoderError& err) {
- Events_.Rollback();
- Cdbg << "EventsTransfer error: EventDecoder error: " << err.what() << Endl;
- } catch (const TZLibDecompressorError& err) {
- Events_.Rollback();
- Cdbg << "EventsTransfer error: ZLibDecompressor error: " << err.what() << Endl;
- } catch (...) {
- Events_.Rollback();
- throw;
- }
-}
-
-void TEventStreamer::TEventBuffer::SetCheckpoint() {
- BufLen_ = Buffer_.size();
-}
-
-void TEventStreamer::TEventBuffer::Rollback() {
- Buffer_.resize(BufLen_);
-}
-
-void TEventStreamer::TEventBuffer::Reorder(bool strongOrdering) {
- SetCheckpoint();
-
- std::reverse(Buffer_.begin(), Buffer_.end());
-
- if (strongOrdering) {
- StableSort(Buffer_.begin(), Buffer_.end(), [&](const auto& a, const auto& b) {
- return (a->Timestamp > b->Timestamp) ||
- ((a->Timestamp == b->Timestamp) && !a->Class && b->Class);
- });
- }
-}
-
-void TEventStreamer::TEventBuffer::Append(TConstEventPtr ev, bool strongOrdering) {
- // Events in buffer output must be in an ascending order.
- Y_ENSURE(!strongOrdering || ev->Timestamp >= LastTimestamp_, "Trying to append out-of-order event");
-
- Buffer_.push_back(std::move(ev));
-}
-
-bool TEventStreamer::TEventBuffer::Avail() const {
- return !Buffer_.empty();
-}
-
-TConstEventPtr TEventStreamer::TEventBuffer::operator*() const {
- Y_ENSURE(!Buffer_.empty(), "Event buffer is empty");
-
- return Buffer_.back();
-}
-
-bool TEventStreamer::TEventBuffer::Next() {
- if (!Buffer_.empty()) {
- LastTimestamp_ = Buffer_.back()->Timestamp;
- Buffer_.pop_back();
- return !Buffer_.empty();
- } else {
- return false;
- }
-}
diff --git a/library/cpp/eventlog/logparser.h b/library/cpp/eventlog/logparser.h
deleted file mode 100644
index f819e725894..00000000000
--- a/library/cpp/eventlog/logparser.h
+++ /dev/null
@@ -1,343 +0,0 @@
-#pragma once
-
-#include <util/generic/ptr.h>
-#include <util/generic/yexception.h>
-#include <util/generic/vector.h>
-#include <util/generic/set.h>
-#include <util/generic/maybe.h>
-#include <util/memory/blob.h>
-#include <util/stream/length.h>
-#include <util/stream/mem.h>
-
-#include "eventlog_int.h"
-#include "eventlog.h"
-#include "common.h"
-
-class IInputStream;
-
-static const ui64 MAX_REQUEST_DURATION = 60'000'000;
-static const ui64 MIN_START_TIME = MAX_REQUEST_DURATION;
-static const ui64 MAX_END_TIME = ((ui64)-1) - MAX_REQUEST_DURATION;
-
-class TEventFilter: public TSet<TEventClass>, public TSimpleRefCount<TEventFilter> {
-public:
- TEventFilter(bool enableEvents)
- : Enable_(enableEvents)
- {
- }
-
- void AddEventClass(TEventClass cls) {
- insert(cls);
- }
-
- bool EventAllowed(TEventClass cls) const {
- bool found = (find(cls) != end());
-
- return Enable_ == found;
- }
-
-private:
- bool Enable_;
-};
-
-using TEventStream = TPacketInputStream<TConstEventPtr>;
-
-struct TFrameHeader {
- // Reads header from the stream. The caller must make sure that the
- // sync data is present just befor the stream position.
- explicit TFrameHeader(IInputStream& in);
-
- ui64 StartTime() const {
- return Framehdr.StartTimestamp;
- }
-
- ui64 EndTime() const {
- return Framehdr.EndTimestamp;
- }
-
- ui32 FrameId() const {
- return Basehdr.FrameId;
- }
-
- ui64 Duration() const {
- return EndTime() - StartTime();
- }
-
- TEventLogFormat ContentFormat() const {
- return Basehdr.Format & 0xffffff;
- }
-
- TEventLogFormat LogFormat() const {
- return Basehdr.Format >> 24;
- }
-
- ui64 FrameLength() const {
- return Basehdr.Length - sizeof(TCompressedFrameHeader2);
- }
-
- // Length including the header
- ui64 FullLength() const {
- return sizeof(*this) + FrameLength();
- }
-
- TCompressedFrameBaseHeader Basehdr;
- TCompressedFrameHeader2 Framehdr;
-};
-
-struct TFrameLoadError: public yexception {
- explicit TFrameLoadError(size_t skipAfter)
- : SkipAfter(skipAfter)
- {}
-
- size_t SkipAfter;
-};
-
-class TFrame : public TFrameHeader {
-public:
- // Reads the frame after the header has been read.
- TFrame(IInputStream& in, TFrameHeader header, IEventFactory*);
-
- TString GetRawFrame() const;
- TString GetCompressedFrame() const;
-
- ui64 Address() const { return Address_; }
-
-private:
- const TConstEventPtr& GetEvent(size_t index) const {
- return EventsCache_[index];
- }
-
- void ClearEventsCache() const;
-
- THolder<TLengthLimitedInput> Limiter_;
- mutable TVector<TConstEventPtr> EventsCache_;
-
- IEventFactory* Fac_;
- ui64 Address_;
-
- friend class TFrameDecoder;
- friend class TFrameStreamer;
-
-private:
- class TIterator: TEventStream {
- public:
- TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter);
- ~TIterator() override = default;
-
- bool Avail() const override {
- return Index_ < Size_;
- }
-
- TConstEventPtr operator*() const override;
- bool Next() override;
-
- private:
- void SkipToValidEvent();
-
- const TFrame& Frame_;
- size_t Size_;
- TIntrusiveConstPtr<TEventFilter> Filter_;
- size_t Index_;
- };
-
-public:
- TFrame::TIterator GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter = nullptr) const;
-};
-
-// If `in` is derived from TCountingInput, Frame's address will
-// be set accorting to the in->Counter(). Otherwise it will be zeroO
-TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory*);
-
-using TFrameStream = TPacketInputStream<const TFrame&>;
-
-class IFrameFilter: public TSimpleRefCount<IFrameFilter> {
-public:
- IFrameFilter() {
- }
-
- virtual ~IFrameFilter() = default;
-
- virtual bool FrameAllowed(const TFrame& frame) const = 0;
-};
-
-using IFrameFilterRef = TIntrusivePtr<IFrameFilter>;
-
-class TDurationFrameFilter: public IFrameFilter {
-public:
- TDurationFrameFilter(ui64 minFrameDuration, ui64 maxFrameDuration = Max<ui64>())
- : MinDuration_(minFrameDuration)
- , MaxDuration_(maxFrameDuration)
- {
- }
-
- bool FrameAllowed(const TFrame& frame) const override {
- return frame.Duration() >= MinDuration_ && frame.Duration() <= MaxDuration_;
- }
-
-private:
- const ui64 MinDuration_;
- const ui64 MaxDuration_;
-};
-
-class TFrameIdFrameFilter: public IFrameFilter {
-public:
- TFrameIdFrameFilter(ui32 frameId)
- : FrameId_(frameId)
- {
- }
-
- bool FrameAllowed(const TFrame& frame) const override {
- return frame.FrameId() == FrameId_;
- }
-
-private:
- const ui32 FrameId_;
-};
-
-class TContainsEventFrameFilter: public IFrameFilter {
-public:
- TContainsEventFrameFilter(const TString& args, const IEventFactory* fac);
-
- bool FrameAllowed(const TFrame& frame) const override;
-
-private:
- struct TMatchGroup {
- TEventClass EventID;
- TString FieldName;
- TString ValueToMatch;
- };
-
- TVector<TMatchGroup> MatchGroups;
-};
-
-void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet);
-
-TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet);
-
-TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection);
-
-class TFrameStreamer: public TFrameStream {
-public:
- TFrameStreamer(IInputStream&, IEventFactory* fac, IFrameFilterRef ff = nullptr);
- TFrameStreamer(
- const TString& fileName,
- ui64 startTime,
- ui64 endTime,
- ui64 maxRequestDuration,
- IEventFactory* fac,
- IFrameFilterRef ff = nullptr);
- ~TFrameStreamer() override;
-
- bool Avail() const override;
- const TFrame& operator*() const override;
- bool Next() override;
-
-private:
- bool DoNext();
- bool AllowedTimeRange(const TFrame& frame) const;
-
- bool AllowedFrame(const TFrame& frame) const {
- return AllowedTimeRange(frame) && (!FrameFilter_ || FrameFilter_->FrameAllowed(frame));
- }
-
- void SkipToAllowedFrame() {
- if (Frame_) {
- while (!AllowedFrame(*Frame_) && DoNext()) {
- //do nothing
- }
- }
- }
-
- TBlob File_;
- TMemoryInput MemoryIn_;
- TCountingInput In_;
- THolder<IInputStream> Stream_;
- ui64 StartTime_ = 0;
- ui64 EndTime_ = 0;
- ui64 CutoffTime_ = 0;
- TMaybe<TFrame> Frame_;
- IFrameFilterRef FrameFilter_;
- IEventFactory* EventFactory_;
-};
-
-class TFrameDecoder: TEventStream {
-public:
- TFrameDecoder(const TFrame&, const TEventFilter* const filter, bool strict = false, bool withRawData = false);
- ~TFrameDecoder() override;
-
- bool Avail() const override;
-
- TConstEventPtr operator*() const override;
- bool Next() override;
-
- const TStringBuf GetRawEvent() const;
-
-private:
- TFrameDecoder(const TFrameDecoder&);
- void operator=(const TFrameDecoder&);
-
- inline bool HaveData() const {
- return Event_ != nullptr;
- }
-
- void Decode();
-
-private:
- const TFrame& Frame_;
- THolder<IInputStream> Decompressor_;
- THolder<TLengthLimitedInput> Limiter_;
- TEventPtr Event_;
- const TEventFilter* const Flt_;
- IEventFactory* Fac_;
- THolder<TEvent> EndOfFrame_;
- bool Strict_;
- TBuffer UncompressedData_;
- TStringBuf RawEventData_;
- bool WithRawData_;
-};
-
-class TEventStreamer: public TEventStream {
-public:
- TEventStreamer(TFrameStream&, ui64 start, ui64 end, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering = false);
- ~TEventStreamer() override;
-
- bool Avail() const override;
- TConstEventPtr operator*() const override;
- bool Next() override;
-
-private:
- class TEventBuffer: public TEventStream {
- public:
- void SetCheckpoint();
- void Rollback();
- void Reorder(bool strongOrdering);
- void Append(TConstEventPtr event, bool strongOrdering);
-
- bool Avail() const override;
- TConstEventPtr operator*() const override;
- bool Next() override;
-
- private:
- TVector<TConstEventPtr> Buffer_;
- size_t BufLen_ = 0;
- ui64 LastTimestamp_ = 0;
- };
-
-private:
- struct TInvalidEventTimestamps: public yexception {
- };
-
- bool LoadMoreEvents();
- void TransferEvents(const TFrame&);
-
-private:
- TFrameStream& Frames_;
- TEventBuffer Events_;
-
- ui64 Start_, End_;
- ui64 MaxEndTimestamp_;
- ui64 Frontier_;
- bool StrongOrdering_;
- bool LosslessStrongOrdering_;
- TIntrusivePtr<TEventFilter> EventFilter_;
-};
diff --git a/library/cpp/eventlog/proto/events_extension.proto b/library/cpp/eventlog/proto/events_extension.proto
deleted file mode 100644
index 37a7d7e5614..00000000000
--- a/library/cpp/eventlog/proto/events_extension.proto
+++ /dev/null
@@ -1,22 +0,0 @@
-import "google/protobuf/descriptor.proto";
-
-option go_package = "a.yandex-team.ru/library/cpp/eventlog/proto;extensions";
-option java_package = "NEventLogEventsExtension";
-
-extend google.protobuf.MessageOptions {
- optional uint32 message_id = 50001;
- optional string realm_name = 50002;
-}
-
-message Repr {
- enum ReprType {
- none = 0;
- as_bytes = 1; // Only for primitive types
- as_hex = 2; // Only for primitive types
- as_base64 = 3; // Only for 'string' and 'bytes' fields
- };
-}
-
-extend google.protobuf.FieldOptions {
- optional Repr.ReprType repr = 55003 [default = none];
-}
diff --git a/library/cpp/eventlog/proto/internal.proto b/library/cpp/eventlog/proto/internal.proto
deleted file mode 100644
index 234230e0949..00000000000
--- a/library/cpp/eventlog/proto/internal.proto
+++ /dev/null
@@ -1,9 +0,0 @@
-option go_package = "a.yandex-team.ru/library/cpp/eventlog/proto;extensions";
-
-package NEventLogInternal;
-
-message TUnknownEvent {
-};
-
-message TEndOfFrameEvent {
-};
diff --git a/library/cpp/eventlog/threaded_eventlog.cpp b/library/cpp/eventlog/threaded_eventlog.cpp
deleted file mode 100644
index 67839063fbd..00000000000
--- a/library/cpp/eventlog/threaded_eventlog.cpp
+++ /dev/null
@@ -1 +0,0 @@
-#include "threaded_eventlog.h"
diff --git a/library/cpp/eventlog/threaded_eventlog.h b/library/cpp/eventlog/threaded_eventlog.h
deleted file mode 100644
index 52382b856d1..00000000000
--- a/library/cpp/eventlog/threaded_eventlog.h
+++ /dev/null
@@ -1,154 +0,0 @@
-#pragma once
-
-#include "eventlog.h"
-
-#include <util/generic/string.h>
-#include <util/thread/pool.h>
-
-class TThreadedEventLog: public TEventLogWithSlave {
-public:
- class TWrapper;
- using TOverflowCallback = std::function<void(TWrapper& wrapper)>;
-
- enum class EDegradationResult {
- ShouldWrite,
- ShouldDrop,
- };
- using TDegradationCallback = std::function<EDegradationResult(float fillFactor)>;
-
-public:
- TThreadedEventLog(
- IEventLog& parentLog,
- size_t threadCount,
- size_t queueSize,
- TOverflowCallback cb,
- TDegradationCallback degradationCallback = {})
- : TEventLogWithSlave(parentLog)
- , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog"))
- , ThreadCount(threadCount)
- , QueueSize(queueSize)
- , OverflowCallback(std::move(cb))
- , DegradationCallback(std::move(degradationCallback))
- {
- Init();
- }
-
- TThreadedEventLog(
- const TEventLogPtr& parentLog,
- size_t threadCount,
- size_t queueSize,
- TOverflowCallback cb,
- TDegradationCallback degradationCallback = {})
- : TEventLogWithSlave(parentLog)
- , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog"))
- , ThreadCount(threadCount)
- , QueueSize(queueSize)
- , OverflowCallback(std::move(cb))
- , DegradationCallback(std::move(degradationCallback))
- {
- Init();
- }
-
- TThreadedEventLog(IEventLog& parentLog)
- : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback())
- {
- }
-
- TThreadedEventLog(const TEventLogPtr& parentLog)
- : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback())
- {
- }
-
- ~TThreadedEventLog() override {
- try {
- LogSaver.Stop();
- } catch (...) {
- }
- }
-
- void ReopenLog() override {
- TEventLogWithSlave::ReopenLog();
- }
-
- void CloseLog() override {
- LogSaver.Stop();
- TEventLogWithSlave::CloseLog();
- }
-
- void WriteFrame(TBuffer& buffer,
- TEventTimestamp startTimestamp,
- TEventTimestamp endTimestamp,
- TWriteFrameCallbackPtr writeFrameCallback = nullptr,
- TLogRecord::TMetaFlags metaFlags = {}) override {
- float fillFactor = 0.0f;
- if (Y_LIKELY(LogSaver.GetMaxQueueSize() > 0)) {
- fillFactor = static_cast<float>(LogSaver.Size()) / LogSaver.GetMaxQueueSize();
- }
-
- EDegradationResult status = EDegradationResult::ShouldWrite;
- if (DegradationCallback) {
- status = DegradationCallback(fillFactor);
- }
- if (Y_UNLIKELY(status == EDegradationResult::ShouldDrop)) {
- return;
- }
-
- THolder<TWrapper> wrapped;
- wrapped.Reset(new TWrapper(buffer, startTimestamp, endTimestamp, Slave(), writeFrameCallback, std::move(metaFlags)));
-
- if (LogSaver.Add(wrapped.Get())) {
- Y_UNUSED(wrapped.Release());
- } else if (OverflowCallback) {
- OverflowCallback(*wrapped);
- }
- }
-
-private:
- void Init() {
- LogSaver.Start(ThreadCount, QueueSize);
- }
-
-public:
- class TWrapper: public IObjectInQueue {
- public:
- TWrapper(TBuffer& buffer,
- TEventTimestamp startTimestamp,
- TEventTimestamp endTimestamp,
- IEventLog& slave,
- TWriteFrameCallbackPtr writeFrameCallback = nullptr,
- TLogRecord::TMetaFlags metaFlags = {})
- : StartTimestamp(startTimestamp)
- , EndTimestamp(endTimestamp)
- , Slave(&slave)
- , WriteFrameCallback(writeFrameCallback)
- , MetaFlags(std::move(metaFlags))
- {
- Buffer.Swap(buffer);
- }
-
- void Process(void*) override {
- THolder<TWrapper> holder(this);
-
- WriteFrame();
- }
-
- void WriteFrame() {
- Slave->WriteFrame(Buffer, StartTimestamp, EndTimestamp, WriteFrameCallback, std::move(MetaFlags));
- }
-
- private:
- TBuffer Buffer;
- TEventTimestamp StartTimestamp;
- TEventTimestamp EndTimestamp;
- IEventLog* Slave;
- TWriteFrameCallbackPtr WriteFrameCallback;
- TLogRecord::TMetaFlags MetaFlags;
- };
-
-private:
- TThreadPool LogSaver;
- const size_t ThreadCount;
- const size_t QueueSize;
- const TOverflowCallback OverflowCallback;
- const TDegradationCallback DegradationCallback;
-};