aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2023-07-17 19:13:22 +0300
committerzverevgeny <zverevgeny@ydb.tech>2023-07-17 19:13:22 +0300
commit029cf29f3669091012394221f00dfa0f3631d91b (patch)
tree7ee4219359b3d93200c219e3eafa986b1c8576c2
parent05482e640fc81d547fb7607d0e97ab6cdb4233a0 (diff)
downloadydb-029cf29f3669091012394221f00dfa0f3631d91b.tar.gz
Extract ISource to separate files
YQL-16186 extract ISource to separate files
-rw-r--r--ydb/library/yql/sql/v1/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/sql/v1/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/sql/v1/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/sql/v1/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/sql/v1/aggregation.cpp1
-rw-r--r--ydb/library/yql/sql/v1/context.h2
-rw-r--r--ydb/library/yql/sql/v1/insert.cpp2
-rw-r--r--ydb/library/yql/sql/v1/join.cpp2
-rw-r--r--ydb/library/yql/sql/v1/node.cpp928
-rw-r--r--ydb/library/yql/sql/v1/node.h291
-rw-r--r--ydb/library/yql/sql/v1/select.cpp3
-rw-r--r--ydb/library/yql/sql/v1/source.cpp949
-rw-r--r--ydb/library/yql/sql/v1/source.h300
-rw-r--r--ydb/library/yql/sql/v1/sql_group_by.cpp1
-rw-r--r--ydb/library/yql/sql/v1/sql_translation.cpp1
-rw-r--r--ydb/library/yql/sql/v1/sql_values.cpp1
-rw-r--r--ydb/library/yql/sql/v1/ya.make1
17 files changed, 1264 insertions, 1222 deletions
diff --git a/ydb/library/yql/sql/v1/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/sql/v1/CMakeLists.darwin-x86_64.txt
index 4704ea3ee2..898dba853a 100644
--- a/ydb/library/yql/sql/v1/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/sql/v1/CMakeLists.darwin-x86_64.txt
@@ -61,6 +61,7 @@ target_sources(yql-sql-v1 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/list_builtin.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/node.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/select.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/source.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_call_expr.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_expression.cpp
diff --git a/ydb/library/yql/sql/v1/CMakeLists.linux-aarch64.txt b/ydb/library/yql/sql/v1/CMakeLists.linux-aarch64.txt
index 5bc90c1e0c..944a46a01b 100644
--- a/ydb/library/yql/sql/v1/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/sql/v1/CMakeLists.linux-aarch64.txt
@@ -62,6 +62,7 @@ target_sources(yql-sql-v1 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/list_builtin.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/node.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/select.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/source.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_call_expr.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_expression.cpp
diff --git a/ydb/library/yql/sql/v1/CMakeLists.linux-x86_64.txt b/ydb/library/yql/sql/v1/CMakeLists.linux-x86_64.txt
index 5bc90c1e0c..944a46a01b 100644
--- a/ydb/library/yql/sql/v1/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/sql/v1/CMakeLists.linux-x86_64.txt
@@ -62,6 +62,7 @@ target_sources(yql-sql-v1 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/list_builtin.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/node.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/select.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/source.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_call_expr.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_expression.cpp
diff --git a/ydb/library/yql/sql/v1/CMakeLists.windows-x86_64.txt b/ydb/library/yql/sql/v1/CMakeLists.windows-x86_64.txt
index 4704ea3ee2..898dba853a 100644
--- a/ydb/library/yql/sql/v1/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/sql/v1/CMakeLists.windows-x86_64.txt
@@ -61,6 +61,7 @@ target_sources(yql-sql-v1 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/list_builtin.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/node.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/select.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/source.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_call_expr.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_expression.cpp
diff --git a/ydb/library/yql/sql/v1/aggregation.cpp b/ydb/library/yql/sql/v1/aggregation.cpp
index ceb2d3a3ad..4fe92d220c 100644
--- a/ydb/library/yql/sql/v1/aggregation.cpp
+++ b/ydb/library/yql/sql/v1/aggregation.cpp
@@ -1,4 +1,5 @@
#include "node.h"
+#include "source.h"
#include "context.h"
#include <ydb/library/yql/ast/yql_type_string.h>
diff --git a/ydb/library/yql/sql/v1/context.h b/ydb/library/yql/sql/v1/context.h
index 033996ed8a..f69449d9fa 100644
--- a/ydb/library/yql/sql/v1/context.h
+++ b/ydb/library/yql/sql/v1/context.h
@@ -1,6 +1,6 @@
#pragma once
-#include "node.h"
+#include "source.h"
#include "sql.h"
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
diff --git a/ydb/library/yql/sql/v1/insert.cpp b/ydb/library/yql/sql/v1/insert.cpp
index 0620eb4eab..00c63e3d24 100644
--- a/ydb/library/yql/sql/v1/insert.cpp
+++ b/ydb/library/yql/sql/v1/insert.cpp
@@ -1,4 +1,4 @@
-#include "node.h"
+#include "source.h"
#include "context.h"
#include <ydb/library/yql/utils/yql_panic.h>
diff --git a/ydb/library/yql/sql/v1/join.cpp b/ydb/library/yql/sql/v1/join.cpp
index 7851b9809b..a5d680c829 100644
--- a/ydb/library/yql/sql/v1/join.cpp
+++ b/ydb/library/yql/sql/v1/join.cpp
@@ -1,4 +1,4 @@
-#include "node.h"
+#include "source.h"
#include "context.h"
#include <ydb/library/yql/utils/yql_panic.h>
diff --git a/ydb/library/yql/sql/v1/node.cpp b/ydb/library/yql/sql/v1/node.cpp
index 079aa76f13..04543e2cd7 100644
--- a/ydb/library/yql/sql/v1/node.cpp
+++ b/ydb/library/yql/sql/v1/node.cpp
@@ -1,4 +1,5 @@
#include "node.h"
+#include "source.h"
#include "context.h"
#include <ydb/library/yql/ast/yql_ast_escaping.h>
@@ -29,22 +30,6 @@ TString ErrorDistinctByGroupKey(const TString& column) {
return TStringBuilder() << "Unable to use DISTINCT by grouping column: " << column << ". You should leave one of them.";
}
-TTableRef::TTableRef(const TString& refName, const TString& service, const TDeferredAtom& cluster, TNodePtr keys)
- : RefName(refName)
- , Service(to_lower(service))
- , Cluster(cluster)
- , Keys(keys)
-{
-}
-
-TString TTableRef::ShortName() const {
- Y_VERIFY_DEBUG(Keys);
- if (Keys->GetTableKeys()->GetTableName()) {
- return *Keys->GetTableKeys()->GetTableName();
- }
- return TString();
-}
-
TTopicRef::TTopicRef(const TString& refName, const TDeferredAtom& cluster, TNodePtr keys)
: RefName(refName)
, Cluster(cluster)
@@ -542,7 +527,6 @@ TString TCallNode::GetOpName() const {
return OpName;
}
-namespace {
const TString* DeriveCommonSourceName(const TVector<TNodePtr> &nodes) {
const TString* name = nullptr;
for (auto& node: nodes) {
@@ -558,7 +542,6 @@ const TString* DeriveCommonSourceName(const TVector<TNodePtr> &nodes) {
return name;
}
-}
const TString* TCallNode::GetSourceName() const {
return DeriveCommonSourceName(Args);
@@ -1298,915 +1281,6 @@ TNodePtr IAggregation::WindowTraits(const TNodePtr& type, TContext& ctx) const {
return Q(Y(Q(Name), GetApply(type, false, false, ctx)));
}
-ISource::ISource(TPosition pos)
- : INode(pos)
-{
-}
-
-ISource::~ISource()
-{
-}
-
-TSourcePtr ISource::CloneSource() const {
- Y_VERIFY_DEBUG(dynamic_cast<ISource*>(Clone().Get()), "Cloned node is no source");
- TSourcePtr result = static_cast<ISource*>(Clone().Get());
- for (auto curFilter: Filters) {
- result->Filters.emplace_back(curFilter->Clone());
- }
- for (int i = 0; i < static_cast<int>(EExprSeat::Max); ++i) {
- result->NamedExprs[i] = CloneContainer(NamedExprs[i]);
- }
- result->FlattenColumns = FlattenColumns;
- result->FlattenMode = FlattenMode;
- return result;
-}
-
-bool ISource::IsFake() const {
- return false;
-}
-
-void ISource::AllColumns() {
- return;
-}
-
-const TColumns* ISource::GetColumns() const {
- return nullptr;
-}
-
-void ISource::GetInputTables(TTableList& tableList) const {
- for (auto srcPtr: UsedSources) {
- srcPtr->GetInputTables(tableList);
- }
- return;
-}
-
-TMaybe<bool> ISource::AddColumn(TContext& ctx, TColumnNode& column) {
- if (column.IsReliable()) {
- ctx.Error(Pos) << "Source does not allow column references";
- ctx.Error(column.GetPos()) << "Column reference " <<
- (column.GetColumnName() ? "'" + *column.GetColumnName() + "'" : "(expr)");
- }
- return {};
-}
-
-void ISource::FinishColumns() {
-}
-
-
-bool ISource::AddFilter(TContext& ctx, TNodePtr filter) {
- Y_UNUSED(ctx);
- Filters.push_back(filter);
- return true;
-}
-
-bool ISource::AddGroupKey(TContext& ctx, const TString& column) {
- if (!GroupKeys.insert(column).second) {
- ctx.Error() << "Duplicate grouping column: " << column;
- return false;
- }
- OrderedGroupKeys.push_back(column);
- return true;
-}
-
-void ISource::SetCompactGroupBy(bool compactGroupBy) {
- CompactGroupBy = compactGroupBy;
-}
-
-void ISource::SetGroupBySuffix(const TString& suffix) {
- GroupBySuffix = suffix;
-}
-
-bool ISource::AddExpressions(TContext& ctx, const TVector<TNodePtr>& expressions, EExprSeat exprSeat) {
- YQL_ENSURE(exprSeat < EExprSeat::Max);
- THashSet<TString> names;
- THashSet<TString> aliasSet;
- // TODO: merge FlattenBy with FlattenByExpr
- const bool isFlatten = (exprSeat == EExprSeat::FlattenBy || exprSeat == EExprSeat::FlattenByExpr);
- THashSet<TString>& aliases = isFlatten ? FlattenByAliases : aliasSet;
- for (const auto& expr: expressions) {
- const auto& alias = expr->GetLabel();
- const auto& columnNamePtr = expr->GetColumnName();
- if (alias) {
- ExprAliases.insert(alias);
- if (!aliases.emplace(alias).second) {
- ctx.Error(expr->GetPos()) << "Duplicate alias found: " << alias << " in " << exprSeat << " section";
- return false;
- }
- if (names.contains(alias)) {
- ctx.Error(expr->GetPos()) << "Collision between alias and column name: " << alias << " in " << exprSeat << " section";
- return false;
- }
- }
- if (columnNamePtr) {
- const auto& sourceName = *expr->GetSourceName();
- auto columnName = *columnNamePtr;
- if (sourceName) {
- columnName = DotJoin(sourceName, columnName);
- }
- if (!names.emplace(columnName).second) {
- ctx.Error(expr->GetPos()) << "Duplicate column name found: " << columnName << " in " << exprSeat << " section";
- return false;
- }
- if (!alias && aliases.contains(columnName)) {
- ctx.Error(expr->GetPos()) << "Collision between alias and column name: " << columnName << " in " << exprSeat << " section";
- return false;
- }
- if (alias && exprSeat == EExprSeat::GroupBy) {
- auto columnAlias = GroupByColumnAliases.emplace(columnName, alias);
- auto oldAlias = columnAlias.first->second;
- if (columnAlias.second && oldAlias != alias) {
- ctx.Error(expr->GetPos()) << "Alias for column not same, column: " << columnName <<
- ", exist alias: " << oldAlias << ", another alias: " << alias;
- return false;
- }
- }
- }
-
- if (exprSeat == EExprSeat::GroupBy) {
- if (auto sessionWindow = dynamic_cast<TSessionWindow*>(expr.Get())) {
- if (SessionWindow) {
- ctx.Error(expr->GetPos()) << "Duplicate session window specification:";
- ctx.Error(SessionWindow->GetPos()) << "Previous session window is declared here";
- return false;
- }
- SessionWindow = expr;
- }
- if (auto hoppingWindow = dynamic_cast<THoppingWindow*>(expr.Get())) {
- if (HoppingWindow) {
- ctx.Error(expr->GetPos()) << "Duplicate hopping window specification:";
- ctx.Error(HoppingWindow->GetPos()) << "Previous hopping window is declared here";
- return false;
- }
- HoppingWindow = expr;
- }
- }
- Expressions(exprSeat).emplace_back(expr);
- }
- return true;
-}
-
-void ISource::SetFlattenByMode(const TString& mode) {
- FlattenMode = mode;
-}
-
-void ISource::MarkFlattenColumns() {
- FlattenColumns = true;
-}
-
-bool ISource::IsFlattenColumns() const {
- return FlattenColumns;
-}
-
-TString ISource::MakeLocalName(const TString& name) {
- auto iter = GenIndexes.find(name);
- if (iter == GenIndexes.end()) {
- iter = GenIndexes.emplace(name, 0).first;
- }
- TStringBuilder str;
- str << name << iter->second;
- ++iter->second;
- return std::move(str);
-}
-
-bool ISource::AddAggregation(TContext& ctx, TAggregationPtr aggr) {
- Y_UNUSED(ctx);
- YQL_ENSURE(aggr);
- Aggregations.push_back(aggr);
- return true;
-}
-
-bool ISource::HasAggregations() const {
- return !Aggregations.empty() || !GroupKeys.empty();
-}
-
-void ISource::AddWindowSpecs(TWinSpecs winSpecs) {
- WinSpecs = winSpecs;
-}
-
-bool ISource::AddFuncOverWindow(TContext& ctx, TNodePtr expr) {
- Y_UNUSED(ctx);
- Y_UNUSED(expr);
- return false;
-}
-
-void ISource::AddTmpWindowColumn(const TString& column) {
- TmpWindowColumns.push_back(column);
-}
-
-const TVector<TString>& ISource::GetTmpWindowColumns() const {
- return TmpWindowColumns;
-}
-
-void ISource::SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec) {
- LegacyHoppingWindowSpec = spec;
-}
-
-TLegacyHoppingWindowSpecPtr ISource::GetLegacyHoppingWindowSpec() const {
- return LegacyHoppingWindowSpec;
-}
-
-TNodePtr ISource::GetSessionWindowSpec() const {
- return SessionWindow;
-}
-
-TNodePtr ISource::GetHoppingWindowSpec() const {
- return HoppingWindow;
-}
-
-TWindowSpecificationPtr ISource::FindWindowSpecification(TContext& ctx, const TString& windowName) const {
- auto winIter = WinSpecs.find(windowName);
- if (winIter == WinSpecs.end()) {
- ctx.Error(Pos) << "Unable to find window specification for window '" << windowName << "'";
- return {};
- }
- YQL_ENSURE(winIter->second);
- return winIter->second;
-}
-
-inline TVector<TNodePtr>& ISource::Expressions(EExprSeat exprSeat) {
- return NamedExprs[static_cast<size_t>(exprSeat)];
-}
-
-const TVector<TNodePtr>& ISource::Expressions(EExprSeat exprSeat) const {
- return NamedExprs[static_cast<size_t>(exprSeat)];
-}
-
-inline TNodePtr ISource::AliasOrColumn(const TNodePtr& node, bool withSource) {
- auto result = node->GetLabel();
- if (!result) {
- const auto columnNamePtr = node->GetColumnName();
- YQL_ENSURE(columnNamePtr);
- result = *columnNamePtr;
- if (withSource) {
- const auto sourceNamePtr = node->GetSourceName();
- if (sourceNamePtr) {
- result = DotJoin(*sourceNamePtr, result);
- }
- }
- }
- return BuildQuotedAtom(node->GetPos(), result);
-}
-
-bool ISource::AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func) {
- YQL_ENSURE(func->IsOverWindow());
- if (func->IsDistinct()) {
- ctx.Error(func->GetPos()) << "Aggregation with distinct is not allowed over window: " << windowName;
- return false;
- }
- if (!FindWindowSpecification(ctx, windowName)) {
- return false;
- }
- AggregationOverWindow[windowName].emplace_back(std::move(func));
- return true;
-}
-
-bool ISource::AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func) {
- if (!FindWindowSpecification(ctx, windowName)) {
- return false;
- }
- FuncOverWindow[windowName].emplace_back(std::move(func));
- return true;
-}
-
-bool ISource::IsCompositeSource() const {
- return false;
-}
-
-bool ISource::IsGroupByColumn(const TString& column) const {
- return GroupKeys.contains(column);
-}
-
-bool ISource::IsFlattenByColumns() const {
- return !Expressions(EExprSeat::FlattenBy).empty();
-}
-
-bool ISource::IsFlattenByExprs() const {
- return !Expressions(EExprSeat::FlattenByExpr).empty();
-}
-
-bool ISource::IsAlias(EExprSeat exprSeat, const TString& column) const {
- for (const auto& exprNode: Expressions(exprSeat)) {
- const auto& labelName = exprNode->GetLabel();
- if (labelName && labelName == column) {
- return true;
- }
- }
- return false;
-}
-
-bool ISource::IsExprAlias(const TString& column) const {
- std::array<EExprSeat, 5> exprSeats = {{EExprSeat::FlattenBy, EExprSeat::FlattenByExpr, EExprSeat::GroupBy,
- EExprSeat::WindowPartitionBy, EExprSeat::DistinctAggr}};
- for (auto seat: exprSeats) {
- if (IsAlias(seat, column)) {
- return true;
- }
- }
- return false;
-}
-
-bool ISource::IsExprSeat(EExprSeat exprSeat, EExprType type) const {
- auto expressions = Expressions(exprSeat);
- if (!expressions) {
- return false;
- }
- for (const auto& exprNode: expressions) {
- if (exprNode->GetLabel()) {
- return type == EExprType::WithExpression;
- }
- }
- return type == EExprType::ColumnOnly;
-}
-
-TString ISource::GetGroupByColumnAlias(const TString& column) const {
- auto iter = GroupByColumnAliases.find(column);
- if (iter == GroupByColumnAliases.end()) {
- return {};
- }
- return iter->second;
-}
-
-const TString* ISource::GetWindowName() const {
- return {};
-}
-
-bool ISource::IsCalcOverWindow() const {
- return !AggregationOverWindow.empty() || !FuncOverWindow.empty() ||
- AnyOf(WinSpecs, [](const auto& item) { return item.second->Session; });
-}
-
-bool ISource::IsOverWindowSource() const {
- return !WinSpecs.empty();
-}
-
-bool ISource::IsStream() const {
- return false;
-}
-
-EOrderKind ISource::GetOrderKind() const {
- return EOrderKind::None;
-}
-
-TWriteSettings ISource::GetWriteSettings() const {
- return {};
-}
-
-bool ISource::SetSamplingOptions(TContext& ctx,
- TPosition pos,
- ESampleMode mode,
- TNodePtr samplingRate,
- TNodePtr samplingSeed) {
- Y_UNUSED(pos);
- Y_UNUSED(mode);
- Y_UNUSED(samplingRate);
- Y_UNUSED(samplingSeed);
- ctx.Error() << "Sampling is only supported for table sources";
- return false;
-}
-
-bool ISource::SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints) {
- Y_UNUSED(pos);
- Y_UNUSED(contextHints);
- if (hints) {
- ctx.Error() << "Explicit hints are only supported for table sources";
- return false;
- }
- return true;
-}
-
-bool ISource::CalculateGroupingHint(TContext& ctx, const TVector<TString>& columns, ui64& hint) const {
- Y_UNUSED(columns);
- Y_UNUSED(hint);
- ctx.Error() << "Source not support grouping hint";
- return false;
-}
-
-TNodePtr ISource::BuildFilter(TContext& ctx, const TString& label) {
- return Filters.empty() ? nullptr : Y(ctx.UseUnordered(*this) ? "OrderedFilter" : "Filter", label, BuildFilterLambda());
-}
-
-TNodePtr ISource::BuildFilterLambda() {
- if (Filters.empty()) {
- return BuildLambda(Pos, Y("row"), Y("Bool", Q("true")));
- }
- YQL_ENSURE(Filters[0]->HasState(ENodeState::Initialized));
- TNodePtr filter(Filters[0]);
- for (ui32 i = 1; i < Filters.size(); ++i) {
- YQL_ENSURE(Filters[i]->HasState(ENodeState::Initialized));
- filter = Y("And", filter, Filters[i]);
- }
- filter = Y("Coalesce", filter, Y("Bool", Q("false")));
- return BuildLambda(Pos, Y("row"), filter);
-}
-
-TNodePtr ISource::BuildFlattenByColumns(const TString& label) {
- auto columnsList = Y("FlattenByColumns", Q(FlattenMode), label);
- for (const auto& column: Expressions(EExprSeat::FlattenBy)) {
- const auto columnNamePtr = column->GetColumnName();
- YQL_ENSURE(columnNamePtr);
- if (column->GetLabel().empty()) {
- columnsList = L(columnsList, Q(*columnNamePtr));
- } else {
- columnsList = L(columnsList, Q(Y(Q(*columnNamePtr), Q(column->GetLabel()))));
- }
- }
- return Y(Y("let", "res", columnsList));
-}
-
-TNodePtr ISource::BuildFlattenColumns(const TString& label) {
- return Y(Y("let", "res", Y("Just", Y("FlattenStructs", label))));
-}
-
-namespace {
-
-TNodePtr BuildLambdaBodyForExprAliases(TPosition pos, const TVector<TNodePtr>& exprs) {
- auto structObj = BuildAtom(pos, "row", TNodeFlags::Default);
- for (const auto& exprNode: exprs) {
- const auto name = exprNode->GetLabel();
- YQL_ENSURE(name);
- structObj = structObj->Y("ForceRemoveMember", structObj, structObj->Q(name));
- if (dynamic_cast<const TSessionWindow*>(exprNode.Get())) {
- continue;
- }
- if (dynamic_cast<const THoppingWindow*>(exprNode.Get())) {
- continue;
- }
- structObj = structObj->Y("AddMember", structObj, structObj->Q(name), exprNode);
- }
- return structObj->Y("AsList", structObj);
-}
-
-}
-
-TNodePtr ISource::BuildPreaggregatedMap(TContext& ctx) {
- Y_UNUSED(ctx);
- const auto& groupByExprs = Expressions(EExprSeat::GroupBy);
- const auto& distinctAggrExprs = Expressions(EExprSeat::DistinctAggr);
- YQL_ENSURE(groupByExprs || distinctAggrExprs);
-
- TNodePtr res;
- if (groupByExprs) {
- auto body = BuildLambdaBodyForExprAliases(Pos, groupByExprs);
- res = Y("FlatMap", "core", BuildLambda(Pos, Y("row"), body));
- }
-
- if (distinctAggrExprs) {
- auto body = BuildLambdaBodyForExprAliases(Pos, distinctAggrExprs);
- auto lambda = BuildLambda(Pos, Y("row"), body);
- res = res ? Y("FlatMap", res, lambda) : Y("FlatMap", "core", lambda);
- }
- return res;
-}
-
-TNodePtr ISource::BuildPreFlattenMap(TContext& ctx) {
- Y_UNUSED(ctx);
- YQL_ENSURE(IsFlattenByExprs());
- return BuildLambdaBodyForExprAliases(Pos, Expressions(EExprSeat::FlattenByExpr));
-}
-
-TNodePtr ISource::BuildPrewindowMap(TContext& ctx) {
- auto feed = BuildAtom(Pos, "row", TNodeFlags::Default);
- for (const auto& exprNode: Expressions(EExprSeat::WindowPartitionBy)) {
- const auto name = exprNode->GetLabel();
- if (name && !dynamic_cast<const TSessionWindow*>(exprNode.Get())) {
- feed = Y("AddMember", feed, Q(name), exprNode);
- }
- }
- return Y(ctx.UseUnordered(*this) ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), Y("AsList", feed)));
-}
-
-bool ISource::BuildSamplingLambda(TNodePtr& node) {
- if (!SamplingRate) {
- return true;
- }
- auto res = Y("Coalesce", Y("SafeCast", SamplingRate, Y("DataType", Q("Double"))), Y("Double", Q("0")));
- res = Y("/", res, Y("Double", Q("100")));
- res = Y(Y("let", "res", Y("OptionalIf", Y("<", Y("Random", Y("DependsOn", "row")), res), "row")));
- node = BuildLambda(GetPos(), Y("row"), res, "res");
- return !!node;
-}
-
-bool ISource::SetSamplingRate(TContext& ctx, TNodePtr samplingRate) {
- if (samplingRate) {
- if (!samplingRate->Init(ctx, this)) {
- return false;
- }
- SamplingRate = Y("Ensure", samplingRate, Y(">=", samplingRate, Y("Double", Q("0"))), Y("String", Q("\"Expected sampling rate to be nonnegative\"")));
- SamplingRate = Y("Ensure", SamplingRate, Y("<=", SamplingRate, Y("Double", Q("100"))), Y("String", Q("\"Sampling rate is over 100%\"")));
- }
- return true;
-}
-
-std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TContext& ctx) {
- if (GroupKeys.empty() && Aggregations.empty() && !IsCompositeSource() && !LegacyHoppingWindowSpec) {
- return { nullptr, true };
- }
-
- auto keysTuple = Y();
- YQL_ENSURE(GroupKeys.size() == OrderedGroupKeys.size());
- for (const auto& key: OrderedGroupKeys) {
- YQL_ENSURE(GroupKeys.contains(key));
- keysTuple = L(keysTuple, BuildQuotedAtom(Pos, key));
- }
-
- std::map<std::pair<bool, TString>, std::vector<IAggregation*>> genericAggrs;
- for (const auto& aggr: Aggregations) {
- if (const auto key = aggr->GetGenericKey()) {
- genericAggrs[{aggr->IsDistinct(), *key}].emplace_back(aggr.Get());
- }
- }
-
- for (const auto& aggr : genericAggrs) {
- for (size_t i = 1U; i < aggr.second.size(); ++i) {
- aggr.second.front()->Join(aggr.second[i]);
- }
- }
-
- const auto listType = Y("TypeOf", label);
- auto aggrArgs = Y();
- const bool overState = GroupBySuffix == "CombineState" || GroupBySuffix == "MergeState" ||
- GroupBySuffix == "MergeFinalize" || GroupBySuffix == "MergeManyFinalize";
- const bool allowAggApply = !LegacyHoppingWindowSpec && !SessionWindow && !HoppingWindow;
- for (const auto& aggr: Aggregations) {
- auto res = aggr->AggregationTraits(listType, overState, GroupBySuffix == "MergeManyFinalize", allowAggApply, ctx);
- if (!res.second) {
- return { nullptr, false };
- }
-
- if (res.first) {
- aggrArgs = L(aggrArgs, res.first);
- }
- }
-
- auto options = Y();
- if (CompactGroupBy || GroupBySuffix == "Finalize") {
- options = L(options, Q(Y(Q("compact"))));
- }
-
- if (LegacyHoppingWindowSpec) {
- auto hoppingTraits = Y(
- "HoppingTraits",
- Y("ListItemType", listType),
- BuildLambda(Pos, Y("row"), LegacyHoppingWindowSpec->TimeExtractor),
- LegacyHoppingWindowSpec->Hop,
- LegacyHoppingWindowSpec->Interval,
- LegacyHoppingWindowSpec->Delay,
- LegacyHoppingWindowSpec->DataWatermarks ? Q("true") : Q("false"),
- Q("v1"));
-
- options = L(options, Q(Y(Q("hopping"), hoppingTraits)));
- }
-
- if (SessionWindow) {
- YQL_ENSURE(SessionWindow->GetLabel());
- auto sessionWindow = dynamic_cast<TSessionWindow*>(SessionWindow.Get());
- YQL_ENSURE(sessionWindow);
- options = L(options, Q(Y(Q("session"),
- Q(Y(BuildQuotedAtom(Pos, SessionWindow->GetLabel()), sessionWindow->BuildTraits(label))))));
- }
-
- if (HoppingWindow) {
- YQL_ENSURE(HoppingWindow->GetLabel());
- auto hoppingWindow = dynamic_cast<THoppingWindow*>(HoppingWindow.Get());
- YQL_ENSURE(hoppingWindow);
- options = L(options, Q(Y(Q("hopping"),
- Q(Y(BuildQuotedAtom(Pos, HoppingWindow->GetLabel()), hoppingWindow->BuildTraits(label))))));
- }
-
- return { Y("AssumeColumnOrderPartial", Y("Aggregate" + GroupBySuffix, label, Q(keysTuple), Q(aggrArgs), Q(options)), Q(keysTuple)), true };
-}
-
-TMaybe<TString> ISource::FindColumnMistype(const TString& name) const {
- auto result = FindMistypeIn(GroupKeys, name);
- return result ? result : FindMistypeIn(ExprAliases, name);
-}
-
-void ISource::AddDependentSource(ISource* usedSource) {
- UsedSources.push_back(usedSource);
-}
-
-class TYqlFrameBound final: public TCallNode {
-public:
- TYqlFrameBound(TPosition pos, TNodePtr bound)
- : TCallNode(pos, "EvaluateExpr", 1, 1, { bound })
- , FakeSource(BuildFakeSource(pos))
- {
- }
-
- bool DoInit(TContext& ctx, ISource* src) override {
- if (!ValidateArguments(ctx)) {
- return false;
- }
-
- if (!Args[0]->Init(ctx, FakeSource.Get())) {
- return false;
- }
-
- return TCallNode::DoInit(ctx, src);
- }
-
- TNodePtr DoClone() const final {
- return new TYqlFrameBound(Pos, Args[0]->Clone());
- }
-private:
- TSourcePtr FakeSource;
-};
-
-TNodePtr BuildFrameNode(const TFrameBound& frame, EFrameType frameType) {
- TString settingStr;
- switch (frame.Settings) {
- case FramePreceding: settingStr = "preceding"; break;
- case FrameCurrentRow: settingStr = "currentRow"; break;
- case FrameFollowing: settingStr = "following"; break;
- default: YQL_ENSURE(false, "Unexpected frame setting");
- }
-
- TNodePtr node = frame.Bound;
- TPosition pos = frame.Pos;
- if (frameType != EFrameType::FrameByRows) {
- TVector<TNodePtr> settings;
- settings.push_back(BuildQuotedAtom(pos, settingStr, TNodeFlags::Default));
- if (frame.Settings != FrameCurrentRow) {
- if (!node) {
- node = BuildQuotedAtom(pos, "unbounded", TNodeFlags::Default);
- } else if (!node->IsLiteral()) {
- node = new TYqlFrameBound(pos, node);
- }
- settings.push_back(std::move(node));
- }
- return BuildTuple(pos, std::move(settings));
- }
-
- // TODO: switch FrameByRows to common format above
- YQL_ENSURE(frame.Settings != FrameCurrentRow, "Should be already replaced by 0 preceding/following");
- if (!node) {
- node = BuildLiteralVoid(pos);
- } else if (node->IsLiteral()) {
- YQL_ENSURE(node->GetLiteralType() == "Int32");
- i32 value = FromString<i32>(node->GetLiteralValue());
- YQL_ENSURE(value >= 0);
- if (frame.Settings == FramePreceding) {
- value = -value;
- }
- node = new TCallNodeImpl(pos, "Int32", { BuildQuotedAtom(pos, ToString(value), TNodeFlags::Default) });
- } else {
- if (frame.Settings == FramePreceding) {
- node = new TCallNodeImpl(pos, "Minus", { node->Clone() });
- }
- node = new TYqlFrameBound(pos, node);
- }
- return node;
-}
-
-TNodePtr ISource::BuildWindowFrame(const TFrameSpecification& spec, bool isCompact) {
- YQL_ENSURE(spec.FrameExclusion == FrameExclNone);
- YQL_ENSURE(spec.FrameBegin);
- YQL_ENSURE(spec.FrameEnd);
-
- auto frameBeginNode = BuildFrameNode(*spec.FrameBegin, spec.FrameType);
- auto frameEndNode = BuildFrameNode(*spec.FrameEnd, spec.FrameType);
-
- auto begin = Q(Y(Q("begin"), frameBeginNode));
- auto end = Q(Y(Q("end"), frameEndNode));
-
- return isCompact ? Q(Y(begin, end, Q(Y(Q("compact"))))) : Q(Y(begin, end));
-}
-
-class TSessionWindowTraits final: public TCallNode {
-public:
- TSessionWindowTraits(TPosition pos, const TVector<TNodePtr>& args)
- : TCallNode(pos, "SessionWindowTraits", args)
- , FakeSource(BuildFakeSource(pos))
- {
- YQL_ENSURE(args.size() == 4);
- }
-
- bool DoInit(TContext& ctx, ISource* src) override {
- if (!ValidateArguments(ctx)) {
- return false;
- }
-
- if (!Args.back()->Init(ctx, FakeSource.Get())) {
- return false;
- }
-
- return TCallNode::DoInit(ctx, src);
- }
-
- TNodePtr DoClone() const final {
- return new TSessionWindowTraits(Pos, CloneContainer(Args));
- }
-private:
- TSourcePtr FakeSource;
-};
-
-TNodePtr ISource::BuildCalcOverWindow(TContext& ctx, const TString& label) {
- YQL_ENSURE(IsCalcOverWindow());
-
- TSet<TString> usedWindows;
- for (auto& it : AggregationOverWindow) {
- usedWindows.insert(it.first);
- }
- for (auto& it : FuncOverWindow) {
- usedWindows.insert(it.first);
- }
- for (auto& it : WinSpecs) {
- if (it.second->Session) {
- usedWindows.insert(it.first);
- }
- }
-
- YQL_ENSURE(!usedWindows.empty());
-
- const bool onePartition = usedWindows.size() == 1;
- const auto useLabel = onePartition ? label : "partitioning";
- const auto listType = Y("TypeOf", useLabel);
- auto framesProcess = Y();
- auto resultNode = onePartition ? Y() : Y(Y("let", "partitioning", label));
-
- for (const auto& name : usedWindows) {
- auto spec = FindWindowSpecification(ctx, name);
- YQL_ENSURE(spec);
-
- auto aggsIter = AggregationOverWindow.find(name);
- auto funcsIter = FuncOverWindow.find(name);
-
- const auto& aggs = (aggsIter == AggregationOverWindow.end()) ? TVector<TAggregationPtr>() : aggsIter->second;
- const auto& funcs = (funcsIter == FuncOverWindow.end()) ? TVector<TNodePtr>() : funcsIter->second;
-
- auto frames = Y();
- TString frameType;
- switch (spec->Frame->FrameType) {
- case EFrameType::FrameByRows: frameType = "WinOnRows"; break;
- case EFrameType::FrameByRange: frameType = "WinOnRange"; break;
- case EFrameType::FrameByGroups: frameType = "WinOnGroups"; break;
- }
- YQL_ENSURE(frameType);
- auto callOnFrame = Y(frameType, BuildWindowFrame(*spec->Frame, spec->IsCompact));
- for (auto& agg : aggs) {
- auto winTraits = agg->WindowTraits(listType, ctx);
- callOnFrame = L(callOnFrame, winTraits);
- }
- for (auto& func : funcs) {
- auto winSpec = func->WindowSpecFunc(listType);
- callOnFrame = L(callOnFrame, winSpec);
- }
- frames = L(frames, callOnFrame);
-
- auto keysTuple = Y();
- for (const auto& key: spec->Partitions) {
- if (!dynamic_cast<TSessionWindow*>(key.Get())) {
- keysTuple = L(keysTuple, AliasOrColumn(key, GetJoin()));
- }
- }
-
- auto sortSpec = spec->OrderBy.empty() ? Y("Void") : BuildSortSpec(spec->OrderBy, useLabel, true, false);
- if (spec->Session) {
- TString label = spec->Session->GetLabel();
- YQL_ENSURE(label);
- auto sessionWindow = dynamic_cast<TSessionWindow*>(spec->Session.Get());
- YQL_ENSURE(sessionWindow);
- auto labelNode = BuildQuotedAtom(sessionWindow->GetPos(), label);
-
- auto sessionTraits = sessionWindow->BuildTraits(useLabel);
- framesProcess = Y("CalcOverSessionWindow", useLabel, Q(keysTuple), sortSpec, Q(frames), sessionTraits, Q(Y(labelNode)));
- } else {
- YQL_ENSURE(aggs || funcs);
- framesProcess = Y("CalcOverWindow", useLabel, Q(keysTuple), sortSpec, Q(frames));
- }
-
- if (!onePartition) {
- resultNode = L(resultNode, Y("let", "partitioning", framesProcess));
- }
- }
- if (onePartition) {
- return framesProcess;
- } else {
- return Y("block", Q(L(resultNode, Y("return", "partitioning"))));
- }
-}
-
-TNodePtr ISource::BuildSort(TContext& ctx, const TString& label) {
- Y_UNUSED(ctx);
- Y_UNUSED(label);
- return nullptr;
-}
-
-TNodePtr ISource::BuildCleanupColumns(TContext& ctx, const TString& label) {
- Y_UNUSED(ctx);
- Y_UNUSED(label);
- return nullptr;
-}
-
-IJoin* ISource::GetJoin() {
- return nullptr;
-}
-
-ISource* ISource::GetCompositeSource() {
- return nullptr;
-}
-
-bool ISource::IsSelect() const {
- return true;
-}
-
-bool ISource::IsTableSource() const {
- return false;
-}
-
-bool ISource::ShouldUseSourceAsColumn(const TString& source) const {
- Y_UNUSED(source);
- return false;
-}
-
-bool ISource::IsJoinKeysInitializing() const {
- return false;
-}
-
-bool ISource::DoInit(TContext& ctx, ISource* src) {
- for (auto& column: Expressions(EExprSeat::FlattenBy)) {
- if (!column->Init(ctx, this)) {
- return false;
- }
- }
-
- if (IsFlattenColumns() && src) {
- src->AllColumns();
- }
-
- return true;
-}
-
-bool ISource::InitFilters(TContext& ctx) {
- for (auto& filter: Filters) {
- if (!filter->Init(ctx, this)) {
- return false;
- }
- if (filter->IsAggregated() && !filter->IsConstant() && !filter->HasState(ENodeState::AggregationKey)) {
- ctx.Error(filter->GetPos()) << "Can not use aggregated values in filtering";
- return false;
- }
- }
- return true;
-}
-
-TAstNode* ISource::Translate(TContext& ctx) const {
- Y_VERIFY_DEBUG(false);
- Y_UNUSED(ctx);
- return nullptr;
-}
-
-void ISource::FillSortParts(const TVector<TSortSpecificationPtr>& orderBy, TNodePtr& sortDirection, TNodePtr& sortKeySelector) {
- TNodePtr expr;
- if (orderBy.empty()) {
- YQL_ENSURE(!sortKeySelector);
- sortDirection = sortKeySelector = Y("Void");
- return;
- } else if (orderBy.size() == 1) {
- auto& sortSpec = orderBy.front();
- expr = Y("PersistableRepr", sortSpec->OrderExpr);
- sortDirection = Y("Bool", Q(sortSpec->Ascending ? "true" : "false"));
- } else {
- auto exprList = Y();
- sortDirection = Y();
- for (const auto& sortSpec: orderBy) {
- const auto asc = sortSpec->Ascending;
- sortDirection = L(sortDirection, Y("Bool", Q(asc ? "true" : "false")));
- exprList = L(exprList, Y("PersistableRepr", sortSpec->OrderExpr));
- }
- sortDirection = Q(sortDirection);
- expr = Q(exprList);
- }
- sortKeySelector = BuildLambda(Pos, Y("row"), expr);
-}
-
-TNodePtr ISource::BuildSortSpec(const TVector<TSortSpecificationPtr>& orderBy, const TString& label, bool traits, bool assume) {
- YQL_ENSURE(!orderBy.empty());
- TNodePtr dirsNode;
- TNodePtr keySelectorNode;
- FillSortParts(orderBy, dirsNode, keySelectorNode);
- if (traits) {
- return Y("SortTraits", Y("TypeOf", label), dirsNode, keySelectorNode);
- } else if (assume) {
- return Y("AssumeSorted", label, dirsNode, keySelectorNode);
- } else {
- return Y("Sort", label, dirsNode, keySelectorNode);
- }
-}
-
-IJoin::IJoin(TPosition pos)
- : ISource(pos)
-{
-}
-
-IJoin::~IJoin()
-{
-}
-
-IJoin* IJoin::GetJoin() {
- return this;
-}
-
namespace {
bool UnescapeQuoted(const TString& str, TPosition& pos, char quoteChar, TString& result, TString& error) {
result = error = {};
diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h
index d49eee34e7..15d28dc9c7 100644
--- a/ydb/library/yql/sql/v1/node.h
+++ b/ydb/library/yql/sql/v1/node.h
@@ -510,24 +510,6 @@ namespace NSQLTranslationV1 {
TString Repr;
};
- typedef TIntrusivePtr<ISource> TSourcePtr;
-
- struct TTableRef {
- TString RefName;
- TString Service;
- TDeferredAtom Cluster;
- TNodePtr Keys;
- TNodePtr Options;
- TSourcePtr Source;
-
- TTableRef() = default;
- TTableRef(const TString& refName, const TString& service, const TDeferredAtom& cluster, TNodePtr keys);
- TTableRef(const TTableRef&) = default;
- TTableRef& operator=(const TTableRef&) = default;
-
- TString ShortName() const;
- };
-
struct TTopicRef {
TString RefName;
TDeferredAtom Cluster;
@@ -653,8 +635,6 @@ namespace NSQLTranslationV1 {
typedef TIntrusivePtr<TWindowSpecification> TWindowSpecificationPtr;
typedef TMap<TString, TWindowSpecificationPtr> TWinSpecs;
- typedef TVector<TTableRef> TTableList;
-
void WarnIfAliasFromSelectIsUsedInGroupBy(TContext& ctx, const TVector<TNodePtr>& selectTerms, const TVector<TNodePtr>& groupByTerms,
const TVector<TNodePtr>& groupByExprTerms);
bool ValidateAllNodesForAggregation(TContext& ctx, const TVector<TNodePtr>& nodes);
@@ -851,153 +831,6 @@ namespace NSQLTranslationV1 {
Passthrough
};
- class IJoin;
- class ISource: public INode {
- public:
- virtual ~ISource();
-
- virtual bool IsFake() const;
- virtual void AllColumns();
- virtual const TColumns* GetColumns() const;
- virtual void GetInputTables(TTableList& tableList) const;
- /// in case of error unfilled, flag show if ensure column name
- virtual TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column);
- virtual void FinishColumns();
- virtual bool AddExpressions(TContext& ctx, const TVector<TNodePtr>& columns, EExprSeat exprSeat);
- virtual void SetFlattenByMode(const TString& mode);
- virtual void MarkFlattenColumns();
- virtual bool IsFlattenColumns() const;
- virtual bool AddFilter(TContext& ctx, TNodePtr filter);
- virtual bool AddGroupKey(TContext& ctx, const TString& column);
- virtual void SetCompactGroupBy(bool compactGroupBy);
- virtual void SetGroupBySuffix(const TString& suffix);
- virtual TString MakeLocalName(const TString& name);
- virtual bool AddAggregation(TContext& ctx, TAggregationPtr aggr);
- virtual bool AddFuncOverWindow(TContext& ctx, TNodePtr expr);
- virtual void AddTmpWindowColumn(const TString& column);
- virtual const TVector<TString>& GetTmpWindowColumns() const;
- virtual bool HasAggregations() const;
- virtual void AddWindowSpecs(TWinSpecs winSpecs);
- virtual bool AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func);
- virtual bool AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func);
- virtual void SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec);
- virtual TLegacyHoppingWindowSpecPtr GetLegacyHoppingWindowSpec() const;
- virtual TNodePtr GetSessionWindowSpec() const;
- virtual TNodePtr GetHoppingWindowSpec() const;
- virtual bool IsCompositeSource() const;
- virtual bool IsGroupByColumn(const TString& column) const;
- virtual bool IsFlattenByColumns() const;
- virtual bool IsFlattenByExprs() const;
- virtual bool IsCalcOverWindow() const;
- virtual bool IsOverWindowSource() const;
- virtual bool IsStream() const;
- virtual EOrderKind GetOrderKind() const;
- virtual TWriteSettings GetWriteSettings() const;
- virtual bool SetSamplingOptions(TContext& ctx, TPosition pos, ESampleMode mode, TNodePtr samplingRate, TNodePtr samplingSeed);
- virtual bool SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints);
- virtual bool CalculateGroupingHint(TContext& ctx, const TVector<TString>& columns, ui64& hint) const;
- virtual TNodePtr BuildFilter(TContext& ctx, const TString& label);
- virtual TNodePtr BuildFilterLambda();
- virtual TNodePtr BuildFlattenByColumns(const TString& label);
- virtual TNodePtr BuildFlattenColumns(const TString& label);
- virtual TNodePtr BuildPreaggregatedMap(TContext& ctx);
- virtual TNodePtr BuildPreFlattenMap(TContext& ctx);
- virtual TNodePtr BuildPrewindowMap(TContext& ctx);
- virtual std::pair<TNodePtr, bool> BuildAggregation(const TString& label, TContext& ctx);
- virtual TNodePtr BuildCalcOverWindow(TContext& ctx, const TString& label);
- virtual TNodePtr BuildSort(TContext& ctx, const TString& label);
- virtual TNodePtr BuildCleanupColumns(TContext& ctx, const TString& label);
- virtual bool BuildSamplingLambda(TNodePtr& node);
- virtual bool SetSamplingRate(TContext& ctx, TNodePtr samplingRate);
- virtual IJoin* GetJoin();
- virtual ISource* GetCompositeSource();
- virtual bool IsSelect() const;
- virtual bool IsTableSource() const;
- virtual bool ShouldUseSourceAsColumn(const TString& source) const;
- virtual bool IsJoinKeysInitializing() const;
- virtual const TString* GetWindowName() const;
-
- virtual bool DoInit(TContext& ctx, ISource* src);
- virtual TNodePtr Build(TContext& ctx) = 0;
-
- virtual TMaybe<TString> FindColumnMistype(const TString& name) const;
-
- virtual bool InitFilters(TContext& ctx);
- void AddDependentSource(ISource* usedSource);
- bool IsAlias(EExprSeat exprSeat, const TString& label) const;
- bool IsExprAlias(const TString& label) const;
- bool IsExprSeat(EExprSeat exprSeat, EExprType type = EExprType::WithExpression) const;
- TString GetGroupByColumnAlias(const TString& column) const;
- const TVector<TNodePtr>& Expressions(EExprSeat exprSeat) const;
-
- virtual TWindowSpecificationPtr FindWindowSpecification(TContext& ctx, const TString& windowName) const;
-
- TIntrusivePtr<ISource> CloneSource() const;
-
- protected:
- ISource(TPosition pos);
- virtual TAstNode* Translate(TContext& ctx) const;
-
- void FillSortParts(const TVector<TSortSpecificationPtr>& orderBy, TNodePtr& sortKeySelector, TNodePtr& sortDirection);
- TNodePtr BuildSortSpec(const TVector<TSortSpecificationPtr>& orderBy, const TString& label, bool traits, bool assume);
-
- TVector<TNodePtr>& Expressions(EExprSeat exprSeat);
- TNodePtr AliasOrColumn(const TNodePtr& node, bool withSource);
-
- TNodePtr BuildWindowFrame(const TFrameSpecification& spec, bool isCompact);
-
- THashSet<TString> ExprAliases;
- THashSet<TString> FlattenByAliases;
- THashMap<TString, TString> GroupByColumnAliases;
- TVector<TNodePtr> Filters;
- bool CompactGroupBy = false;
- TString GroupBySuffix;
- TSet<TString> GroupKeys;
- TVector<TString> OrderedGroupKeys;
- std::array<TVector<TNodePtr>, static_cast<unsigned>(EExprSeat::Max)> NamedExprs;
- TVector<TAggregationPtr> Aggregations;
- TMap<TString, TVector<TAggregationPtr>> AggregationOverWindow;
- TMap<TString, TVector<TNodePtr>> FuncOverWindow;
- TWinSpecs WinSpecs;
- TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec;
- TNodePtr SessionWindow;
- TNodePtr HoppingWindow;
- TVector<ISource*> UsedSources;
- TString FlattenMode;
- bool FlattenColumns = false;
- THashMap<TString, ui32> GenIndexes;
- TVector<TString> TmpWindowColumns;
- TNodePtr SamplingRate;
- };
-
- template<>
- inline TVector<TSourcePtr> CloneContainer<TSourcePtr>(const TVector<TSourcePtr>& args) {
- TVector<TSourcePtr> cloneArgs;
- cloneArgs.reserve(args.size());
- for (const auto& arg: args) {
- cloneArgs.emplace_back(arg ? arg->CloneSource() : nullptr);
- }
- return cloneArgs;
- }
-
- struct TJoinLinkSettings {
- bool ForceSortedMerge = false;
- };
-
- class IJoin: public ISource {
- public:
- virtual ~IJoin();
-
- virtual IJoin* GetJoin();
- virtual TNodePtr BuildJoinKeys(TContext& ctx, const TVector<TDeferredAtom>& names) = 0;
- virtual void SetupJoin(const TString& joinOp, TNodePtr joinExpr, const TJoinLinkSettings& linkSettings) = 0;
- virtual const THashMap<TString, THashSet<TString>>& GetSameKeysMap() const = 0;
- virtual TVector<TString> GetJoinLabels() const = 0;
-
- protected:
- IJoin(TPosition pos);
- };
-
class TListOfNamedNodes final: public INode {
public:
TListOfNamedNodes(TPosition pos, TVector<TNodePtr>&& exprs);
@@ -1075,46 +908,6 @@ namespace NSQLTranslationV1 {
TNodePtr Node;
};
- class TSessionWindow final : public INode {
- public:
- TSessionWindow(TPosition pos, const TVector<TNodePtr>& args);
- void MarkValid();
- TNodePtr BuildTraits(const TString& label) const;
- private:
- bool DoInit(TContext& ctx, ISource* src) override;
- TAstNode* Translate(TContext&) const override;
- void DoUpdateState() const override;
- TNodePtr DoClone() const override;
- TString GetOpName() const override;
-
- TVector<TNodePtr> Args;
- TSourcePtr FakeSource;
- TNodePtr Node;
- bool Valid;
- };
-
- class THoppingWindow final : public INode {
- public:
- THoppingWindow(TPosition pos, const TVector<TNodePtr>& args);
- void MarkValid();
- TNodePtr BuildTraits(const TString& label) const;
- public:
- TNodePtr Hop;
- TNodePtr Interval;
- private:
- bool DoInit(TContext& ctx, ISource* src) override;
- TAstNode* Translate(TContext&) const override;
- void DoUpdateState() const override;
- TNodePtr DoClone() const override;
- TString GetOpName() const override;
- TNodePtr ProcessIntervalParam(const TNodePtr& val) const;
-
- TVector<TNodePtr> Args;
- TSourcePtr FakeSource;
- TNodePtr Node;
- bool Valid;
- };
-
struct TStringContent {
TString Content;
NYql::NUdf::EDataSlot Type = NYql::NUdf::EDataSlot::String;
@@ -1399,83 +1192,7 @@ namespace NSQLTranslationV1 {
bool warnOnYqlNameSpace = true
);
- // Implemented in join.cpp
- TString NormalizeJoinOp(const TString& joinOp);
- TSourcePtr BuildEquiJoin(TPosition pos, TVector<TSourcePtr>&& sources, TVector<bool>&& anyFlags, bool strictJoinKeyTypes);
-
- // Implemented in select.cpp
- TNodePtr BuildSubquery(TSourcePtr source, const TString& alias, bool inSubquery, int ensureTupleSize, TScopedStatePtr scoped);
- TNodePtr BuildSubqueryRef(TNodePtr subquery, const TString& alias, int tupleIndex = -1);
- TNodePtr BuildInvalidSubqueryRef(TPosition subqueryPos);
- TNodePtr BuildSourceNode(TPosition pos, TSourcePtr source, bool checkExist = false);
- TSourcePtr BuildMuxSource(TPosition pos, TVector<TSourcePtr>&& sources);
- TSourcePtr BuildFakeSource(TPosition pos, bool missingFrom = false, bool inSubquery = false);
- TSourcePtr BuildNodeSource(TPosition pos, const TNodePtr& node, bool wrapToList = false);
- TSourcePtr BuildTableSource(TPosition pos, const TTableRef& table, const TString& label = TString());
- TSourcePtr BuildInnerSource(TPosition pos, TNodePtr node, const TString& service, const TDeferredAtom& cluster, const TString& label = TString());
- TSourcePtr BuildRefColumnSource(TPosition pos, const TString& partExpression);
- TSourcePtr BuildUnionAll(TPosition pos, TVector<TSourcePtr>&& sources, const TWriteSettings& settings);
- TSourcePtr BuildOverWindowSource(TPosition pos, const TString& windowName, ISource* origSource);
-
- TNodePtr BuildOrderBy(TPosition pos, const TVector<TNodePtr>& keys, const TVector<bool>& order);
- TNodePtr BuildSkipTake(TPosition pos, const TNodePtr& skip, const TNodePtr& take);
-
-
- TSourcePtr BuildSelectCore(
- TContext& ctx,
- TPosition pos,
- TSourcePtr source,
- const TVector<TNodePtr>& groupByExpr,
- const TVector<TNodePtr>& groupBy,
- bool compactGroupBy,
- const TString& groupBySuffix,
- bool assumeSorted,
- const TVector<TSortSpecificationPtr>& orderBy,
- TNodePtr having,
- TWinSpecs&& windowSpec,
- TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec,
- TVector<TNodePtr>&& terms,
- bool distinct,
- TVector<TNodePtr>&& without,
- bool selectStream,
- const TWriteSettings& settings
- );
- TSourcePtr BuildSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake);
-
-
- enum class ReduceMode {
- ByPartition,
- ByAll,
- };
- TSourcePtr BuildReduce(TPosition pos, ReduceMode mode, TSourcePtr source, TVector<TSortSpecificationPtr>&& orderBy,
- TVector<TNodePtr>&& keys, TVector<TNodePtr>&& args, TNodePtr udf, TNodePtr having, const TWriteSettings& settings,
- const TVector<TSortSpecificationPtr>& assumeOrderBy, bool listCall);
- TSourcePtr BuildProcess(TPosition pos, TSourcePtr source, TNodePtr with, bool withExtFunction, TVector<TNodePtr>&& terms, bool listCall,
- bool prcessStream, const TWriteSettings& settings, const TVector<TSortSpecificationPtr>& assumeOrderBy);
-
- TNodePtr BuildSelectResult(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery, TScopedStatePtr scoped);
-
- // Implemented in insert.cpp
- TSourcePtr BuildWriteValues(TPosition pos, const TString& opertationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values);
- TSourcePtr BuildWriteValues(TPosition pos, const TString& opertationHumanName, const TVector<TString>& columnsHint, TSourcePtr source);
- TSourcePtr BuildUpdateValues(TPosition pos, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values);
-
- EWriteColumnMode ToWriteColumnsMode(ESQLWriteColumnMode sqlWriteColumnMode);
- TNodePtr BuildEraseColumns(TPosition pos, const TVector<TString>& columns);
- TNodePtr BuildIntoTableOptions(TPosition pos, const TVector<TString>& eraseColumns, const TTableHints& hints);
- TNodePtr BuildWriteColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, EWriteColumnMode mode, TSourcePtr values, TNodePtr options = nullptr);
- TNodePtr BuildUpdateColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr values, TSourcePtr source);
- TNodePtr BuildDelete(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr source);
-
// Implemented in query.cpp
- TNodePtr BuildTableKey(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TDeferredAtom& name, const TString& view);
- TNodePtr BuildTableKeys(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TString& func, const TVector<TTableArg>& args);
- TNodePtr BuildTopicKey(TPosition pos, const TDeferredAtom& cluster, const TDeferredAtom& name);
- TNodePtr BuildInputOptions(TPosition pos, const TTableHints& hints);
- TNodePtr BuildInputTables(TPosition pos, const TTableList& tables, bool inSubquery, TScopedStatePtr scoped);
- TNodePtr BuildCreateTable(TPosition pos, const TTableRef& tr, const TCreateTableParameters& params, TScopedStatePtr scoped);
- TNodePtr BuildAlterTable(TPosition pos, const TTableRef& tr, const TAlterTableParameters& params, TScopedStatePtr scoped);
- TNodePtr BuildDropTable(TPosition pos, const TTableRef& table, ETableType tableType, TScopedStatePtr scoped);
TNodePtr BuildCreateUser(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TDeferredAtom& name, const TMaybe<TRoleParameters>& params, TScopedStatePtr scoped);
TNodePtr BuildCreateGroup(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TDeferredAtom& name, TScopedStatePtr scoped);
TNodePtr BuildAlterUser(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TDeferredAtom& name, const TRoleParameters& params, TScopedStatePtr scoped);
@@ -1499,8 +1216,6 @@ namespace NSQLTranslationV1 {
std::map<TString, TNodePtr>&& settings,
const TObjectOperatorContext& context);
TNodePtr BuildDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context);
- TNodePtr BuildWriteTable(TPosition pos, const TString& label, const TTableRef& table, EWriteColumnMode mode, TNodePtr options,
- TScopedStatePtr scoped);
TNodePtr BuildWriteResult(TPosition pos, const TString& label, TNodePtr settings);
TNodePtr BuildCommitClusters(TPosition pos);
TNodePtr BuildRollbackClusters(TPosition pos);
@@ -1528,9 +1243,5 @@ namespace NSQLTranslationV1 {
bool Parseui32(TNodePtr from, ui32& to);
TNodePtr GroundWithExpr(const TNodePtr& ground, const TNodePtr& expr);
- TSourcePtr TryMakeSourceFromExpression(TContext& ctx, const TString& currService, const TDeferredAtom& currCluster,
- TNodePtr node, const TString& view = {});
- void MakeTableFromExpression(TContext& ctx, TNodePtr node, TDeferredAtom& table, const TString& prefix = {});
- TDeferredAtom MakeAtomFromExpression(TContext& ctx, TNodePtr node, const TString& prefix = {});
- TString NormalizeTypeString(const TString& str);
+ const TString* DeriveCommonSourceName(const TVector<TNodePtr> &nodes);
} // namespace NSQLTranslationV1
diff --git a/ydb/library/yql/sql/v1/select.cpp b/ydb/library/yql/sql/v1/select.cpp
index 65d7a3624c..28228731d3 100644
--- a/ydb/library/yql/sql/v1/select.cpp
+++ b/ydb/library/yql/sql/v1/select.cpp
@@ -1,5 +1,5 @@
#include "sql.h"
-#include "node.h"
+#include "source.h"
#include "context.h"
@@ -1660,7 +1660,6 @@ public:
return false;
}
}
-
return true;
}
diff --git a/ydb/library/yql/sql/v1/source.cpp b/ydb/library/yql/sql/v1/source.cpp
new file mode 100644
index 0000000000..8c696ce22b
--- /dev/null
+++ b/ydb/library/yql/sql/v1/source.cpp
@@ -0,0 +1,949 @@
+#include "source.h"
+#include "context.h"
+
+#include <ydb/library/yql/ast/yql_ast_escaping.h>
+#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/core/sql_types/simple_types.h>
+#include <ydb/library/yql/minikql/mkql_type_ops.h>
+#include <ydb/library/yql/parser/pg_catalog/catalog.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <library/cpp/containers/stack_vector/stack_vec.h>
+#include <library/cpp/charset/ci_string.h>
+#include <util/generic/hash_set.h>
+#include <util/stream/str.h>
+#include <util/string/cast.h>
+#include <util/string/escape.h>
+#include <util/string/subst.h>
+
+using namespace NYql;
+
+namespace NSQLTranslationV1 {
+
+
+TTableRef::TTableRef(const TString& refName, const TString& service, const TDeferredAtom& cluster, TNodePtr keys)
+ : RefName(refName)
+ , Service(to_lower(service))
+ , Cluster(cluster)
+ , Keys(keys)
+{
+}
+
+TString TTableRef::ShortName() const {
+ Y_VERIFY_DEBUG(Keys);
+ if (Keys->GetTableKeys()->GetTableName()) {
+ return *Keys->GetTableKeys()->GetTableName();
+ }
+ return TString();
+}
+
+ISource::ISource(TPosition pos)
+ : INode(pos)
+{
+}
+
+ISource::~ISource()
+{
+}
+
+TSourcePtr ISource::CloneSource() const {
+ Y_VERIFY_DEBUG(dynamic_cast<ISource*>(Clone().Get()), "Cloned node is no source");
+ TSourcePtr result = static_cast<ISource*>(Clone().Get());
+ for (auto curFilter: Filters) {
+ result->Filters.emplace_back(curFilter->Clone());
+ }
+ for (int i = 0; i < static_cast<int>(EExprSeat::Max); ++i) {
+ result->NamedExprs[i] = CloneContainer(NamedExprs[i]);
+ }
+ result->FlattenColumns = FlattenColumns;
+ result->FlattenMode = FlattenMode;
+ return result;
+}
+
+bool ISource::IsFake() const {
+ return false;
+}
+
+void ISource::AllColumns() {
+ return;
+}
+
+const TColumns* ISource::GetColumns() const {
+ return nullptr;
+}
+
+void ISource::GetInputTables(TTableList& tableList) const {
+ for (auto srcPtr: UsedSources) {
+ srcPtr->GetInputTables(tableList);
+ }
+ return;
+}
+
+TMaybe<bool> ISource::AddColumn(TContext& ctx, TColumnNode& column) {
+ if (column.IsReliable()) {
+ ctx.Error(Pos) << "Source does not allow column references";
+ ctx.Error(column.GetPos()) << "Column reference " <<
+ (column.GetColumnName() ? "'" + *column.GetColumnName() + "'" : "(expr)");
+ }
+ return {};
+}
+
+void ISource::FinishColumns() {
+}
+
+
+bool ISource::AddFilter(TContext& ctx, TNodePtr filter) {
+ Y_UNUSED(ctx);
+ Filters.push_back(filter);
+ return true;
+}
+
+bool ISource::AddGroupKey(TContext& ctx, const TString& column) {
+ if (!GroupKeys.insert(column).second) {
+ ctx.Error() << "Duplicate grouping column: " << column;
+ return false;
+ }
+ OrderedGroupKeys.push_back(column);
+ return true;
+}
+
+void ISource::SetCompactGroupBy(bool compactGroupBy) {
+ CompactGroupBy = compactGroupBy;
+}
+
+void ISource::SetGroupBySuffix(const TString& suffix) {
+ GroupBySuffix = suffix;
+}
+
+bool ISource::AddExpressions(TContext& ctx, const TVector<TNodePtr>& expressions, EExprSeat exprSeat) {
+ YQL_ENSURE(exprSeat < EExprSeat::Max);
+ THashSet<TString> names;
+ THashSet<TString> aliasSet;
+ // TODO: merge FlattenBy with FlattenByExpr
+ const bool isFlatten = (exprSeat == EExprSeat::FlattenBy || exprSeat == EExprSeat::FlattenByExpr);
+ THashSet<TString>& aliases = isFlatten ? FlattenByAliases : aliasSet;
+ for (const auto& expr: expressions) {
+ const auto& alias = expr->GetLabel();
+ const auto& columnNamePtr = expr->GetColumnName();
+ if (alias) {
+ ExprAliases.insert(alias);
+ if (!aliases.emplace(alias).second) {
+ ctx.Error(expr->GetPos()) << "Duplicate alias found: " << alias << " in " << exprSeat << " section";
+ return false;
+ }
+ if (names.contains(alias)) {
+ ctx.Error(expr->GetPos()) << "Collision between alias and column name: " << alias << " in " << exprSeat << " section";
+ return false;
+ }
+ }
+ if (columnNamePtr) {
+ const auto& sourceName = *expr->GetSourceName();
+ auto columnName = *columnNamePtr;
+ if (sourceName) {
+ columnName = DotJoin(sourceName, columnName);
+ }
+ if (!names.emplace(columnName).second) {
+ ctx.Error(expr->GetPos()) << "Duplicate column name found: " << columnName << " in " << exprSeat << " section";
+ return false;
+ }
+ if (!alias && aliases.contains(columnName)) {
+ ctx.Error(expr->GetPos()) << "Collision between alias and column name: " << columnName << " in " << exprSeat << " section";
+ return false;
+ }
+ if (alias && exprSeat == EExprSeat::GroupBy) {
+ auto columnAlias = GroupByColumnAliases.emplace(columnName, alias);
+ auto oldAlias = columnAlias.first->second;
+ if (columnAlias.second && oldAlias != alias) {
+ ctx.Error(expr->GetPos()) << "Alias for column not same, column: " << columnName <<
+ ", exist alias: " << oldAlias << ", another alias: " << alias;
+ return false;
+ }
+ }
+ }
+
+ if (exprSeat == EExprSeat::GroupBy) {
+ if (auto sessionWindow = dynamic_cast<TSessionWindow*>(expr.Get())) {
+ if (SessionWindow) {
+ ctx.Error(expr->GetPos()) << "Duplicate session window specification:";
+ ctx.Error(SessionWindow->GetPos()) << "Previous session window is declared here";
+ return false;
+ }
+ SessionWindow = expr;
+ }
+ if (auto hoppingWindow = dynamic_cast<THoppingWindow*>(expr.Get())) {
+ if (HoppingWindow) {
+ ctx.Error(expr->GetPos()) << "Duplicate hopping window specification:";
+ ctx.Error(HoppingWindow->GetPos()) << "Previous hopping window is declared here";
+ return false;
+ }
+ HoppingWindow = expr;
+ }
+ }
+ Expressions(exprSeat).emplace_back(expr);
+ }
+ return true;
+}
+
+void ISource::SetFlattenByMode(const TString& mode) {
+ FlattenMode = mode;
+}
+
+void ISource::MarkFlattenColumns() {
+ FlattenColumns = true;
+}
+
+bool ISource::IsFlattenColumns() const {
+ return FlattenColumns;
+}
+
+TString ISource::MakeLocalName(const TString& name) {
+ auto iter = GenIndexes.find(name);
+ if (iter == GenIndexes.end()) {
+ iter = GenIndexes.emplace(name, 0).first;
+ }
+ TStringBuilder str;
+ str << name << iter->second;
+ ++iter->second;
+ return std::move(str);
+}
+
+bool ISource::AddAggregation(TContext& ctx, TAggregationPtr aggr) {
+ Y_UNUSED(ctx);
+ YQL_ENSURE(aggr);
+ Aggregations.push_back(aggr);
+ return true;
+}
+
+bool ISource::HasAggregations() const {
+ return !Aggregations.empty() || !GroupKeys.empty();
+}
+
+void ISource::AddWindowSpecs(TWinSpecs winSpecs) {
+ WinSpecs = winSpecs;
+}
+
+bool ISource::AddFuncOverWindow(TContext& ctx, TNodePtr expr) {
+ Y_UNUSED(ctx);
+ Y_UNUSED(expr);
+ return false;
+}
+
+void ISource::AddTmpWindowColumn(const TString& column) {
+ TmpWindowColumns.push_back(column);
+}
+
+const TVector<TString>& ISource::GetTmpWindowColumns() const {
+ return TmpWindowColumns;
+}
+
+void ISource::SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec) {
+ LegacyHoppingWindowSpec = spec;
+}
+
+TLegacyHoppingWindowSpecPtr ISource::GetLegacyHoppingWindowSpec() const {
+ return LegacyHoppingWindowSpec;
+}
+
+TNodePtr ISource::GetSessionWindowSpec() const {
+ return SessionWindow;
+}
+
+TNodePtr ISource::GetHoppingWindowSpec() const {
+ return HoppingWindow;
+}
+
+TWindowSpecificationPtr ISource::FindWindowSpecification(TContext& ctx, const TString& windowName) const {
+ auto winIter = WinSpecs.find(windowName);
+ if (winIter == WinSpecs.end()) {
+ ctx.Error(Pos) << "Unable to find window specification for window '" << windowName << "'";
+ return {};
+ }
+ YQL_ENSURE(winIter->second);
+ return winIter->second;
+}
+
+inline TVector<TNodePtr>& ISource::Expressions(EExprSeat exprSeat) {
+ return NamedExprs[static_cast<size_t>(exprSeat)];
+}
+
+const TVector<TNodePtr>& ISource::Expressions(EExprSeat exprSeat) const {
+ return NamedExprs[static_cast<size_t>(exprSeat)];
+}
+
+inline TNodePtr ISource::AliasOrColumn(const TNodePtr& node, bool withSource) {
+ auto result = node->GetLabel();
+ if (!result) {
+ const auto columnNamePtr = node->GetColumnName();
+ YQL_ENSURE(columnNamePtr);
+ result = *columnNamePtr;
+ if (withSource) {
+ const auto sourceNamePtr = node->GetSourceName();
+ if (sourceNamePtr) {
+ result = DotJoin(*sourceNamePtr, result);
+ }
+ }
+ }
+ return BuildQuotedAtom(node->GetPos(), result);
+}
+
+bool ISource::AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func) {
+ YQL_ENSURE(func->IsOverWindow());
+ if (func->IsDistinct()) {
+ ctx.Error(func->GetPos()) << "Aggregation with distinct is not allowed over window: " << windowName;
+ return false;
+ }
+ if (!FindWindowSpecification(ctx, windowName)) {
+ return false;
+ }
+ AggregationOverWindow[windowName].emplace_back(std::move(func));
+ return true;
+}
+
+bool ISource::AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func) {
+ if (!FindWindowSpecification(ctx, windowName)) {
+ return false;
+ }
+ FuncOverWindow[windowName].emplace_back(std::move(func));
+ return true;
+}
+
+bool ISource::IsCompositeSource() const {
+ return false;
+}
+
+bool ISource::IsGroupByColumn(const TString& column) const {
+ return GroupKeys.contains(column);
+}
+
+bool ISource::IsFlattenByColumns() const {
+ return !Expressions(EExprSeat::FlattenBy).empty();
+}
+
+bool ISource::IsFlattenByExprs() const {
+ return !Expressions(EExprSeat::FlattenByExpr).empty();
+}
+
+bool ISource::IsAlias(EExprSeat exprSeat, const TString& column) const {
+ for (const auto& exprNode: Expressions(exprSeat)) {
+ const auto& labelName = exprNode->GetLabel();
+ if (labelName && labelName == column) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool ISource::IsExprAlias(const TString& column) const {
+ std::array<EExprSeat, 5> exprSeats = {{EExprSeat::FlattenBy, EExprSeat::FlattenByExpr, EExprSeat::GroupBy,
+ EExprSeat::WindowPartitionBy, EExprSeat::DistinctAggr}};
+ for (auto seat: exprSeats) {
+ if (IsAlias(seat, column)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool ISource::IsExprSeat(EExprSeat exprSeat, EExprType type) const {
+ auto expressions = Expressions(exprSeat);
+ if (!expressions) {
+ return false;
+ }
+ for (const auto& exprNode: expressions) {
+ if (exprNode->GetLabel()) {
+ return type == EExprType::WithExpression;
+ }
+ }
+ return type == EExprType::ColumnOnly;
+}
+
+TString ISource::GetGroupByColumnAlias(const TString& column) const {
+ auto iter = GroupByColumnAliases.find(column);
+ if (iter == GroupByColumnAliases.end()) {
+ return {};
+ }
+ return iter->second;
+}
+
+const TString* ISource::GetWindowName() const {
+ return {};
+}
+
+bool ISource::IsCalcOverWindow() const {
+ return !AggregationOverWindow.empty() || !FuncOverWindow.empty() ||
+ AnyOf(WinSpecs, [](const auto& item) { return item.second->Session; });
+}
+
+bool ISource::IsOverWindowSource() const {
+ return !WinSpecs.empty();
+}
+
+bool ISource::IsStream() const {
+ return false;
+}
+
+EOrderKind ISource::GetOrderKind() const {
+ return EOrderKind::None;
+}
+
+TWriteSettings ISource::GetWriteSettings() const {
+ return {};
+}
+
+bool ISource::SetSamplingOptions(TContext& ctx,
+ TPosition pos,
+ ESampleMode mode,
+ TNodePtr samplingRate,
+ TNodePtr samplingSeed) {
+ Y_UNUSED(pos);
+ Y_UNUSED(mode);
+ Y_UNUSED(samplingRate);
+ Y_UNUSED(samplingSeed);
+ ctx.Error() << "Sampling is only supported for table sources";
+ return false;
+}
+
+bool ISource::SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints) {
+ Y_UNUSED(pos);
+ Y_UNUSED(contextHints);
+ if (hints) {
+ ctx.Error() << "Explicit hints are only supported for table sources";
+ return false;
+ }
+ return true;
+}
+
+bool ISource::CalculateGroupingHint(TContext& ctx, const TVector<TString>& columns, ui64& hint) const {
+ Y_UNUSED(columns);
+ Y_UNUSED(hint);
+ ctx.Error() << "Source not support grouping hint";
+ return false;
+}
+
+TNodePtr ISource::BuildFilter(TContext& ctx, const TString& label) {
+ return Filters.empty() ? nullptr : Y(ctx.UseUnordered(*this) ? "OrderedFilter" : "Filter", label, BuildFilterLambda());
+}
+
+TNodePtr ISource::BuildFilterLambda() {
+ if (Filters.empty()) {
+ return BuildLambda(Pos, Y("row"), Y("Bool", Q("true")));
+ }
+ YQL_ENSURE(Filters[0]->HasState(ENodeState::Initialized));
+ TNodePtr filter(Filters[0]);
+ for (ui32 i = 1; i < Filters.size(); ++i) {
+ YQL_ENSURE(Filters[i]->HasState(ENodeState::Initialized));
+ filter = Y("And", filter, Filters[i]);
+ }
+ filter = Y("Coalesce", filter, Y("Bool", Q("false")));
+ return BuildLambda(Pos, Y("row"), filter);
+}
+
+TNodePtr ISource::BuildFlattenByColumns(const TString& label) {
+ auto columnsList = Y("FlattenByColumns", Q(FlattenMode), label);
+ for (const auto& column: Expressions(EExprSeat::FlattenBy)) {
+ const auto columnNamePtr = column->GetColumnName();
+ YQL_ENSURE(columnNamePtr);
+ if (column->GetLabel().empty()) {
+ columnsList = L(columnsList, Q(*columnNamePtr));
+ } else {
+ columnsList = L(columnsList, Q(Y(Q(*columnNamePtr), Q(column->GetLabel()))));
+ }
+ }
+ return Y(Y("let", "res", columnsList));
+}
+
+TNodePtr ISource::BuildFlattenColumns(const TString& label) {
+ return Y(Y("let", "res", Y("Just", Y("FlattenStructs", label))));
+}
+
+namespace {
+
+TNodePtr BuildLambdaBodyForExprAliases(TPosition pos, const TVector<TNodePtr>& exprs) {
+ auto structObj = BuildAtom(pos, "row", TNodeFlags::Default);
+ for (const auto& exprNode: exprs) {
+ const auto name = exprNode->GetLabel();
+ YQL_ENSURE(name);
+ structObj = structObj->Y("ForceRemoveMember", structObj, structObj->Q(name));
+ if (dynamic_cast<const TSessionWindow*>(exprNode.Get())) {
+ continue;
+ }
+ if (dynamic_cast<const THoppingWindow*>(exprNode.Get())) {
+ continue;
+ }
+ structObj = structObj->Y("AddMember", structObj, structObj->Q(name), exprNode);
+ }
+ return structObj->Y("AsList", structObj);
+}
+
+}
+
+TNodePtr ISource::BuildPreaggregatedMap(TContext& ctx) {
+ Y_UNUSED(ctx);
+ const auto& groupByExprs = Expressions(EExprSeat::GroupBy);
+ const auto& distinctAggrExprs = Expressions(EExprSeat::DistinctAggr);
+ YQL_ENSURE(groupByExprs || distinctAggrExprs);
+
+ TNodePtr res;
+ if (groupByExprs) {
+ auto body = BuildLambdaBodyForExprAliases(Pos, groupByExprs);
+ res = Y("FlatMap", "core", BuildLambda(Pos, Y("row"), body));
+ }
+
+ if (distinctAggrExprs) {
+ auto body = BuildLambdaBodyForExprAliases(Pos, distinctAggrExprs);
+ auto lambda = BuildLambda(Pos, Y("row"), body);
+ res = res ? Y("FlatMap", res, lambda) : Y("FlatMap", "core", lambda);
+ }
+ return res;
+}
+
+TNodePtr ISource::BuildPreFlattenMap(TContext& ctx) {
+ Y_UNUSED(ctx);
+ YQL_ENSURE(IsFlattenByExprs());
+ return BuildLambdaBodyForExprAliases(Pos, Expressions(EExprSeat::FlattenByExpr));
+}
+
+TNodePtr ISource::BuildPrewindowMap(TContext& ctx) {
+ auto feed = BuildAtom(Pos, "row", TNodeFlags::Default);
+ for (const auto& exprNode: Expressions(EExprSeat::WindowPartitionBy)) {
+ const auto name = exprNode->GetLabel();
+ if (name && !dynamic_cast<const TSessionWindow*>(exprNode.Get())) {
+ feed = Y("AddMember", feed, Q(name), exprNode);
+ }
+ }
+ return Y(ctx.UseUnordered(*this) ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), Y("AsList", feed)));
+}
+
+bool ISource::BuildSamplingLambda(TNodePtr& node) {
+ if (!SamplingRate) {
+ return true;
+ }
+ auto res = Y("Coalesce", Y("SafeCast", SamplingRate, Y("DataType", Q("Double"))), Y("Double", Q("0")));
+ res = Y("/", res, Y("Double", Q("100")));
+ res = Y(Y("let", "res", Y("OptionalIf", Y("<", Y("Random", Y("DependsOn", "row")), res), "row")));
+ node = BuildLambda(GetPos(), Y("row"), res, "res");
+ return !!node;
+}
+
+bool ISource::SetSamplingRate(TContext& ctx, TNodePtr samplingRate) {
+ if (samplingRate) {
+ if (!samplingRate->Init(ctx, this)) {
+ return false;
+ }
+ SamplingRate = Y("Ensure", samplingRate, Y(">=", samplingRate, Y("Double", Q("0"))), Y("String", Q("\"Expected sampling rate to be nonnegative\"")));
+ SamplingRate = Y("Ensure", SamplingRate, Y("<=", SamplingRate, Y("Double", Q("100"))), Y("String", Q("\"Sampling rate is over 100%\"")));
+ }
+ return true;
+}
+
+std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TContext& ctx) {
+ if (GroupKeys.empty() && Aggregations.empty() && !IsCompositeSource() && !LegacyHoppingWindowSpec) {
+ return { nullptr, true };
+ }
+
+ auto keysTuple = Y();
+ YQL_ENSURE(GroupKeys.size() == OrderedGroupKeys.size());
+ for (const auto& key: OrderedGroupKeys) {
+ YQL_ENSURE(GroupKeys.contains(key));
+ keysTuple = L(keysTuple, BuildQuotedAtom(Pos, key));
+ }
+
+ std::map<std::pair<bool, TString>, std::vector<IAggregation*>> genericAggrs;
+ for (const auto& aggr: Aggregations) {
+ if (const auto key = aggr->GetGenericKey()) {
+ genericAggrs[{aggr->IsDistinct(), *key}].emplace_back(aggr.Get());
+ }
+ }
+
+ for (const auto& aggr : genericAggrs) {
+ for (size_t i = 1U; i < aggr.second.size(); ++i) {
+ aggr.second.front()->Join(aggr.second[i]);
+ }
+ }
+
+ const auto listType = Y("TypeOf", label);
+ auto aggrArgs = Y();
+ const bool overState = GroupBySuffix == "CombineState" || GroupBySuffix == "MergeState" ||
+ GroupBySuffix == "MergeFinalize" || GroupBySuffix == "MergeManyFinalize";
+ const bool allowAggApply = !LegacyHoppingWindowSpec && !SessionWindow && !HoppingWindow;
+ for (const auto& aggr: Aggregations) {
+ auto res = aggr->AggregationTraits(listType, overState, GroupBySuffix == "MergeManyFinalize", allowAggApply, ctx);
+ if (!res.second) {
+ return { nullptr, false };
+ }
+
+ if (res.first) {
+ aggrArgs = L(aggrArgs, res.first);
+ }
+ }
+
+ auto options = Y();
+ if (CompactGroupBy || GroupBySuffix == "Finalize") {
+ options = L(options, Q(Y(Q("compact"))));
+ }
+
+ if (LegacyHoppingWindowSpec) {
+ auto hoppingTraits = Y(
+ "HoppingTraits",
+ Y("ListItemType", listType),
+ BuildLambda(Pos, Y("row"), LegacyHoppingWindowSpec->TimeExtractor),
+ LegacyHoppingWindowSpec->Hop,
+ LegacyHoppingWindowSpec->Interval,
+ LegacyHoppingWindowSpec->Delay,
+ LegacyHoppingWindowSpec->DataWatermarks ? Q("true") : Q("false"),
+ Q("v1"));
+
+ options = L(options, Q(Y(Q("hopping"), hoppingTraits)));
+ }
+
+ if (SessionWindow) {
+ YQL_ENSURE(SessionWindow->GetLabel());
+ auto sessionWindow = dynamic_cast<TSessionWindow*>(SessionWindow.Get());
+ YQL_ENSURE(sessionWindow);
+ options = L(options, Q(Y(Q("session"),
+ Q(Y(BuildQuotedAtom(Pos, SessionWindow->GetLabel()), sessionWindow->BuildTraits(label))))));
+ }
+
+ if (HoppingWindow) {
+ YQL_ENSURE(HoppingWindow->GetLabel());
+ auto hoppingWindow = dynamic_cast<THoppingWindow*>(HoppingWindow.Get());
+ YQL_ENSURE(hoppingWindow);
+ options = L(options, Q(Y(Q("hopping"),
+ Q(Y(BuildQuotedAtom(Pos, HoppingWindow->GetLabel()), hoppingWindow->BuildTraits(label))))));
+ }
+
+ return { Y("AssumeColumnOrderPartial", Y("Aggregate" + GroupBySuffix, label, Q(keysTuple), Q(aggrArgs), Q(options)), Q(keysTuple)), true };
+}
+
+TMaybe<TString> ISource::FindColumnMistype(const TString& name) const {
+ auto result = FindMistypeIn(GroupKeys, name);
+ return result ? result : FindMistypeIn(ExprAliases, name);
+}
+
+void ISource::AddDependentSource(ISource* usedSource) {
+ UsedSources.push_back(usedSource);
+}
+
+class TYqlFrameBound final: public TCallNode {
+public:
+ TYqlFrameBound(TPosition pos, TNodePtr bound)
+ : TCallNode(pos, "EvaluateExpr", 1, 1, { bound })
+ , FakeSource(BuildFakeSource(pos))
+ {
+ }
+
+ bool DoInit(TContext& ctx, ISource* src) override {
+ if (!ValidateArguments(ctx)) {
+ return false;
+ }
+
+ if (!Args[0]->Init(ctx, FakeSource.Get())) {
+ return false;
+ }
+
+ return TCallNode::DoInit(ctx, src);
+ }
+
+ TNodePtr DoClone() const final {
+ return new TYqlFrameBound(Pos, Args[0]->Clone());
+ }
+private:
+ TSourcePtr FakeSource;
+};
+
+TNodePtr BuildFrameNode(const TFrameBound& frame, EFrameType frameType) {
+ TString settingStr;
+ switch (frame.Settings) {
+ case FramePreceding: settingStr = "preceding"; break;
+ case FrameCurrentRow: settingStr = "currentRow"; break;
+ case FrameFollowing: settingStr = "following"; break;
+ default: YQL_ENSURE(false, "Unexpected frame setting");
+ }
+
+ TNodePtr node = frame.Bound;
+ TPosition pos = frame.Pos;
+ if (frameType != EFrameType::FrameByRows) {
+ TVector<TNodePtr> settings;
+ settings.push_back(BuildQuotedAtom(pos, settingStr, TNodeFlags::Default));
+ if (frame.Settings != FrameCurrentRow) {
+ if (!node) {
+ node = BuildQuotedAtom(pos, "unbounded", TNodeFlags::Default);
+ } else if (!node->IsLiteral()) {
+ node = new TYqlFrameBound(pos, node);
+ }
+ settings.push_back(std::move(node));
+ }
+ return BuildTuple(pos, std::move(settings));
+ }
+
+ // TODO: switch FrameByRows to common format above
+ YQL_ENSURE(frame.Settings != FrameCurrentRow, "Should be already replaced by 0 preceding/following");
+ if (!node) {
+ node = BuildLiteralVoid(pos);
+ } else if (node->IsLiteral()) {
+ YQL_ENSURE(node->GetLiteralType() == "Int32");
+ i32 value = FromString<i32>(node->GetLiteralValue());
+ YQL_ENSURE(value >= 0);
+ if (frame.Settings == FramePreceding) {
+ value = -value;
+ }
+ node = new TCallNodeImpl(pos, "Int32", { BuildQuotedAtom(pos, ToString(value), TNodeFlags::Default) });
+ } else {
+ if (frame.Settings == FramePreceding) {
+ node = new TCallNodeImpl(pos, "Minus", { node->Clone() });
+ }
+ node = new TYqlFrameBound(pos, node);
+ }
+ return node;
+}
+
+TNodePtr ISource::BuildWindowFrame(const TFrameSpecification& spec, bool isCompact) {
+ YQL_ENSURE(spec.FrameExclusion == FrameExclNone);
+ YQL_ENSURE(spec.FrameBegin);
+ YQL_ENSURE(spec.FrameEnd);
+
+ auto frameBeginNode = BuildFrameNode(*spec.FrameBegin, spec.FrameType);
+ auto frameEndNode = BuildFrameNode(*spec.FrameEnd, spec.FrameType);
+
+ auto begin = Q(Y(Q("begin"), frameBeginNode));
+ auto end = Q(Y(Q("end"), frameEndNode));
+
+ return isCompact ? Q(Y(begin, end, Q(Y(Q("compact"))))) : Q(Y(begin, end));
+}
+
+class TSessionWindowTraits final: public TCallNode {
+public:
+ TSessionWindowTraits(TPosition pos, const TVector<TNodePtr>& args)
+ : TCallNode(pos, "SessionWindowTraits", args)
+ , FakeSource(BuildFakeSource(pos))
+ {
+ YQL_ENSURE(args.size() == 4);
+ }
+
+ bool DoInit(TContext& ctx, ISource* src) override {
+ if (!ValidateArguments(ctx)) {
+ return false;
+ }
+
+ if (!Args.back()->Init(ctx, FakeSource.Get())) {
+ return false;
+ }
+
+ return TCallNode::DoInit(ctx, src);
+ }
+
+ TNodePtr DoClone() const final {
+ return new TSessionWindowTraits(Pos, CloneContainer(Args));
+ }
+private:
+ TSourcePtr FakeSource;
+};
+
+TNodePtr ISource::BuildCalcOverWindow(TContext& ctx, const TString& label) {
+ YQL_ENSURE(IsCalcOverWindow());
+
+ TSet<TString> usedWindows;
+ for (auto& it : AggregationOverWindow) {
+ usedWindows.insert(it.first);
+ }
+ for (auto& it : FuncOverWindow) {
+ usedWindows.insert(it.first);
+ }
+ for (auto& it : WinSpecs) {
+ if (it.second->Session) {
+ usedWindows.insert(it.first);
+ }
+ }
+
+ YQL_ENSURE(!usedWindows.empty());
+
+ const bool onePartition = usedWindows.size() == 1;
+ const auto useLabel = onePartition ? label : "partitioning";
+ const auto listType = Y("TypeOf", useLabel);
+ auto framesProcess = Y();
+ auto resultNode = onePartition ? Y() : Y(Y("let", "partitioning", label));
+
+ for (const auto& name : usedWindows) {
+ auto spec = FindWindowSpecification(ctx, name);
+ YQL_ENSURE(spec);
+
+ auto aggsIter = AggregationOverWindow.find(name);
+ auto funcsIter = FuncOverWindow.find(name);
+
+ const auto& aggs = (aggsIter == AggregationOverWindow.end()) ? TVector<TAggregationPtr>() : aggsIter->second;
+ const auto& funcs = (funcsIter == FuncOverWindow.end()) ? TVector<TNodePtr>() : funcsIter->second;
+
+ auto frames = Y();
+ TString frameType;
+ switch (spec->Frame->FrameType) {
+ case EFrameType::FrameByRows: frameType = "WinOnRows"; break;
+ case EFrameType::FrameByRange: frameType = "WinOnRange"; break;
+ case EFrameType::FrameByGroups: frameType = "WinOnGroups"; break;
+ }
+ YQL_ENSURE(frameType);
+ auto callOnFrame = Y(frameType, BuildWindowFrame(*spec->Frame, spec->IsCompact));
+ for (auto& agg : aggs) {
+ auto winTraits = agg->WindowTraits(listType, ctx);
+ callOnFrame = L(callOnFrame, winTraits);
+ }
+ for (auto& func : funcs) {
+ auto winSpec = func->WindowSpecFunc(listType);
+ callOnFrame = L(callOnFrame, winSpec);
+ }
+ frames = L(frames, callOnFrame);
+
+ auto keysTuple = Y();
+ for (const auto& key: spec->Partitions) {
+ if (!dynamic_cast<TSessionWindow*>(key.Get())) {
+ keysTuple = L(keysTuple, AliasOrColumn(key, GetJoin()));
+ }
+ }
+
+ auto sortSpec = spec->OrderBy.empty() ? Y("Void") : BuildSortSpec(spec->OrderBy, useLabel, true, false);
+ if (spec->Session) {
+ TString label = spec->Session->GetLabel();
+ YQL_ENSURE(label);
+ auto sessionWindow = dynamic_cast<TSessionWindow*>(spec->Session.Get());
+ YQL_ENSURE(sessionWindow);
+ auto labelNode = BuildQuotedAtom(sessionWindow->GetPos(), label);
+
+ auto sessionTraits = sessionWindow->BuildTraits(useLabel);
+ framesProcess = Y("CalcOverSessionWindow", useLabel, Q(keysTuple), sortSpec, Q(frames), sessionTraits, Q(Y(labelNode)));
+ } else {
+ YQL_ENSURE(aggs || funcs);
+ framesProcess = Y("CalcOverWindow", useLabel, Q(keysTuple), sortSpec, Q(frames));
+ }
+
+ if (!onePartition) {
+ resultNode = L(resultNode, Y("let", "partitioning", framesProcess));
+ }
+ }
+ if (onePartition) {
+ return framesProcess;
+ } else {
+ return Y("block", Q(L(resultNode, Y("return", "partitioning"))));
+ }
+}
+
+TNodePtr ISource::BuildSort(TContext& ctx, const TString& label) {
+ Y_UNUSED(ctx);
+ Y_UNUSED(label);
+ return nullptr;
+}
+
+TNodePtr ISource::BuildCleanupColumns(TContext& ctx, const TString& label) {
+ Y_UNUSED(ctx);
+ Y_UNUSED(label);
+ return nullptr;
+}
+
+IJoin* ISource::GetJoin() {
+ return nullptr;
+}
+
+ISource* ISource::GetCompositeSource() {
+ return nullptr;
+}
+
+bool ISource::IsSelect() const {
+ return true;
+}
+
+bool ISource::IsTableSource() const {
+ return false;
+}
+
+bool ISource::ShouldUseSourceAsColumn(const TString& source) const {
+ Y_UNUSED(source);
+ return false;
+}
+
+bool ISource::IsJoinKeysInitializing() const {
+ return false;
+}
+
+bool ISource::DoInit(TContext& ctx, ISource* src) {
+ for (auto& column: Expressions(EExprSeat::FlattenBy)) {
+ if (!column->Init(ctx, this)) {
+ return false;
+ }
+ }
+
+ if (IsFlattenColumns() && src) {
+ src->AllColumns();
+ }
+
+ return true;
+}
+
+bool ISource::InitFilters(TContext& ctx) {
+ for (auto& filter: Filters) {
+ if (!filter->Init(ctx, this)) {
+ return false;
+ }
+ if (filter->IsAggregated() && !filter->IsConstant() && !filter->HasState(ENodeState::AggregationKey)) {
+ ctx.Error(filter->GetPos()) << "Can not use aggregated values in filtering";
+ return false;
+ }
+ }
+ return true;
+}
+
+TAstNode* ISource::Translate(TContext& ctx) const {
+ Y_VERIFY_DEBUG(false);
+ Y_UNUSED(ctx);
+ return nullptr;
+}
+
+void ISource::FillSortParts(const TVector<TSortSpecificationPtr>& orderBy, TNodePtr& sortDirection, TNodePtr& sortKeySelector) {
+ TNodePtr expr;
+ if (orderBy.empty()) {
+ YQL_ENSURE(!sortKeySelector);
+ sortDirection = sortKeySelector = Y("Void");
+ return;
+ } else if (orderBy.size() == 1) {
+ auto& sortSpec = orderBy.front();
+ expr = Y("PersistableRepr", sortSpec->OrderExpr);
+ sortDirection = Y("Bool", Q(sortSpec->Ascending ? "true" : "false"));
+ } else {
+ auto exprList = Y();
+ sortDirection = Y();
+ for (const auto& sortSpec: orderBy) {
+ const auto asc = sortSpec->Ascending;
+ sortDirection = L(sortDirection, Y("Bool", Q(asc ? "true" : "false")));
+ exprList = L(exprList, Y("PersistableRepr", sortSpec->OrderExpr));
+ }
+ sortDirection = Q(sortDirection);
+ expr = Q(exprList);
+ }
+ sortKeySelector = BuildLambda(Pos, Y("row"), expr);
+}
+
+TNodePtr ISource::BuildSortSpec(const TVector<TSortSpecificationPtr>& orderBy, const TString& label, bool traits, bool assume) {
+ YQL_ENSURE(!orderBy.empty());
+ TNodePtr dirsNode;
+ TNodePtr keySelectorNode;
+ FillSortParts(orderBy, dirsNode, keySelectorNode);
+ if (traits) {
+ return Y("SortTraits", Y("TypeOf", label), dirsNode, keySelectorNode);
+ } else if (assume) {
+ return Y("AssumeSorted", label, dirsNode, keySelectorNode);
+ } else {
+ return Y("Sort", label, dirsNode, keySelectorNode);
+ }
+}
+
+IJoin::IJoin(TPosition pos)
+ : ISource(pos)
+{
+}
+
+IJoin::~IJoin()
+{
+}
+
+IJoin* IJoin::GetJoin() {
+ return this;
+}
+
+} // namespace NSQLTranslationV1
diff --git a/ydb/library/yql/sql/v1/source.h b/ydb/library/yql/sql/v1/source.h
new file mode 100644
index 0000000000..a62b53efb9
--- /dev/null
+++ b/ydb/library/yql/sql/v1/source.h
@@ -0,0 +1,300 @@
+#pragma once
+#include "node.h"
+
+namespace NSQLTranslationV1 {
+
+ class ISource;
+ typedef TIntrusivePtr<ISource> TSourcePtr;
+
+ struct TTableRef {
+ TString RefName;
+ TString Service;
+ TDeferredAtom Cluster;
+ TNodePtr Keys;
+ TNodePtr Options;
+ TSourcePtr Source;
+
+ TTableRef() = default;
+ TTableRef(const TString& refName, const TString& service, const TDeferredAtom& cluster, TNodePtr keys);
+ TTableRef(const TTableRef&) = default;
+ TTableRef& operator=(const TTableRef&) = default;
+
+ TString ShortName() const;
+ };
+
+ typedef TVector<TTableRef> TTableList;
+
+
+ class IJoin;
+ class ISource: public INode {
+ public:
+ virtual ~ISource();
+
+ virtual bool IsFake() const;
+ virtual void AllColumns();
+ virtual const TColumns* GetColumns() const;
+ virtual void GetInputTables(TTableList& tableList) const;
+ /// in case of error unfilled, flag show if ensure column name
+ virtual TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column);
+ virtual void FinishColumns();
+ virtual bool AddExpressions(TContext& ctx, const TVector<TNodePtr>& columns, EExprSeat exprSeat);
+ virtual void SetFlattenByMode(const TString& mode);
+ virtual void MarkFlattenColumns();
+ virtual bool IsFlattenColumns() const;
+ virtual bool AddFilter(TContext& ctx, TNodePtr filter);
+ virtual bool AddGroupKey(TContext& ctx, const TString& column);
+ virtual void SetCompactGroupBy(bool compactGroupBy);
+ virtual void SetGroupBySuffix(const TString& suffix);
+ virtual TString MakeLocalName(const TString& name);
+ virtual bool AddAggregation(TContext& ctx, TAggregationPtr aggr);
+ virtual bool AddFuncOverWindow(TContext& ctx, TNodePtr expr);
+ virtual void AddTmpWindowColumn(const TString& column);
+ virtual const TVector<TString>& GetTmpWindowColumns() const;
+ virtual bool HasAggregations() const;
+ virtual void AddWindowSpecs(TWinSpecs winSpecs);
+ virtual bool AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func);
+ virtual bool AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func);
+ virtual void SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec);
+ virtual TLegacyHoppingWindowSpecPtr GetLegacyHoppingWindowSpec() const;
+ virtual TNodePtr GetSessionWindowSpec() const;
+ virtual TNodePtr GetHoppingWindowSpec() const;
+ virtual bool IsCompositeSource() const;
+ virtual bool IsGroupByColumn(const TString& column) const;
+ virtual bool IsFlattenByColumns() const;
+ virtual bool IsFlattenByExprs() const;
+ virtual bool IsCalcOverWindow() const;
+ virtual bool IsOverWindowSource() const;
+ virtual bool IsStream() const;
+ virtual EOrderKind GetOrderKind() const;
+ virtual TWriteSettings GetWriteSettings() const;
+ virtual bool SetSamplingOptions(TContext& ctx, TPosition pos, ESampleMode mode, TNodePtr samplingRate, TNodePtr samplingSeed);
+ virtual bool SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints);
+ virtual bool CalculateGroupingHint(TContext& ctx, const TVector<TString>& columns, ui64& hint) const;
+ virtual TNodePtr BuildFilter(TContext& ctx, const TString& label);
+ virtual TNodePtr BuildFilterLambda();
+ virtual TNodePtr BuildFlattenByColumns(const TString& label);
+ virtual TNodePtr BuildFlattenColumns(const TString& label);
+ virtual TNodePtr BuildPreaggregatedMap(TContext& ctx);
+ virtual TNodePtr BuildPreFlattenMap(TContext& ctx);
+ virtual TNodePtr BuildPrewindowMap(TContext& ctx);
+ virtual std::pair<TNodePtr, bool> BuildAggregation(const TString& label, TContext& ctx);
+ virtual TNodePtr BuildCalcOverWindow(TContext& ctx, const TString& label);
+ virtual TNodePtr BuildSort(TContext& ctx, const TString& label);
+ virtual TNodePtr BuildCleanupColumns(TContext& ctx, const TString& label);
+ virtual bool BuildSamplingLambda(TNodePtr& node);
+ virtual bool SetSamplingRate(TContext& ctx, TNodePtr samplingRate);
+ virtual IJoin* GetJoin();
+ virtual ISource* GetCompositeSource();
+ virtual bool IsSelect() const;
+ virtual bool IsTableSource() const;
+ virtual bool ShouldUseSourceAsColumn(const TString& source) const;
+ virtual bool IsJoinKeysInitializing() const;
+ virtual const TString* GetWindowName() const;
+
+ virtual bool DoInit(TContext& ctx, ISource* src);
+ virtual TNodePtr Build(TContext& ctx) = 0;
+
+ virtual TMaybe<TString> FindColumnMistype(const TString& name) const;
+
+ virtual bool InitFilters(TContext& ctx);
+ void AddDependentSource(ISource* usedSource);
+ bool IsAlias(EExprSeat exprSeat, const TString& label) const;
+ bool IsExprAlias(const TString& label) const;
+ bool IsExprSeat(EExprSeat exprSeat, EExprType type = EExprType::WithExpression) const;
+ TString GetGroupByColumnAlias(const TString& column) const;
+ const TVector<TNodePtr>& Expressions(EExprSeat exprSeat) const;
+
+ virtual TWindowSpecificationPtr FindWindowSpecification(TContext& ctx, const TString& windowName) const;
+
+ TIntrusivePtr<ISource> CloneSource() const;
+
+ protected:
+ ISource(TPosition pos);
+ virtual TAstNode* Translate(TContext& ctx) const;
+
+ void FillSortParts(const TVector<TSortSpecificationPtr>& orderBy, TNodePtr& sortKeySelector, TNodePtr& sortDirection);
+ TNodePtr BuildSortSpec(const TVector<TSortSpecificationPtr>& orderBy, const TString& label, bool traits, bool assume);
+
+ TVector<TNodePtr>& Expressions(EExprSeat exprSeat);
+ TNodePtr AliasOrColumn(const TNodePtr& node, bool withSource);
+
+ TNodePtr BuildWindowFrame(const TFrameSpecification& spec, bool isCompact);
+
+ THashSet<TString> ExprAliases;
+ THashSet<TString> FlattenByAliases;
+ THashMap<TString, TString> GroupByColumnAliases;
+ TVector<TNodePtr> Filters;
+ bool CompactGroupBy = false;
+ TString GroupBySuffix;
+ TSet<TString> GroupKeys;
+ TVector<TString> OrderedGroupKeys;
+ std::array<TVector<TNodePtr>, static_cast<unsigned>(EExprSeat::Max)> NamedExprs;
+ TVector<TAggregationPtr> Aggregations;
+ TMap<TString, TVector<TAggregationPtr>> AggregationOverWindow;
+ TMap<TString, TVector<TNodePtr>> FuncOverWindow;
+ TWinSpecs WinSpecs;
+ TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec;
+ TNodePtr SessionWindow;
+ TNodePtr HoppingWindow;
+ TVector<ISource*> UsedSources;
+ TString FlattenMode;
+ bool FlattenColumns = false;
+ THashMap<TString, ui32> GenIndexes;
+ TVector<TString> TmpWindowColumns;
+ TNodePtr SamplingRate;
+ };
+
+ template<>
+ inline TVector<TSourcePtr> CloneContainer<TSourcePtr>(const TVector<TSourcePtr>& args) {
+ TVector<TSourcePtr> cloneArgs;
+ cloneArgs.reserve(args.size());
+ for (const auto& arg: args) {
+ cloneArgs.emplace_back(arg ? arg->CloneSource() : nullptr);
+ }
+ return cloneArgs;
+ }
+
+ struct TJoinLinkSettings {
+ bool ForceSortedMerge = false;
+ };
+
+ class IJoin: public ISource {
+ public:
+ virtual ~IJoin();
+
+ virtual IJoin* GetJoin();
+ virtual TNodePtr BuildJoinKeys(TContext& ctx, const TVector<TDeferredAtom>& names) = 0;
+ virtual void SetupJoin(const TString& joinOp, TNodePtr joinExpr, const TJoinLinkSettings& linkSettings) = 0;
+ virtual const THashMap<TString, THashSet<TString>>& GetSameKeysMap() const = 0;
+ virtual TVector<TString> GetJoinLabels() const = 0;
+
+ protected:
+ IJoin(TPosition pos);
+ };
+
+ class TSessionWindow final : public INode {
+ public:
+ TSessionWindow(TPosition pos, const TVector<TNodePtr>& args);
+ void MarkValid();
+ TNodePtr BuildTraits(const TString& label) const;
+ private:
+ bool DoInit(TContext& ctx, ISource* src) override;
+ TAstNode* Translate(TContext&) const override;
+ void DoUpdateState() const override;
+ TNodePtr DoClone() const override;
+ TString GetOpName() const override;
+
+ TVector<TNodePtr> Args;
+ TSourcePtr FakeSource;
+ TNodePtr Node;
+ bool Valid;
+ };
+
+ class THoppingWindow final : public INode {
+ public:
+ THoppingWindow(TPosition pos, const TVector<TNodePtr>& args);
+ void MarkValid();
+ TNodePtr BuildTraits(const TString& label) const;
+ public:
+ TNodePtr Hop;
+ TNodePtr Interval;
+ private:
+ bool DoInit(TContext& ctx, ISource* src) override;
+ TAstNode* Translate(TContext&) const override;
+ void DoUpdateState() const override;
+ TNodePtr DoClone() const override;
+ TString GetOpName() const override;
+ TNodePtr ProcessIntervalParam(const TNodePtr& val) const;
+
+ TVector<TNodePtr> Args;
+ TSourcePtr FakeSource;
+ TNodePtr Node;
+ bool Valid;
+ };
+
+
+ // Implemented in join.cpp
+ TString NormalizeJoinOp(const TString& joinOp);
+ TSourcePtr BuildEquiJoin(TPosition pos, TVector<TSourcePtr>&& sources, TVector<bool>&& anyFlags, bool strictJoinKeyTypes);
+
+ // Implemented in select.cpp
+ TNodePtr BuildSubquery(TSourcePtr source, const TString& alias, bool inSubquery, int ensureTupleSize, TScopedStatePtr scoped);
+ TNodePtr BuildSubqueryRef(TNodePtr subquery, const TString& alias, int tupleIndex = -1);
+ TNodePtr BuildInvalidSubqueryRef(TPosition subqueryPos);
+ TNodePtr BuildSourceNode(TPosition pos, TSourcePtr source, bool checkExist = false);
+ TSourcePtr BuildMuxSource(TPosition pos, TVector<TSourcePtr>&& sources);
+ TSourcePtr BuildFakeSource(TPosition pos, bool missingFrom = false, bool inSubquery = false);
+ TSourcePtr BuildNodeSource(TPosition pos, const TNodePtr& node, bool wrapToList = false);
+ TSourcePtr BuildTableSource(TPosition pos, const TTableRef& table, const TString& label = TString());
+ TSourcePtr BuildInnerSource(TPosition pos, TNodePtr node, const TString& service, const TDeferredAtom& cluster, const TString& label = TString());
+ TSourcePtr BuildRefColumnSource(TPosition pos, const TString& partExpression);
+ TSourcePtr BuildUnionAll(TPosition pos, TVector<TSourcePtr>&& sources, const TWriteSettings& settings);
+ TSourcePtr BuildOverWindowSource(TPosition pos, const TString& windowName, ISource* origSource);
+
+ TNodePtr BuildOrderBy(TPosition pos, const TVector<TNodePtr>& keys, const TVector<bool>& order);
+ TNodePtr BuildSkipTake(TPosition pos, const TNodePtr& skip, const TNodePtr& take);
+
+
+ TSourcePtr BuildSelectCore(
+ TContext& ctx,
+ TPosition pos,
+ TSourcePtr source,
+ const TVector<TNodePtr>& groupByExpr,
+ const TVector<TNodePtr>& groupBy,
+ bool compactGroupBy,
+ const TString& groupBySuffix,
+ bool assumeSorted,
+ const TVector<TSortSpecificationPtr>& orderBy,
+ TNodePtr having,
+ TWinSpecs&& windowSpec,
+ TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec,
+ TVector<TNodePtr>&& terms,
+ bool distinct,
+ TVector<TNodePtr>&& without,
+ bool selectStream,
+ const TWriteSettings& settings
+ );
+ TSourcePtr BuildSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake);
+
+
+ enum class ReduceMode {
+ ByPartition,
+ ByAll,
+ };
+ TSourcePtr BuildReduce(TPosition pos, ReduceMode mode, TSourcePtr source, TVector<TSortSpecificationPtr>&& orderBy,
+ TVector<TNodePtr>&& keys, TVector<TNodePtr>&& args, TNodePtr udf, TNodePtr having, const TWriteSettings& settings,
+ const TVector<TSortSpecificationPtr>& assumeOrderBy, bool listCall);
+ TSourcePtr BuildProcess(TPosition pos, TSourcePtr source, TNodePtr with, bool withExtFunction, TVector<TNodePtr>&& terms, bool listCall,
+ bool prcessStream, const TWriteSettings& settings, const TVector<TSortSpecificationPtr>& assumeOrderBy);
+
+ TNodePtr BuildSelectResult(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery, TScopedStatePtr scoped);
+
+ // Implemented in insert.cpp
+ TSourcePtr BuildWriteValues(TPosition pos, const TString& opertationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values);
+ TSourcePtr BuildWriteValues(TPosition pos, const TString& opertationHumanName, const TVector<TString>& columnsHint, TSourcePtr source);
+ TSourcePtr BuildUpdateValues(TPosition pos, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values);
+
+ EWriteColumnMode ToWriteColumnsMode(ESQLWriteColumnMode sqlWriteColumnMode);
+ TNodePtr BuildEraseColumns(TPosition pos, const TVector<TString>& columns);
+ TNodePtr BuildIntoTableOptions(TPosition pos, const TVector<TString>& eraseColumns, const TTableHints& hints);
+ TNodePtr BuildWriteColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, EWriteColumnMode mode, TSourcePtr values, TNodePtr options = nullptr);
+ TNodePtr BuildUpdateColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr values, TSourcePtr source);
+ TNodePtr BuildDelete(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr source);
+
+ // Implemented in query.cpp
+ TNodePtr BuildTableKey(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TDeferredAtom& name, const TString& view);
+ TNodePtr BuildTableKeys(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TString& func, const TVector<TTableArg>& args);
+ TNodePtr BuildTopicKey(TPosition pos, const TDeferredAtom& cluster, const TDeferredAtom& name);
+ TNodePtr BuildInputOptions(TPosition pos, const TTableHints& hints);
+ TNodePtr BuildInputTables(TPosition pos, const TTableList& tables, bool inSubquery, TScopedStatePtr scoped);
+ TNodePtr BuildCreateTable(TPosition pos, const TTableRef& tr, const TCreateTableParameters& params, TScopedStatePtr scoped);
+ TNodePtr BuildAlterTable(TPosition pos, const TTableRef& tr, const TAlterTableParameters& params, TScopedStatePtr scoped);
+ TNodePtr BuildDropTable(TPosition pos, const TTableRef& table, ETableType tableType, TScopedStatePtr scoped);
+ TNodePtr BuildWriteTable(TPosition pos, const TString& label, const TTableRef& table, EWriteColumnMode mode, TNodePtr options,
+ TScopedStatePtr scoped);
+ TSourcePtr TryMakeSourceFromExpression(TContext& ctx, const TString& currService, const TDeferredAtom& currCluster,
+ TNodePtr node, const TString& view = {});
+ void MakeTableFromExpression(TContext& ctx, TNodePtr node, TDeferredAtom& table, const TString& prefix = {});
+ TDeferredAtom MakeAtomFromExpression(TContext& ctx, TNodePtr node, const TString& prefix = {});
+ TString NormalizeTypeString(const TString& str);
+} // namespace NSQLTranslationV1
diff --git a/ydb/library/yql/sql/v1/sql_group_by.cpp b/ydb/library/yql/sql/v1/sql_group_by.cpp
index 33a525b259..d1dd439045 100644
--- a/ydb/library/yql/sql/v1/sql_group_by.cpp
+++ b/ydb/library/yql/sql/v1/sql_group_by.cpp
@@ -1,5 +1,6 @@
#include "sql_group_by.h"
#include "sql_expression.h"
+#include "source.h"
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
namespace NSQLTranslationV1 {
diff --git a/ydb/library/yql/sql/v1/sql_translation.cpp b/ydb/library/yql/sql/v1/sql_translation.cpp
index 67bb735a4e..edd313801d 100644
--- a/ydb/library/yql/sql/v1/sql_translation.cpp
+++ b/ydb/library/yql/sql/v1/sql_translation.cpp
@@ -4,6 +4,7 @@
#include "sql_query.h"
#include "sql_values.h"
#include "sql_select.h"
+#include "source.h"
#include <ydb/library/yql/parser/proto_ast/gen/v1/SQLv1Lexer.h>
#include <ydb/library/yql/sql/settings/partitioning.h>
#include <util/generic/scope.h>
diff --git a/ydb/library/yql/sql/v1/sql_values.cpp b/ydb/library/yql/sql/v1/sql_values.cpp
index 4071338282..4b31cb0884 100644
--- a/ydb/library/yql/sql/v1/sql_values.cpp
+++ b/ydb/library/yql/sql/v1/sql_values.cpp
@@ -3,6 +3,7 @@
#include "sql_query.h"
#include "sql_select.h"
#include "sql_expression.h"
+#include "source.h"
namespace NSQLTranslationV1 {
diff --git a/ydb/library/yql/sql/v1/ya.make b/ydb/library/yql/sql/v1/ya.make
index 153b9fd659..39aaf6c45a 100644
--- a/ydb/library/yql/sql/v1/ya.make
+++ b/ydb/library/yql/sql/v1/ya.make
@@ -32,6 +32,7 @@ SRCS(
list_builtin.cpp
node.cpp
select.cpp
+ source.cpp
sql.cpp
sql_call_expr.cpp
sql_expression.cpp