1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
#pragma once
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
namespace NYql {
struct TTypeAnnotationContext;
TExprNode::TPtr ExpandCalcOverWindow(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types);
TExprNodeList ExtractCalcsOverWindow(const TExprNode::TPtr& node, TExprContext& ctx);
TExprNode::TPtr RebuildCalcOverWindowGroup(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNodeList& calcs, TExprContext& ctx);
enum EFrameType {
FrameByRows,
FrameByRange,
FrameByGroups,
};
using NNodes::TCoWinOnBase;
using NNodes::TCoFrameBound;
bool IsUnbounded(const NNodes::TCoFrameBound& bound);
bool IsCurrentRow(const NNodes::TCoFrameBound& bound);
class TWindowFrameSettings {
public:
static TWindowFrameSettings Parse(const TExprNode& node, TExprContext& ctx);
static TMaybe<TWindowFrameSettings> TryParse(const TExprNode& node, TExprContext& ctx);
// This two functions can only be used for FrameByRows or FrameByGroups
TMaybe<i32> GetFirstOffset() const;
TMaybe<i32> GetLastOffset() const;
TCoFrameBound GetFirst() const;
TCoFrameBound GetLast() const;
bool IsNonEmpty() const { return NeverEmpty; }
bool IsCompact() const { return Compact; }
EFrameType GetFrameType() const { return Type; }
private:
EFrameType Type = FrameByRows;
TExprNode::TPtr First;
TMaybe<i32> FirstOffset;
TExprNode::TPtr Last;
TMaybe<i32> LastOffset;
bool NeverEmpty = false;
bool Compact = false;
};
struct TSessionWindowParams {
TSessionWindowParams()
: Traits(nullptr)
, Key(nullptr)
, KeyType(nullptr)
, ParamsType(nullptr)
, Init(nullptr)
, Update(nullptr)
, SortTraits(nullptr)
{}
void Reset();
TExprNode::TPtr Traits;
TExprNode::TPtr Key;
const TTypeAnnotationNode* KeyType;
const TTypeAnnotationNode* ParamsType;
TExprNode::TPtr Init;
TExprNode::TPtr Update;
TExprNode::TPtr SortTraits;
};
struct TSortParams {
TExprNode::TPtr Key;
TExprNode::TPtr Order;
};
// Lambda(input: Stream/List<T>) -> Stream/List<Tuple<T, SessionKey, SessionState, ....>>
// input is assumed to be partitioned by partitionKeySelector
TExprNode::TPtr ZipWithSessionParamsLambda(TPositionHandle pos, const TExprNode::TPtr& partitionKeySelector,
const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit,
const TExprNode::TPtr& sessionUpdate, TExprContext& ctx);
// input should be List/Stream of structs + see above
TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
TStringBuf sessionStartMemberName, TStringBuf sessionParamsMemberName,
const TExprNode::TPtr& partitionKeySelector,
const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit,
const TExprNode::TPtr& sessionUpdate, TExprContext& ctx);
// input should be List/Stream of structs + see above
TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
TStringBuf sessionStartMemberName, const TExprNode::TPtr& partitionKeySelector,
const TSessionWindowParams& sessionWindowParams, TExprContext& ctx);
}
|