diff options
author | qrort <qrort@yandex-team.com> | 2022-12-02 11:31:25 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2022-12-02 11:31:25 +0300 |
commit | b1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806 (patch) | |
tree | 2a23209faf0fea5586a6d4b9cee60d1b318d29fe /library/cpp/eventlog/logparser.cpp | |
parent | 559174a9144de40d6bb3997ea4073c82289b4974 (diff) | |
download | ydb-b1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806.tar.gz |
remove kikimr/driver DEPENDS
Diffstat (limited to 'library/cpp/eventlog/logparser.cpp')
-rw-r--r-- | library/cpp/eventlog/logparser.cpp | 814 |
1 files changed, 0 insertions, 814 deletions
diff --git a/library/cpp/eventlog/logparser.cpp b/library/cpp/eventlog/logparser.cpp deleted file mode 100644 index 6f8959f7888..00000000000 --- a/library/cpp/eventlog/logparser.cpp +++ /dev/null @@ -1,814 +0,0 @@ -#include "logparser.h" -#include "evdecoder.h" - -#include <util/stream/output.h> -#include <util/stream/zlib.h> -#include <util/digest/murmur.h> -#include <util/generic/algorithm.h> -#include <util/generic/scope.h> -#include <util/generic/hash_set.h> -#include <util/string/split.h> -#include <util/string/cast.h> -#include <util/string/escape.h> -#include <util/string/builder.h> - -#include <contrib/libs/re2/re2/re2.h> - -#include <algorithm> -#include <array> - -namespace { - bool FastforwardUntilSyncHeader(IInputStream* in) { - // Usually this function finds the correct header at the first hit - std::array<char, COMPRESSED_LOG_FRAME_SYNC_DATA.size()> buffer; - if (in->Load(buffer.data(), buffer.size()) != buffer.size()) { - return false; - } - - auto begin = buffer.begin(); - - for (;;) { - if (std::mismatch( - begin, buffer.end(), - COMPRESSED_LOG_FRAME_SYNC_DATA.begin()).first == buffer.end() && - std::mismatch( - buffer.begin(), begin, - COMPRESSED_LOG_FRAME_SYNC_DATA.begin() + (buffer.end() - begin)).first == begin) { - return true; - } - if (!in->ReadChar(*begin)) { - return false; - } - ++begin; - if (begin == buffer.end()) { - begin = buffer.begin(); - } - } - } - - bool HasCorrectChecksum(const TFrameHeader& header) { - // Calculating hash over all the fields of the read header except for the field with the hash of the header itself. - const size_t baseSize = sizeof(TCompressedFrameBaseHeader) + sizeof(TCompressedFrameHeader2) - sizeof(ui32); - const ui32 checksum = MurmurHash<ui32>(&header.Basehdr, baseSize); - return checksum == header.Framehdr.HeaderChecksum; - } - - TMaybe<TFrameHeader> FindNextFrameHeader(IInputStream* in) { - for (;;) { - if (FastforwardUntilSyncHeader(in)) { - try { - return TFrameHeader(*in); - } catch (const TFrameLoadError& err) { - Cdbg << err.what() << Endl; - in->Skip(err.SkipAfter); - } - } else { - return Nothing(); - } - } - } - - std::pair<TMaybe<TFrameHeader>, TStringBuf> FindNextFrameHeader(TStringBuf span) { - for (;;) { - auto iter = std::search( - span.begin(), span.end(), - COMPRESSED_LOG_FRAME_SYNC_DATA.begin(), COMPRESSED_LOG_FRAME_SYNC_DATA.end()); - const size_t offset = iter - span.begin(); - - if (offset != span.size()) { - span = span.substr(offset); - try { - TMemoryInput in( - span.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(), - span.size() - COMPRESSED_LOG_FRAME_SYNC_DATA.size()); - return {TFrameHeader(in), span}; - } catch (const TFrameLoadError& err) { - Cdbg << err.what() << Endl; - span = span.substr(err.SkipAfter); - } - } else { - return {Nothing(), {}}; - } - } - } - - size_t FindFrames(const TStringBuf span, ui64 start, ui64 end, ui64 maxRequestDuration) { - Y_ENSURE(start <= end); - - const auto leftTimeBound = start - Min(start, maxRequestDuration); - const auto rightTimeBound = end + Min(maxRequestDuration, Max<ui64>() - end); - - TStringBuf subspan = span; - TMaybe<TFrameHeader> maybeLeftFrame; - std::tie(maybeLeftFrame, subspan) = FindNextFrameHeader(subspan); - - if (!maybeLeftFrame || maybeLeftFrame->EndTime() > rightTimeBound) { - return span.size(); - } - - if (maybeLeftFrame->StartTime() > leftTimeBound) { - return 0; - } - - while (subspan.size() > maybeLeftFrame->FullLength()) { - const auto mid = subspan.data() + subspan.size() / 2; - auto [midFrame, rightHalfSpan] = FindNextFrameHeader({mid, subspan.data() + subspan.size()}); - if (!midFrame) { - // If mid is in the middle of the last frame, here we will lose it meaning that - // we will find previous frame as the result. - // This is fine because we will iterate frames starting from that. - subspan = subspan.substr(0, subspan.size() / 2); - continue; - } - if (midFrame->StartTime() <= leftTimeBound) { - maybeLeftFrame = midFrame; - subspan = rightHalfSpan; - } else { - subspan = subspan.substr(0, subspan.size() / 2); - } - } - - return subspan.data() - span.data(); - } -} - -TFrameHeader::TFrameHeader(IInputStream& in) { - try { - ::Load(&in, Basehdr); - - Y_ENSURE(Basehdr.Length, "Empty frame additional data"); - - ::Load(&in, Framehdr); - switch (LogFormat()) { - case COMPRESSED_LOG_FORMAT_V1: - break; - - case COMPRESSED_LOG_FORMAT_V2: - case COMPRESSED_LOG_FORMAT_V3: - case COMPRESSED_LOG_FORMAT_V4: - case COMPRESSED_LOG_FORMAT_V5: - Y_ENSURE(!Framehdr.CompressorVersion, "Wrong compressor"); - - Y_ENSURE(HasCorrectChecksum(*this), "Wrong header checksum"); - break; - - default: - ythrow yexception() << "Unsupported log structure format"; - }; - - Y_ENSURE(Framehdr.StartTimestamp <= Framehdr.EndTimestamp, "Wrong start/end timestamps"); - - // Each frame must contain at least one event. - Y_ENSURE(Framehdr.UncompressedDatalen, "Empty frame payload"); - } catch (...) { - TString location = ""; - if (const auto* cnt = dynamic_cast<TCountingInput *>(&in)) { - location = "@ " + ToString(cnt->Counter()); - } - ythrow TFrameLoadError(FrameLength()) << "Frame Load Error" << location << ": " << CurrentExceptionMessage(); - } -} - -TFrame::TFrame(IInputStream& in, TFrameHeader header, IEventFactory* fac) - : TFrameHeader(header) - , Limiter_(MakeHolder<TLengthLimitedInput>(&in, header.FrameLength())) - , Fac_(fac) -{ - if (auto* cnt = dynamic_cast<TCountingInput *>(&in)) { - Address_ = cnt->Counter() - sizeof(TFrameHeader); - } else { - Address_ = 0; - } -} - -TFrame::TIterator TFrame::GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter) const { - if (EventsCache_.empty()) { - for (TFrameDecoder decoder{*this, eventFilter.Get()}; decoder.Avail(); decoder.Next()) { - EventsCache_.emplace_back(*decoder); - } - } - - return TIterator(*this, eventFilter); -} - -void TFrame::ClearEventsCache() const { - EventsCache_.clear(); -} - -TString TFrame::GetCompressedFrame() const { - const auto left = Limiter_->Left(); - TString payload = Limiter_->ReadAll(); - Y_ENSURE(payload.size() == left, "Could not read frame payload: premature end of stream"); - const ui32 checksum = MurmurHash<ui32>(payload.data(), payload.size()); - Y_ENSURE(checksum == Framehdr.PayloadChecksum, "Invalid frame checksum"); - - return payload; -} - -TString TFrame::GetRawFrame() const { - TString frameBuf = GetCompressedFrame(); - TStringInput sin(frameBuf); - return TZLibDecompress{&sin}.ReadAll(); -} - -TFrame::TIterator::TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter) - : Frame_(frame) - , Size_(frame.EventsCache_.size()) - , Filter_(filter) - , Index_(0) -{ - SkipToValidEvent(); -} - -TConstEventPtr TFrame::TIterator::operator*() const { - return Frame_.GetEvent(Index_); -} - -bool TFrame::TIterator::Next() { - Index_++; - SkipToValidEvent(); - return Index_ < Size_; -} - -void TFrame::TIterator::SkipToValidEvent() { - if (!Filter_) { - return; - } - - for (; Index_ < Size_; ++Index_) { - if (Filter_->EventAllowed(Frame_.GetEvent(Index_)->Class)) { - break; - } - } -} - -TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory* eventFactory) { - if (auto header = FindNextFrameHeader(in)) { - return TFrame{*in, *header, eventFactory}; - } else { - return Nothing(); - } -} - -TContainsEventFrameFilter::TContainsEventFrameFilter(const TString& unparsedMatchGroups, const IEventFactory* eventFactory) { - TVector<TStringBuf> tokens; - - SplitWithEscaping(tokens, unparsedMatchGroups, "/"); - - // Amount of match groups - size_t size = tokens.size(); - MatchGroups.resize(size); - - for (size_t i = 0; i < size; i++) { - TMatchGroup& group = MatchGroups[i]; - TVector<TStringBuf> groupTokens; - SplitWithEscaping(groupTokens, tokens[i], ":"); - - Y_ENSURE(groupTokens.size() == 3); - - try { - group.EventID = eventFactory->ClassByName(groupTokens[0]); - } catch (yexception& e) { - if (!TryFromString<TEventClass>(groupTokens[0], group.EventID)) { - e << "\nAppend:\n" << "Cannot derive EventId from EventType: " << groupTokens[0]; - throw e; - } - } - - group.FieldName = groupTokens[1]; - group.ValueToMatch = UnescapeCharacters(groupTokens[2], "/:"); - } -} - -bool TContainsEventFrameFilter::FrameAllowed(const TFrame& frame) const { - THashSet<size_t> toMatchSet; - for (size_t i = 0; i < MatchGroups.size(); i++) { - toMatchSet.insert(i); - } - - for (auto it = frame.GetIterator(); it.Avail(); it.Next()) { - TConstEventPtr event(*it); - TVector<size_t> indicesToErase; - - if (!toMatchSet.empty()) { - const NProtoBuf::Message* message = event->GetProto(); - const google::protobuf::Descriptor* descriptor = message->GetDescriptor(); - const google::protobuf::Reflection* reflection = message->GetReflection(); - - Y_ENSURE(descriptor); - Y_ENSURE(reflection); - - for (size_t groupIndex : toMatchSet) { - const TMatchGroup& group = MatchGroups[groupIndex]; - - if (event->Class == group.EventID) { - TVector<TString> parts = StringSplitter(group.FieldName).Split('.').ToList<TString>(); - TString lastPart = std::move(parts.back()); - parts.pop_back(); - - for (auto part : parts) { - auto fieldDescriptor = descriptor->FindFieldByName(part); - Y_ENSURE(fieldDescriptor, "Cannot find field \"" + part + "\". Full fieldname is \"" + group.FieldName + "\"."); - - message = &reflection->GetMessage(*message, fieldDescriptor); - descriptor = message->GetDescriptor(); - reflection = message->GetReflection(); - - Y_ENSURE(descriptor); - Y_ENSURE(reflection); - } - - const google::protobuf::FieldDescriptor* fieldDescriptor = descriptor->FindFieldByName(lastPart); - Y_ENSURE(fieldDescriptor, "Cannot find field \"" + lastPart + "\". Full fieldname is \"" + group.FieldName + "\"."); - - TString fieldValue = GetEventFieldAsString(message, fieldDescriptor, reflection); - if (re2::RE2::FullMatch(fieldValue, group.ValueToMatch)) { - indicesToErase.push_back(groupIndex); - } - } - } - - for (size_t idx : indicesToErase) { - toMatchSet.erase(idx); - } - - if (toMatchSet.empty()) { - return true; - } - } - } - - return toMatchSet.empty(); -} - -void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet) { - size_t tokenStart = 0; - const TString characterSet = TString::Join("\\", externalCharacterSet); - - for (size_t position = stringToSplit.find_first_of(characterSet); position != TString::npos; position = stringToSplit.find_first_of(characterSet, position + 1)) { - if (stringToSplit[position] == '\\') { - position++; - } else { - if (tokenStart != position) { - tokens.push_back(TStringBuf(stringToSplit, tokenStart, position - tokenStart)); - } - tokenStart = position + 1; - } - } - - if (tokenStart < stringToSplit.size()) { - tokens.push_back(TStringBuf(stringToSplit, tokenStart, stringToSplit.size() - tokenStart)); - } -} - -TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet) { - TStringBuilder stringBuilder; - size_t tokenStart = 0; - - for (size_t position = stringToUnescape.find('\\', 0u); position != TString::npos; position = stringToUnescape.find('\\', position + 2)) { - if (position + 1 < stringToUnescape.size() && characterSet.find(stringToUnescape[position + 1]) != TString::npos) { - stringBuilder << TStringBuf(stringToUnescape, tokenStart, position - tokenStart); - tokenStart = position + 1; - } - } - - if (tokenStart < stringToUnescape.size()) { - stringBuilder << TStringBuf(stringToUnescape, tokenStart, stringToUnescape.size() - tokenStart); - } - - return stringBuilder; -} - -TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection) { - Y_ENSURE(message); - Y_ENSURE(fieldDescriptor); - Y_ENSURE(reflection); - - TString result; - switch (fieldDescriptor->type()) { - case google::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: - result = ToString(reflection->GetDouble(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_FLOAT: - result = ToString(reflection->GetFloat(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_BOOL: - result = ToString(reflection->GetBool(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_INT32: - result = ToString(reflection->GetInt32(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_UINT32: - result = ToString(reflection->GetUInt32(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_INT64: - result = ToString(reflection->GetInt64(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_UINT64: - result = ToString(reflection->GetUInt64(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_STRING: - result = ToString(reflection->GetString(*message, fieldDescriptor)); - break; - case google::protobuf::FieldDescriptor::Type::TYPE_ENUM: - { - const NProtoBuf::EnumValueDescriptor* enumValueDescriptor = reflection->GetEnum(*message, fieldDescriptor); - result = ToString(enumValueDescriptor->name()); - } - break; - default: - throw yexception() << "GetEventFieldAsString for type " << fieldDescriptor->type_name() << " is not implemented."; - } - return result; -} - -TFrameStreamer::TFrameStreamer(IInputStream& s, IEventFactory* fac, IFrameFilterRef ff) - : In_(&s) - , FrameFilter_(ff) - , EventFactory_(fac) -{ - Frame_ = FindNextFrame(&In_, EventFactory_); - - SkipToAllowedFrame(); -} - -TFrameStreamer::TFrameStreamer( - const TString& fileName, - ui64 startTime, - ui64 endTime, - ui64 maxRequestDuration, - IEventFactory* fac, - IFrameFilterRef ff) - : File_(TBlob::FromFile(fileName)) - , MemoryIn_(File_.Data(), File_.Size()) - , In_(&MemoryIn_) - , StartTime_(startTime) - , EndTime_(endTime) - , CutoffTime_(endTime + Min(maxRequestDuration, Max<ui64>() - endTime)) - , FrameFilter_(ff) - , EventFactory_(fac) -{ - In_.Skip(FindFrames(File_.AsStringBuf(), startTime, endTime, maxRequestDuration)); - Frame_ = FindNextFrame(&In_, fac); - SkipToAllowedFrame(); -} - -TFrameStreamer::~TFrameStreamer() = default; - -bool TFrameStreamer::Avail() const { - return Frame_.Defined(); -} - -const TFrame& TFrameStreamer::operator*() const { - Y_ENSURE(Frame_, "Frame streamer depleted"); - - return *Frame_; -} - -bool TFrameStreamer::Next() { - DoNext(); - SkipToAllowedFrame(); - - return Frame_.Defined(); -} - -bool TFrameStreamer::AllowedTimeRange(const TFrame& frame) const { - const bool allowedStartTime = (StartTime_ == 0) || ((StartTime_ <= frame.StartTime()) && (frame.StartTime() <= EndTime_)); - const bool allowedEndTime = (EndTime_ == 0) || ((StartTime_ <= frame.EndTime()) && (frame.EndTime() <= EndTime_)); - return allowedStartTime || allowedEndTime; -} - -bool TFrameStreamer::DoNext() { - if (!Frame_) { - return false; - } - In_.Skip(Frame_->Limiter_->Left()); - Frame_ = FindNextFrame(&In_, EventFactory_); - - if (Frame_ && CutoffTime_ > 0 && Frame_->EndTime() > CutoffTime_) { - Frame_.Clear(); - } - - return Frame_.Defined(); -} - -namespace { - struct TDecodeBuffer { - TDecodeBuffer(const TString codec, IInputStream& src, size_t bs) { - TBuffer from(bs); - - { - TBufferOutput b(from); - TransferData(&src, &b); - } - - NBlockCodecs::Codec(codec)->Decode(from, DecodeBuffer); - } - - explicit TDecodeBuffer(IInputStream& src) { - TBufferOutput b(DecodeBuffer); - TransferData(&src, &b); - } - - TBuffer DecodeBuffer; - }; - - class TBlockCodecStream: private TDecodeBuffer, public TBufferInput { - public: - TBlockCodecStream(const TString codec, IInputStream& src, size_t bs) - : TDecodeBuffer(codec, src, bs) - , TBufferInput(DecodeBuffer) - {} - - explicit TBlockCodecStream(IInputStream& src) - : TDecodeBuffer(src) - , TBufferInput(DecodeBuffer) - {} - }; -} - -TFrameDecoder::TFrameDecoder(const TFrame& fr, const TEventFilter* const filter, bool strict, bool withRawData) - : Frame_(fr) - , Event_(nullptr) - , Flt_(filter) - , Fac_(fr.Fac_) - , EndOfFrame_(new TEndOfFrameEvent(Frame_.EndTime())) - , Strict_(strict) - , WithRawData_(withRawData) -{ - switch (fr.LogFormat()) { - case COMPRESSED_LOG_FORMAT_V2: - case COMPRESSED_LOG_FORMAT_V3: - case COMPRESSED_LOG_FORMAT_V4: - case COMPRESSED_LOG_FORMAT_V5: { - const auto payload = fr.GetCompressedFrame(); - TMemoryInput payloadInput{payload}; - - if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V5) { - Decompressor_.Reset(new TBlockCodecStream("zstd_1", payloadInput, payload.size())); - } else { - TZLibDecompress zlib(&payloadInput); - Decompressor_.Reset(new TBlockCodecStream(zlib)); - if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V4) { - Decompressor_.Reset(new TBlockCodecStream("lz4hc", *Decompressor_, payload.size())); - } - } - - break; - } - - default: - ythrow yexception() << "unsupported log format: " << fr.LogFormat() << Endl; - break; - }; - - if (WithRawData_) { - TBufferOutput out(UncompressedData_); - TLengthLimitedInput limiter(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen); - - TransferData(&limiter, &out); - Decompressor_.Reset(new TMemoryInput(UncompressedData_.data(), UncompressedData_.size())); - } - - Limiter_.Reset(new TLengthLimitedInput(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen)); - - Decode(); -} - -TFrameDecoder::~TFrameDecoder() = default; - -bool TFrameDecoder::Avail() const { - return HaveData(); -} - -TConstEventPtr TFrameDecoder::operator*() const { - Y_ENSURE(HaveData(), "Decoder depleted"); - - return Event_; -} - -bool TFrameDecoder::Next() { - if (HaveData()) { - Decode(); - } - - return HaveData(); -} - -void TFrameDecoder::Decode() { - Event_ = nullptr; - const bool framed = (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V3) || (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V4 || Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V5); - - size_t evBegin = 0; - size_t evEnd = 0; - if (WithRawData_) - evBegin = UncompressedData_.Size() - Limiter_->Left(); - - while (Limiter_->Left() && !(Event_ = DecodeEvent(*Limiter_, framed, Frame_.Address(), Flt_, Fac_, Strict_).Release())) { - } - - if (WithRawData_) { - evEnd = UncompressedData_.Size() - Limiter_->Left(); - RawEventData_ = TStringBuf(UncompressedData_.data() + evBegin, UncompressedData_.data() + evEnd); - } - - if (!Event_ && (!Flt_ || (Flt_->EventAllowed(TEndOfFrameEvent::EventClass)))) { - Event_ = EndOfFrame_.Release(); - } - - if (!!Event_) { - Event_->FrameId = Frame_.FrameId(); - } -} - -const TStringBuf TFrameDecoder::GetRawEvent() const { - return RawEventData_; -} - -TEventStreamer::TEventStreamer(TFrameStream& fs, ui64 s, ui64 e, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering) - : Frames_(fs) - , Start_(s) - , End_(e) - , MaxEndTimestamp_(0) - , Frontier_(0) - , StrongOrdering_(strongOrdering) - , LosslessStrongOrdering_(losslessStrongOrdering) - , EventFilter_(filter) -{ - - if (Start_ > End_) { - ythrow yexception() << "Wrong main interval"; - } - - TEventStreamer::Next(); -} - -TEventStreamer::~TEventStreamer() = default; - -bool TEventStreamer::Avail() const { - return Events_.Avail() && (*Events_)->Timestamp <= Frontier_; -} - -TConstEventPtr TEventStreamer::operator*() const { - Y_ENSURE(TEventStreamer::Avail(), "Event streamer depleted"); - - return *Events_; -} - -bool TEventStreamer::Next() { - if (Events_.Avail() && Events_.Next() && (*Events_)->Timestamp <= Frontier_) { - return true; - } - - for (;;) { - if (!LoadMoreEvents()) { - return false; - } - - if (TEventStreamer::Avail()) { - return true; - } - } -} - -/* -Two parameters are used in the function: -Frontier - the moment of time up to which inclusively all the log events made their way - into the buffer (and might have been already extracted out of it). -Horizon - the moment of time, that equals to Frontier + MAX_REQUEST_DURATION. -In order to get all the log events up to the Frontier inclusively, - frames need to be read until "end time" of the current frame exceeds the Horizon. -*/ -bool TEventStreamer::LoadMoreEvents() { - if (!Frames_.Avail()) { - return false; - } - - const TFrame& fr1 = *Frames_; - const ui64 maxRequestDuration = (StrongOrdering_ ? MAX_REQUEST_DURATION : 0); - - if (fr1.EndTime() <= Frontier_ + maxRequestDuration) { - ythrow yexception() << "Wrong frame stream state"; - } - - if (Frontier_ >= End_) { - return false; - } - - const ui64 old_frontier = Frontier_; - Frontier_ = fr1.EndTime(); - - { - Y_DEFER { - Events_.Reorder(StrongOrdering_); - }; - - for (; Frames_.Avail(); Frames_.Next()) { - const TFrame& fr2 = *Frames_; - - // Frames need to start later than the Frontier. - if (StrongOrdering_ && fr2.StartTime() <= old_frontier) { - Cdbg << "Invalid frame encountered" << Endl; - continue; - } - - if (fr2.EndTime() > MaxEndTimestamp_) { - MaxEndTimestamp_ = fr2.EndTime(); - } - - if (fr2.EndTime() > Frontier_ + maxRequestDuration && !LosslessStrongOrdering_) { - return true; - } - - // Checking for the frame to be within the main time borders. - if (fr2.EndTime() >= Start_ && fr2.StartTime() <= End_) { - TransferEvents(fr2); - } - } - } - - Frontier_ = MaxEndTimestamp_; - - return true; -} - -void TEventStreamer::TransferEvents(const TFrame& fr) { - Events_.SetCheckpoint(); - - try { - for (auto it = fr.GetIterator(EventFilter_); it.Avail(); it.Next()) { - TConstEventPtr ev = *it; - - if (ev->Timestamp > fr.EndTime() || ev->Timestamp < fr.StartTime()) { - ythrow TInvalidEventTimestamps() << "Event timestamp out of frame range"; - } - - if (ev->Timestamp >= Start_ && ev->Timestamp <= End_) { - Events_.Append(ev, StrongOrdering_); - } - } - } catch (const TInvalidEventTimestamps& err) { - Events_.Rollback(); - Cdbg << "EventsTransfer error: InvalidEventTimestamps: " << err.what() << Endl; - } catch (const TFrameLoadError& err) { - Events_.Rollback(); - Cdbg << "EventsTransfer error: " << err.what() << Endl; - } catch (const TEventDecoderError& err) { - Events_.Rollback(); - Cdbg << "EventsTransfer error: EventDecoder error: " << err.what() << Endl; - } catch (const TZLibDecompressorError& err) { - Events_.Rollback(); - Cdbg << "EventsTransfer error: ZLibDecompressor error: " << err.what() << Endl; - } catch (...) { - Events_.Rollback(); - throw; - } -} - -void TEventStreamer::TEventBuffer::SetCheckpoint() { - BufLen_ = Buffer_.size(); -} - -void TEventStreamer::TEventBuffer::Rollback() { - Buffer_.resize(BufLen_); -} - -void TEventStreamer::TEventBuffer::Reorder(bool strongOrdering) { - SetCheckpoint(); - - std::reverse(Buffer_.begin(), Buffer_.end()); - - if (strongOrdering) { - StableSort(Buffer_.begin(), Buffer_.end(), [&](const auto& a, const auto& b) { - return (a->Timestamp > b->Timestamp) || - ((a->Timestamp == b->Timestamp) && !a->Class && b->Class); - }); - } -} - -void TEventStreamer::TEventBuffer::Append(TConstEventPtr ev, bool strongOrdering) { - // Events in buffer output must be in an ascending order. - Y_ENSURE(!strongOrdering || ev->Timestamp >= LastTimestamp_, "Trying to append out-of-order event"); - - Buffer_.push_back(std::move(ev)); -} - -bool TEventStreamer::TEventBuffer::Avail() const { - return !Buffer_.empty(); -} - -TConstEventPtr TEventStreamer::TEventBuffer::operator*() const { - Y_ENSURE(!Buffer_.empty(), "Event buffer is empty"); - - return Buffer_.back(); -} - -bool TEventStreamer::TEventBuffer::Next() { - if (!Buffer_.empty()) { - LastTimestamp_ = Buffer_.back()->Timestamp; - Buffer_.pop_back(); - return !Buffer_.empty(); - } else { - return false; - } -} |