diff options
author | qrort <qrort@yandex-team.com> | 2022-12-02 11:31:25 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2022-12-02 11:31:25 +0300 |
commit | b1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806 (patch) | |
tree | 2a23209faf0fea5586a6d4b9cee60d1b318d29fe /library/cpp/eventlog | |
parent | 559174a9144de40d6bb3997ea4073c82289b4974 (diff) | |
download | ydb-b1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806.tar.gz |
remove kikimr/driver DEPENDS
Diffstat (limited to 'library/cpp/eventlog')
26 files changed, 0 insertions, 3735 deletions
diff --git a/library/cpp/eventlog/common.h b/library/cpp/eventlog/common.h deleted file mode 100644 index 75c512c13ef..00000000000 --- a/library/cpp/eventlog/common.h +++ /dev/null @@ -1,10 +0,0 @@ -#pragma once - -template <class T> -class TPacketInputStream { -public: - virtual bool Avail() const = 0; - virtual T operator*() const = 0; - virtual bool Next() = 0; - virtual ~TPacketInputStream() = default; -}; diff --git a/library/cpp/eventlog/dumper/common.cpp b/library/cpp/eventlog/dumper/common.cpp deleted file mode 100644 index eebe3b6ee33..00000000000 --- a/library/cpp/eventlog/dumper/common.cpp +++ /dev/null @@ -1,81 +0,0 @@ -#include "common.h" - -#include <contrib/libs/re2/re2/re2.h> - -#include <util/datetime/base.h> -#include <util/datetime/parser.h> -#include <util/string/cast.h> - -static time_t RecentTime(int h, int m, int s) { - time_t now = time(nullptr); - tm tmTmp; - localtime_r(&now, &tmTmp); - tmTmp.tm_hour = h; - tmTmp.tm_min = m; - tmTmp.tm_sec = s; - time_t today = mktime(&tmTmp); - tmTmp.tm_mday -= 1; - time_t yesterday = mktime(&tmTmp); - return today <= now ? today : yesterday; -} - -static bool ParseRecentTime(const TString& str, time_t& result) { - RE2 RecentTimePattern("(\\d{1,2}):(\\d{2})(?::(\\d{2}))?"); - re2::StringPiece hStr, mStr, sStr; - if (!RE2::FullMatch({str.data(), str.size()}, RecentTimePattern, &hStr, &mStr, &sStr)) { - return false; - } - int h = FromString<int>(hStr.data(), hStr.length()); - int m = FromString<int>(mStr.data(), mStr.length()); - int s = FromString<int>(sStr.data(), sStr.length(), 0); - if (h > 23 || m > 59 || s > 59) { - return false; - } - result = RecentTime(h, m, s); - return true; -} - -namespace { - class TDefaultOffset8601Parser: public TIso8601DateTimeParser { - public: - TDefaultOffset8601Parser(int offsetHours) { - DateTimeFields.ZoneOffsetMinutes = offsetHours * 60; - } - }; -} // namespace - -static bool ParseISO8601DateTimeWithDefaultOffset(TStringBuf str, int offsetHours, time_t& result) { - TDefaultOffset8601Parser parser{offsetHours}; - - if (!parser.ParsePart(str.data(), str.size())) { - return false; - } - - const TInstant instant = parser.GetResult(TInstant::Max()); - if (instant == TInstant::Max()) { - return false; - } - - result = instant.TimeT(); - return true; -} - -ui64 ParseTime(const TString& str, ui64 defValue, int offset) { - if (!str) { - return defValue; - } - - time_t utcTime; - - if (ParseISO8601DateTimeWithDefaultOffset(str, offset, utcTime)) { - return (ui64)utcTime * 1000000; - } - - if (ParseRecentTime(str, utcTime)) { - return (ui64)utcTime * 1000000; - } - - // if conversion fails, TryFromString leaves defValue unchanged - TryFromString<ui64>(str, defValue); - return defValue; -} diff --git a/library/cpp/eventlog/dumper/common.h b/library/cpp/eventlog/dumper/common.h deleted file mode 100644 index 839fc7b221c..00000000000 --- a/library/cpp/eventlog/dumper/common.h +++ /dev/null @@ -1,8 +0,0 @@ -#pragma once - -#include <util/generic/string.h> - -/** - * Returns microseconds since the epoch. - */ -ui64 ParseTime(const TString& str, ui64 defValue, int offset /* timezone offset, default MSK */ = 3); diff --git a/library/cpp/eventlog/dumper/evlogdump.cpp b/library/cpp/eventlog/dumper/evlogdump.cpp deleted file mode 100644 index 003b412265b..00000000000 --- a/library/cpp/eventlog/dumper/evlogdump.cpp +++ /dev/null @@ -1,414 +0,0 @@ -#include "common.h" -#include "evlogdump.h" - -#include <library/cpp/eventlog/evdecoder.h> -#include <library/cpp/eventlog/iterator.h> -#include <library/cpp/eventlog/logparser.h> - -#include <library/cpp/getopt/last_getopt.h> - -#ifndef NO_SVN_DEPEND -#include <library/cpp/svnversion/svnversion.h> -#endif - -#include <util/datetime/base.h> -#include <util/generic/ptr.h> -#include <util/stream/file.h> -#include <util/string/cast.h> -#include <util/string/type.h> - -using namespace NEventLog; -namespace nlg = NLastGetopt; - -IFrameFilterRef CreateDurationFrameFilter(const TString& params) { - TStringBuf d(params); - TStringBuf l, r; - d.Split(':', l, r); - TDuration min = l.empty() ? TDuration::Zero() : FromString<TDuration>(l); - TDuration max = r.empty() ? TDuration::Max() : FromString<TDuration>(r); - - if (min > TDuration::Zero() || max < TDuration::Max()) { - return new TDurationFrameFilter(min.GetValue(), max.GetValue()); - } - - return nullptr; -} - -IFrameFilterRef CreateFrameIdFilter(const ui32 frameId) { - return new TFrameIdFrameFilter(frameId); -} - -IFrameFilterRef CreateContainsEventFrameFilter(const TString& args, const IEventFactory* factory) { - return new TContainsEventFrameFilter(args, factory); -} - -void ListAllEvents(IEventFactory* factory, IOutputStream* out) { - Y_ENSURE(out); - - for (TEventClass eventClass = factory->EventClassBegin(); eventClass < factory->EventClassEnd(); eventClass++) { - THolder<TEvent> event(factory->CreateLogEvent(eventClass)); - - const TStringBuf& name = event->GetName(); - if (name) { - (*out) << name << "\n"; - } - } -} - -void ListEventFieldsByEventId(const TEventClass eventClass, IEventFactory* factory, IOutputStream* out) { - THolder<TEvent> event(factory->CreateLogEvent(eventClass)); - const NProtoBuf::Message* message = event->GetProto(); - const google::protobuf::Descriptor* descriptor = message->GetDescriptor(); - - (*out) << descriptor->DebugString(); -} - -void ListEventFields(const TString& eventName, IEventFactory* factory, IOutputStream* out) { - Y_ENSURE(out); - - TEventClass eventClass; - - try { - eventClass = factory->ClassByName(eventName); - } catch (yexception& e) { - if (!TryFromString<TEventClass>(eventName, eventClass)) { - (*out) << "Cannot dervie event class from event name: " << eventName << Endl; - return; - } - } - - ListEventFieldsByEventId(eventClass, factory, out); -} - -int PrintHelpEvents(const TString& helpEvents, IEventFactory* factory) { - if (helpEvents == "all") { - ListAllEvents(factory, &Cout); - } else if (IsNumber(helpEvents)) { - ListEventFieldsByEventId(IntFromString<ui32, 10>(helpEvents), factory, &Cout); - } else { - ListEventFields(helpEvents, factory, &Cout); - } - - return 0; -} - -int IterateEventLog(IEventFactory* fac, IEventProcessor* proc, int argc, const char** argv) { - class TProxy: public ITunableEventProcessor { - public: - TProxy(IEventProcessor* proc) - : Processor_(proc) - { - } - - void AddOptions(NLastGetopt::TOpts&) override { - } - - void SetOptions(const TEvent::TOutputOptions& options) override { - Processor_->SetOptions(options); - } - - void ProcessEvent(const TEvent* ev) override { - Processor_->ProcessEvent(ev); - } - - bool CheckedProcessEvent(const TEvent* ev) override { - return Processor_->CheckedProcessEvent(ev); - } - - private: - IEventProcessor* Processor_ = nullptr; - }; - - TProxy proxy(proc); - return IterateEventLog(fac, &proxy, argc, argv); -} - -int IterateEventLog(IEventFactory* fac, ITunableEventProcessor* proc, int argc, const char** argv) { - nlg::TOpts opts; - opts.AddHelpOption('?'); - opts.AddHelpOption('h'); - opts.SetTitle( - "Search EventLog dumper\n\n" - "Examples:\n" - "evlogdump -s 1228484839332219 -e 1228484840332219 event_log\n" - "evlogdump -s 2008-12-12T12:00:00+03 -e 2008-12-12T12:05:00+03 event_log\n" - "evlogdump -s 12:40 -e 12:55 event_log\n" - "evlogdump -i 301,302,303 event_log\n" - "evlogdump -i SubSourceResponse,SubSourceOk,SubSourceError event_log\n" - "evlogdump -d 40ms:60ms event_log\n" - "evlogdump -c ReqId:ReqId:1563185655856304-30407004790538173600035-vla1-0671-p1\n" - "\n" - "Fine manual: https://nda.ya.ru/3Tpo3L\n" - "\n" - "If in trouble using this (or bugs encountered),\n" - "please don't hesitate to ask middlesearch/basesearch dev team\n" - "(vmordovin@, mvel@, etc)\n"); - - TString start; - opts.AddLongOption( - 's', "start-time", - "Start time (Unix time in microseconds, ISO8601, or HH:MM[:SS] in the last 24 hours).\n" - "Long frames can produce inaccurate cropping and event loss in dump (see SEARCH-5576)") - .Optional() - .StoreResult(&start); - - TString end; - opts.AddLongOption( - 'e', "end-time", - "End time (Unix time in microseconds, ISO8601, or HH:MM[:SS] in the last 24 hours)") - .Optional() - .StoreResult(&end); - - TOptions o; - - opts.AddLongOption( - 'm', "max-request-duration", - "Max duration of a request in the log, in us") - .Optional() - .StoreResult(&o.MaxRequestDuration); - - bool oneFrame = false; - opts.AddLongOption( - 'f', "one-frame", - "Treat input file as single frame dump (e.g. from gdb)") - .NoArgument() - .Optional() - .StoreValue(&oneFrame, true); - - opts.AddLongOption( - 'v', "version", - "Print program version") - .NoArgument() - .Optional() - .Handler(&nlg::PrintVersionAndExit); - - TString includeEventList; - opts.AddLongOption( - 'i', "event-list", - "Comma-separated list of included event class IDs or names " - "(e.g. 265,287 or MainTaskError,ContextCreated)") - .Optional() - .StoreResult(&includeEventList) - .StoreValue(&o.EnableEvents, true); - - TString durationFilterStr; - opts.AddLongOption( - 'd', "duration", - "DurationMin[:DurationMax] (values must contain a unit, valid examples are: 50us, 50ms, 50s)\n" - "(show frames with duration greater or equal DurationMin [and less or equal than DurationMax])") - .Optional() - .StoreResult(&durationFilterStr); - - const ui32 INVALID_FRAME_ID = ui32(-1); - ui32 frameIdFilter = INVALID_FRAME_ID; - opts.AddLongOption( - 'n', "frame-id", - "Filter frame with given id\n") - .Optional() - .StoreResult(&frameIdFilter); - - TString excludeEventList; - opts.AddLongOption( - 'x', "exclude-list", - "Comma-separated list of excluded event class IDs or names (see also --event-list option)") - .Optional() - .StoreResult(&excludeEventList) - .StoreValue(&o.EnableEvents, false); - - opts.AddLongOption( - 'o', "frame-order", - "Order events by time only inside a frame (faster)") - .NoArgument() - .Optional() - .StoreValue(&o.ForceWeakOrdering, true); - - opts.AddLongOption( - 'O', "global-order", - "Globally but lossy (may lose some frames) order events by time only (slower)") - .NoArgument() - .Optional() - .StoreValue(&o.ForceStrongOrdering, true); - - opts.AddLongOption( - "lossless-strong-order", - "Globally order events by time only (super slow and memory heavy)") - .NoArgument() - .Optional() - .StoreValue(&o.ForceLosslessStrongOrdering, true) - .StoreValue(&o.ForceStrongOrdering, true); - - opts.AddLongOption( - 'S', "stream", - "Force stream mode (for log files with invalid ending)") - .NoArgument() - .Optional() - .StoreValue(&o.ForceStreamMode, true); - - opts.AddLongOption( - 't', "tail", - "Open file like `tail -f` does") - .NoArgument() - .Optional() - .StoreValue(&o.TailFMode, true); - - bool unescaped = false; - opts.AddLongOption( - 'u', "unescaped", - "Don't escape \\t, \\n chars in message fields") - .NoArgument() - .StoreValue(&unescaped, true); - - bool json = false; - opts.AddLongOption( - 'j', "json", - "Print messages as JSON values") - .NoArgument() - .StoreValue(&json, true); - - TEvent::TOutputOptions outputOptions; - opts.AddLongOption( - 'r', "human-readable", - "Print some fields (e.g. timestamp) in human-readable format, add time offsets") - .NoArgument() - .StoreValue(&outputOptions.HumanReadable, true); - - //Supports only fields of type string - TString containsEvent; - opts.AddLongOption( - 'c', "contains-event", - "Only print frames that contain events whose particular field's value matches given value.\n" - "Match group should be provided in format: EventName:FieldName:ValueToMatch\n" - "If more than one match group is provided, they should be separated by / delimiter.\n" - "Nested fields are supported through \'.\' in FieldName.\n" - "Symbols \'/\' and \':\' in EventName, FieldName or ValueToMatch must be escaped.\n" - "NOTE: bool values are 1 and 0.") - .Optional() - .StoreResult(&containsEvent); - - TString helpEvents; - opts.AddLongOption("help-events") - .RequiredArgument("EVENT, EVENT_ID or string \"all\"") - .Optional() - .StoreResult(&helpEvents); - - proc->AddOptions(opts); - - opts.SetFreeArgsMin(0); - opts.SetFreeArgsMax(1); - opts.SetFreeArgTitle(0, "<eventlog>", "Event log file"); - - try { - const nlg::TOptsParseResult optsRes(&opts, argc, argv); - - if (helpEvents) { - return PrintHelpEvents(helpEvents, fac); - } - - TVector<TString> freeArgs = optsRes.GetFreeArgs(); - if (freeArgs) { - o.FileName = freeArgs[0]; - } - - ui32 conditionsHappened = 0; - - if (frameIdFilter != INVALID_FRAME_ID) { - ++conditionsHappened; - } - if (containsEvent) { - ++conditionsHappened; - } - if (durationFilterStr) { - ++conditionsHappened; - } - - if (conditionsHappened > 1) { - throw nlg::TUsageException() << "You can only use no more than one of frame id, duration or contains event frame filters."; - } - - if (frameIdFilter != INVALID_FRAME_ID) { - o.FrameFilter = CreateFrameIdFilter(frameIdFilter); - } else if (durationFilterStr) { - o.FrameFilter = CreateDurationFrameFilter(durationFilterStr); - } else if (containsEvent) { - o.FrameFilter = CreateContainsEventFrameFilter(containsEvent, fac); - } - - // handle some inconsistencies - if (o.ForceWeakOrdering && o.ForceStrongOrdering) { - throw nlg::TUsageException() << "You can use either strong (-O) or weak (-o) ordering. "; - } - if (includeEventList && excludeEventList) { - throw nlg::TUsageException() << "You can use either include (-i) or exclude (-x) events list. "; - } - if (unescaped && json) { - throw nlg::TUsageException() << "You can use either unescaped (-u) or json (-j) output format. "; - } - - proc->CheckOptions(); - } catch (...) { - Cerr << "Usage error: " << CurrentExceptionMessage() << Endl; - return 1; - } - - o.EvList = o.EnableEvents ? includeEventList : excludeEventList; - if (json) { - outputOptions.OutputFormat = TEvent::TOutputFormat::Json; - } else if (unescaped) { - outputOptions.OutputFormat = TEvent::TOutputFormat::TabSeparatedRaw; - } else { - outputOptions.OutputFormat = TEvent::TOutputFormat::TabSeparated; - } - - IEventProcessor* eventProcessor = NEvClass::Processor(); - // FIXME(mvel): A little of hell here: `proc` and `eventProcessor` are `IEventProcessor`s - // So we need to set options for BOTH! - - eventProcessor->SetOptions(outputOptions); - proc->SetOptions(outputOptions); - proc->SetEventProcessor(eventProcessor); - - if (oneFrame) { - THolder<IInputStream> fileInput; - - // this is for coredumps analysis, usage: - // gdb: dump binary memory framefile Data_ Data_ + Pos_ - // evlogdump -f framefile - IInputStream* usedInput = nullptr; - - if (o.FileName.size()) { - fileInput.Reset(new TUnbufferedFileInput(o.FileName)); - usedInput = fileInput.Get(); - } else { - usedInput = &Cin; - } - - try { - for (;;) { - proc->ProcessEvent(DecodeEvent(*usedInput, true, 0, nullptr, fac).Get()); - } - } catch (...) { - } - - return 0; - } - - o.StartTime = ParseTime(start, MIN_START_TIME); - o.EndTime = ParseTime(end, MAX_END_TIME); - - try { - THolder<IIterator> it = CreateIterator(o, fac); - - while (const auto ev = it->Next()) { - if (!proc->CheckedProcessEvent(ev.Get())) { - break; - } - } - - return 0; - } catch (...) { - Cout.Flush(); - Cerr << "Error occured: " << CurrentExceptionMessage() << Endl; - } - - return 1; -} diff --git a/library/cpp/eventlog/dumper/evlogdump.h b/library/cpp/eventlog/dumper/evlogdump.h deleted file mode 100644 index eb150573746..00000000000 --- a/library/cpp/eventlog/dumper/evlogdump.h +++ /dev/null @@ -1,11 +0,0 @@ -#pragma once - -#include "tunable_event_processor.h" - -#include <library/cpp/eventlog/eventlog.h> - -int IterateEventLog(IEventFactory* fac, IEventProcessor* proc, int argc, const char** argv); -int IterateEventLog(IEventFactory* fac, ITunableEventProcessor* proc, int argc, const char** argv); - -// added for using in infra/libs/logger/log_printer.cpp -int PrintHelpEvents(const TString& helpEvents, IEventFactory* factory); diff --git a/library/cpp/eventlog/dumper/tunable_event_processor.cpp b/library/cpp/eventlog/dumper/tunable_event_processor.cpp deleted file mode 100644 index 8333089cb54..00000000000 --- a/library/cpp/eventlog/dumper/tunable_event_processor.cpp +++ /dev/null @@ -1 +0,0 @@ -#include "tunable_event_processor.h" diff --git a/library/cpp/eventlog/dumper/tunable_event_processor.h b/library/cpp/eventlog/dumper/tunable_event_processor.h deleted file mode 100644 index b56eae9222a..00000000000 --- a/library/cpp/eventlog/dumper/tunable_event_processor.h +++ /dev/null @@ -1,19 +0,0 @@ -#pragma once - -#include <library/cpp/eventlog/eventlog.h> - -namespace NLastGetopt { - class TOpts; -} - -class ITunableEventProcessor: public IEventProcessor { -public: - virtual void SetEventProcessor(IEventProcessor* /*processor*/) { - } - - virtual void AddOptions(NLastGetopt::TOpts& opts) = 0; - virtual void CheckOptions() { - } - virtual ~ITunableEventProcessor() { - } -}; diff --git a/library/cpp/eventlog/evdecoder.cpp b/library/cpp/eventlog/evdecoder.cpp deleted file mode 100644 index e4413a1b0e0..00000000000 --- a/library/cpp/eventlog/evdecoder.cpp +++ /dev/null @@ -1,112 +0,0 @@ -#include <util/memory/tempbuf.h> -#include <util/string/cast.h> -#include <util/stream/output.h> - -#include "evdecoder.h" -#include "logparser.h" - -static const char* const UNKNOWN_EVENT_CLASS = "Unknown event class"; - -static inline void LogError(ui64 frameAddr, const char* msg, bool strict) { - if (!strict) { - Cerr << "EventDecoder warning @" << frameAddr << ": " << msg << Endl; - } else { - ythrow yexception() << "EventDecoder error @" << frameAddr << ": " << msg; - } -} - -static inline bool SkipData(IInputStream& s, size_t amount) { - return (amount == s.Skip(amount)); -} - -// There are 2 log fomats: the one, that allows event skip without event decode (it has stored event length) -// and another, that requires each event decode just to seek over stream. needRead == true means the latter format. -static inline THolder<TEvent> DoDecodeEvent(IInputStream& s, const TEventFilter* const filter, const bool needRead, IEventFactory* fac) { - TEventTimestamp ts; - TEventClass c; - THolder<TEvent> e; - - ::Load(&s, ts); - ::Load(&s, c); - - bool needReturn = false; - - if (!filter || filter->EventAllowed(c)) { - needReturn = true; - } - - if (needRead || needReturn) { - e.Reset(fac->CreateLogEvent(c)); - - if (!!e) { - e->Timestamp = ts; - e->Load(s); - } else if (needReturn) { - e.Reset(new TUnknownEvent(ts, c)); - } - - if (!needReturn) { - e.Reset(nullptr); - } - } - - return e; -} - -THolder<TEvent> DecodeFramed(IInputStream& inp, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) { - ui32 len; - ::Load(&inp, len); - - if (len < sizeof(ui32)) { - ythrow TEventDecoderError() << "invalid event length"; - } - - TLengthLimitedInput s(&inp, len - sizeof(ui32)); - - try { - THolder<TEvent> e = DoDecodeEvent(s, filter, false, fac); - if (!!e) { - if (!s.Left()) { - return e; - } else if (e->Class == 0) { - if (!SkipData(s, s.Left())) { - ythrow TEventDecoderError() << "cannot skip bad event"; - } - - return e; - } - - LogError(frameAddr, "Event is not fully read", strict); - } - } catch (const TLoadEOF&) { - if (s.Left()) { - throw; - } - - LogError(frameAddr, "Unexpected event end", strict); - } - - if (!SkipData(s, s.Left())) { - ythrow TEventDecoderError() << "cannot skip bad event"; - } - - return nullptr; -} - -THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) { - try { - if (framed) { - return DecodeFramed(s, frameAddr, filter, fac, strict); - } else { - THolder<TEvent> e = DoDecodeEvent(s, filter, true, fac); - // e(0) means event, skipped by filter. Not an error. - if (!!e && !e->Class) { - ythrow TEventDecoderError() << UNKNOWN_EVENT_CLASS; - } - - return e; - } - } catch (const TLoadEOF&) { - ythrow TEventDecoderError() << "unexpected frame end"; - } -} diff --git a/library/cpp/eventlog/evdecoder.h b/library/cpp/eventlog/evdecoder.h deleted file mode 100644 index eedfc821743..00000000000 --- a/library/cpp/eventlog/evdecoder.h +++ /dev/null @@ -1,16 +0,0 @@ -#pragma once - -#include <util/generic/yexception.h> -#include <util/generic/ptr.h> - -#include "eventlog.h" - -class TEvent; -class IInputStream; -class TEventFilter; - -struct TEventDecoderError: public yexception { -}; - -THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict = false); -bool AcceptableContent(TEventLogFormat); diff --git a/library/cpp/eventlog/event_field_output.cpp b/library/cpp/eventlog/event_field_output.cpp deleted file mode 100644 index a1b4c2dafa1..00000000000 --- a/library/cpp/eventlog/event_field_output.cpp +++ /dev/null @@ -1,65 +0,0 @@ -#include "event_field_output.h" - -#include <util/string/split.h> - -namespace { - TString MakeSeparators(EFieldOutputFlags flags) { - TString res; - res.reserve(3); - - if (flags & EFieldOutputFlag::EscapeTab) { - res.append('\t'); - } - if (flags & EFieldOutputFlag::EscapeNewLine) { - res.append('\n'); - } - if (flags & EFieldOutputFlag::EscapeBackSlash) { - res.append('\\'); - } - - return res; - } -} - -TEventFieldOutput::TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags) - : Output(output) - , Flags(flags) - , Separators(MakeSeparators(flags)) -{ -} - -IOutputStream& TEventFieldOutput::GetOutputStream() { - return Output; -} - -EFieldOutputFlags TEventFieldOutput::GetFlags() const { - return Flags; -} - -void TEventFieldOutput::DoWrite(const void* buf, size_t len) { - if (!Flags) { - Output.Write(buf, len); - return; - } - - TStringBuf chunk{static_cast<const char*>(buf), len}; - - for (const auto part : StringSplitter(chunk).SplitBySet(Separators.data())) { - TStringBuf token = part.Token(); - TStringBuf delim = part.Delim(); - - if (!token.empty()) { - Output.Write(token); - } - if ("\n" == delim) { - Output.Write(TStringBuf("\\n")); - } else if ("\t" == delim) { - Output.Write(TStringBuf("\\t")); - } else if ("\\" == delim) { - Output.Write(TStringBuf("\\\\")); - } else { - Y_ASSERT(delim.empty()); - } - } -} - diff --git a/library/cpp/eventlog/event_field_output.h b/library/cpp/eventlog/event_field_output.h deleted file mode 100644 index ed9db0ae167..00000000000 --- a/library/cpp/eventlog/event_field_output.h +++ /dev/null @@ -1,29 +0,0 @@ -#pragma once - -#include <util/stream/output.h> -#include <util/generic/flags.h> - -enum class EFieldOutputFlag { - EscapeTab = 0x1, // escape \t in field value - EscapeNewLine = 0x2, // escape \n in field value - EscapeBackSlash = 0x4 // escape \ in field value -}; - -Y_DECLARE_FLAGS(EFieldOutputFlags, EFieldOutputFlag); -Y_DECLARE_OPERATORS_FOR_FLAGS(EFieldOutputFlags); - -class TEventFieldOutput: public IOutputStream { -public: - TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags); - - IOutputStream& GetOutputStream(); - EFieldOutputFlags GetFlags() const; - -protected: - void DoWrite(const void* buf, size_t len) override; - -private: - IOutputStream& Output; - EFieldOutputFlags Flags; - TString Separators; -}; diff --git a/library/cpp/eventlog/event_field_printer.cpp b/library/cpp/eventlog/event_field_printer.cpp deleted file mode 100644 index 29c6b4b661e..00000000000 --- a/library/cpp/eventlog/event_field_printer.cpp +++ /dev/null @@ -1,27 +0,0 @@ -#include "event_field_printer.h" - -#include <library/cpp/protobuf/json/proto2json.h> - -namespace { - - const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig() - .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault) - .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>()); - -} // namespace - -TEventProtobufMessageFieldPrinter::TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode) - : Mode(mode) -{} - -template <> -void TEventProtobufMessageFieldPrinter::PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output) { - switch (Mode) { - case EProtobufMessageFieldPrintMode::DEFAULT: - case EProtobufMessageFieldPrintMode::JSON: { - // Do not use field.PrintJSON() here: IGNIETFERRO-2002 - NProtobufJson::Proto2Json(field, output, PROTO_2_JSON_CONFIG); - break; - } - } -} diff --git a/library/cpp/eventlog/event_field_printer.h b/library/cpp/eventlog/event_field_printer.h deleted file mode 100644 index 835e8f4a850..00000000000 --- a/library/cpp/eventlog/event_field_printer.h +++ /dev/null @@ -1,38 +0,0 @@ -#pragma once - -#include "event_field_output.h" - -#include <google/protobuf/message.h> - -// NB: For historical reasons print code for all primitive types/repeated fields/etc generated by https://a.yandex-team.ru/arc/trunk/arcadia/tools/event2cpp - -enum class EProtobufMessageFieldPrintMode { - // Use <TEventProtobufMessageFieldType>::Print method for fields that has it - // Print json for other fields - DEFAULT = 0, - - JSON = 1, -}; - -class TEventProtobufMessageFieldPrinter { -public: - explicit TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode); - - template <typename TEventProtobufMessageFieldType, bool HasPrintFunction> - void PrintProtobufMessageFieldToOutput(const TEventProtobufMessageFieldType& field, TEventFieldOutput& output) { - if constexpr (HasPrintFunction) { - if (Mode == EProtobufMessageFieldPrintMode::DEFAULT) { - field.Print(output.GetOutputStream(), output.GetFlags()); - return; - } - } - - PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(field, output); - } - - template <> - void PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output); - -private: - EProtobufMessageFieldPrintMode Mode; -}; diff --git a/library/cpp/eventlog/eventlog.cpp b/library/cpp/eventlog/eventlog.cpp deleted file mode 100644 index 458a632b4a2..00000000000 --- a/library/cpp/eventlog/eventlog.cpp +++ /dev/null @@ -1,554 +0,0 @@ -#include <util/datetime/base.h> -#include <util/stream/zlib.h> -#include <util/stream/length.h> -#include <util/generic/buffer.h> -#include <util/generic/yexception.h> -#include <util/digest/murmur.h> -#include <util/generic/singleton.h> -#include <util/generic/function.h> -#include <util/stream/output.h> -#include <util/stream/format.h> -#include <util/stream/null.h> - -#include <google/protobuf/messagext.h> - -#include "eventlog.h" -#include "events_extension.h" -#include "evdecoder.h" -#include "logparser.h" -#include <library/cpp/eventlog/proto/internal.pb.h> - -#include <library/cpp/json/json_writer.h> -#include <library/cpp/protobuf/json/proto2json.h> - - -TAtomic eventlogFrameCounter = 0; - -namespace { - - const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig() - .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault) - .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>()); - - ui32 GenerateFrameId() { - return ui32(AtomicAdd(eventlogFrameCounter, 1)); - } - - inline const NProtoBuf::Message* UnknownEventMessage() { - return Singleton<NEventLogInternal::TUnknownEvent>(); - } - -} // namespace - -void TEvent::Print(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const { - if (options.OutputFormat == TOutputFormat::TabSeparatedRaw) { - PrintHeader(out, options, eventState); - DoPrint(out, {}); - } else if (options.OutputFormat == TOutputFormat::TabSeparated) { - PrintHeader(out, options, eventState); - DoPrint( - out, - EFieldOutputFlags{} | EFieldOutputFlag::EscapeNewLine | EFieldOutputFlag::EscapeBackSlash); - } else if (options.OutputFormat == TOutputFormat::Json) { - NJson::TJsonWriterConfig jsonWriterConfig; - jsonWriterConfig.FormatOutput = 0; - NJson::TJsonWriter jsonWriter(&out, jsonWriterConfig); - - jsonWriter.OpenMap(); - PrintJsonHeader(jsonWriter); - DoPrintJson(jsonWriter); - jsonWriter.CloseMap(); - } -} - -void TEvent::PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const { - if (options.HumanReadable) { - out << TInstant::MicroSeconds(Timestamp).ToString() << "\t"; - if (Timestamp >= eventState.FrameStartTime) - out << "+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.FrameStartTime)); - else // a bug somewhere? anyway, let's handle it in a nice fashion - out << "-" << HumanReadable(TDuration::MicroSeconds(eventState.FrameStartTime - Timestamp)); - - if (Timestamp >= eventState.PrevEventTime) - out << " (+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.PrevEventTime)) << ")"; - // else: these events are async and out-of-order, relative time diff makes no sense, skip it - - out << "\tF# " << FrameId << '\t'; - } else { - out << static_cast<TEventTimestamp>(Timestamp); - out << '\t' << FrameId << '\t'; - } -} - -void TEvent::PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const { - jsonWriter.Write("Timestamp", Timestamp); - jsonWriter.Write("FrameId", FrameId); -} - -class TProtobufEvent: public TEvent { -public: - TProtobufEvent(TEventTimestamp t, size_t eventId, const NProtoBuf::Message& msg) - : TEvent(eventId, t) - , Message_(&msg) - , EventFactory_(NProtoBuf::TEventFactory::Instance()) - { - } - - TProtobufEvent() - : TEvent(0, 0) - , EventFactory_(NProtoBuf::TEventFactory::Instance()) - { - } - - explicit TProtobufEvent(ui32 id, NProtoBuf::TEventFactory* eventFactory = NProtoBuf::TEventFactory::Instance()) - : TEvent(id, 0) - , EventFactory_(eventFactory) - { - InnerMsg_.Reset(EventFactory_->CreateEvent(Class)); - Message_ = InnerMsg_.Get(); - } - - ui32 Id() const { - return Class; - } - - void Load(IInputStream& in) override { - if (!!InnerMsg_) { - InnerMsg_->ParseFromArcadiaStream(&in); - } else { - TransferData(&in, &Cnull); - } - } - - void Save(IOutputStream& out) const override { - Message_->SerializeToArcadiaStream(&out); - } - - void SaveToBuffer(TBufferOutput& buf) const override { - size_t messageSize = Message_->ByteSize(); - size_t before = buf.Buffer().Size(); - buf.Buffer().Advance(messageSize); - Y_PROTOBUF_SUPPRESS_NODISCARD Message_->SerializeToArray(buf.Buffer().Data() + before, messageSize); - } - - TStringBuf GetName() const override { - return EventFactory_->NameById(Id()); - } - -private: - void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const override { - EventFactory_->PrintEvent(Id(), Message_, out, flags); - } - void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override { - jsonWriter.OpenMap("EventBody"); - jsonWriter.Write("Type", GetName()); - - jsonWriter.Write("Fields"); - NProtobufJson::Proto2Json(*GetProto(), jsonWriter, PROTO_2_JSON_CONFIG); - - jsonWriter.CloseMap(); - } - - const NProtoBuf::Message* GetProto() const override { - if (Message_) { - return Message_; - } - - return UnknownEventMessage(); - } - -private: - const NProtoBuf::Message* Message_ = nullptr; - NProtoBuf::TEventFactory* EventFactory_; - THolder<NProtoBuf::Message> InnerMsg_; - - friend class TEventLogFrame; -}; - -void TEventLogFrame::LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev) { - TProtobufEvent event(Now().MicroSeconds(), eventId, ev); - - LogEventImpl(event); -} - -void TEventLogFrame::LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev) { - TProtobufEvent event(timestamp, eventId, ev); - - LogEventImpl(event); -} - -template <> -void TEventLogFrame::DebugDump(const TProtobufEvent& ev) { - static TMutex lock; - - with_lock (lock) { - Cerr << ev.Timestamp << "\t" << ev.GetName() << "\t"; - ev.GetProto()->PrintJSON(Cerr); - Cerr << Endl; - } -} - -#pragma pack(push, 1) -struct TFrameHeaderData { - char SyncField[COMPRESSED_LOG_FRAME_SYNC_DATA.size()]; - TCompressedFrameBaseHeader Header; - TCompressedFrameHeader2 HeaderEx; -}; -#pragma pack(pop) - -TEventLogFrame::TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) - : EvLog_(parentLog.HasNullBackend() ? nullptr : &parentLog) - , NeedAlwaysSafeAdd_(needAlwaysSafeAdd) - , ForceDump_(false) - , WriteFrameCallback_(std::move(writeFrameCallback)) -{ - DoInit(); -} - -TEventLogFrame::TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) - : EvLog_(parentLog) - , NeedAlwaysSafeAdd_(needAlwaysSafeAdd) - , ForceDump_(false) - , WriteFrameCallback_(std::move(writeFrameCallback)) -{ - if (EvLog_ && EvLog_->HasNullBackend()) { - EvLog_ = nullptr; - } - - DoInit(); -} - -TEventLogFrame::TEventLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) - : EvLog_(nullptr) - , NeedAlwaysSafeAdd_(needAlwaysSafeAdd) - , ForceDump_(false) - , WriteFrameCallback_(std::move(writeFrameCallback)) -{ - DoInit(); -} - -void TEventLogFrame::Flush() { - if (EvLog_ == nullptr) - return; - - TBuffer& buf = Buf_.Buffer(); - - if (buf.Empty()) { - return; - } - - EvLog_->WriteFrame(buf, StartTimestamp_, EndTimestamp_, WriteFrameCallback_, std::move(MetaFlags_)); - - DoInit(); - - return; -} - -void TEventLogFrame::SafeFlush() { - TGuard<TMutex> g(Mtx_); - Flush(); -} - -void TEventLogFrame::AddEvent(TEventTimestamp timestamp) { - if (timestamp < StartTimestamp_) { - StartTimestamp_ = timestamp; - } - - if (timestamp > EndTimestamp_) { - EndTimestamp_ = timestamp; - } -} - -void TEventLogFrame::DoInit() { - Buf_.Buffer().Clear(); - - StartTimestamp_ = (TEventTimestamp)-1; - EndTimestamp_ = 0; -} - -void TEventLogFrame::VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory) { - const auto doVisit = [this, &visitor, eventFactory]() { - TBuffer& buf = Buf_.Buffer(); - - TBufferInput bufferInput(buf); - TLengthLimitedInput limitedInput(&bufferInput, buf.size()); - - TEventFilter EventFilter(false); - - while (limitedInput.Left()) { - THolder<TEvent> event = DecodeEvent(limitedInput, true, 0, &EventFilter, eventFactory); - - visitor.Visit(*event); - } - }; - if (NeedAlwaysSafeAdd_) { - TGuard<TMutex> g(Mtx_); - doVisit(); - } else { - doVisit(); - } -} - -TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) - : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback)) -{ -} - -TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) - : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback)) -{ -} - -TSelfFlushLogFrame::TSelfFlushLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) - : TEventLogFrame(needAlwaysSafeAdd, std::move(writeFrameCallback)) -{ -} - -TSelfFlushLogFrame::~TSelfFlushLogFrame() { - try { - Flush(); - } catch (...) { - } -} - -IEventLog::~IEventLog() { -} - -static THolder<TLogBackend> ConstructBackend(const TString& fileName, const TEventLogBackendOptions& backendOpts) { - try { - THolder<TLogBackend> backend; - if (backendOpts.UseSyncPageCacheBackend) { - backend = MakeHolder<TSyncPageCacheFileLogBackend>(fileName, backendOpts.SyncPageCacheBackendBufferSize, backendOpts.SyncPageCacheBackendMaxPendingSize); - } else { - backend = MakeHolder<TFileLogBackend>(fileName); - } - return MakeHolder<TReopenLogBackend>(std::move(backend)); - } catch (...) { - Cdbg << "Warning: Cannot open event log '" << fileName << "': " << CurrentExceptionMessage() << "." << Endl; - } - - return MakeHolder<TNullLogBackend>(); -} - -TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat) - : Log_(ConstructBackend(fileName, backendOpts)) - , ContentFormat_(contentFormat) - , LogFormat_(logFormat.Defined() ? *logFormat : COMPRESSED_LOG_FORMAT_V4) - , HasNullBackend_(Log_.IsNullLog()) - , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc")) - , ZstdCodec_(NBlockCodecs::Codec("zstd_1")) -{ - Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5); - - if (contentFormat & 0xff000000) { - ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")"; - } -} - -TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts) - : TEventLog(fileName, contentFormat, backendOpts, COMPRESSED_LOG_FORMAT_V4) -{ -} - -TEventLog::TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat) - : Log_(log) - , ContentFormat_(contentFormat) - , LogFormat_(logFormat) - , HasNullBackend_(Log_.IsNullLog()) - , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc")) - , ZstdCodec_(NBlockCodecs::Codec("zstd_1")) -{ - if (contentFormat & 0xff000000) { - ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")"; - } -} - -TEventLog::TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat) - : Log_(MakeHolder<TNullLogBackend>()) - , ContentFormat_(contentFormat) - , LogFormat_(logFormat) - , HasNullBackend_(true) - , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc")) - , ZstdCodec_(NBlockCodecs::Codec("zstd_1")) -{ - if (contentFormat & 0xff000000) { - ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")"; - } -} - -TEventLog::~TEventLog() { -} - -void TEventLog::ReopenLog() { - Log_.ReopenLog(); -} - -void TEventLog::CloseLog() { - Log_.CloseLog(); -} - -void TEventLog::Flush() { -} - -namespace { - class TOnExceptionAction { - public: - TOnExceptionAction(std::function<void()>&& f) - : F_(std::move(f)) - { - } - - ~TOnExceptionAction() { - if (F_ && UncaughtException()) { - try { - F_(); - } catch (...) { - } - } - } - - private: - std::function<void()> F_; - }; -} - -void TEventLog::WriteFrame(TBuffer& buffer, - TEventTimestamp startTimestamp, - TEventTimestamp endTimestamp, - TWriteFrameCallbackPtr writeFrameCallback, - TLogRecord::TMetaFlags metaFlags) { - Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5); - - TBuffer& b1 = buffer; - - size_t maxCompressedLength = (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) ? b1.Size() + 256 : ZstdCodec_->MaxCompressedLength(b1); - - // Reserve enough memory to minimize reallocs - TBufferOutput outbuf(sizeof(TFrameHeaderData) + maxCompressedLength); - TBuffer& b2 = outbuf.Buffer(); - b2.Proceed(sizeof(TFrameHeaderData)); - - { - TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data()); - - memcpy(hdr.SyncField, COMPRESSED_LOG_FRAME_SYNC_DATA.data(), COMPRESSED_LOG_FRAME_SYNC_DATA.size()); - hdr.Header.Format = (LogFormat_ << 24) | (ContentFormat_ & 0xffffff); - hdr.Header.FrameId = GenerateFrameId(); - hdr.HeaderEx.UncompressedDatalen = (ui32)b1.Size(); - hdr.HeaderEx.StartTimestamp = startTimestamp; - hdr.HeaderEx.EndTimestamp = endTimestamp; - hdr.HeaderEx.PayloadChecksum = 0; - hdr.HeaderEx.CompressorVersion = 0; - } - - if (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) { - TBuffer encoded(b1.Size() + sizeof(TFrameHeaderData) + 256); - Lz4hcCodec_->Encode(b1, encoded); - - TZLibCompress compr(&outbuf, ZLib::ZLib, 6, 2048); - compr.Write(encoded.data(), encoded.size()); - compr.Finish(); - } else { - b2.Advance(ZstdCodec_->Compress(b1, b2.Pos())); - } - - { - const size_t k = sizeof(TCompressedFrameBaseHeader) + COMPRESSED_LOG_FRAME_SYNC_DATA.size(); - TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data()); - hdr.Header.Length = static_cast<ui32>(b2.size() - k); - hdr.HeaderEx.PayloadChecksum = MurmurHash<ui32>(b2.data() + sizeof(TFrameHeaderData), b2.size() - sizeof(TFrameHeaderData)); - - const size_t n = sizeof(TFrameHeaderData) - (COMPRESSED_LOG_FRAME_SYNC_DATA.size() + sizeof(hdr.HeaderEx.HeaderChecksum)); - hdr.HeaderEx.HeaderChecksum = MurmurHash<ui32>(b2.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(), n); - } - - const TBuffer& frameData = outbuf.Buffer(); - - TOnExceptionAction actionCallback([this] { - if (ErrorCallback_) { - ErrorCallback_->OnWriteError(); - } - }); - - if (writeFrameCallback) { - writeFrameCallback->OnAfterCompress(frameData, startTimestamp, endTimestamp); - } - - Log_.Write(frameData.Data(), frameData.Size(), std::move(metaFlags)); - if (SuccessCallback_) { - SuccessCallback_->OnWriteSuccess(frameData); - } -} - -TEvent* TProtobufEventFactory::CreateLogEvent(TEventClass c) { - return new TProtobufEvent(c, EventFactory_); -} - -TEventClass TProtobufEventFactory::ClassByName(TStringBuf name) const { - return EventFactory_->IdByName(name); -} - -TEventClass TProtobufEventFactory::EventClassBegin() const { - const auto& items = EventFactory_->FactoryItems(); - - if (items.empty()) { - return static_cast<TEventClass>(0); - } - - return static_cast<TEventClass>(items.begin()->first); -} - -TEventClass TProtobufEventFactory::EventClassEnd() const { - const auto& items = EventFactory_->FactoryItems(); - - if (items.empty()) { - return static_cast<TEventClass>(0); - } - - return static_cast<TEventClass>(items.rbegin()->first + 1); -} - -namespace NEvClass { - IEventFactory* Factory() { - return Singleton<TProtobufEventFactory>(); - } - - IEventProcessor* Processor() { - return Singleton<TProtobufEventProcessor>(); - } -} - -const NProtoBuf::Message* TUnknownEvent::GetProto() const { - return UnknownEventMessage(); -} - -TStringBuf TUnknownEvent::GetName() const { - return TStringBuf("UnknownEvent"); -} - -void TUnknownEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const { - jsonWriter.OpenMap("EventBody"); - jsonWriter.Write("Type", GetName()); - jsonWriter.Write("EventId", (size_t)Class); - jsonWriter.CloseMap(); -} - -TStringBuf TEndOfFrameEvent::GetName() const { - return TStringBuf("EndOfFrame"); -} - -const NProtoBuf::Message* TEndOfFrameEvent::GetProto() const { - return Singleton<NEventLogInternal::TEndOfFrameEvent>(); -} - -void TEndOfFrameEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const { - jsonWriter.OpenMap("EventBody"); - jsonWriter.Write("Type", GetName()); - jsonWriter.OpenMap("Fields"); - jsonWriter.CloseMap(); - jsonWriter.CloseMap(); -} - -THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev) { - return MakeHolder<TProtobufEvent>(ts, eventId, ev); -} diff --git a/library/cpp/eventlog/eventlog.h b/library/cpp/eventlog/eventlog.h deleted file mode 100644 index 45c2dfb17fd..00000000000 --- a/library/cpp/eventlog/eventlog.h +++ /dev/null @@ -1,623 +0,0 @@ -#pragma once - -#include "eventlog_int.h" -#include "event_field_output.h" -#include "events_extension.h" - -#include <library/cpp/blockcodecs/codecs.h> -#include <library/cpp/logger/all.h> - -#include <google/protobuf/message.h> - -#include <util/datetime/base.h> -#include <util/generic/ptr.h> -#include <util/generic/string.h> -#include <util/stream/output.h> -#include <util/stream/buffer.h> -#include <util/stream/str.h> -#include <util/system/mutex.h> -#include <util/stream/output.h> -#include <util/system/env.h> -#include <util/system/unaligned_mem.h> -#include <util/ysaveload.h> - -#include <cstdlib> - -namespace NJson { - class TJsonWriter; -} - -class IEventLog; - -class TEvent : public TThrRefBase { -public: - enum class TOutputFormat { - TabSeparated, - TabSeparatedRaw, // disables escaping - Json - }; - - struct TOutputOptions { - TOutputFormat OutputFormat = TOutputFormat::TabSeparated; - // Dump some fields (e.g. timestamp) in more human-readable format - bool HumanReadable = false; - - TOutputOptions(TOutputFormat outputFormat = TOutputFormat::TabSeparated) - : OutputFormat(outputFormat) - { - } - - TOutputOptions(TOutputFormat outputFormat, bool humanReadable) - : OutputFormat(outputFormat) - , HumanReadable(humanReadable) - { - } - }; - - struct TEventState { - TEventTimestamp FrameStartTime = 0; - TEventTimestamp PrevEventTime = 0; - TEventState() { - } - }; - - TEvent(TEventClass c, TEventTimestamp t) - : Class(c) - , Timestamp(t) - { - } - - virtual ~TEvent() = default; - - // Note, that descendants MUST have Save() & Load() methods to alter - // only its new variables, not the base class! - virtual void Save(IOutputStream& out) const = 0; - virtual void SaveToBuffer(TBufferOutput& out) const { - Save(out); - } - - // Note, that descendants MUST have Save() & Load() methods to alter - // only its new variables, not the base class! - virtual void Load(IInputStream& i) = 0; - - virtual TStringBuf GetName() const = 0; - virtual const NProtoBuf::Message* GetProto() const = 0; - - void Print(IOutputStream& out, const TOutputOptions& options = TOutputOptions(), const TEventState& eventState = TEventState()) const; - void PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const; - - TString ToString() const { - TStringStream buff; - Print(buff); - return buff.Str(); - } - - void FullSaveToBuffer(TBufferOutput& buf) const { - SaveMessageHeader(buf); - this->SaveToBuffer(buf); - } - - void FullSave(IOutputStream& o) const { - SaveMessageHeader(o); - this->Save(o); - } - - void FullLoad(IInputStream& i) { - ::Load(&i, Timestamp); - ::Load(&i, Class); - this->Load(i); - } - - template <class T> - const T* Get() const { - return static_cast<const T*>(this->GetProto()); - } - - TEventClass Class; - TEventTimestamp Timestamp; - ui32 FrameId = 0; - -private: - void SaveMessageHeader(IOutputStream& out) const { - ::Save(&out, Timestamp); - ::Save(&out, Class); - } - - virtual void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const = 0; - virtual void DoPrintJson(NJson::TJsonWriter& jsonWriter) const = 0; - - void PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const; -}; - -using TEventPtr = TIntrusivePtr<TEvent>; -using TConstEventPtr = TIntrusiveConstPtr<TEvent>; - -class IEventProcessor { -public: - virtual void SetOptions(const TEvent::TOutputOptions& options) { - Options_ = options; - } - virtual void ProcessEvent(const TEvent* ev) = 0; - virtual bool CheckedProcessEvent(const TEvent* ev) { - ProcessEvent(ev); - return true; - } - virtual ~IEventProcessor() = default; - -protected: - TEvent::TOutputOptions Options_; -}; - -class IEventFactory { -public: - virtual TEvent* CreateLogEvent(TEventClass c) = 0; - virtual TEventLogFormat CurrentFormat() = 0; - virtual TEventClass ClassByName(TStringBuf name) const = 0; - virtual TEventClass EventClassBegin() const = 0; - virtual TEventClass EventClassEnd() const = 0; - virtual ~IEventFactory() = default; -}; - -class TUnknownEvent: public TEvent { -public: - TUnknownEvent(TEventTimestamp ts, TEventClass cls) - : TEvent(cls, ts) - { - } - - ~TUnknownEvent() override = default; - - void Save(IOutputStream& /* o */) const override { - ythrow yexception() << "TUnknownEvent cannot be saved"; - } - - void Load(IInputStream& /* i */) override { - ythrow yexception() << "TUnknownEvent cannot be loaded"; - } - - TStringBuf GetName() const override; - -private: - void DoPrint(IOutputStream& out, EFieldOutputFlags) const override { - out << GetName() << "\t" << (size_t)Class; - } - - void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override; - - const NProtoBuf::Message* GetProto() const override; -}; - -class TEndOfFrameEvent: public TEvent { -public: - enum { - EventClass = 0 - }; - - TEndOfFrameEvent(TEventTimestamp ts) - : TEvent(TEndOfFrameEvent::EventClass, ts) - { - } - - ~TEndOfFrameEvent() override = default; - - void Save(IOutputStream& o) const override { - (void)o; - ythrow yexception() << "TEndOfFrameEvent cannot be saved"; - } - - void Load(IInputStream& i) override { - (void)i; - ythrow yexception() << "TEndOfFrameEvent cannot be loaded"; - } - - TStringBuf GetName() const override; - -private: - void DoPrint(IOutputStream& out, EFieldOutputFlags) const override { - out << GetName(); - } - void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override; - - const NProtoBuf::Message* GetProto() const override; -}; - -class ILogFrameEventVisitor { -public: - virtual ~ILogFrameEventVisitor() = default; - - virtual void Visit(const TEvent& event) = 0; -}; - -class IWriteFrameCallback : public TAtomicRefCount<IWriteFrameCallback> { -public: - virtual ~IWriteFrameCallback() = default; - - virtual void OnAfterCompress(const TBuffer& compressedFrame, TEventTimestamp startTimestamp, TEventTimestamp endTimestamp) = 0; -}; - -using TWriteFrameCallbackPtr = TIntrusivePtr<IWriteFrameCallback>; - -class TEventLogFrame { -public: - TEventLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); - TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); - TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); - - virtual ~TEventLogFrame() = default; - - void Flush(); - void SafeFlush(); - - void ForceDump() { - ForceDump_ = true; - } - - template <class T> - inline void LogEvent(const T& ev) { - if (NeedAlwaysSafeAdd_) { - SafeLogEvent(ev); - } else { - UnSafeLogEvent(ev); - } - } - - template <class T> - inline void LogEvent(TEventTimestamp timestamp, const T& ev) { - if (NeedAlwaysSafeAdd_) { - SafeLogEvent(timestamp, ev); - } else { - UnSafeLogEvent(timestamp, ev); - } - } - - template <class T> - inline void UnSafeLogEvent(const T& ev) { - if (!IsEventIgnored(ev.ID)) - LogProtobufEvent(ev.ID, ev); - } - - template <class T> - inline void UnSafeLogEvent(TEventTimestamp timestamp, const T& ev) { - if (!IsEventIgnored(ev.ID)) - LogProtobufEvent(timestamp, ev.ID, ev); - } - - template <class T> - inline void SafeLogEvent(const T& ev) { - if (!IsEventIgnored(ev.ID)) { - TGuard<TMutex> g(Mtx_); - LogProtobufEvent(ev.ID, ev); - } - } - - template <class T> - inline void SafeLogEvent(TEventTimestamp timestamp, const T& ev) { - if (!IsEventIgnored(ev.ID)) { - TGuard<TMutex> g(Mtx_); - LogProtobufEvent(timestamp, ev.ID, ev); - } - } - - void VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory); - - inline bool IsEventIgnored(size_t eventId) const { - Y_UNUSED(eventId); // in future we might want to selectively discard only some kinds of messages - return !IsDebugModeEnabled() && EvLog_ == nullptr && !ForceDump_; - } - - void Enable(IEventLog& evLog) { - EvLog_ = &evLog; - } - - void Disable() { - EvLog_ = nullptr; - } - - void SetNeedAlwaysSafeAdd(bool val) { - NeedAlwaysSafeAdd_ = val; - } - - void SetWriteFrameCallback(TWriteFrameCallbackPtr writeFrameCallback) { - WriteFrameCallback_ = writeFrameCallback; - } - - void AddMetaFlag(const TString& key, const TString& value) { - if (NeedAlwaysSafeAdd_) { - TGuard<TMutex> g(Mtx_); - MetaFlags_.emplace_back(key, value); - } else { - MetaFlags_.emplace_back(key, value); - } - } - -protected: - void LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev); - void LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev); - -private: - static bool IsDebugModeEnabled() { - static struct TSelector { - bool Flag; - - TSelector() - : Flag(GetEnv("EVLOG_DEBUG") == TStringBuf("1")) - { - } - } selector; - - return selector.Flag; - } - - template <class T> - void DebugDump(const T& ev); - - // T must be a descendant of NEvClass::TEvent - template <class T> - inline void LogEventImpl(const T& ev) { - if (EvLog_ != nullptr || ForceDump_) { - TBuffer& b = Buf_.Buffer(); - size_t lastSize = b.size(); - ::Save(&Buf_, ui32(0)); - ev.FullSaveToBuffer(Buf_); - WriteUnaligned<ui32>(b.data() + lastSize, (ui32)(b.size() - lastSize)); - AddEvent(ev.Timestamp); - } - - if (IsDebugModeEnabled()) { - DebugDump(ev); - } - } - - void AddEvent(TEventTimestamp timestamp); - void DoInit(); - -private: - TBufferOutput Buf_; - TEventTimestamp StartTimestamp_, EndTimestamp_; - IEventLog* EvLog_; - TMutex Mtx_; - bool NeedAlwaysSafeAdd_; - bool ForceDump_; - TWriteFrameCallbackPtr WriteFrameCallback_; - TLogRecord::TMetaFlags MetaFlags_; - friend class TEventRecord; -}; - -class TSelfFlushLogFrame: public TEventLogFrame, public TAtomicRefCount<TSelfFlushLogFrame> { -public: - TSelfFlushLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); - TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); - TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); - - virtual ~TSelfFlushLogFrame(); -}; - -using TSelfFlushLogFramePtr = TIntrusivePtr<TSelfFlushLogFrame>; - -class IEventLog: public TAtomicRefCount<IEventLog> { -public: - class IErrorCallback { - public: - virtual ~IErrorCallback() { - } - - virtual void OnWriteError() = 0; - }; - - class ISuccessCallback { - public: - virtual ~ISuccessCallback() { - } - - virtual void OnWriteSuccess(const TBuffer& frameData) = 0; - }; - - virtual ~IEventLog(); - - virtual void ReopenLog() = 0; - virtual void CloseLog() = 0; - virtual void Flush() = 0; - virtual void SetErrorCallback(IErrorCallback*) { - } - virtual void SetSuccessCallback(ISuccessCallback*) { - } - - template <class T> - void LogEvent(const T& ev) { - TEventLogFrame frame(*this); - frame.LogEvent(ev); - frame.Flush(); - } - - virtual bool HasNullBackend() const = 0; - - virtual void WriteFrame(TBuffer& buffer, - TEventTimestamp startTimestamp, - TEventTimestamp endTimestamp, - TWriteFrameCallbackPtr writeFrameCallback = nullptr, - TLogRecord::TMetaFlags metaFlags = {}) = 0; -}; - -struct TEventLogBackendOptions { - bool UseSyncPageCacheBackend = false; - size_t SyncPageCacheBackendBufferSize = 0; - size_t SyncPageCacheBackendMaxPendingSize = 0; -}; - -class TEventLog: public IEventLog { -public: - /* - * Параметр contentformat указывает формат контента лога, например какие могут в логе - * встретится классы событий, какие параметры у этих событий, и пр. Старший байт параметра - * должен быть нулевым. - */ - TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat); - TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts = {}); - TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4); - TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4); - - ~TEventLog() override; - - void ReopenLog() override; - void CloseLog() override; - void Flush() override; - void SetErrorCallback(IErrorCallback* errorCallback) override { - ErrorCallback_ = errorCallback; - } - void SetSuccessCallback(ISuccessCallback* successCallback) override { - SuccessCallback_ = successCallback; - } - - template <class T> - void LogEvent(const T& ev) { - TEventLogFrame frame(*this); - frame.LogEvent(ev); - frame.Flush(); - } - - bool HasNullBackend() const override { - return HasNullBackend_; - } - - void WriteFrame(TBuffer& buffer, - TEventTimestamp startTimestamp, - TEventTimestamp endTimestamp, - TWriteFrameCallbackPtr writeFrameCallback = nullptr, - TLogRecord::TMetaFlags metaFlags = {}) override; - -private: - mutable TLog Log_; - TEventLogFormat ContentFormat_; - const TEventLogFormat LogFormat_; - bool HasNullBackend_; - const NBlockCodecs::ICodec* const Lz4hcCodec_; - const NBlockCodecs::ICodec* const ZstdCodec_; - IErrorCallback* ErrorCallback_ = nullptr; - ISuccessCallback* SuccessCallback_ = nullptr; -}; - -using TEventLogPtr = TIntrusivePtr<IEventLog>; - -class TEventLogWithSlave: public IEventLog { -public: - TEventLogWithSlave(IEventLog& parentLog) - : Slave_(&parentLog) - { - } - - TEventLogWithSlave(const TEventLogPtr& parentLog) - : SlavePtr_(parentLog) - , Slave_(SlavePtr_.Get()) - { - } - - ~TEventLogWithSlave() override { - try { - Slave().Flush(); - } catch (...) { - } - } - - void Flush() override { - Slave().Flush(); - } - - void ReopenLog() override { - return Slave().ReopenLog(); - } - void CloseLog() override { - return Slave().CloseLog(); - } - - bool HasNullBackend() const override { - return Slave().HasNullBackend(); - } - - void WriteFrame(TBuffer& buffer, - TEventTimestamp startTimestamp, - TEventTimestamp endTimestamp, - TWriteFrameCallbackPtr writeFrameCallback = nullptr, - TLogRecord::TMetaFlags metaFlags = {}) override { - Slave().WriteFrame(buffer, startTimestamp, endTimestamp, writeFrameCallback, std::move(metaFlags)); - } - - void SetErrorCallback(IErrorCallback* errorCallback) override { - Slave().SetErrorCallback(errorCallback); - } - - void SetSuccessCallback(ISuccessCallback* successCallback) override { - Slave().SetSuccessCallback(successCallback); - } - -protected: - inline IEventLog& Slave() const { - return *Slave_; - } - -private: - TEventLogPtr SlavePtr_; - IEventLog* Slave_ = nullptr; -}; - -extern TAtomic eventlogFrameCounter; - -class TProtobufEventProcessor: public IEventProcessor { -public: - void ProcessEvent(const TEvent* ev) override final { - ProcessEvent(ev, &Cout); - } - - void ProcessEvent(const TEvent* ev, IOutputStream *out) { - UpdateEventState(ev); - DoProcessEvent(ev, out); - EventState_.PrevEventTime = ev->Timestamp; - } -protected: - virtual void DoProcessEvent(const TEvent * ev, IOutputStream *out) { - ev->Print(*out, Options_, EventState_); - (*out) << Endl; - } - ui32 CurrentFrameId_ = Max<ui32>(); - TEvent::TEventState EventState_; - -private: - void UpdateEventState(const TEvent *ev) { - if (ev->FrameId != CurrentFrameId_) { - EventState_.FrameStartTime = ev->Timestamp; - EventState_.PrevEventTime = ev->Timestamp; - CurrentFrameId_ = ev->FrameId; - } - } -}; - -class TProtobufEventFactory: public IEventFactory { -public: - TProtobufEventFactory(NProtoBuf::TEventFactory* factory = NProtoBuf::TEventFactory::Instance()) - : EventFactory_(factory) - { - } - - TEvent* CreateLogEvent(TEventClass c) override; - - TEventLogFormat CurrentFormat() override { - return 0; - } - - TEventClass ClassByName(TStringBuf name) const override; - - TEventClass EventClassBegin() const override; - - TEventClass EventClassEnd() const override; - - ~TProtobufEventFactory() override = default; - -private: - NProtoBuf::TEventFactory* EventFactory_; -}; - -THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev); - -namespace NEvClass { - IEventFactory* Factory(); - IEventProcessor* Processor(); -} diff --git a/library/cpp/eventlog/eventlog_int.cpp b/library/cpp/eventlog/eventlog_int.cpp deleted file mode 100644 index faa8c42cbeb..00000000000 --- a/library/cpp/eventlog/eventlog_int.cpp +++ /dev/null @@ -1,12 +0,0 @@ -#include "eventlog_int.h" - -#include <util/string/cast.h> - -TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str) { - EEventLogFormat format; - if (TryFromString(str, format)) { - return static_cast<TEventLogFormat>(format); - } else { - return {}; - } -} diff --git a/library/cpp/eventlog/eventlog_int.h b/library/cpp/eventlog/eventlog_int.h deleted file mode 100644 index eb00fecfab6..00000000000 --- a/library/cpp/eventlog/eventlog_int.h +++ /dev/null @@ -1,72 +0,0 @@ -#pragma once - -#include <util/stream/output.h> -#include <util/generic/maybe.h> -#include <util/generic/utility.h> -#include <util/generic/yexception.h> -#include <util/ysaveload.h> - -using TEventClass = ui32; -using TEventLogFormat = ui32; -using TEventTimestamp = ui64; - -constexpr TStringBuf COMPRESSED_LOG_FRAME_SYNC_DATA = - "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" - "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" - "\x00\x00\x00\x00\xfe\x00\x00\xff\xff\x00\x00\xff\xff\x00" - "\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff" - "\xff\x00\x00\xff\xff\x00\x00\xff"sv; - -static_assert(COMPRESSED_LOG_FRAME_SYNC_DATA.size() == 64); - -/* - * Коды форматов логов. Форматом лога считается формат служебных - * структур лога. К примеру формат заголовка, наличие компрессии, и т.д. - * Имеет значение только 1 младший байт. - */ - -enum EEventLogFormat : TEventLogFormat { - // Формат версии 1. Используется компрессор LZQ. - COMPRESSED_LOG_FORMAT_V1 = 1, - - // Формат версии 2. Используется компрессор ZLIB. Добавлены CRC заголовка и данных, - // поле типа компрессора. - COMPRESSED_LOG_FORMAT_V2 = 2, - - // Формат версии 3. Используется компрессор ZLIB. В начинке фреймов перед каждым событием добавлен его размер. - COMPRESSED_LOG_FORMAT_V3 = 3, - - // Lz4hc codec + zlib - COMPRESSED_LOG_FORMAT_V4 = 4 /* "zlib_lz4" */, - - // zstd - COMPRESSED_LOG_FORMAT_V5 = 5 /* "zstd" */, -}; - -TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str); - -#pragma pack(push, 1) - -struct TCompressedFrameBaseHeader { - TEventLogFormat Format; - ui32 Length; // Длина остатка фрейма в байтах, после этого заголовка - ui32 FrameId; -}; - -struct TCompressedFrameHeader { - TEventTimestamp StartTimestamp; - TEventTimestamp EndTimestamp; - ui32 UncompressedDatalen; // Длина данных, которые были закомпрессированы - ui32 PayloadChecksum; // В логе версии 1 поле не используется -}; - -struct TCompressedFrameHeader2: public TCompressedFrameHeader { - ui8 CompressorVersion; // Сейчас не используется - ui32 HeaderChecksum; -}; - -#pragma pack(pop) - -Y_DECLARE_PODTYPE(TCompressedFrameBaseHeader); -Y_DECLARE_PODTYPE(TCompressedFrameHeader); -Y_DECLARE_PODTYPE(TCompressedFrameHeader2); diff --git a/library/cpp/eventlog/events_extension.h b/library/cpp/eventlog/events_extension.h deleted file mode 100644 index 0cf062f9590..00000000000 --- a/library/cpp/eventlog/events_extension.h +++ /dev/null @@ -1,161 +0,0 @@ -#pragma once - -#include "event_field_output.h" - -#include <google/protobuf/descriptor.h> -#include <google/protobuf/message.h> - -#include <library/cpp/threading/atomic/bool.h> -#include <library/cpp/string_utils/base64/base64.h> - -#include <util/generic/map.h> -#include <util/generic/deque.h> -#include <util/generic/singleton.h> -#include <util/string/hex.h> -#include <util/system/guard.h> -#include <util/system/mutex.h> - -namespace NProtoBuf { - class TEventFactory { - public: - typedef ::google::protobuf::Message Message; - typedef void (*TEventSerializer)(const Message* event, IOutputStream& output, EFieldOutputFlags flags); - typedef void (*TRegistrationFunc)(); - - private: - class TFactoryItem { - public: - TFactoryItem(const Message* prototype, const TEventSerializer serializer) - : Prototype_(prototype) - , Serializer_(serializer) - { - } - - TStringBuf GetName() const { - return Prototype_->GetDescriptor()->name(); - } - - Message* Create() const { - return Prototype_->New(); - } - - void PrintEvent(const Message* event, IOutputStream& out, EFieldOutputFlags flags) const { - (*Serializer_)(event, out, flags); - } - - private: - const Message* Prototype_; - const TEventSerializer Serializer_; - }; - - typedef TMap<size_t, TFactoryItem> TFactoryMap; - - public: - TEventFactory() - : FactoryItems_() - { - } - - void ScheduleRegistration(TRegistrationFunc func) { - EventRegistrators_.push_back(func); - } - - void RegisterEvent(size_t eventId, const Message* prototype, const TEventSerializer serializer) { - FactoryItems_.insert(std::make_pair(eventId, TFactoryItem(prototype, serializer))); - } - - size_t IdByName(TStringBuf eventname) { - DelayedRegistration(); - for (TFactoryMap::const_iterator it = FactoryItems_.begin(); it != FactoryItems_.end(); ++it) { - if (it->second.GetName() == eventname) - return it->first; - } - - ythrow yexception() << "do not know event '" << eventname << "'"; - } - - TStringBuf NameById(size_t id) { - DelayedRegistration(); - TFactoryMap::const_iterator it = FactoryItems_.find(id); - return it != FactoryItems_.end() ? it->second.GetName() : TStringBuf(); - } - - Message* CreateEvent(size_t eventId) { - DelayedRegistration(); - TFactoryMap::const_iterator it = FactoryItems_.find(eventId); - - if (it != FactoryItems_.end()) { - return it->second.Create(); - } - - return nullptr; - } - - const TMap<size_t, TFactoryItem>& FactoryItems() { - DelayedRegistration(); - return FactoryItems_; - } - - void PrintEvent( - size_t eventId, - const Message* event, - IOutputStream& output, - EFieldOutputFlags flags = {}) { - DelayedRegistration(); - TFactoryMap::const_iterator it = FactoryItems_.find(eventId); - - if (it != FactoryItems_.end()) { - it->second.PrintEvent(event, output, flags); - } - } - - static TEventFactory* Instance() { - return Singleton<TEventFactory>(); - } - - private: - void DelayedRegistration() { - if (!DelayedRegistrationDone_) { - TGuard<TMutex> guard(MutexEventRegistrators_); - Y_UNUSED(guard); - while (!EventRegistrators_.empty()) { - EventRegistrators_.front()(); - EventRegistrators_.pop_front(); - } - DelayedRegistrationDone_ = true; - } - } - - private: - TMap<size_t, TFactoryItem> FactoryItems_; - TDeque<TRegistrationFunc> EventRegistrators_; - NAtomic::TBool DelayedRegistrationDone_ = false; - TMutex MutexEventRegistrators_; - }; - - template <typename T> - void PrintAsBytes(const T& obj, IOutputStream& output) { - const ui8* b = reinterpret_cast<const ui8*>(&obj); - const ui8* e = b + sizeof(T); - const char* delim = ""; - - while (b != e) { - output << delim; - output << (int)*b++; - delim = "."; - } - } - - template <typename T> - void PrintAsHex(const T& obj, IOutputStream& output) { - output << "0x"; - output << HexEncode(&obj, sizeof(T)); - } - - inline void PrintAsBase64(TStringBuf data, IOutputStream& output) { - if (!data.empty()) { - output << Base64Encode(data); - } - } - -} diff --git a/library/cpp/eventlog/iterator.cpp b/library/cpp/eventlog/iterator.cpp deleted file mode 100644 index 71f955bca82..00000000000 --- a/library/cpp/eventlog/iterator.cpp +++ /dev/null @@ -1,88 +0,0 @@ -#include "iterator.h" - -#include <library/cpp/streams/growing_file_input/growing_file_input.h> - -#include <util/string/cast.h> -#include <util/string/split.h> -#include <util/string/type.h> -#include <util/stream/file.h> - -using namespace NEventLog; - -namespace { - inline TIntrusivePtr<TEventFilter> ConstructEventFilter(bool enableEvents, const TString& evList, IEventFactory* fac) { - if (evList.empty()) { - return nullptr; - } - - TVector<TString> events; - - StringSplitter(evList).Split(',').SkipEmpty().Collect(&events); - if (events.empty()) { - return nullptr; - } - - TIntrusivePtr<TEventFilter> filter(new TEventFilter(enableEvents)); - - for (const auto& event : events) { - if (IsNumber(event)) - filter->AddEventClass(FromString<size_t>(event)); - else - filter->AddEventClass(fac->ClassByName(event)); - } - - return filter; - } - - struct TIterator: public IIterator { - inline TIterator(const TOptions& o, IEventFactory* fac) - : First(true) - { - if (o.FileName.size()) { - if (o.ForceStreamMode || o.TailFMode) { - FileInput.Reset(o.TailFMode ? (IInputStream*)new TGrowingFileInput(o.FileName) : (IInputStream*)new TUnbufferedFileInput(o.FileName)); - FrameStream.Reset(new TFrameStreamer(*FileInput, fac, o.FrameFilter)); - } else { - FrameStream.Reset(new TFrameStreamer(o.FileName, o.StartTime, o.EndTime, o.MaxRequestDuration, fac, o.FrameFilter)); - } - } else { - FrameStream.Reset(new TFrameStreamer(*o.Input, fac, o.FrameFilter)); - } - - EvFilter = ConstructEventFilter(o.EnableEvents, o.EvList, fac); - EventStream.Reset(new TEventStreamer(*FrameStream, o.StartTime, o.EndTime, o.ForceStrongOrdering, EvFilter, o.ForceLosslessStrongOrdering)); - } - - TConstEventPtr Next() override { - if (First) { - First = false; - - if (!EventStream->Avail()) { - return nullptr; - } - } else { - if (!EventStream->Next()) { - return nullptr; - } - } - - return **EventStream; - } - - THolder<IInputStream> FileInput; - THolder<TFrameStreamer> FrameStream; - TIntrusivePtr<TEventFilter> EvFilter; - THolder<TEventStreamer> EventStream; - bool First; - }; -} - -IIterator::~IIterator() = default; - -THolder<IIterator> NEventLog::CreateIterator(const TOptions& o, IEventFactory* fac) { - return MakeHolder<TIterator>(o, fac); -} - -THolder<IIterator> NEventLog::CreateIterator(const TOptions& o) { - return MakeHolder<TIterator>(o, NEvClass::Factory()); -} diff --git a/library/cpp/eventlog/iterator.h b/library/cpp/eventlog/iterator.h deleted file mode 100644 index 71a61ed5494..00000000000 --- a/library/cpp/eventlog/iterator.h +++ /dev/null @@ -1,51 +0,0 @@ -#pragma once - -#include <util/stream/input.h> -#include <util/generic/ptr.h> -#include <util/generic/string.h> -#include <util/generic/iterator.h> - -#include "eventlog.h" -#include "logparser.h" - -namespace NEventLog { - struct TOptions { - inline TOptions& SetFileName(const TString& fileName) { - FileName = fileName; - - return *this; - } - - inline TOptions& SetForceStrongOrdering(bool v) { - if(!ForceLosslessStrongOrdering) { - ForceStrongOrdering = v; - } - - return *this; - } - - ui64 StartTime = MIN_START_TIME; - ui64 EndTime = MAX_END_TIME; - ui64 MaxRequestDuration = MAX_REQUEST_DURATION; - TString FileName; - bool ForceStrongOrdering = false; - bool ForceWeakOrdering = false; - bool EnableEvents = true; - TString EvList; - bool ForceStreamMode = false; - bool ForceLosslessStrongOrdering = false; - bool TailFMode = false; - IInputStream* Input = &Cin; - IFrameFilterRef FrameFilter; - }; - - class IIterator: public TInputRangeAdaptor<IIterator> { - public: - virtual ~IIterator(); - - virtual TConstEventPtr Next() = 0; - }; - - THolder<IIterator> CreateIterator(const TOptions& o); - THolder<IIterator> CreateIterator(const TOptions& o, IEventFactory* fac); -} diff --git a/library/cpp/eventlog/logparser.cpp b/library/cpp/eventlog/logparser.cpp deleted file mode 100644 index 6f8959f7888..00000000000 --- a/library/cpp/eventlog/logparser.cpp +++ /dev/null @@ -1,814 +0,0 @@ -#include "logparser.h" -#include "evdecoder.h" - -#include <util/stream/output.h> -#include <util/stream/zlib.h> -#include <util/digest/murmur.h> -#include <util/generic/algorithm.h> -#include <util/generic/scope.h> -#include <util/generic/hash_set.h> -#include <util/string/split.h> -#include <util/string/cast.h> -#include <util/string/escape.h> -#include <util/string/builder.h> - -#include <contrib/libs/re2/re2/re2.h> - -#include <algorithm> -#include <array> - -namespace { - bool FastforwardUntilSyncHeader(IInputStream* in) { - // Usually this function finds the correct header at the first hit - std::array<char, COMPRESSED_LOG_FRAME_SYNC_DATA.size()> buffer; - if (in->Load(buffer.data(), buffer.size()) != buffer.size()) { - return false; - } - - auto begin = buffer.begin(); - - for (;;) { - if (std::mismatch( - begin, buffer.end(), - COMPRESSED_LOG_FRAME_SYNC_DATA.begin()).first == buffer.end() && - std::mismatch( - buffer.begin(), begin, - COMPRESSED_LOG_FRAME_SYNC_DATA.begin() + (buffer.end() - begin)).first == begin) { - return true; - } - if (!in->ReadChar(*begin)) { - return false; - } - ++begin; - if (begin == buffer.end()) { - begin = buffer.begin(); - } - } - } - - bool HasCorrectChecksum(const TFrameHeader& header) { - // Calculating hash over all the fields of the read header except for the field with the hash of the header itself. - const size_t baseSize = sizeof(TCompressedFrameBaseHeader) + sizeof(TCompressedFrameHeader2) - sizeof(ui32); - const ui32 checksum = MurmurHash<ui32>(&header.Basehdr, baseSize); - return checksum == header.Framehdr.HeaderChecksum; - } - - TMaybe<TFrameHeader> FindNextFrameHeader(IInputStream* in) { - for (;;) { - if (FastforwardUntilSyncHeader(in)) { - try { - return TFrameHeader(*in); - } catch (const TFrameLoadError& err) { - Cdbg << err.what() << Endl; - in->Skip(err.SkipAfter); - } - } else { - return Nothing(); - } - } - } - - std::pair<TMaybe<TFrameHeader>, TStringBuf> FindNextFrameHeader(TStringBuf span) { - for (;;) { - auto iter = std::search( - span.begin(), span.end(), - COMPRESSED_LOG_FRAME_SYNC_DATA.begin(), COMPRESSED_LOG_FRAME_SYNC_DATA.end()); - const size_t offset = iter - span.begin(); - - if (offset != span.size()) { - span = span.substr(offset); - try { - TMemoryInput in( - span.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(), - span.size() - COMPRESSED_LOG_FRAME_SYNC_DATA.size()); - return {TFrameHeader(in), span}; - } catch (const TFrameLoadError& err) { - Cdbg << err.what() << Endl; - span = span.substr(err.SkipAfter); - } - } else { - return {Nothing(), {}}; - } - } - } - - size_t FindFrames(const TStringBuf span, ui64 start, ui64 end, ui64 maxRequestDuration) { - Y_ENSURE(start <= end); - - const auto leftTimeBound = start - Min(start, maxRequestDuration); - const auto rightTimeBound = end + Min(maxRequestDuration, Max<ui64>() - end); - - TStringBuf subspan = span; - TMaybe<TFrameHeader> maybeLeftFrame; - std::tie(maybeLeftFrame, subspan) = FindNextFrameHeader(subspan); - - if (!maybeLeftFrame || maybeLeftFrame->EndTime() > rightTimeBound) { - return span.size(); - } - - if (maybeLeftFrame->StartTime() > leftTimeBound) { - return 0; - } - - while (subspan.size() > maybeLeftFrame->FullLength()) { - const auto mid = subspan.data() + subspan.size() / 2; - auto [midFrame, rightHalfSpan] = FindNextFrameHeader({mid, subspan.data() + subspan.size()}); - if (!midFrame) { - // If mid is in the middle of the last frame, here we will lose it meaning that - // we will find previous frame as the result. - // This is fine because we will iterate frames starting from that. - subspan = subspan.substr(0, subspan.size() / 2); - continue; - } - if (midFrame->StartTime() <= leftTimeBound) { - maybeLeftFrame = midFrame; - subspan = rightHalfSpan; - } else { - subspan = subspan.substr(0, subspan.size() / 2); - } - } - - return subspan.data() - span.data(); - } -} - -TFrameHeader::TFrameHeader(IInputStream& in) { - try { - ::Load(&in, Basehdr); - - Y_ENSURE(Basehdr.Length, "Empty frame additional data"); - - ::Load(&in, Framehdr); - switch (LogFormat()) { - case COMPRESSED_LOG_FORMAT_V1: - break; - - case COMPRESSED_LOG_FORMAT_V2: - case COMPRESSED_LOG_FORMAT_V3: - case COMPRESSED_LOG_FORMAT_V4: - case COMPRESSED_LOG_FORMAT_V5: - Y_ENSURE(!Framehdr.CompressorVersion, "Wrong compressor"); - - Y_ENSURE(HasCorrectChecksum(*this), "Wrong header checksum"); - break; - - default: - ythrow yexception() << "Unsupported log structure format"; - }; - - Y_ENSURE(Framehdr.StartTimestamp <= Framehdr.EndTimestamp, "Wrong start/end timestamps"); - - // Each frame must contain at least one event. - Y_ENSURE(Framehdr.UncompressedDatalen, "Empty frame payload"); - } catch (...) { - TString location = ""; - if (const auto* cnt = dynamic_cast<TCountingInput *>(&in)) { - location = "@ " + ToString(cnt->Counter()); - } - ythrow TFrameLoadError(FrameLength()) << "Frame Load Error" << location << ": " << CurrentExceptionMessage(); - } -} - -TFrame::TFrame(IInputStream& in, TFrameHeader header, IEventFactory* fac) - : TFrameHeader(header) - , Limiter_(MakeHolder<TLengthLimitedInput>(&in, header.FrameLength())) - , Fac_(fac) -{ - if (auto* cnt = dynamic_cast<TCountingInput *>(&in)) { - Address_ = cnt->Counter() - sizeof(TFrameHeader); - } else { - Address_ = 0; - } -} - -TFrame::TIterator TFrame::GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter) const { - if (EventsCache_.empty()) { - for (TFrameDecoder decoder{*this, eventFilter.Get()}; decoder.Avail(); decoder.Next()) { - EventsCache_.emplace_back(*decoder); - } - } - - return TIterator(*this, eventFilter); -} - -void TFrame::ClearEventsCache() const { - EventsCache_.clear(); -} - -TString TFrame::GetCompressedFrame() const { - const auto left = Limiter_->Left(); - TString payload = Limiter_->ReadAll(); - Y_ENSURE(payload.size() == left, "Could not read frame payload: premature end of stream"); - const ui32 checksum = MurmurHash<ui32>(payload.data(), payload.size()); - Y_ENSURE(checksum == Framehdr.PayloadChecksum, "Invalid frame checksum"); - - return payload; -} - -TString TFrame::GetRawFrame() const { - TString frameBuf = GetCompressedFrame(); - TStringInput sin(frameBuf); - return TZLibDecompress{&sin}.ReadAll(); -} - -TFrame::TIterator::TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter) - : Frame_(frame) - , Size_(frame.EventsCache_.size()) - , Filter_(filter) - , Index_(0) -{ - SkipToValidEvent(); -} - -TConstEventPtr TFrame::TIterator::operator*() const { - return Frame_.GetEvent(Index_); -} - -bool TFrame::TIterator::Next() { - Index_++; - SkipToValidEvent(); - return Index_ < Size_; -} - -void TFrame::TIterator::SkipToValidEvent() { - if (!Filter_) { - return; - } - - for (; Index_ < Size_; ++Index_) { - if (Filter_->EventAllowed(Frame_.GetEvent(Index_)->Class)) { - break; - } - } -} - -TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory* eventFactory) { - if (auto header = FindNextFrameHeader(in)) { - return TFrame{*in, *header, eventFactory}; - } else { - return Nothing(); - } -} - -TContainsEventFrameFilter::TContainsEventFrameFilter(const TString& unparsedMatchGroups, const IEventFactory* eventFactory) { - TVector<TStringBuf> tokens; - - SplitWithEscaping(tokens, unparsedMatchGroups, "/"); - - // Amount of match groups - size_t size = tokens.size(); - MatchGroups.resize(size); - - for (size_t i = 0; i < size; i++) { - TMatchGroup& group = MatchGroups[i]; - TVector<TStringBuf> groupTokens; - SplitWithEscaping(groupTokens, tokens[i], ":"); - - Y_ENSURE(groupTokens.size() == 3); - - try { - group.EventID = eventFactory->ClassByName(groupTokens[0]); - } catch (yexception& e) { - if (!TryFromString<TEventClass>(groupTokens[0], group.EventID)) { - e << "\nAppend:\n" << "Cannot derive EventId from EventType: " << groupTokens[0]; - throw e; - } - } - - group.FieldName = groupTokens[1]; - group.ValueToMatch = UnescapeCharacters(groupTokens[2], "/:"); - } -} - -bool TContainsEventFrameFilter::FrameAllowed(const TFrame& frame) const { - THashSet<size_t> toMatchSet; - for (size_t i = 0; i < MatchGroups.size(); i++) { - toMatchSet.insert(i); - } - - for (auto it = frame.GetIterator(); it.Avail(); it.Next()) { - TConstEventPtr event(*it); - TVector<size_t> indicesToErase; - - if (!toMatchSet.empty()) { - const NProtoBuf::Message* message = event->GetProto(); - const google::protobuf::Descriptor* descriptor = message->GetDescriptor(); - const google::protobuf::Reflection* reflection = message->GetReflection(); - - Y_ENSURE(descriptor); - Y_ENSURE(reflection); - - for (size_t groupIndex : toMatchSet) { - const TMatchGroup& group = MatchGroups[groupIndex]; - - if (event->Class == group.EventID) { - TVector<TString> parts = StringSplitter(group.FieldName).Split('.').ToList<TString>(); - TString lastPart = std::move(parts.back()); - parts.pop_back(); - - for (auto part : parts) { - auto fieldDescriptor = descriptor->FindFieldByName(part); - Y_ENSURE(fieldDescriptor, "Cannot find field \"" + part + "\". Full fieldname is \"" + group.FieldName + "\"."); - - message = &reflection->GetMessage(*message, fieldDescriptor); - descriptor = message->GetDescriptor(); - reflection = message->GetReflection(); - - Y_ENSURE(descriptor); - Y_ENSURE(reflection); - } - - const google::protobuf::FieldDescriptor* fieldDescriptor = descriptor->FindFieldByName(lastPart); - Y_ENSURE(fieldDescriptor, "Cannot find field \"" + lastPart + "\". Full fieldname is \"" + group.FieldName + "\"."); - - TString fieldValue = GetEventFieldAsString(message, fieldDescriptor, reflection); - if (re2::RE2::FullMatch(fieldValue, group.ValueToMatch)) { - indicesToErase.push_back(groupIndex); - } - } - } - - for (size_t idx : indicesToErase) { - toMatchSet.erase(idx); - } - - if (toMatchSet.empty()) { - return true; - } - } - } - - return toMatchSet.empty(); -} - -void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet) { - size_t tokenStart = 0; - const TString characterSet = TString::Join("\\", externalCharacterSet); - - for (size_t position = stringToSplit.find_first_of(characterSet); position != TString::npos; position = stringToSplit.find_first_of(characterSet, position + 1)) { - if (stringToSplit[position] == '\\') { - position++; - } else { - if (tokenStart != position) { - tokens.push_back(TStringBuf(stringToSplit, tokenStart, position - tokenStart)); - } - tokenStart = position + 1; - } - } - - if (tokenStart < stringToSplit.size()) { - tokens.push_back(TStringBuf(stringToSplit, tokenStart, stringToSplit.size() - tokenStart)); - } -} - -TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet) { - TStringBuilder stringBuilder; - size_t tokenStart = 0; - - for (size_t position = stringToUnescape.find('\\', 0u); position != TString::npos; position = stringToUnescape.find('\\', position + 2)) { - if (position + 1 < stringToUnescape.size() && characterSet.find(stringToUnescape[position + 1]) != TString::npos) { - stringBuilder << TStringBuf(stringToUnescape, tokenStart, position - tokenStart); - tokenStart = position + 1; - } - } - - if (tokenStart < stringToUnescape.size()) { - stringBuilder << TStringBuf(stringToUnescape, tokenStart, stringToUnescape.size() - tokenStart); - } - - return stringBuilder; -} - -TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection) { - Y_ENSURE(message); - Y_ENSURE(fieldDescriptor); - Y_ENSURE(reflection); - - TString result; - switch (fieldDescriptor->type()) { - case google::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: - result = ToString(reflection->GetDouble(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_FLOAT: - result = ToString(reflection->GetFloat(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_BOOL: - result = ToString(reflection->GetBool(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_INT32: - result = ToString(reflection->GetInt32(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_UINT32: - result = ToString(reflection->GetUInt32(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_INT64: - result = ToString(reflection->GetInt64(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_UINT64: - result = ToString(reflection->GetUInt64(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_STRING: - result = ToString(reflection->GetString(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_ENUM: - { - const NProtoBuf::EnumValueDescriptor* enumValueDescriptor = reflection->GetEnum(*message, fieldDescriptor); - result = ToString(enumValueDescriptor->name()); - } - break; - default: - throw yexception() << "GetEventFieldAsString for type " << fieldDescriptor->type_name() << " is not implemented."; - } - return result; -} - -TFrameStreamer::TFrameStreamer(IInputStream& s, IEventFactory* fac, IFrameFilterRef ff) - : In_(&s) - , FrameFilter_(ff) - , EventFactory_(fac) -{ - Frame_ = FindNextFrame(&In_, EventFactory_); - - SkipToAllowedFrame(); -} - -TFrameStreamer::TFrameStreamer( - const TString& fileName, - ui64 startTime, - ui64 endTime, - ui64 maxRequestDuration, - IEventFactory* fac, - IFrameFilterRef ff) - : File_(TBlob::FromFile(fileName)) - , MemoryIn_(File_.Data(), File_.Size()) - , In_(&MemoryIn_) - , StartTime_(startTime) - , EndTime_(endTime) - , CutoffTime_(endTime + Min(maxRequestDuration, Max<ui64>() - endTime)) - , FrameFilter_(ff) - , EventFactory_(fac) -{ - In_.Skip(FindFrames(File_.AsStringBuf(), startTime, endTime, maxRequestDuration)); - Frame_ = FindNextFrame(&In_, fac); - SkipToAllowedFrame(); -} - -TFrameStreamer::~TFrameStreamer() = default; - -bool TFrameStreamer::Avail() const { - return Frame_.Defined(); -} - -const TFrame& TFrameStreamer::operator*() const { - Y_ENSURE(Frame_, "Frame streamer depleted"); - - return *Frame_; -} - -bool TFrameStreamer::Next() { - DoNext(); - SkipToAllowedFrame(); - - return Frame_.Defined(); -} - -bool TFrameStreamer::AllowedTimeRange(const TFrame& frame) const { - const bool allowedStartTime = (StartTime_ == 0) || ((StartTime_ <= frame.StartTime()) && (frame.StartTime() <= EndTime_)); - const bool allowedEndTime = (EndTime_ == 0) || ((StartTime_ <= frame.EndTime()) && (frame.EndTime() <= EndTime_)); - return allowedStartTime || allowedEndTime; -} - -bool TFrameStreamer::DoNext() { - if (!Frame_) { - return false; - } - In_.Skip(Frame_->Limiter_->Left()); - Frame_ = FindNextFrame(&In_, EventFactory_); - - if (Frame_ && CutoffTime_ > 0 && Frame_->EndTime() > CutoffTime_) { - Frame_.Clear(); - } - - return Frame_.Defined(); -} - -namespace { - struct TDecodeBuffer { - TDecodeBuffer(const TString codec, IInputStream& src, size_t bs) { - TBuffer from(bs); - - { - TBufferOutput b(from); - TransferData(&src, &b); - } - - NBlockCodecs::Codec(codec)->Decode(from, DecodeBuffer); - } - - explicit TDecodeBuffer(IInputStream& src) { - TBufferOutput b(DecodeBuffer); - TransferData(&src, &b); - } - - TBuffer DecodeBuffer; - }; - - class TBlockCodecStream: private TDecodeBuffer, public TBufferInput { - public: - TBlockCodecStream(const TString codec, IInputStream& src, size_t bs) - : TDecodeBuffer(codec, src, bs) - , TBufferInput(DecodeBuffer) - {} - - explicit TBlockCodecStream(IInputStream& src) - : TDecodeBuffer(src) - , TBufferInput(DecodeBuffer) - {} - }; -} - -TFrameDecoder::TFrameDecoder(const TFrame& fr, const TEventFilter* const filter, bool strict, bool withRawData) - : Frame_(fr) - , Event_(nullptr) - , Flt_(filter) - , Fac_(fr.Fac_) - , EndOfFrame_(new TEndOfFrameEvent(Frame_.EndTime())) - , Strict_(strict) - , WithRawData_(withRawData) -{ - switch (fr.LogFormat()) { - case COMPRESSED_LOG_FORMAT_V2: - case COMPRESSED_LOG_FORMAT_V3: - case COMPRESSED_LOG_FORMAT_V4: - case COMPRESSED_LOG_FORMAT_V5: { - const auto payload = fr.GetCompressedFrame(); - TMemoryInput payloadInput{payload}; - - if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V5) { - Decompressor_.Reset(new TBlockCodecStream("zstd_1", payloadInput, payload.size())); - } else { - TZLibDecompress zlib(&payloadInput); - Decompressor_.Reset(new TBlockCodecStream(zlib)); - if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V4) { - Decompressor_.Reset(new TBlockCodecStream("lz4hc", *Decompressor_, payload.size())); - } - } - - break; - } - - default: - ythrow yexception() << "unsupported log format: " << fr.LogFormat() << Endl; - break; - }; - - if (WithRawData_) { - TBufferOutput out(UncompressedData_); - TLengthLimitedInput limiter(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen); - - TransferData(&limiter, &out); - Decompressor_.Reset(new TMemoryInput(UncompressedData_.data(), UncompressedData_.size())); - } - - Limiter_.Reset(new TLengthLimitedInput(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen)); - - Decode(); -} - -TFrameDecoder::~TFrameDecoder() = default; - -bool TFrameDecoder::Avail() const { - return HaveData(); -} - -TConstEventPtr TFrameDecoder::operator*() const { - Y_ENSURE(HaveData(), "Decoder depleted"); - - return Event_; -} - -bool TFrameDecoder::Next() { - if (HaveData()) { - Decode(); - } - - return HaveData(); -} - -void TFrameDecoder::Decode() { - Event_ = nullptr; - const bool framed = (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V3) || (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V4 || Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V5); - - size_t evBegin = 0; - size_t evEnd = 0; - if (WithRawData_) - evBegin = UncompressedData_.Size() - Limiter_->Left(); - - while (Limiter_->Left() && !(Event_ = DecodeEvent(*Limiter_, framed, Frame_.Address(), Flt_, Fac_, Strict_).Release())) { - } - - if (WithRawData_) { - evEnd = UncompressedData_.Size() - Limiter_->Left(); - RawEventData_ = TStringBuf(UncompressedData_.data() + evBegin, UncompressedData_.data() + evEnd); - } - - if (!Event_ && (!Flt_ || (Flt_->EventAllowed(TEndOfFrameEvent::EventClass)))) { - Event_ = EndOfFrame_.Release(); - } - - if (!!Event_) { - Event_->FrameId = Frame_.FrameId(); - } -} - -const TStringBuf TFrameDecoder::GetRawEvent() const { - return RawEventData_; -} - -TEventStreamer::TEventStreamer(TFrameStream& fs, ui64 s, ui64 e, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering) - : Frames_(fs) - , Start_(s) - , End_(e) - , MaxEndTimestamp_(0) - , Frontier_(0) - , StrongOrdering_(strongOrdering) - , LosslessStrongOrdering_(losslessStrongOrdering) - , EventFilter_(filter) -{ - - if (Start_ > End_) { - ythrow yexception() << "Wrong main interval"; - } - - TEventStreamer::Next(); -} - -TEventStreamer::~TEventStreamer() = default; - -bool TEventStreamer::Avail() const { - return Events_.Avail() && (*Events_)->Timestamp <= Frontier_; -} - -TConstEventPtr TEventStreamer::operator*() const { - Y_ENSURE(TEventStreamer::Avail(), "Event streamer depleted"); - - return *Events_; -} - -bool TEventStreamer::Next() { - if (Events_.Avail() && Events_.Next() && (*Events_)->Timestamp <= Frontier_) { - return true; - } - - for (;;) { - if (!LoadMoreEvents()) { - return false; - } - - if (TEventStreamer::Avail()) { - return true; - } - } -} - -/* -Two parameters are used in the function: -Frontier - the moment of time up to which inclusively all the log events made their way - into the buffer (and might have been already extracted out of it). -Horizon - the moment of time, that equals to Frontier + MAX_REQUEST_DURATION. -In order to get all the log events up to the Frontier inclusively, - frames need to be read until "end time" of the current frame exceeds the Horizon. -*/ -bool TEventStreamer::LoadMoreEvents() { - if (!Frames_.Avail()) { - return false; - } - - const TFrame& fr1 = *Frames_; - const ui64 maxRequestDuration = (StrongOrdering_ ? MAX_REQUEST_DURATION : 0); - - if (fr1.EndTime() <= Frontier_ + maxRequestDuration) { - ythrow yexception() << "Wrong frame stream state"; - } - - if (Frontier_ >= End_) { - return false; - } - - const ui64 old_frontier = Frontier_; - Frontier_ = fr1.EndTime(); - - { - Y_DEFER { - Events_.Reorder(StrongOrdering_); - }; - - for (; Frames_.Avail(); Frames_.Next()) { - const TFrame& fr2 = *Frames_; - - // Frames need to start later than the Frontier. - if (StrongOrdering_ && fr2.StartTime() <= old_frontier) { - Cdbg << "Invalid frame encountered" << Endl; - continue; - } - - if (fr2.EndTime() > MaxEndTimestamp_) { - MaxEndTimestamp_ = fr2.EndTime(); - } - - if (fr2.EndTime() > Frontier_ + maxRequestDuration && !LosslessStrongOrdering_) { - return true; - } - - // Checking for the frame to be within the main time borders. - if (fr2.EndTime() >= Start_ && fr2.StartTime() <= End_) { - TransferEvents(fr2); - } - } - } - - Frontier_ = MaxEndTimestamp_; - - return true; -} - -void TEventStreamer::TransferEvents(const TFrame& fr) { - Events_.SetCheckpoint(); - - try { - for (auto it = fr.GetIterator(EventFilter_); it.Avail(); it.Next()) { - TConstEventPtr ev = *it; - - if (ev->Timestamp > fr.EndTime() || ev->Timestamp < fr.StartTime()) { - ythrow TInvalidEventTimestamps() << "Event timestamp out of frame range"; - } - - if (ev->Timestamp >= Start_ && ev->Timestamp <= End_) { - Events_.Append(ev, StrongOrdering_); - } - } - } catch (const TInvalidEventTimestamps& err) { - Events_.Rollback(); - Cdbg << "EventsTransfer error: InvalidEventTimestamps: " << err.what() << Endl; - } catch (const TFrameLoadError& err) { - Events_.Rollback(); - Cdbg << "EventsTransfer error: " << err.what() << Endl; - } catch (const TEventDecoderError& err) { - Events_.Rollback(); - Cdbg << "EventsTransfer error: EventDecoder error: " << err.what() << Endl; - } catch (const TZLibDecompressorError& err) { - Events_.Rollback(); - Cdbg << "EventsTransfer error: ZLibDecompressor error: " << err.what() << Endl; - } catch (...) { - Events_.Rollback(); - throw; - } -} - -void TEventStreamer::TEventBuffer::SetCheckpoint() { - BufLen_ = Buffer_.size(); -} - -void TEventStreamer::TEventBuffer::Rollback() { - Buffer_.resize(BufLen_); -} - -void TEventStreamer::TEventBuffer::Reorder(bool strongOrdering) { - SetCheckpoint(); - - std::reverse(Buffer_.begin(), Buffer_.end()); - - if (strongOrdering) { - StableSort(Buffer_.begin(), Buffer_.end(), [&](const auto& a, const auto& b) { - return (a->Timestamp > b->Timestamp) || - ((a->Timestamp == b->Timestamp) && !a->Class && b->Class); - }); - } -} - -void TEventStreamer::TEventBuffer::Append(TConstEventPtr ev, bool strongOrdering) { - // Events in buffer output must be in an ascending order. - Y_ENSURE(!strongOrdering || ev->Timestamp >= LastTimestamp_, "Trying to append out-of-order event"); - - Buffer_.push_back(std::move(ev)); -} - -bool TEventStreamer::TEventBuffer::Avail() const { - return !Buffer_.empty(); -} - -TConstEventPtr TEventStreamer::TEventBuffer::operator*() const { - Y_ENSURE(!Buffer_.empty(), "Event buffer is empty"); - - return Buffer_.back(); -} - -bool TEventStreamer::TEventBuffer::Next() { - if (!Buffer_.empty()) { - LastTimestamp_ = Buffer_.back()->Timestamp; - Buffer_.pop_back(); - return !Buffer_.empty(); - } else { - return false; - } -} diff --git a/library/cpp/eventlog/logparser.h b/library/cpp/eventlog/logparser.h deleted file mode 100644 index f819e725894..00000000000 --- a/library/cpp/eventlog/logparser.h +++ /dev/null @@ -1,343 +0,0 @@ -#pragma once - -#include <util/generic/ptr.h> -#include <util/generic/yexception.h> -#include <util/generic/vector.h> -#include <util/generic/set.h> -#include <util/generic/maybe.h> -#include <util/memory/blob.h> -#include <util/stream/length.h> -#include <util/stream/mem.h> - -#include "eventlog_int.h" -#include "eventlog.h" -#include "common.h" - -class IInputStream; - -static const ui64 MAX_REQUEST_DURATION = 60'000'000; -static const ui64 MIN_START_TIME = MAX_REQUEST_DURATION; -static const ui64 MAX_END_TIME = ((ui64)-1) - MAX_REQUEST_DURATION; - -class TEventFilter: public TSet<TEventClass>, public TSimpleRefCount<TEventFilter> { -public: - TEventFilter(bool enableEvents) - : Enable_(enableEvents) - { - } - - void AddEventClass(TEventClass cls) { - insert(cls); - } - - bool EventAllowed(TEventClass cls) const { - bool found = (find(cls) != end()); - - return Enable_ == found; - } - -private: - bool Enable_; -}; - -using TEventStream = TPacketInputStream<TConstEventPtr>; - -struct TFrameHeader { - // Reads header from the stream. The caller must make sure that the - // sync data is present just befor the stream position. - explicit TFrameHeader(IInputStream& in); - - ui64 StartTime() const { - return Framehdr.StartTimestamp; - } - - ui64 EndTime() const { - return Framehdr.EndTimestamp; - } - - ui32 FrameId() const { - return Basehdr.FrameId; - } - - ui64 Duration() const { - return EndTime() - StartTime(); - } - - TEventLogFormat ContentFormat() const { - return Basehdr.Format & 0xffffff; - } - - TEventLogFormat LogFormat() const { - return Basehdr.Format >> 24; - } - - ui64 FrameLength() const { - return Basehdr.Length - sizeof(TCompressedFrameHeader2); - } - - // Length including the header - ui64 FullLength() const { - return sizeof(*this) + FrameLength(); - } - - TCompressedFrameBaseHeader Basehdr; - TCompressedFrameHeader2 Framehdr; -}; - -struct TFrameLoadError: public yexception { - explicit TFrameLoadError(size_t skipAfter) - : SkipAfter(skipAfter) - {} - - size_t SkipAfter; -}; - -class TFrame : public TFrameHeader { -public: - // Reads the frame after the header has been read. - TFrame(IInputStream& in, TFrameHeader header, IEventFactory*); - - TString GetRawFrame() const; - TString GetCompressedFrame() const; - - ui64 Address() const { return Address_; } - -private: - const TConstEventPtr& GetEvent(size_t index) const { - return EventsCache_[index]; - } - - void ClearEventsCache() const; - - THolder<TLengthLimitedInput> Limiter_; - mutable TVector<TConstEventPtr> EventsCache_; - - IEventFactory* Fac_; - ui64 Address_; - - friend class TFrameDecoder; - friend class TFrameStreamer; - -private: - class TIterator: TEventStream { - public: - TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter); - ~TIterator() override = default; - - bool Avail() const override { - return Index_ < Size_; - } - - TConstEventPtr operator*() const override; - bool Next() override; - - private: - void SkipToValidEvent(); - - const TFrame& Frame_; - size_t Size_; - TIntrusiveConstPtr<TEventFilter> Filter_; - size_t Index_; - }; - -public: - TFrame::TIterator GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter = nullptr) const; -}; - -// If `in` is derived from TCountingInput, Frame's address will -// be set accorting to the in->Counter(). Otherwise it will be zeroO -TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory*); - -using TFrameStream = TPacketInputStream<const TFrame&>; - -class IFrameFilter: public TSimpleRefCount<IFrameFilter> { -public: - IFrameFilter() { - } - - virtual ~IFrameFilter() = default; - - virtual bool FrameAllowed(const TFrame& frame) const = 0; -}; - -using IFrameFilterRef = TIntrusivePtr<IFrameFilter>; - -class TDurationFrameFilter: public IFrameFilter { -public: - TDurationFrameFilter(ui64 minFrameDuration, ui64 maxFrameDuration = Max<ui64>()) - : MinDuration_(minFrameDuration) - , MaxDuration_(maxFrameDuration) - { - } - - bool FrameAllowed(const TFrame& frame) const override { - return frame.Duration() >= MinDuration_ && frame.Duration() <= MaxDuration_; - } - -private: - const ui64 MinDuration_; - const ui64 MaxDuration_; -}; - -class TFrameIdFrameFilter: public IFrameFilter { -public: - TFrameIdFrameFilter(ui32 frameId) - : FrameId_(frameId) - { - } - - bool FrameAllowed(const TFrame& frame) const override { - return frame.FrameId() == FrameId_; - } - -private: - const ui32 FrameId_; -}; - -class TContainsEventFrameFilter: public IFrameFilter { -public: - TContainsEventFrameFilter(const TString& args, const IEventFactory* fac); - - bool FrameAllowed(const TFrame& frame) const override; - -private: - struct TMatchGroup { - TEventClass EventID; - TString FieldName; - TString ValueToMatch; - }; - - TVector<TMatchGroup> MatchGroups; -}; - -void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet); - -TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet); - -TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection); - -class TFrameStreamer: public TFrameStream { -public: - TFrameStreamer(IInputStream&, IEventFactory* fac, IFrameFilterRef ff = nullptr); - TFrameStreamer( - const TString& fileName, - ui64 startTime, - ui64 endTime, - ui64 maxRequestDuration, - IEventFactory* fac, - IFrameFilterRef ff = nullptr); - ~TFrameStreamer() override; - - bool Avail() const override; - const TFrame& operator*() const override; - bool Next() override; - -private: - bool DoNext(); - bool AllowedTimeRange(const TFrame& frame) const; - - bool AllowedFrame(const TFrame& frame) const { - return AllowedTimeRange(frame) && (!FrameFilter_ || FrameFilter_->FrameAllowed(frame)); - } - - void SkipToAllowedFrame() { - if (Frame_) { - while (!AllowedFrame(*Frame_) && DoNext()) { - //do nothing - } - } - } - - TBlob File_; - TMemoryInput MemoryIn_; - TCountingInput In_; - THolder<IInputStream> Stream_; - ui64 StartTime_ = 0; - ui64 EndTime_ = 0; - ui64 CutoffTime_ = 0; - TMaybe<TFrame> Frame_; - IFrameFilterRef FrameFilter_; - IEventFactory* EventFactory_; -}; - -class TFrameDecoder: TEventStream { -public: - TFrameDecoder(const TFrame&, const TEventFilter* const filter, bool strict = false, bool withRawData = false); - ~TFrameDecoder() override; - - bool Avail() const override; - - TConstEventPtr operator*() const override; - bool Next() override; - - const TStringBuf GetRawEvent() const; - -private: - TFrameDecoder(const TFrameDecoder&); - void operator=(const TFrameDecoder&); - - inline bool HaveData() const { - return Event_ != nullptr; - } - - void Decode(); - -private: - const TFrame& Frame_; - THolder<IInputStream> Decompressor_; - THolder<TLengthLimitedInput> Limiter_; - TEventPtr Event_; - const TEventFilter* const Flt_; - IEventFactory* Fac_; - THolder<TEvent> EndOfFrame_; - bool Strict_; - TBuffer UncompressedData_; - TStringBuf RawEventData_; - bool WithRawData_; -}; - -class TEventStreamer: public TEventStream { -public: - TEventStreamer(TFrameStream&, ui64 start, ui64 end, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering = false); - ~TEventStreamer() override; - - bool Avail() const override; - TConstEventPtr operator*() const override; - bool Next() override; - -private: - class TEventBuffer: public TEventStream { - public: - void SetCheckpoint(); - void Rollback(); - void Reorder(bool strongOrdering); - void Append(TConstEventPtr event, bool strongOrdering); - - bool Avail() const override; - TConstEventPtr operator*() const override; - bool Next() override; - - private: - TVector<TConstEventPtr> Buffer_; - size_t BufLen_ = 0; - ui64 LastTimestamp_ = 0; - }; - -private: - struct TInvalidEventTimestamps: public yexception { - }; - - bool LoadMoreEvents(); - void TransferEvents(const TFrame&); - -private: - TFrameStream& Frames_; - TEventBuffer Events_; - - ui64 Start_, End_; - ui64 MaxEndTimestamp_; - ui64 Frontier_; - bool StrongOrdering_; - bool LosslessStrongOrdering_; - TIntrusivePtr<TEventFilter> EventFilter_; -}; diff --git a/library/cpp/eventlog/proto/events_extension.proto b/library/cpp/eventlog/proto/events_extension.proto deleted file mode 100644 index 37a7d7e5614..00000000000 --- a/library/cpp/eventlog/proto/events_extension.proto +++ /dev/null @@ -1,22 +0,0 @@ -import "google/protobuf/descriptor.proto"; - -option go_package = "a.yandex-team.ru/library/cpp/eventlog/proto;extensions"; -option java_package = "NEventLogEventsExtension"; - -extend google.protobuf.MessageOptions { - optional uint32 message_id = 50001; - optional string realm_name = 50002; -} - -message Repr { - enum ReprType { - none = 0; - as_bytes = 1; // Only for primitive types - as_hex = 2; // Only for primitive types - as_base64 = 3; // Only for 'string' and 'bytes' fields - }; -} - -extend google.protobuf.FieldOptions { - optional Repr.ReprType repr = 55003 [default = none]; -} diff --git a/library/cpp/eventlog/proto/internal.proto b/library/cpp/eventlog/proto/internal.proto deleted file mode 100644 index 234230e0949..00000000000 --- a/library/cpp/eventlog/proto/internal.proto +++ /dev/null @@ -1,9 +0,0 @@ -option go_package = "a.yandex-team.ru/library/cpp/eventlog/proto;extensions"; - -package NEventLogInternal; - -message TUnknownEvent { -}; - -message TEndOfFrameEvent { -}; diff --git a/library/cpp/eventlog/threaded_eventlog.cpp b/library/cpp/eventlog/threaded_eventlog.cpp deleted file mode 100644 index 67839063fbd..00000000000 --- a/library/cpp/eventlog/threaded_eventlog.cpp +++ /dev/null @@ -1 +0,0 @@ -#include "threaded_eventlog.h" diff --git a/library/cpp/eventlog/threaded_eventlog.h b/library/cpp/eventlog/threaded_eventlog.h deleted file mode 100644 index 52382b856d1..00000000000 --- a/library/cpp/eventlog/threaded_eventlog.h +++ /dev/null @@ -1,154 +0,0 @@ -#pragma once - -#include "eventlog.h" - -#include <util/generic/string.h> -#include <util/thread/pool.h> - -class TThreadedEventLog: public TEventLogWithSlave { -public: - class TWrapper; - using TOverflowCallback = std::function<void(TWrapper& wrapper)>; - - enum class EDegradationResult { - ShouldWrite, - ShouldDrop, - }; - using TDegradationCallback = std::function<EDegradationResult(float fillFactor)>; - -public: - TThreadedEventLog( - IEventLog& parentLog, - size_t threadCount, - size_t queueSize, - TOverflowCallback cb, - TDegradationCallback degradationCallback = {}) - : TEventLogWithSlave(parentLog) - , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog")) - , ThreadCount(threadCount) - , QueueSize(queueSize) - , OverflowCallback(std::move(cb)) - , DegradationCallback(std::move(degradationCallback)) - { - Init(); - } - - TThreadedEventLog( - const TEventLogPtr& parentLog, - size_t threadCount, - size_t queueSize, - TOverflowCallback cb, - TDegradationCallback degradationCallback = {}) - : TEventLogWithSlave(parentLog) - , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog")) - , ThreadCount(threadCount) - , QueueSize(queueSize) - , OverflowCallback(std::move(cb)) - , DegradationCallback(std::move(degradationCallback)) - { - Init(); - } - - TThreadedEventLog(IEventLog& parentLog) - : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback()) - { - } - - TThreadedEventLog(const TEventLogPtr& parentLog) - : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback()) - { - } - - ~TThreadedEventLog() override { - try { - LogSaver.Stop(); - } catch (...) { - } - } - - void ReopenLog() override { - TEventLogWithSlave::ReopenLog(); - } - - void CloseLog() override { - LogSaver.Stop(); - TEventLogWithSlave::CloseLog(); - } - - void WriteFrame(TBuffer& buffer, - TEventTimestamp startTimestamp, - TEventTimestamp endTimestamp, - TWriteFrameCallbackPtr writeFrameCallback = nullptr, - TLogRecord::TMetaFlags metaFlags = {}) override { - float fillFactor = 0.0f; - if (Y_LIKELY(LogSaver.GetMaxQueueSize() > 0)) { - fillFactor = static_cast<float>(LogSaver.Size()) / LogSaver.GetMaxQueueSize(); - } - - EDegradationResult status = EDegradationResult::ShouldWrite; - if (DegradationCallback) { - status = DegradationCallback(fillFactor); - } - if (Y_UNLIKELY(status == EDegradationResult::ShouldDrop)) { - return; - } - - THolder<TWrapper> wrapped; - wrapped.Reset(new TWrapper(buffer, startTimestamp, endTimestamp, Slave(), writeFrameCallback, std::move(metaFlags))); - - if (LogSaver.Add(wrapped.Get())) { - Y_UNUSED(wrapped.Release()); - } else if (OverflowCallback) { - OverflowCallback(*wrapped); - } - } - -private: - void Init() { - LogSaver.Start(ThreadCount, QueueSize); - } - -public: - class TWrapper: public IObjectInQueue { - public: - TWrapper(TBuffer& buffer, - TEventTimestamp startTimestamp, - TEventTimestamp endTimestamp, - IEventLog& slave, - TWriteFrameCallbackPtr writeFrameCallback = nullptr, - TLogRecord::TMetaFlags metaFlags = {}) - : StartTimestamp(startTimestamp) - , EndTimestamp(endTimestamp) - , Slave(&slave) - , WriteFrameCallback(writeFrameCallback) - , MetaFlags(std::move(metaFlags)) - { - Buffer.Swap(buffer); - } - - void Process(void*) override { - THolder<TWrapper> holder(this); - - WriteFrame(); - } - - void WriteFrame() { - Slave->WriteFrame(Buffer, StartTimestamp, EndTimestamp, WriteFrameCallback, std::move(MetaFlags)); - } - - private: - TBuffer Buffer; - TEventTimestamp StartTimestamp; - TEventTimestamp EndTimestamp; - IEventLog* Slave; - TWriteFrameCallbackPtr WriteFrameCallback; - TLogRecord::TMetaFlags MetaFlags; - }; - -private: - TThreadPool LogSaver; - const size_t ThreadCount; - const size_t QueueSize; - const TOverflowCallback OverflowCallback; - const TDegradationCallback DegradationCallback; -}; |