diff options
author | kardymon-d <kardymon-d@yandex-team.com> | 2023-10-20 16:09:49 +0300 |
---|---|---|
committer | kardymon-d <kardymon-d@yandex-team.com> | 2023-10-20 17:34:24 +0300 |
commit | b5c517190a565e87657b56a363d58217480ef3f8 (patch) | |
tree | 96f8572c926906c35ec1f1f590220e8240152002 | |
parent | 9eaf05cd7641e8b698f228b039e3bd3a11fea23a (diff) | |
download | ydb-b5c517190a565e87657b56a363d58217480ef3f8.tar.gz |
Refactoring TTimeOrderRecover
Move TState to TTimeOrderRecover
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp | 118 |
1 files changed, 59 insertions, 59 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp index e6af05d8f95..5a533367cd5 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp @@ -7,73 +7,73 @@ namespace NKikimr::NMiniKQL { namespace { -class TState: public TComputationValue<TState> { +class TTimeOrderRecover : public TStatefulFlowComputationNode<TTimeOrderRecover> { + using TBaseComputation = TStatefulFlowComputationNode<TTimeOrderRecover>; public: - using TTimestamp = i64; //use signed integers to simplify arithmetics - using TTimeinterval = i64; + class TState: public TComputationValue<TState> { + public: + using TTimestamp = i64; //use signed integers to simplify arithmetics + using TTimeinterval = i64; - TState(TMemoryUsageInfo* memInfo, TTimeinterval delay, TTimeinterval ahead, ui32 rowLimit) - : TComputationValue<TState>(memInfo) - , Heap(Greater) - , Delay(delay) - , Ahead(ahead) - , RowLimit(rowLimit + 1) - , Latest(0) - , Terminating(false) - {} - NUdf::TUnboxedValue GetOutputIfReady() { - if (Terminating && Heap.empty()) { - return NUdf::TUnboxedValue::MakeFinish(); - } - if (Heap.empty()) { + TState(TMemoryUsageInfo* memInfo, TTimeinterval delay, TTimeinterval ahead, ui32 rowLimit) + : TComputationValue<TState>(memInfo) + , Heap(Greater) + , Delay(delay) + , Ahead(ahead) + , RowLimit(rowLimit + 1) + , Latest(0) + , Terminating(false) + {} + NUdf::TUnboxedValue GetOutputIfReady() { + if (Terminating && Heap.empty()) { + return NUdf::TUnboxedValue::MakeFinish(); + } + if (Heap.empty()) { + return NUdf::TUnboxedValue{}; + } + TTimestamp oldest = Heap.top().first; + if (oldest < Latest + Delay || Heap.size() == RowLimit || Terminating) { + auto result = std::move(Heap.top().second); + Heap.pop(); + return result; + } return NUdf::TUnboxedValue{}; } - TTimestamp oldest = Heap.top().first; - if (oldest < Latest + Delay || Heap.size() == RowLimit || Terminating) { - auto result = std::move(Heap.top().second); - Heap.pop(); - return result; - } - return NUdf::TUnboxedValue{}; - } - ///return input row in case it cannot process it correctly - NUdf::TUnboxedValue ProcessRow(TTimestamp t, NUdf::TUnboxedValue&& row) { - MKQL_ENSURE(!row.IsSpecial(), "Internal logic error"); - MKQL_ENSURE(Heap.size() < RowLimit, "Internal logic error"); - if (Heap.empty()) { - Latest = t; + ///return input row in case it cannot process it correctly + NUdf::TUnboxedValue ProcessRow(TTimestamp t, NUdf::TUnboxedValue&& row) { + MKQL_ENSURE(!row.IsSpecial(), "Internal logic error"); + MKQL_ENSURE(Heap.size() < RowLimit, "Internal logic error"); + if (Heap.empty()) { + Latest = t; + } + if (Latest + Delay < t && t < Latest + Ahead) { + Heap.emplace(t, std::move(row)); + } else { + return row; + } + Latest = std::max(Latest, t); + return NUdf::TUnboxedValue{}; } - if (Latest + Delay < t && t < Latest + Ahead) { - Heap.emplace(t, std::move(row)); - } else { - return row; + void Finish() { + Terminating = true; } - Latest = std::max(Latest, t); - return NUdf::TUnboxedValue{}; - } - void Finish() { - Terminating = true; - } -private: - using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>; - static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) { - return lhs.first > rhs.first; + private: + using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>; + static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) { + return lhs.first > rhs.first; + }; + using THeap = std::priority_queue< + TEntry, + std::vector<TEntry, TMKQLAllocator<TEntry>>, + decltype(Greater)>; + THeap Heap; + const TTimeinterval Delay; + const TTimeinterval Ahead; + const ui32 RowLimit; + TTimestamp Latest; + bool Terminating; //not applicable for streams, but useful for debug and testing }; - using THeap = std::priority_queue< - TEntry, - std::vector<TEntry, TMKQLAllocator<TEntry>>, - decltype(Greater)>; - THeap Heap; - const TTimeinterval Delay; - const TTimeinterval Ahead; - const ui32 RowLimit; - TTimestamp Latest; - bool Terminating; //not applicable for streams, but useful for debug and testing -}; -class TTimeOrderRecover : public TStatefulFlowComputationNode<TTimeOrderRecover> { - using TBaseComputation = TStatefulFlowComputationNode<TTimeOrderRecover>; -public: TTimeOrderRecover( TComputationMutables& mutables, EValueRepresentation kind, |