aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkardymon-d <kardymon-d@yandex-team.com>2023-10-20 16:09:49 +0300
committerkardymon-d <kardymon-d@yandex-team.com>2023-10-20 17:34:24 +0300
commitb5c517190a565e87657b56a363d58217480ef3f8 (patch)
tree96f8572c926906c35ec1f1f590220e8240152002
parent9eaf05cd7641e8b698f228b039e3bd3a11fea23a (diff)
downloadydb-b5c517190a565e87657b56a363d58217480ef3f8.tar.gz
Refactoring TTimeOrderRecover
Move TState to TTimeOrderRecover
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp118
1 files changed, 59 insertions, 59 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
index e6af05d8f95..5a533367cd5 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
@@ -7,73 +7,73 @@ namespace NKikimr::NMiniKQL {
namespace {
-class TState: public TComputationValue<TState> {
+class TTimeOrderRecover : public TStatefulFlowComputationNode<TTimeOrderRecover> {
+ using TBaseComputation = TStatefulFlowComputationNode<TTimeOrderRecover>;
public:
- using TTimestamp = i64; //use signed integers to simplify arithmetics
- using TTimeinterval = i64;
+ class TState: public TComputationValue<TState> {
+ public:
+ using TTimestamp = i64; //use signed integers to simplify arithmetics
+ using TTimeinterval = i64;
- TState(TMemoryUsageInfo* memInfo, TTimeinterval delay, TTimeinterval ahead, ui32 rowLimit)
- : TComputationValue<TState>(memInfo)
- , Heap(Greater)
- , Delay(delay)
- , Ahead(ahead)
- , RowLimit(rowLimit + 1)
- , Latest(0)
- , Terminating(false)
- {}
- NUdf::TUnboxedValue GetOutputIfReady() {
- if (Terminating && Heap.empty()) {
- return NUdf::TUnboxedValue::MakeFinish();
- }
- if (Heap.empty()) {
+ TState(TMemoryUsageInfo* memInfo, TTimeinterval delay, TTimeinterval ahead, ui32 rowLimit)
+ : TComputationValue<TState>(memInfo)
+ , Heap(Greater)
+ , Delay(delay)
+ , Ahead(ahead)
+ , RowLimit(rowLimit + 1)
+ , Latest(0)
+ , Terminating(false)
+ {}
+ NUdf::TUnboxedValue GetOutputIfReady() {
+ if (Terminating && Heap.empty()) {
+ return NUdf::TUnboxedValue::MakeFinish();
+ }
+ if (Heap.empty()) {
+ return NUdf::TUnboxedValue{};
+ }
+ TTimestamp oldest = Heap.top().first;
+ if (oldest < Latest + Delay || Heap.size() == RowLimit || Terminating) {
+ auto result = std::move(Heap.top().second);
+ Heap.pop();
+ return result;
+ }
return NUdf::TUnboxedValue{};
}
- TTimestamp oldest = Heap.top().first;
- if (oldest < Latest + Delay || Heap.size() == RowLimit || Terminating) {
- auto result = std::move(Heap.top().second);
- Heap.pop();
- return result;
- }
- return NUdf::TUnboxedValue{};
- }
- ///return input row in case it cannot process it correctly
- NUdf::TUnboxedValue ProcessRow(TTimestamp t, NUdf::TUnboxedValue&& row) {
- MKQL_ENSURE(!row.IsSpecial(), "Internal logic error");
- MKQL_ENSURE(Heap.size() < RowLimit, "Internal logic error");
- if (Heap.empty()) {
- Latest = t;
+ ///return input row in case it cannot process it correctly
+ NUdf::TUnboxedValue ProcessRow(TTimestamp t, NUdf::TUnboxedValue&& row) {
+ MKQL_ENSURE(!row.IsSpecial(), "Internal logic error");
+ MKQL_ENSURE(Heap.size() < RowLimit, "Internal logic error");
+ if (Heap.empty()) {
+ Latest = t;
+ }
+ if (Latest + Delay < t && t < Latest + Ahead) {
+ Heap.emplace(t, std::move(row));
+ } else {
+ return row;
+ }
+ Latest = std::max(Latest, t);
+ return NUdf::TUnboxedValue{};
}
- if (Latest + Delay < t && t < Latest + Ahead) {
- Heap.emplace(t, std::move(row));
- } else {
- return row;
+ void Finish() {
+ Terminating = true;
}
- Latest = std::max(Latest, t);
- return NUdf::TUnboxedValue{};
- }
- void Finish() {
- Terminating = true;
- }
-private:
- using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>;
- static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) {
- return lhs.first > rhs.first;
+ private:
+ using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>;
+ static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) {
+ return lhs.first > rhs.first;
+ };
+ using THeap = std::priority_queue<
+ TEntry,
+ std::vector<TEntry, TMKQLAllocator<TEntry>>,
+ decltype(Greater)>;
+ THeap Heap;
+ const TTimeinterval Delay;
+ const TTimeinterval Ahead;
+ const ui32 RowLimit;
+ TTimestamp Latest;
+ bool Terminating; //not applicable for streams, but useful for debug and testing
};
- using THeap = std::priority_queue<
- TEntry,
- std::vector<TEntry, TMKQLAllocator<TEntry>>,
- decltype(Greater)>;
- THeap Heap;
- const TTimeinterval Delay;
- const TTimeinterval Ahead;
- const ui32 RowLimit;
- TTimestamp Latest;
- bool Terminating; //not applicable for streams, but useful for debug and testing
-};
-class TTimeOrderRecover : public TStatefulFlowComputationNode<TTimeOrderRecover> {
- using TBaseComputation = TStatefulFlowComputationNode<TTimeOrderRecover>;
-public:
TTimeOrderRecover(
TComputationMutables& mutables,
EValueRepresentation kind,