aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/core/yql_opt_window.h
blob: 679b71c8b7dab51060931f6569e30c89f39e0e14 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#pragma once
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>

namespace NYql {

struct TTypeAnnotationContext;
TExprNode::TPtr ExpandCalcOverWindow(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types);

TExprNodeList ExtractCalcsOverWindow(const TExprNode::TPtr& node, TExprContext& ctx);
TExprNode::TPtr RebuildCalcOverWindowGroup(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNodeList& calcs, TExprContext& ctx);

enum EFrameType {
    FrameByRows,
    FrameByRange,
    FrameByGroups,
};

using NNodes::TCoWinOnBase;
using NNodes::TCoFrameBound;

bool IsUnbounded(const NNodes::TCoFrameBound& bound);
bool IsCurrentRow(const NNodes::TCoFrameBound& bound);

class TWindowFrameSettings {
public:
    static TWindowFrameSettings Parse(const TExprNode& node, TExprContext& ctx);
    static TMaybe<TWindowFrameSettings> TryParse(const TExprNode& node, TExprContext& ctx);

    // This two functions can only be used for FrameByRows or FrameByGroups
    TMaybe<i32> GetFirstOffset() const;
    TMaybe<i32> GetLastOffset() const;

    TCoFrameBound GetFirst() const;
    TCoFrameBound GetLast() const;

    bool IsNonEmpty() const { return NeverEmpty; }
    bool IsCompact() const { return Compact; }
    EFrameType GetFrameType() const { return Type; }
private:
    EFrameType Type = FrameByRows;
    TExprNode::TPtr First;
    TMaybe<i32> FirstOffset;
    TExprNode::TPtr Last;
    TMaybe<i32> LastOffset;
    bool NeverEmpty = false;
    bool Compact = false;
};

struct TSessionWindowParams {
    TSessionWindowParams()
        : Traits(nullptr)
        , Key(nullptr)
        , KeyType(nullptr)
        , ParamsType(nullptr)
        , Init(nullptr)
        , Update(nullptr)
        , SortTraits(nullptr)
    {}
    
    void Reset();

    TExprNode::TPtr Traits;
    TExprNode::TPtr Key;
    const TTypeAnnotationNode* KeyType;
    const TTypeAnnotationNode* ParamsType;
    TExprNode::TPtr Init;
    TExprNode::TPtr Update;
    TExprNode::TPtr SortTraits;
};

struct TSortParams {
    TExprNode::TPtr Key;
    TExprNode::TPtr Order;
};

// Lambda(input: Stream/List<T>) -> Stream/List<Tuple<T, SessionKey, SessionState, ....>>
// input is assumed to be partitioned by partitionKeySelector
TExprNode::TPtr ZipWithSessionParamsLambda(TPositionHandle pos, const TExprNode::TPtr& partitionKeySelector,
    const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit,
    const TExprNode::TPtr& sessionUpdate, TExprContext& ctx);

// input should be List/Stream of structs + see above
TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
    TStringBuf sessionStartMemberName, TStringBuf sessionParamsMemberName,
    const TExprNode::TPtr& partitionKeySelector,
    const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit,
    const TExprNode::TPtr& sessionUpdate, TExprContext& ctx);

// input should be List/Stream of structs + see above
TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
    TStringBuf sessionStartMemberName, const TExprNode::TPtr& partitionKeySelector,
    const TSessionWindowParams& sessionWindowParams, TExprContext& ctx);

}