aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/providers/common/provider/yql_provider.h
blob: 362b1e19584ccb5df3e3232a45b50b932c7fbd62 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#pragma once

#include <yql/essentials/ast/yql_expr.h>
#include <yql/essentials/core/yql_data_provider.h>
#include <yql/essentials/core/yql_graph_transformer.h>
#include <yql/essentials/core/yql_expr_optimize.h>
#include <yql/essentials/core/yql_expr_type_annotation.h>
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>

#include <library/cpp/yson/writer.h>

#include <util/generic/hash_set.h>
#include <util/generic/string.h>
#include <util/generic/strbuf.h>

#include <utility>

namespace NYson {
    class TYsonWriter;
}

namespace NKikimr {
    namespace NMiniKQL {
        class IFunctionRegistry;
    }
}

namespace NYql {

struct TTypeAnnotationContext;
struct TOperationStatistics;

namespace NCommon {

constexpr TStringBuf PgCatalogFileName = "_yql_pg_catalog";

struct TWriteTableSettings {
    NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
    NNodes::TMaybeNode<NNodes::TCoAtom> Temporary;
    NNodes::TMaybeNode<NNodes::TExprList> Columns;
    NNodes::TMaybeNode<NNodes::TExprList> ReturningList;
    NNodes::TMaybeNode<NNodes::TCoAtomList> PrimaryKey;
    NNodes::TMaybeNode<NNodes::TCoAtomList> PartitionBy;
    NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> OrderBy;
    NNodes::TMaybeNode<NNodes::TCoLambda> Filter;
    NNodes::TMaybeNode<NNodes::TCoLambda> Update;
    NNodes::TMaybeNode<NNodes::TCoIndexList> Indexes;
    NNodes::TMaybeNode<NNodes::TCoChangefeedList> Changefeeds;
    NNodes::TCoNameValueTupleList Other;
    NNodes::TMaybeNode<NNodes::TExprList> ColumnFamilies;
    NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> ColumnsDefaultValues;
    NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> TableSettings;
    NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> AlterActions;
    NNodes::TMaybeNode<NNodes::TCoAtom> TableType;
    NNodes::TMaybeNode<NNodes::TCallable> PgFilter;

    TWriteTableSettings(const NNodes::TCoNameValueTupleList& other)
        : Other(other) {}
};

struct TWriteSequenceSettings {
    NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
    NNodes::TMaybeNode<NNodes::TCoAtom> ValueType;
    NNodes::TMaybeNode<NNodes::TCoAtom> Temporary;
    NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> SequenceSettings;

    NNodes::TCoNameValueTupleList Other;

    TWriteSequenceSettings(const NNodes::TCoNameValueTupleList& other)
        : Other(other) {}
};

struct TWriteTopicSettings {
    NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
    NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> TopicSettings;
    NNodes::TMaybeNode<NNodes::TCoTopicConsumerList> Consumers;
    NNodes::TMaybeNode<NNodes::TCoTopicConsumerList> AddConsumers;
    NNodes::TMaybeNode<NNodes::TCoTopicConsumerList> AlterConsumers;
    NNodes::TMaybeNode<NNodes::TCoAtomList> DropConsumers;
    NNodes::TCoNameValueTupleList Other;

    TWriteTopicSettings(const NNodes::TCoNameValueTupleList& other)
        : Other(other)
    {}

};

struct TWriteReplicationSettings {
    NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
    NNodes::TMaybeNode<NNodes::TCoReplicationTargetList> Targets;
    NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> ReplicationSettings;
    NNodes::TCoNameValueTupleList Other;

    TWriteReplicationSettings(const NNodes::TCoNameValueTupleList& other)
        : Other(other)
    {}
};

struct TWriteRoleSettings {
    NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
    NNodes::TMaybeNode<NNodes::TCoAtomList> Roles;
    NNodes::TMaybeNode<NNodes::TCoAtom> NewName;
    NNodes::TCoNameValueTupleList Other;

    TWriteRoleSettings(const NNodes::TCoNameValueTupleList& other)
        : Other(other) {}
};

struct TWritePermissionSettings {
    NNodes::TMaybeNode<NNodes::TCoAtomList> Permissions;
    NNodes::TMaybeNode<NNodes::TCoAtomList> Paths;
    NNodes::TMaybeNode<NNodes::TCoAtomList> RoleNames;

    TWritePermissionSettings(NNodes::TMaybeNode<NNodes::TCoAtomList>&& permissions, NNodes::TMaybeNode<NNodes::TCoAtomList>&& paths, NNodes::TMaybeNode<NNodes::TCoAtomList>&& roleNames)
        : Permissions(std::move(permissions))
        , Paths(std::move(paths))
        , RoleNames(std::move(roleNames)) {}
};

struct TWriteObjectSettings {
    NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
    NNodes::TCoNameValueTupleList Features;
    NNodes::TCoAtomList ResetFeatures;
    TWriteObjectSettings(NNodes::TMaybeNode<NNodes::TCoAtom>&& mode, NNodes::TCoNameValueTupleList&& kvFeatures, NNodes::TCoAtomList&& resetFeatures)
        : Mode(std::move(mode))
        , Features(std::move(kvFeatures))
        , ResetFeatures(std::move(resetFeatures))
    {
    }
};

struct TCommitSettings
{
    TPositionHandle Pos;
    NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
    NNodes::TMaybeNode<NNodes::TCoAtom> Epoch;
    NNodes::TCoNameValueTupleList Other;

