diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 13:26:22 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 15:44:45 +0300 |
commit | 0a98fece5a9b54f16afeb3a94b3eb3105e9c3962 (patch) | |
tree | 291d72dbd7e9865399f668c84d11ed86fb190bbf /library/cpp/eventlog/evdecoder.cpp | |
parent | cb2c8d75065e5b3c47094067cb4aa407d4813298 (diff) | |
download | ydb-0a98fece5a9b54f16afeb3a94b3eb3105e9c3962.tar.gz |
YQ Connector:Use docker-compose in integrational tests
Diffstat (limited to 'library/cpp/eventlog/evdecoder.cpp')
-rw-r--r-- | library/cpp/eventlog/evdecoder.cpp | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/library/cpp/eventlog/evdecoder.cpp b/library/cpp/eventlog/evdecoder.cpp new file mode 100644 index 0000000000..e4413a1b0e --- /dev/null +++ b/library/cpp/eventlog/evdecoder.cpp @@ -0,0 +1,112 @@ +#include <util/memory/tempbuf.h> +#include <util/string/cast.h> +#include <util/stream/output.h> + +#include "evdecoder.h" +#include "logparser.h" + +static const char* const UNKNOWN_EVENT_CLASS = "Unknown event class"; + +static inline void LogError(ui64 frameAddr, const char* msg, bool strict) { + if (!strict) { + Cerr << "EventDecoder warning @" << frameAddr << ": " << msg << Endl; + } else { + ythrow yexception() << "EventDecoder error @" << frameAddr << ": " << msg; + } +} + +static inline bool SkipData(IInputStream& s, size_t amount) { + return (amount == s.Skip(amount)); +} + +// There are 2 log fomats: the one, that allows event skip without event decode (it has stored event length) +// and another, that requires each event decode just to seek over stream. needRead == true means the latter format. +static inline THolder<TEvent> DoDecodeEvent(IInputStream& s, const TEventFilter* const filter, const bool needRead, IEventFactory* fac) { + TEventTimestamp ts; + TEventClass c; + THolder<TEvent> e; + + ::Load(&s, ts); + ::Load(&s, c); + + bool needReturn = false; + + if (!filter || filter->EventAllowed(c)) { + needReturn = true; + } + + if (needRead || needReturn) { + e.Reset(fac->CreateLogEvent(c)); + + if (!!e) { + e->Timestamp = ts; + e->Load(s); + } else if (needReturn) { + e.Reset(new TUnknownEvent(ts, c)); + } + + if (!needReturn) { + e.Reset(nullptr); + } + } + + return e; +} + +THolder<TEvent> DecodeFramed(IInputStream& inp, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) { + ui32 len; + ::Load(&inp, len); + + if (len < sizeof(ui32)) { + ythrow TEventDecoderError() << "invalid event length"; + } + + TLengthLimitedInput s(&inp, len - sizeof(ui32)); + + try { + THolder<TEvent> e = DoDecodeEvent(s, filter, false, fac); + if (!!e) { + if (!s.Left()) { + return e; + } else if (e->Class == 0) { + if (!SkipData(s, s.Left())) { + ythrow TEventDecoderError() << "cannot skip bad event"; + } + + return e; + } + + LogError(frameAddr, "Event is not fully read", strict); + } + } catch (const TLoadEOF&) { + if (s.Left()) { + throw; + } + + LogError(frameAddr, "Unexpected event end", strict); + } + + if (!SkipData(s, s.Left())) { + ythrow TEventDecoderError() << "cannot skip bad event"; + } + + return nullptr; +} + +THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) { + try { + if (framed) { + return DecodeFramed(s, frameAddr, filter, fac, strict); + } else { + THolder<TEvent> e = DoDecodeEvent(s, filter, true, fac); + // e(0) means event, skipped by filter. Not an error. + if (!!e && !e->Class) { + ythrow TEventDecoderError() << UNKNOWN_EVENT_CLASS; + } + + return e; + } + } catch (const TLoadEOF&) { + ythrow TEventDecoderError() << "unexpected frame end"; + } +} |