    TCommitSettings(NNodes::TCoNameValueTupleList other)
        : Other(other) {}

    NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx) const;

    bool EnsureModeEmpty(TExprContext& ctx);
    bool EnsureEpochEmpty(TExprContext& ctx);
    bool EnsureOtherEmpty(TExprContext& ctx);
};

struct TPgObjectSettings
{
    NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
    NNodes::TMaybeNode<NNodes::TCoAtom> IfExists;

    TPgObjectSettings(NNodes::TMaybeNode<NNodes::TCoAtom>&& mode, NNodes::TMaybeNode<NNodes::TCoAtom>&& ifExists)
        : Mode(std::move(mode))
        , IfExists(std::move(ifExists)) {}
};

const TStructExprType* BuildCommonTableListType(TExprContext& ctx);

TExprNode::TPtr BuildTypeExpr(TPositionHandle pos, const TTypeAnnotationNode& ann, TExprContext& ctx);

bool HasResOrPullOption(const TExprNode& node, const TStringBuf& option);

TVector<TString> GetResOrPullColumnHints(const TExprNode& node);

TWriteTableSettings ParseWriteTableSettings(NNodes::TExprList node, TExprContext& ctx);
TWriteTopicSettings ParseWriteTopicSettings(NNodes::TExprList node, TExprContext& ctx);
TWriteReplicationSettings ParseWriteReplicationSettings(NNodes::TExprList node, TExprContext& ctx);

TWriteRoleSettings ParseWriteRoleSettings(NNodes::TExprList node, TExprContext& ctx);
TWriteObjectSettings ParseWriteObjectSettings(NNodes::TExprList node, TExprContext& ctx);

TWritePermissionSettings ParseWritePermissionsSettings(NNodes::TExprList node, TExprContext& ctx);

TCommitSettings ParseCommitSettings(NNodes::TCoCommit node, TExprContext& ctx);

TPgObjectSettings ParsePgObjectSettings(NNodes::TExprList node, TExprContext& ctx);

TWriteSequenceSettings ParseSequenceSettings(NNodes::TExprList node, TExprContext& ctx);

TString FullTableName(const TStringBuf& cluster, const TStringBuf& table);

IDataProvider::TFillSettings GetFillSettings(const TExprNode& node);
NYson::EYsonFormat GetYsonFormat(const IDataProvider::TFillSettings& fillSettings);

TVector<TString> GetStructFields(const TTypeAnnotationNode* type);

void TransformerStatsToYson(const TString& name, const IGraphTransformer::TStatistics& stats, NYson::TYsonWriter& writer);

TString TransformerStatsToYson(const IGraphTransformer::TStatistics& stats, NYson::EYsonFormat format
    = NYson::EYsonFormat::Pretty);

void FillSecureParams(const TExprNode::TPtr& node, const TTypeAnnotationContext& types, THashMap<TString, TString>& secureParams);

bool FillUsedFiles(const TExprNode& node, TUserDataTable& files, const TTypeAnnotationContext& types, TExprContext& ctx, const TUserDataTable& crutches = {});

std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> FreezeUsedFiles(const TExprNode& node, TUserDataTable& files, const TTypeAnnotationContext& types, TExprContext& ctx, const std::function<bool(const TString&)>& urlDownloadFilter, const TUserDataTable& crutches = {});

bool FreezeUsedFilesSync(const TExprNode& node, TUserDataTable& files, const TTypeAnnotationContext& types, TExprContext& ctx, const std::function<bool(const TString&)>& urlDownloadFilter);

void WriteColumns(NYson::TYsonWriter& writer, const NNodes::TExprBase& columns);

TString SerializeExpr(TExprContext& ctx, const TExprNode& expr, bool withTypes = false);
TString ExprToPrettyString(TExprContext& ctx, const TExprNode& expr);

void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprNode* source);
void WriteStreams(NYson::TYsonWriter& writer, TStringBuf name, const NNodes::TCoLambda& lambda);

double GetDataReplicationFactor(const TExprNode& lambda, TExprContext& ctx);

void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics, bool addTotalKey = true, bool addExternalMap = true);
void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& statistics);

bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx);
bool ValidateCompressionForOutput(std::string_view format, std::string_view compression, TExprContext& ctx);

bool ValidateFormatForInput(std::string_view format, const TStructExprType* schemaStructRowType, const std::function<bool(TStringBuf)>& excludeFields, TExprContext& ctx);
bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx);

bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx);
bool ValidateDateTimeFormatName(std::string_view formatName, TExprContext& ctx);
bool ValidateTimestampFormatName(std::string_view formatName, TExprContext& ctx);

bool TransformPgSetItemOption(
    const NNodes::TCoPgSelect& pgSelect,
    TStringBuf optionName,
    std::function<void(const NNodes::TExprBase&)> lambda
);

TExprNode::TPtr GetSetItemOption(const NNodes::TCoPgSelect& pgSelect, TStringBuf optionName);

TExprNode::TPtr GetSetItemOptionValue(const NNodes::TExprBase& setItemOption);

bool NeedToRenamePgSelectColumns(const NNodes::TCoPgSelect& pgSelect);

bool RenamePgSelectColumns(
    const NNodes::TCoPgSelect& node,
    TExprNode::TPtr& output,
    const TMaybe<TColumnOrder>& tableColumnOrder,
    TExprContext& ctx,
    TTypeAnnotationContext& types);
} // namespace NCommon
} // namespace NYql