diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2023-07-17 19:13:22 +0300 |
---|---|---|
committer | zverevgeny <zverevgeny@ydb.tech> | 2023-07-17 19:13:22 +0300 |
commit | 029cf29f3669091012394221f00dfa0f3631d91b (patch) | |
tree | 7ee4219359b3d93200c219e3eafa986b1c8576c2 | |
parent | 05482e640fc81d547fb7607d0e97ab6cdb4233a0 (diff) | |
download | ydb-029cf29f3669091012394221f00dfa0f3631d91b.tar.gz |
Extract ISource to separate files
YQL-16186 extract ISource to separate files
-rw-r--r-- | ydb/library/yql/sql/v1/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/aggregation.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/context.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/insert.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/join.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/node.cpp | 928 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/node.h | 291 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/select.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/source.cpp | 949 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/source.h | 300 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql_group_by.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql_translation.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql_values.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/ya.make | 1 |
17 files changed, 1264 insertions, 1222 deletions
diff --git a/ydb/library/yql/sql/v1/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/sql/v1/CMakeLists.darwin-x86_64.txt index 4704ea3ee2..898dba853a 100644 --- a/ydb/library/yql/sql/v1/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/sql/v1/CMakeLists.darwin-x86_64.txt @@ -61,6 +61,7 @@ target_sources(yql-sql-v1 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/list_builtin.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/node.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/select.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/source.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_call_expr.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_expression.cpp diff --git a/ydb/library/yql/sql/v1/CMakeLists.linux-aarch64.txt b/ydb/library/yql/sql/v1/CMakeLists.linux-aarch64.txt index 5bc90c1e0c..944a46a01b 100644 --- a/ydb/library/yql/sql/v1/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/sql/v1/CMakeLists.linux-aarch64.txt @@ -62,6 +62,7 @@ target_sources(yql-sql-v1 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/list_builtin.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/node.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/select.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/source.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_call_expr.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_expression.cpp diff --git a/ydb/library/yql/sql/v1/CMakeLists.linux-x86_64.txt b/ydb/library/yql/sql/v1/CMakeLists.linux-x86_64.txt index 5bc90c1e0c..944a46a01b 100644 --- a/ydb/library/yql/sql/v1/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/sql/v1/CMakeLists.linux-x86_64.txt @@ -62,6 +62,7 @@ target_sources(yql-sql-v1 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/list_builtin.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/node.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/select.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/source.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_call_expr.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_expression.cpp diff --git a/ydb/library/yql/sql/v1/CMakeLists.windows-x86_64.txt b/ydb/library/yql/sql/v1/CMakeLists.windows-x86_64.txt index 4704ea3ee2..898dba853a 100644 --- a/ydb/library/yql/sql/v1/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/sql/v1/CMakeLists.windows-x86_64.txt @@ -61,6 +61,7 @@ target_sources(yql-sql-v1 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/list_builtin.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/node.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/select.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/source.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_call_expr.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/v1/sql_expression.cpp diff --git a/ydb/library/yql/sql/v1/aggregation.cpp b/ydb/library/yql/sql/v1/aggregation.cpp index ceb2d3a3ad..4fe92d220c 100644 --- a/ydb/library/yql/sql/v1/aggregation.cpp +++ b/ydb/library/yql/sql/v1/aggregation.cpp @@ -1,4 +1,5 @@ #include "node.h" +#include "source.h" #include "context.h" #include <ydb/library/yql/ast/yql_type_string.h> diff --git a/ydb/library/yql/sql/v1/context.h b/ydb/library/yql/sql/v1/context.h index 033996ed8a..f69449d9fa 100644 --- a/ydb/library/yql/sql/v1/context.h +++ b/ydb/library/yql/sql/v1/context.h @@ -1,6 +1,6 @@ #pragma once -#include "node.h" +#include "source.h" #include "sql.h" #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> diff --git a/ydb/library/yql/sql/v1/insert.cpp b/ydb/library/yql/sql/v1/insert.cpp index 0620eb4eab..00c63e3d24 100644 --- a/ydb/library/yql/sql/v1/insert.cpp +++ b/ydb/library/yql/sql/v1/insert.cpp @@ -1,4 +1,4 @@ -#include "node.h" +#include "source.h" #include "context.h" #include <ydb/library/yql/utils/yql_panic.h> diff --git a/ydb/library/yql/sql/v1/join.cpp b/ydb/library/yql/sql/v1/join.cpp index 7851b9809b..a5d680c829 100644 --- a/ydb/library/yql/sql/v1/join.cpp +++ b/ydb/library/yql/sql/v1/join.cpp @@ -1,4 +1,4 @@ -#include "node.h" +#include "source.h" #include "context.h" #include <ydb/library/yql/utils/yql_panic.h> diff --git a/ydb/library/yql/sql/v1/node.cpp b/ydb/library/yql/sql/v1/node.cpp index 079aa76f13..04543e2cd7 100644 --- a/ydb/library/yql/sql/v1/node.cpp +++ b/ydb/library/yql/sql/v1/node.cpp @@ -1,4 +1,5 @@ #include "node.h" +#include "source.h" #include "context.h" #include <ydb/library/yql/ast/yql_ast_escaping.h> @@ -29,22 +30,6 @@ TString ErrorDistinctByGroupKey(const TString& column) { return TStringBuilder() << "Unable to use DISTINCT by grouping column: " << column << ". You should leave one of them."; } -TTableRef::TTableRef(const TString& refName, const TString& service, const TDeferredAtom& cluster, TNodePtr keys) - : RefName(refName) - , Service(to_lower(service)) - , Cluster(cluster) - , Keys(keys) -{ -} - -TString TTableRef::ShortName() const { - Y_VERIFY_DEBUG(Keys); - if (Keys->GetTableKeys()->GetTableName()) { - return *Keys->GetTableKeys()->GetTableName(); - } - return TString(); -} - TTopicRef::TTopicRef(const TString& refName, const TDeferredAtom& cluster, TNodePtr keys) : RefName(refName) , Cluster(cluster) @@ -542,7 +527,6 @@ TString TCallNode::GetOpName() const { return OpName; } -namespace { const TString* DeriveCommonSourceName(const TVector<TNodePtr> &nodes) { const TString* name = nullptr; for (auto& node: nodes) { @@ -558,7 +542,6 @@ const TString* DeriveCommonSourceName(const TVector<TNodePtr> &nodes) { return name; } -} const TString* TCallNode::GetSourceName() const { return DeriveCommonSourceName(Args); @@ -1298,915 +1281,6 @@ TNodePtr IAggregation::WindowTraits(const TNodePtr& type, TContext& ctx) const { return Q(Y(Q(Name), GetApply(type, false, false, ctx))); } -ISource::ISource(TPosition pos) - : INode(pos) -{ -} - -ISource::~ISource() -{ -} - -TSourcePtr ISource::CloneSource() const { - Y_VERIFY_DEBUG(dynamic_cast<ISource*>(Clone().Get()), "Cloned node is no source"); - TSourcePtr result = static_cast<ISource*>(Clone().Get()); - for (auto curFilter: Filters) { - result->Filters.emplace_back(curFilter->Clone()); - } - for (int i = 0; i < static_cast<int>(EExprSeat::Max); ++i) { - result->NamedExprs[i] = CloneContainer(NamedExprs[i]); - } - result->FlattenColumns = FlattenColumns; - result->FlattenMode = FlattenMode; - return result; -} - -bool ISource::IsFake() const { - return false; -} - -void ISource::AllColumns() { - return; -} - -const TColumns* ISource::GetColumns() const { - return nullptr; -} - -void ISource::GetInputTables(TTableList& tableList) const { - for (auto srcPtr: UsedSources) { - srcPtr->GetInputTables(tableList); - } - return; -} - -TMaybe<bool> ISource::AddColumn(TContext& ctx, TColumnNode& column) { - if (column.IsReliable()) { - ctx.Error(Pos) << "Source does not allow column references"; - ctx.Error(column.GetPos()) << "Column reference " << - (column.GetColumnName() ? "'" + *column.GetColumnName() + "'" : "(expr)"); - } - return {}; -} - -void ISource::FinishColumns() { -} - - -bool ISource::AddFilter(TContext& ctx, TNodePtr filter) { - Y_UNUSED(ctx); - Filters.push_back(filter); - return true; -} - -bool ISource::AddGroupKey(TContext& ctx, const TString& column) { - if (!GroupKeys.insert(column).second) { - ctx.Error() << "Duplicate grouping column: " << column; - return false; - } - OrderedGroupKeys.push_back(column); - return true; -} - -void ISource::SetCompactGroupBy(bool compactGroupBy) { - CompactGroupBy = compactGroupBy; -} - -void ISource::SetGroupBySuffix(const TString& suffix) { - GroupBySuffix = suffix; -} - -bool ISource::AddExpressions(TContext& ctx, const TVector<TNodePtr>& expressions, EExprSeat exprSeat) { - YQL_ENSURE(exprSeat < EExprSeat::Max); - THashSet<TString> names; - THashSet<TString> aliasSet; - // TODO: merge FlattenBy with FlattenByExpr - const bool isFlatten = (exprSeat == EExprSeat::FlattenBy || exprSeat == EExprSeat::FlattenByExpr); - THashSet<TString>& aliases = isFlatten ? FlattenByAliases : aliasSet; - for (const auto& expr: expressions) { - const auto& alias = expr->GetLabel(); - const auto& columnNamePtr = expr->GetColumnName(); - if (alias) { - ExprAliases.insert(alias); - if (!aliases.emplace(alias).second) { - ctx.Error(expr->GetPos()) << "Duplicate alias found: " << alias << " in " << exprSeat << " section"; - return false; - } - if (names.contains(alias)) { - ctx.Error(expr->GetPos()) << "Collision between alias and column name: " << alias << " in " << exprSeat << " section"; - return false; - } - } - if (columnNamePtr) { - const auto& sourceName = *expr->GetSourceName(); - auto columnName = *columnNamePtr; - if (sourceName) { - columnName = DotJoin(sourceName, columnName); - } - if (!names.emplace(columnName).second) { - ctx.Error(expr->GetPos()) << "Duplicate column name found: " << columnName << " in " << exprSeat << " section"; - return false; - } - if (!alias && aliases.contains(columnName)) { - ctx.Error(expr->GetPos()) << "Collision between alias and column name: " << columnName << " in " << exprSeat << " section"; - return false; - } - if (alias && exprSeat == EExprSeat::GroupBy) { - auto columnAlias = GroupByColumnAliases.emplace(columnName, alias); - auto oldAlias = columnAlias.first->second; - if (columnAlias.second && oldAlias != alias) { - ctx.Error(expr->GetPos()) << "Alias for column not same, column: " << columnName << - ", exist alias: " << oldAlias << ", another alias: " << alias; - return false; - } - } - } - - if (exprSeat == EExprSeat::GroupBy) { - if (auto sessionWindow = dynamic_cast<TSessionWindow*>(expr.Get())) { - if (SessionWindow) { - ctx.Error(expr->GetPos()) << "Duplicate session window specification:"; - ctx.Error(SessionWindow->GetPos()) << "Previous session window is declared here"; - return false; - } - SessionWindow = expr; - } - if (auto hoppingWindow = dynamic_cast<THoppingWindow*>(expr.Get())) { - if (HoppingWindow) { - ctx.Error(expr->GetPos()) << "Duplicate hopping window specification:"; - ctx.Error(HoppingWindow->GetPos()) << "Previous hopping window is declared here"; - return false; - } - HoppingWindow = expr; - } - } - Expressions(exprSeat).emplace_back(expr); - } - return true; -} - -void ISource::SetFlattenByMode(const TString& mode) { - FlattenMode = mode; -} - -void ISource::MarkFlattenColumns() { - FlattenColumns = true; -} - -bool ISource::IsFlattenColumns() const { - return FlattenColumns; -} - -TString ISource::MakeLocalName(const TString& name) { - auto iter = GenIndexes.find(name); - if (iter == GenIndexes.end()) { - iter = GenIndexes.emplace(name, 0).first; - } - TStringBuilder str; - str << name << iter->second; - ++iter->second; - return std::move(str); -} - -bool ISource::AddAggregation(TContext& ctx, TAggregationPtr aggr) { - Y_UNUSED(ctx); - YQL_ENSURE(aggr); - Aggregations.push_back(aggr); - return true; -} - -bool ISource::HasAggregations() const { - return !Aggregations.empty() || !GroupKeys.empty(); -} - -void ISource::AddWindowSpecs(TWinSpecs winSpecs) { - WinSpecs = winSpecs; -} - -bool ISource::AddFuncOverWindow(TContext& ctx, TNodePtr expr) { - Y_UNUSED(ctx); - Y_UNUSED(expr); - return false; -} - -void ISource::AddTmpWindowColumn(const TString& column) { - TmpWindowColumns.push_back(column); -} - -const TVector<TString>& ISource::GetTmpWindowColumns() const { - return TmpWindowColumns; -} - -void ISource::SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec) { - LegacyHoppingWindowSpec = spec; -} - -TLegacyHoppingWindowSpecPtr ISource::GetLegacyHoppingWindowSpec() const { - return LegacyHoppingWindowSpec; -} - -TNodePtr ISource::GetSessionWindowSpec() const { - return SessionWindow; -} - -TNodePtr ISource::GetHoppingWindowSpec() const { - return HoppingWindow; -} - -TWindowSpecificationPtr ISource::FindWindowSpecification(TContext& ctx, const TString& windowName) const { - auto winIter = WinSpecs.find(windowName); - if (winIter == WinSpecs.end()) { - ctx.Error(Pos) << "Unable to find window specification for window '" << windowName << "'"; - return {}; - } - YQL_ENSURE(winIter->second); - return winIter->second; -} - -inline TVector<TNodePtr>& ISource::Expressions(EExprSeat exprSeat) { - return NamedExprs[static_cast<size_t>(exprSeat)]; -} - -const TVector<TNodePtr>& ISource::Expressions(EExprSeat exprSeat) const { - return NamedExprs[static_cast<size_t>(exprSeat)]; -} - -inline TNodePtr ISource::AliasOrColumn(const TNodePtr& node, bool withSource) { - auto result = node->GetLabel(); - if (!result) { - const auto columnNamePtr = node->GetColumnName(); - YQL_ENSURE(columnNamePtr); - result = *columnNamePtr; - if (withSource) { - const auto sourceNamePtr = node->GetSourceName(); - if (sourceNamePtr) { - result = DotJoin(*sourceNamePtr, result); - } - } - } - return BuildQuotedAtom(node->GetPos(), result); -} - -bool ISource::AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func) { - YQL_ENSURE(func->IsOverWindow()); - if (func->IsDistinct()) { - ctx.Error(func->GetPos()) << "Aggregation with distinct is not allowed over window: " << windowName; - return false; - } - if (!FindWindowSpecification(ctx, windowName)) { - return false; - } - AggregationOverWindow[windowName].emplace_back(std::move(func)); - return true; -} - -bool ISource::AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func) { - if (!FindWindowSpecification(ctx, windowName)) { - return false; - } - FuncOverWindow[windowName].emplace_back(std::move(func)); - return true; -} - -bool ISource::IsCompositeSource() const { - return false; -} - -bool ISource::IsGroupByColumn(const TString& column) const { - return GroupKeys.contains(column); -} - -bool ISource::IsFlattenByColumns() const { - return !Expressions(EExprSeat::FlattenBy).empty(); -} - -bool ISource::IsFlattenByExprs() const { - return !Expressions(EExprSeat::FlattenByExpr).empty(); -} - -bool ISource::IsAlias(EExprSeat exprSeat, const TString& column) const { - for (const auto& exprNode: Expressions(exprSeat)) { - const auto& labelName = exprNode->GetLabel(); - if (labelName && labelName == column) { - return true; - } - } - return false; -} - -bool ISource::IsExprAlias(const TString& column) const { - std::array<EExprSeat, 5> exprSeats = {{EExprSeat::FlattenBy, EExprSeat::FlattenByExpr, EExprSeat::GroupBy, - EExprSeat::WindowPartitionBy, EExprSeat::DistinctAggr}}; - for (auto seat: exprSeats) { - if (IsAlias(seat, column)) { - return true; - } - } - return false; -} - -bool ISource::IsExprSeat(EExprSeat exprSeat, EExprType type) const { - auto expressions = Expressions(exprSeat); - if (!expressions) { - return false; - } - for (const auto& exprNode: expressions) { - if (exprNode->GetLabel()) { - return type == EExprType::WithExpression; - } - } - return type == EExprType::ColumnOnly; -} - -TString ISource::GetGroupByColumnAlias(const TString& column) const { - auto iter = GroupByColumnAliases.find(column); - if (iter == GroupByColumnAliases.end()) { - return {}; - } - return iter->second; -} - -const TString* ISource::GetWindowName() const { - return {}; -} - -bool ISource::IsCalcOverWindow() const { - return !AggregationOverWindow.empty() || !FuncOverWindow.empty() || - AnyOf(WinSpecs, [](const auto& item) { return item.second->Session; }); -} - -bool ISource::IsOverWindowSource() const { - return !WinSpecs.empty(); -} - -bool ISource::IsStream() const { - return false; -} - -EOrderKind ISource::GetOrderKind() const { - return EOrderKind::None; -} - -TWriteSettings ISource::GetWriteSettings() const { - return {}; -} - -bool ISource::SetSamplingOptions(TContext& ctx, - TPosition pos, - ESampleMode mode, - TNodePtr samplingRate, - TNodePtr samplingSeed) { - Y_UNUSED(pos); - Y_UNUSED(mode); - Y_UNUSED(samplingRate); - Y_UNUSED(samplingSeed); - ctx.Error() << "Sampling is only supported for table sources"; - return false; -} - -bool ISource::SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints) { - Y_UNUSED(pos); - Y_UNUSED(contextHints); - if (hints) { - ctx.Error() << "Explicit hints are only supported for table sources"; - return false; - } - return true; -} - -bool ISource::CalculateGroupingHint(TContext& ctx, const TVector<TString>& columns, ui64& hint) const { - Y_UNUSED(columns); - Y_UNUSED(hint); - ctx.Error() << "Source not support grouping hint"; - return false; -} - -TNodePtr ISource::BuildFilter(TContext& ctx, const TString& label) { - return Filters.empty() ? nullptr : Y(ctx.UseUnordered(*this) ? "OrderedFilter" : "Filter", label, BuildFilterLambda()); -} - -TNodePtr ISource::BuildFilterLambda() { - if (Filters.empty()) { - return BuildLambda(Pos, Y("row"), Y("Bool", Q("true"))); - } - YQL_ENSURE(Filters[0]->HasState(ENodeState::Initialized)); - TNodePtr filter(Filters[0]); - for (ui32 i = 1; i < Filters.size(); ++i) { - YQL_ENSURE(Filters[i]->HasState(ENodeState::Initialized)); - filter = Y("And", filter, Filters[i]); - } - filter = Y("Coalesce", filter, Y("Bool", Q("false"))); - return BuildLambda(Pos, Y("row"), filter); -} - -TNodePtr ISource::BuildFlattenByColumns(const TString& label) { - auto columnsList = Y("FlattenByColumns", Q(FlattenMode), label); - for (const auto& column: Expressions(EExprSeat::FlattenBy)) { - const auto columnNamePtr = column->GetColumnName(); - YQL_ENSURE(columnNamePtr); - if (column->GetLabel().empty()) { - columnsList = L(columnsList, Q(*columnNamePtr)); - } else { - columnsList = L(columnsList, Q(Y(Q(*columnNamePtr), Q(column->GetLabel())))); - } - } - return Y(Y("let", "res", columnsList)); -} - -TNodePtr ISource::BuildFlattenColumns(const TString& label) { - return Y(Y("let", "res", Y("Just", Y("FlattenStructs", label)))); -} - -namespace { - -TNodePtr BuildLambdaBodyForExprAliases(TPosition pos, const TVector<TNodePtr>& exprs) { - auto structObj = BuildAtom(pos, "row", TNodeFlags::Default); - for (const auto& exprNode: exprs) { - const auto name = exprNode->GetLabel(); - YQL_ENSURE(name); - structObj = structObj->Y("ForceRemoveMember", structObj, structObj->Q(name)); - if (dynamic_cast<const TSessionWindow*>(exprNode.Get())) { - continue; - } - if (dynamic_cast<const THoppingWindow*>(exprNode.Get())) { - continue; - } - structObj = structObj->Y("AddMember", structObj, structObj->Q(name), exprNode); - } - return structObj->Y("AsList", structObj); -} - -} - -TNodePtr ISource::BuildPreaggregatedMap(TContext& ctx) { - Y_UNUSED(ctx); - const auto& groupByExprs = Expressions(EExprSeat::GroupBy); - const auto& distinctAggrExprs = Expressions(EExprSeat::DistinctAggr); - YQL_ENSURE(groupByExprs || distinctAggrExprs); - - TNodePtr res; - if (groupByExprs) { - auto body = BuildLambdaBodyForExprAliases(Pos, groupByExprs); - res = Y("FlatMap", "core", BuildLambda(Pos, Y("row"), body)); - } - - if (distinctAggrExprs) { - auto body = BuildLambdaBodyForExprAliases(Pos, distinctAggrExprs); - auto lambda = BuildLambda(Pos, Y("row"), body); - res = res ? Y("FlatMap", res, lambda) : Y("FlatMap", "core", lambda); - } - return res; -} - -TNodePtr ISource::BuildPreFlattenMap(TContext& ctx) { - Y_UNUSED(ctx); - YQL_ENSURE(IsFlattenByExprs()); - return BuildLambdaBodyForExprAliases(Pos, Expressions(EExprSeat::FlattenByExpr)); -} - -TNodePtr ISource::BuildPrewindowMap(TContext& ctx) { - auto feed = BuildAtom(Pos, "row", TNodeFlags::Default); - for (const auto& exprNode: Expressions(EExprSeat::WindowPartitionBy)) { - const auto name = exprNode->GetLabel(); - if (name && !dynamic_cast<const TSessionWindow*>(exprNode.Get())) { - feed = Y("AddMember", feed, Q(name), exprNode); - } - } - return Y(ctx.UseUnordered(*this) ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), Y("AsList", feed))); -} - -bool ISource::BuildSamplingLambda(TNodePtr& node) { - if (!SamplingRate) { - return true; - } - auto res = Y("Coalesce", Y("SafeCast", SamplingRate, Y("DataType", Q("Double"))), Y("Double", Q("0"))); - res = Y("/", res, Y("Double", Q("100"))); - res = Y(Y("let", "res", Y("OptionalIf", Y("<", Y("Random", Y("DependsOn", "row")), res), "row"))); - node = BuildLambda(GetPos(), Y("row"), res, "res"); - return !!node; -} - -bool ISource::SetSamplingRate(TContext& ctx, TNodePtr samplingRate) { - if (samplingRate) { - if (!samplingRate->Init(ctx, this)) { - return false; - } - SamplingRate = Y("Ensure", samplingRate, Y(">=", samplingRate, Y("Double", Q("0"))), Y("String", Q("\"Expected sampling rate to be nonnegative\""))); - SamplingRate = Y("Ensure", SamplingRate, Y("<=", SamplingRate, Y("Double", Q("100"))), Y("String", Q("\"Sampling rate is over 100%\""))); - } - return true; -} - -std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TContext& ctx) { - if (GroupKeys.empty() && Aggregations.empty() && !IsCompositeSource() && !LegacyHoppingWindowSpec) { - return { nullptr, true }; - } - - auto keysTuple = Y(); - YQL_ENSURE(GroupKeys.size() == OrderedGroupKeys.size()); - for (const auto& key: OrderedGroupKeys) { - YQL_ENSURE(GroupKeys.contains(key)); - keysTuple = L(keysTuple, BuildQuotedAtom(Pos, key)); - } - - std::map<std::pair<bool, TString>, std::vector<IAggregation*>> genericAggrs; - for (const auto& aggr: Aggregations) { - if (const auto key = aggr->GetGenericKey()) { - genericAggrs[{aggr->IsDistinct(), *key}].emplace_back(aggr.Get()); - } - } - - for (const auto& aggr : genericAggrs) { - for (size_t i = 1U; i < aggr.second.size(); ++i) { - aggr.second.front()->Join(aggr.second[i]); - } - } - - const auto listType = Y("TypeOf", label); - auto aggrArgs = Y(); - const bool overState = GroupBySuffix == "CombineState" || GroupBySuffix == "MergeState" || - GroupBySuffix == "MergeFinalize" || GroupBySuffix == "MergeManyFinalize"; - const bool allowAggApply = !LegacyHoppingWindowSpec && !SessionWindow && !HoppingWindow; - for (const auto& aggr: Aggregations) { - auto res = aggr->AggregationTraits(listType, overState, GroupBySuffix == "MergeManyFinalize", allowAggApply, ctx); - if (!res.second) { - return { nullptr, false }; - } - - if (res.first) { - aggrArgs = L(aggrArgs, res.first); - } - } - - auto options = Y(); - if (CompactGroupBy || GroupBySuffix == "Finalize") { - options = L(options, Q(Y(Q("compact")))); - } - - if (LegacyHoppingWindowSpec) { - auto hoppingTraits = Y( - "HoppingTraits", - Y("ListItemType", listType), - BuildLambda(Pos, Y("row"), LegacyHoppingWindowSpec->TimeExtractor), - LegacyHoppingWindowSpec->Hop, - LegacyHoppingWindowSpec->Interval, - LegacyHoppingWindowSpec->Delay, - LegacyHoppingWindowSpec->DataWatermarks ? Q("true") : Q("false"), - Q("v1")); - - options = L(options, Q(Y(Q("hopping"), hoppingTraits))); - } - - if (SessionWindow) { - YQL_ENSURE(SessionWindow->GetLabel()); - auto sessionWindow = dynamic_cast<TSessionWindow*>(SessionWindow.Get()); - YQL_ENSURE(sessionWindow); - options = L(options, Q(Y(Q("session"), - Q(Y(BuildQuotedAtom(Pos, SessionWindow->GetLabel()), sessionWindow->BuildTraits(label)))))); - } - - if (HoppingWindow) { - YQL_ENSURE(HoppingWindow->GetLabel()); - auto hoppingWindow = dynamic_cast<THoppingWindow*>(HoppingWindow.Get()); - YQL_ENSURE(hoppingWindow); - options = L(options, Q(Y(Q("hopping"), - Q(Y(BuildQuotedAtom(Pos, HoppingWindow->GetLabel()), hoppingWindow->BuildTraits(label)))))); - } - - return { Y("AssumeColumnOrderPartial", Y("Aggregate" + GroupBySuffix, label, Q(keysTuple), Q(aggrArgs), Q(options)), Q(keysTuple)), true }; -} - -TMaybe<TString> ISource::FindColumnMistype(const TString& name) const { - auto result = FindMistypeIn(GroupKeys, name); - return result ? result : FindMistypeIn(ExprAliases, name); -} - -void ISource::AddDependentSource(ISource* usedSource) { - UsedSources.push_back(usedSource); -} - -class TYqlFrameBound final: public TCallNode { -public: - TYqlFrameBound(TPosition pos, TNodePtr bound) - : TCallNode(pos, "EvaluateExpr", 1, 1, { bound }) - , FakeSource(BuildFakeSource(pos)) - { - } - - bool DoInit(TContext& ctx, ISource* src) override { - if (!ValidateArguments(ctx)) { - return false; - } - - if (!Args[0]->Init(ctx, FakeSource.Get())) { - return false; - } - - return TCallNode::DoInit(ctx, src); - } - - TNodePtr DoClone() const final { - return new TYqlFrameBound(Pos, Args[0]->Clone()); - } -private: - TSourcePtr FakeSource; -}; - -TNodePtr BuildFrameNode(const TFrameBound& frame, EFrameType frameType) { - TString settingStr; - switch (frame.Settings) { - case FramePreceding: settingStr = "preceding"; break; - case FrameCurrentRow: settingStr = "currentRow"; break; - case FrameFollowing: settingStr = "following"; break; - default: YQL_ENSURE(false, "Unexpected frame setting"); - } - - TNodePtr node = frame.Bound; - TPosition pos = frame.Pos; - if (frameType != EFrameType::FrameByRows) { - TVector<TNodePtr> settings; - settings.push_back(BuildQuotedAtom(pos, settingStr, TNodeFlags::Default)); - if (frame.Settings != FrameCurrentRow) { - if (!node) { - node = BuildQuotedAtom(pos, "unbounded", TNodeFlags::Default); - } else if (!node->IsLiteral()) { - node = new TYqlFrameBound(pos, node); - } - settings.push_back(std::move(node)); - } - return BuildTuple(pos, std::move(settings)); - } - - // TODO: switch FrameByRows to common format above - YQL_ENSURE(frame.Settings != FrameCurrentRow, "Should be already replaced by 0 preceding/following"); - if (!node) { - node = BuildLiteralVoid(pos); - } else if (node->IsLiteral()) { - YQL_ENSURE(node->GetLiteralType() == "Int32"); - i32 value = FromString<i32>(node->GetLiteralValue()); - YQL_ENSURE(value >= 0); - if (frame.Settings == FramePreceding) { - value = -value; - } - node = new TCallNodeImpl(pos, "Int32", { BuildQuotedAtom(pos, ToString(value), TNodeFlags::Default) }); - } else { - if (frame.Settings == FramePreceding) { - node = new TCallNodeImpl(pos, "Minus", { node->Clone() }); - } - node = new TYqlFrameBound(pos, node); - } - return node; -} - -TNodePtr ISource::BuildWindowFrame(const TFrameSpecification& spec, bool isCompact) { - YQL_ENSURE(spec.FrameExclusion == FrameExclNone); - YQL_ENSURE(spec.FrameBegin); - YQL_ENSURE(spec.FrameEnd); - - auto frameBeginNode = BuildFrameNode(*spec.FrameBegin, spec.FrameType); - auto frameEndNode = BuildFrameNode(*spec.FrameEnd, spec.FrameType); - - auto begin = Q(Y(Q("begin"), frameBeginNode)); - auto end = Q(Y(Q("end"), frameEndNode)); - - return isCompact ? Q(Y(begin, end, Q(Y(Q("compact"))))) : Q(Y(begin, end)); -} - -class TSessionWindowTraits final: public TCallNode { -public: - TSessionWindowTraits(TPosition pos, const TVector<TNodePtr>& args) - : TCallNode(pos, "SessionWindowTraits", args) - , FakeSource(BuildFakeSource(pos)) - { - YQL_ENSURE(args.size() == 4); - } - - bool DoInit(TContext& ctx, ISource* src) override { - if (!ValidateArguments(ctx)) { - return false; - } - - if (!Args.back()->Init(ctx, FakeSource.Get())) { - return false; - } - - return TCallNode::DoInit(ctx, src); - } - - TNodePtr DoClone() const final { - return new TSessionWindowTraits(Pos, CloneContainer(Args)); - } -private: - TSourcePtr FakeSource; -}; - -TNodePtr ISource::BuildCalcOverWindow(TContext& ctx, const TString& label) { - YQL_ENSURE(IsCalcOverWindow()); - - TSet<TString> usedWindows; - for (auto& it : AggregationOverWindow) { - usedWindows.insert(it.first); - } - for (auto& it : FuncOverWindow) { - usedWindows.insert(it.first); - } - for (auto& it : WinSpecs) { - if (it.second->Session) { - usedWindows.insert(it.first); - } - } - - YQL_ENSURE(!usedWindows.empty()); - - const bool onePartition = usedWindows.size() == 1; - const auto useLabel = onePartition ? label : "partitioning"; - const auto listType = Y("TypeOf", useLabel); - auto framesProcess = Y(); - auto resultNode = onePartition ? Y() : Y(Y("let", "partitioning", label)); - - for (const auto& name : usedWindows) { - auto spec = FindWindowSpecification(ctx, name); - YQL_ENSURE(spec); - - auto aggsIter = AggregationOverWindow.find(name); - auto funcsIter = FuncOverWindow.find(name); - - const auto& aggs = (aggsIter == AggregationOverWindow.end()) ? TVector<TAggregationPtr>() : aggsIter->second; - const auto& funcs = (funcsIter == FuncOverWindow.end()) ? TVector<TNodePtr>() : funcsIter->second; - - auto frames = Y(); - TString frameType; - switch (spec->Frame->FrameType) { - case EFrameType::FrameByRows: frameType = "WinOnRows"; break; - case EFrameType::FrameByRange: frameType = "WinOnRange"; break; - case EFrameType::FrameByGroups: frameType = "WinOnGroups"; break; - } - YQL_ENSURE(frameType); - auto callOnFrame = Y(frameType, BuildWindowFrame(*spec->Frame, spec->IsCompact)); - for (auto& agg : aggs) { - auto winTraits = agg->WindowTraits(listType, ctx); - callOnFrame = L(callOnFrame, winTraits); - } - for (auto& func : funcs) { - auto winSpec = func->WindowSpecFunc(listType); - callOnFrame = L(callOnFrame, winSpec); - } - frames = L(frames, callOnFrame); - - auto keysTuple = Y(); - for (const auto& key: spec->Partitions) { - if (!dynamic_cast<TSessionWindow*>(key.Get())) { - keysTuple = L(keysTuple, AliasOrColumn(key, GetJoin())); - } - } - - auto sortSpec = spec->OrderBy.empty() ? Y("Void") : BuildSortSpec(spec->OrderBy, useLabel, true, false); - if (spec->Session) { - TString label = spec->Session->GetLabel(); - YQL_ENSURE(label); - auto sessionWindow = dynamic_cast<TSessionWindow*>(spec->Session.Get()); - YQL_ENSURE(sessionWindow); - auto labelNode = BuildQuotedAtom(sessionWindow->GetPos(), label); - - auto sessionTraits = sessionWindow->BuildTraits(useLabel); - framesProcess = Y("CalcOverSessionWindow", useLabel, Q(keysTuple), sortSpec, Q(frames), sessionTraits, Q(Y(labelNode))); - } else { - YQL_ENSURE(aggs || funcs); - framesProcess = Y("CalcOverWindow", useLabel, Q(keysTuple), sortSpec, Q(frames)); - } - - if (!onePartition) { - resultNode = L(resultNode, Y("let", "partitioning", framesProcess)); - } - } - if (onePartition) { - return framesProcess; - } else { - return Y("block", Q(L(resultNode, Y("return", "partitioning")))); - } -} - -TNodePtr ISource::BuildSort(TContext& ctx, const TString& label) { - Y_UNUSED(ctx); - Y_UNUSED(label); - return nullptr; -} - -TNodePtr ISource::BuildCleanupColumns(TContext& ctx, const TString& label) { - Y_UNUSED(ctx); - Y_UNUSED(label); - return nullptr; -} - -IJoin* ISource::GetJoin() { - return nullptr; -} - -ISource* ISource::GetCompositeSource() { - return nullptr; -} - -bool ISource::IsSelect() const { - return true; -} - -bool ISource::IsTableSource() const { - return false; -} - -bool ISource::ShouldUseSourceAsColumn(const TString& source) const { - Y_UNUSED(source); - return false; -} - -bool ISource::IsJoinKeysInitializing() const { - return false; -} - -bool ISource::DoInit(TContext& ctx, ISource* src) { - for (auto& column: Expressions(EExprSeat::FlattenBy)) { - if (!column->Init(ctx, this)) { - return false; - } - } - - if (IsFlattenColumns() && src) { - src->AllColumns(); - } - - return true; -} - -bool ISource::InitFilters(TContext& ctx) { - for (auto& filter: Filters) { - if (!filter->Init(ctx, this)) { - return false; - } - if (filter->IsAggregated() && !filter->IsConstant() && !filter->HasState(ENodeState::AggregationKey)) { - ctx.Error(filter->GetPos()) << "Can not use aggregated values in filtering"; - return false; - } - } - return true; -} - -TAstNode* ISource::Translate(TContext& ctx) const { - Y_VERIFY_DEBUG(false); - Y_UNUSED(ctx); - return nullptr; -} - -void ISource::FillSortParts(const TVector<TSortSpecificationPtr>& orderBy, TNodePtr& sortDirection, TNodePtr& sortKeySelector) { - TNodePtr expr; - if (orderBy.empty()) { - YQL_ENSURE(!sortKeySelector); - sortDirection = sortKeySelector = Y("Void"); - return; - } else if (orderBy.size() == 1) { - auto& sortSpec = orderBy.front(); - expr = Y("PersistableRepr", sortSpec->OrderExpr); - sortDirection = Y("Bool", Q(sortSpec->Ascending ? "true" : "false")); - } else { - auto exprList = Y(); - sortDirection = Y(); - for (const auto& sortSpec: orderBy) { - const auto asc = sortSpec->Ascending; - sortDirection = L(sortDirection, Y("Bool", Q(asc ? "true" : "false"))); - exprList = L(exprList, Y("PersistableRepr", sortSpec->OrderExpr)); - } - sortDirection = Q(sortDirection); - expr = Q(exprList); - } - sortKeySelector = BuildLambda(Pos, Y("row"), expr); -} - -TNodePtr ISource::BuildSortSpec(const TVector<TSortSpecificationPtr>& orderBy, const TString& label, bool traits, bool assume) { - YQL_ENSURE(!orderBy.empty()); - TNodePtr dirsNode; - TNodePtr keySelectorNode; - FillSortParts(orderBy, dirsNode, keySelectorNode); - if (traits) { - return Y("SortTraits", Y("TypeOf", label), dirsNode, keySelectorNode); - } else if (assume) { - return Y("AssumeSorted", label, dirsNode, keySelectorNode); - } else { - return Y("Sort", label, dirsNode, keySelectorNode); - } -} - -IJoin::IJoin(TPosition pos) - : ISource(pos) -{ -} - -IJoin::~IJoin() -{ -} - -IJoin* IJoin::GetJoin() { - return this; -} - namespace { bool UnescapeQuoted(const TString& str, TPosition& pos, char quoteChar, TString& result, TString& error) { result = error = {}; diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h index d49eee34e7..15d28dc9c7 100644 --- a/ydb/library/yql/sql/v1/node.h +++ b/ydb/library/yql/sql/v1/node.h @@ -510,24 +510,6 @@ namespace NSQLTranslationV1 { TString Repr; }; - typedef TIntrusivePtr<ISource> TSourcePtr; - - struct TTableRef { - TString RefName; - TString Service; - TDeferredAtom Cluster; - TNodePtr Keys; - TNodePtr Options; - TSourcePtr Source; - - TTableRef() = default; - TTableRef(const TString& refName, const TString& service, const TDeferredAtom& cluster, TNodePtr keys); - TTableRef(const TTableRef&) = default; - TTableRef& operator=(const TTableRef&) = default; - - TString ShortName() const; - }; - struct TTopicRef { TString RefName; TDeferredAtom Cluster; @@ -653,8 +635,6 @@ namespace NSQLTranslationV1 { typedef TIntrusivePtr<TWindowSpecification> TWindowSpecificationPtr; typedef TMap<TString, TWindowSpecificationPtr> TWinSpecs; - typedef TVector<TTableRef> TTableList; - void WarnIfAliasFromSelectIsUsedInGroupBy(TContext& ctx, const TVector<TNodePtr>& selectTerms, const TVector<TNodePtr>& groupByTerms, const TVector<TNodePtr>& groupByExprTerms); bool ValidateAllNodesForAggregation(TContext& ctx, const TVector<TNodePtr>& nodes); @@ -851,153 +831,6 @@ namespace NSQLTranslationV1 { Passthrough }; - class IJoin; - class ISource: public INode { - public: - virtual ~ISource(); - - virtual bool IsFake() const; - virtual void AllColumns(); - virtual const TColumns* GetColumns() const; - virtual void GetInputTables(TTableList& tableList) const; - /// in case of error unfilled, flag show if ensure column name - virtual TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column); - virtual void FinishColumns(); - virtual bool AddExpressions(TContext& ctx, const TVector<TNodePtr>& columns, EExprSeat exprSeat); - virtual void SetFlattenByMode(const TString& mode); - virtual void MarkFlattenColumns(); - virtual bool IsFlattenColumns() const; - virtual bool AddFilter(TContext& ctx, TNodePtr filter); - virtual bool AddGroupKey(TContext& ctx, const TString& column); - virtual void SetCompactGroupBy(bool compactGroupBy); - virtual void SetGroupBySuffix(const TString& suffix); - virtual TString MakeLocalName(const TString& name); - virtual bool AddAggregation(TContext& ctx, TAggregationPtr aggr); - virtual bool AddFuncOverWindow(TContext& ctx, TNodePtr expr); - virtual void AddTmpWindowColumn(const TString& column); - virtual const TVector<TString>& GetTmpWindowColumns() const; - virtual bool HasAggregations() const; - virtual void AddWindowSpecs(TWinSpecs winSpecs); - virtual bool AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func); - virtual bool AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func); - virtual void SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec); - virtual TLegacyHoppingWindowSpecPtr GetLegacyHoppingWindowSpec() const; - virtual TNodePtr GetSessionWindowSpec() const; - virtual TNodePtr GetHoppingWindowSpec() const; - virtual bool IsCompositeSource() const; - virtual bool IsGroupByColumn(const TString& column) const; - virtual bool IsFlattenByColumns() const; - virtual bool IsFlattenByExprs() const; - virtual bool IsCalcOverWindow() const; - virtual bool IsOverWindowSource() const; - virtual bool IsStream() const; - virtual EOrderKind GetOrderKind() const; - virtual TWriteSettings GetWriteSettings() const; - virtual bool SetSamplingOptions(TContext& ctx, TPosition pos, ESampleMode mode, TNodePtr samplingRate, TNodePtr samplingSeed); - virtual bool SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints); - virtual bool CalculateGroupingHint(TContext& ctx, const TVector<TString>& columns, ui64& hint) const; - virtual TNodePtr BuildFilter(TContext& ctx, const TString& label); - virtual TNodePtr BuildFilterLambda(); - virtual TNodePtr BuildFlattenByColumns(const TString& label); - virtual TNodePtr BuildFlattenColumns(const TString& label); - virtual TNodePtr BuildPreaggregatedMap(TContext& ctx); - virtual TNodePtr BuildPreFlattenMap(TContext& ctx); - virtual TNodePtr BuildPrewindowMap(TContext& ctx); - virtual std::pair<TNodePtr, bool> BuildAggregation(const TString& label, TContext& ctx); - virtual TNodePtr BuildCalcOverWindow(TContext& ctx, const TString& label); - virtual TNodePtr BuildSort(TContext& ctx, const TString& label); - virtual TNodePtr BuildCleanupColumns(TContext& ctx, const TString& label); - virtual bool BuildSamplingLambda(TNodePtr& node); - virtual bool SetSamplingRate(TContext& ctx, TNodePtr samplingRate); - virtual IJoin* GetJoin(); - virtual ISource* GetCompositeSource(); - virtual bool IsSelect() const; - virtual bool IsTableSource() const; - virtual bool ShouldUseSourceAsColumn(const TString& source) const; - virtual bool IsJoinKeysInitializing() const; - virtual const TString* GetWindowName() const; - - virtual bool DoInit(TContext& ctx, ISource* src); - virtual TNodePtr Build(TContext& ctx) = 0; - - virtual TMaybe<TString> FindColumnMistype(const TString& name) const; - - virtual bool InitFilters(TContext& ctx); - void AddDependentSource(ISource* usedSource); - bool IsAlias(EExprSeat exprSeat, const TString& label) const; - bool IsExprAlias(const TString& label) const; - bool IsExprSeat(EExprSeat exprSeat, EExprType type = EExprType::WithExpression) const; - TString GetGroupByColumnAlias(const TString& column) const; - const TVector<TNodePtr>& Expressions(EExprSeat exprSeat) const; - - virtual TWindowSpecificationPtr FindWindowSpecification(TContext& ctx, const TString& windowName) const; - - TIntrusivePtr<ISource> CloneSource() const; - - protected: - ISource(TPosition pos); - virtual TAstNode* Translate(TContext& ctx) const; - - void FillSortParts(const TVector<TSortSpecificationPtr>& orderBy, TNodePtr& sortKeySelector, TNodePtr& sortDirection); - TNodePtr BuildSortSpec(const TVector<TSortSpecificationPtr>& orderBy, const TString& label, bool traits, bool assume); - - TVector<TNodePtr>& Expressions(EExprSeat exprSeat); - TNodePtr AliasOrColumn(const TNodePtr& node, bool withSource); - - TNodePtr BuildWindowFrame(const TFrameSpecification& spec, bool isCompact); - - THashSet<TString> ExprAliases; - THashSet<TString> FlattenByAliases; - THashMap<TString, TString> GroupByColumnAliases; - TVector<TNodePtr> Filters; - bool CompactGroupBy = false; - TString GroupBySuffix; - TSet<TString> GroupKeys; - TVector<TString> OrderedGroupKeys; - std::array<TVector<TNodePtr>, static_cast<unsigned>(EExprSeat::Max)> NamedExprs; - TVector<TAggregationPtr> Aggregations; - TMap<TString, TVector<TAggregationPtr>> AggregationOverWindow; - TMap<TString, TVector<TNodePtr>> FuncOverWindow; - TWinSpecs WinSpecs; - TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec; - TNodePtr SessionWindow; - TNodePtr HoppingWindow; - TVector<ISource*> UsedSources; - TString FlattenMode; - bool FlattenColumns = false; - THashMap<TString, ui32> GenIndexes; - TVector<TString> TmpWindowColumns; - TNodePtr SamplingRate; - }; - - template<> - inline TVector<TSourcePtr> CloneContainer<TSourcePtr>(const TVector<TSourcePtr>& args) { - TVector<TSourcePtr> cloneArgs; - cloneArgs.reserve(args.size()); - for (const auto& arg: args) { - cloneArgs.emplace_back(arg ? arg->CloneSource() : nullptr); - } - return cloneArgs; - } - - struct TJoinLinkSettings { - bool ForceSortedMerge = false; - }; - - class IJoin: public ISource { - public: - virtual ~IJoin(); - - virtual IJoin* GetJoin(); - virtual TNodePtr BuildJoinKeys(TContext& ctx, const TVector<TDeferredAtom>& names) = 0; - virtual void SetupJoin(const TString& joinOp, TNodePtr joinExpr, const TJoinLinkSettings& linkSettings) = 0; - virtual const THashMap<TString, THashSet<TString>>& GetSameKeysMap() const = 0; - virtual TVector<TString> GetJoinLabels() const = 0; - - protected: - IJoin(TPosition pos); - }; - class TListOfNamedNodes final: public INode { public: TListOfNamedNodes(TPosition pos, TVector<TNodePtr>&& exprs); @@ -1075,46 +908,6 @@ namespace NSQLTranslationV1 { TNodePtr Node; }; - class TSessionWindow final : public INode { - public: - TSessionWindow(TPosition pos, const TVector<TNodePtr>& args); - void MarkValid(); - TNodePtr BuildTraits(const TString& label) const; - private: - bool DoInit(TContext& ctx, ISource* src) override; - TAstNode* Translate(TContext&) const override; - void DoUpdateState() const override; - TNodePtr DoClone() const override; - TString GetOpName() const override; - - TVector<TNodePtr> Args; - TSourcePtr FakeSource; - TNodePtr Node; - bool Valid; - }; - - class THoppingWindow final : public INode { - public: - THoppingWindow(TPosition pos, const TVector<TNodePtr>& args); - void MarkValid(); - TNodePtr BuildTraits(const TString& label) const; - public: - TNodePtr Hop; - TNodePtr Interval; - private: - bool DoInit(TContext& ctx, ISource* src) override; - TAstNode* Translate(TContext&) const override; - void DoUpdateState() const override; - TNodePtr DoClone() const override; - TString GetOpName() const override; - TNodePtr ProcessIntervalParam(const TNodePtr& val) const; - - TVector<TNodePtr> Args; - TSourcePtr FakeSource; - TNodePtr Node; - bool Valid; - }; - struct TStringContent { TString Content; NYql::NUdf::EDataSlot Type = NYql::NUdf::EDataSlot::String; @@ -1399,83 +1192,7 @@ namespace NSQLTranslationV1 { bool warnOnYqlNameSpace = true ); - // Implemented in join.cpp - TString NormalizeJoinOp(const TString& joinOp); - TSourcePtr BuildEquiJoin(TPosition pos, TVector<TSourcePtr>&& sources, TVector<bool>&& anyFlags, bool strictJoinKeyTypes); - - // Implemented in select.cpp - TNodePtr BuildSubquery(TSourcePtr source, const TString& alias, bool inSubquery, int ensureTupleSize, TScopedStatePtr scoped); - TNodePtr BuildSubqueryRef(TNodePtr subquery, const TString& alias, int tupleIndex = -1); - TNodePtr BuildInvalidSubqueryRef(TPosition subqueryPos); - TNodePtr BuildSourceNode(TPosition pos, TSourcePtr source, bool checkExist = false); - TSourcePtr BuildMuxSource(TPosition pos, TVector<TSourcePtr>&& sources); - TSourcePtr BuildFakeSource(TPosition pos, bool missingFrom = false, bool inSubquery = false); - TSourcePtr BuildNodeSource(TPosition pos, const TNodePtr& node, bool wrapToList = false); - TSourcePtr BuildTableSource(TPosition pos, const TTableRef& table, const TString& label = TString()); - TSourcePtr BuildInnerSource(TPosition pos, TNodePtr node, const TString& service, const TDeferredAtom& cluster, const TString& label = TString()); - TSourcePtr BuildRefColumnSource(TPosition pos, const TString& partExpression); - TSourcePtr BuildUnionAll(TPosition pos, TVector<TSourcePtr>&& sources, const TWriteSettings& settings); - TSourcePtr BuildOverWindowSource(TPosition pos, const TString& windowName, ISource* origSource); - - TNodePtr BuildOrderBy(TPosition pos, const TVector<TNodePtr>& keys, const TVector<bool>& order); - TNodePtr BuildSkipTake(TPosition pos, const TNodePtr& skip, const TNodePtr& take); - - - TSourcePtr BuildSelectCore( - TContext& ctx, - TPosition pos, - TSourcePtr source, - const TVector<TNodePtr>& groupByExpr, - const TVector<TNodePtr>& groupBy, - bool compactGroupBy, - const TString& groupBySuffix, - bool assumeSorted, - const TVector<TSortSpecificationPtr>& orderBy, - TNodePtr having, - TWinSpecs&& windowSpec, - TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec, - TVector<TNodePtr>&& terms, - bool distinct, - TVector<TNodePtr>&& without, - bool selectStream, - const TWriteSettings& settings - ); - TSourcePtr BuildSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake); - - - enum class ReduceMode { - ByPartition, - ByAll, - }; - TSourcePtr BuildReduce(TPosition pos, ReduceMode mode, TSourcePtr source, TVector<TSortSpecificationPtr>&& orderBy, - TVector<TNodePtr>&& keys, TVector<TNodePtr>&& args, TNodePtr udf, TNodePtr having, const TWriteSettings& settings, - const TVector<TSortSpecificationPtr>& assumeOrderBy, bool listCall); - TSourcePtr BuildProcess(TPosition pos, TSourcePtr source, TNodePtr with, bool withExtFunction, TVector<TNodePtr>&& terms, bool listCall, - bool prcessStream, const TWriteSettings& settings, const TVector<TSortSpecificationPtr>& assumeOrderBy); - - TNodePtr BuildSelectResult(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery, TScopedStatePtr scoped); - - // Implemented in insert.cpp - TSourcePtr BuildWriteValues(TPosition pos, const TString& opertationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values); - TSourcePtr BuildWriteValues(TPosition pos, const TString& opertationHumanName, const TVector<TString>& columnsHint, TSourcePtr source); - TSourcePtr BuildUpdateValues(TPosition pos, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values); - - EWriteColumnMode ToWriteColumnsMode(ESQLWriteColumnMode sqlWriteColumnMode); - TNodePtr BuildEraseColumns(TPosition pos, const TVector<TString>& columns); - TNodePtr BuildIntoTableOptions(TPosition pos, const TVector<TString>& eraseColumns, const TTableHints& hints); - TNodePtr BuildWriteColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, EWriteColumnMode mode, TSourcePtr values, TNodePtr options = nullptr); - TNodePtr BuildUpdateColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr values, TSourcePtr source); - TNodePtr BuildDelete(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr source); - // Implemented in query.cpp - TNodePtr BuildTableKey(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TDeferredAtom& name, const TString& view); - TNodePtr BuildTableKeys(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TString& func, const TVector<TTableArg>& args); - TNodePtr BuildTopicKey(TPosition pos, const TDeferredAtom& cluster, const TDeferredAtom& name); - TNodePtr BuildInputOptions(TPosition pos, const TTableHints& hints); - TNodePtr BuildInputTables(TPosition pos, const TTableList& tables, bool inSubquery, TScopedStatePtr scoped); - TNodePtr BuildCreateTable(TPosition pos, const TTableRef& tr, const TCreateTableParameters& params, TScopedStatePtr scoped); - TNodePtr BuildAlterTable(TPosition pos, const TTableRef& tr, const TAlterTableParameters& params, TScopedStatePtr scoped); - TNodePtr BuildDropTable(TPosition pos, const TTableRef& table, ETableType tableType, TScopedStatePtr scoped); TNodePtr BuildCreateUser(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TDeferredAtom& name, const TMaybe<TRoleParameters>& params, TScopedStatePtr scoped); TNodePtr BuildCreateGroup(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TDeferredAtom& name, TScopedStatePtr scoped); TNodePtr BuildAlterUser(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TDeferredAtom& name, const TRoleParameters& params, TScopedStatePtr scoped); @@ -1499,8 +1216,6 @@ namespace NSQLTranslationV1 { std::map<TString, TNodePtr>&& settings, const TObjectOperatorContext& context); TNodePtr BuildDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context); - TNodePtr BuildWriteTable(TPosition pos, const TString& label, const TTableRef& table, EWriteColumnMode mode, TNodePtr options, - TScopedStatePtr scoped); TNodePtr BuildWriteResult(TPosition pos, const TString& label, TNodePtr settings); TNodePtr BuildCommitClusters(TPosition pos); TNodePtr BuildRollbackClusters(TPosition pos); @@ -1528,9 +1243,5 @@ namespace NSQLTranslationV1 { bool Parseui32(TNodePtr from, ui32& to); TNodePtr GroundWithExpr(const TNodePtr& ground, const TNodePtr& expr); - TSourcePtr TryMakeSourceFromExpression(TContext& ctx, const TString& currService, const TDeferredAtom& currCluster, - TNodePtr node, const TString& view = {}); - void MakeTableFromExpression(TContext& ctx, TNodePtr node, TDeferredAtom& table, const TString& prefix = {}); - TDeferredAtom MakeAtomFromExpression(TContext& ctx, TNodePtr node, const TString& prefix = {}); - TString NormalizeTypeString(const TString& str); + const TString* DeriveCommonSourceName(const TVector<TNodePtr> &nodes); } // namespace NSQLTranslationV1 diff --git a/ydb/library/yql/sql/v1/select.cpp b/ydb/library/yql/sql/v1/select.cpp index 65d7a3624c..28228731d3 100644 --- a/ydb/library/yql/sql/v1/select.cpp +++ b/ydb/library/yql/sql/v1/select.cpp @@ -1,5 +1,5 @@ #include "sql.h" -#include "node.h" +#include "source.h" #include "context.h" @@ -1660,7 +1660,6 @@ public: return false; } } - return true; } diff --git a/ydb/library/yql/sql/v1/source.cpp b/ydb/library/yql/sql/v1/source.cpp new file mode 100644 index 0000000000..8c696ce22b --- /dev/null +++ b/ydb/library/yql/sql/v1/source.cpp @@ -0,0 +1,949 @@ +#include "source.h" +#include "context.h" + +#include <ydb/library/yql/ast/yql_ast_escaping.h> +#include <ydb/library/yql/ast/yql_expr.h> +#include <ydb/library/yql/core/sql_types/simple_types.h> +#include <ydb/library/yql/minikql/mkql_type_ops.h> +#include <ydb/library/yql/parser/pg_catalog/catalog.h> +#include <ydb/library/yql/utils/yql_panic.h> + +#include <library/cpp/containers/stack_vector/stack_vec.h> +#include <library/cpp/charset/ci_string.h> +#include <util/generic/hash_set.h> +#include <util/stream/str.h> +#include <util/string/cast.h> +#include <util/string/escape.h> +#include <util/string/subst.h> + +using namespace NYql; + +namespace NSQLTranslationV1 { + + +TTableRef::TTableRef(const TString& refName, const TString& service, const TDeferredAtom& cluster, TNodePtr keys) + : RefName(refName) + , Service(to_lower(service)) + , Cluster(cluster) + , Keys(keys) +{ +} + +TString TTableRef::ShortName() const { + Y_VERIFY_DEBUG(Keys); + if (Keys->GetTableKeys()->GetTableName()) { + return *Keys->GetTableKeys()->GetTableName(); + } + return TString(); +} + +ISource::ISource(TPosition pos) + : INode(pos) +{ +} + +ISource::~ISource() +{ +} + +TSourcePtr ISource::CloneSource() const { + Y_VERIFY_DEBUG(dynamic_cast<ISource*>(Clone().Get()), "Cloned node is no source"); + TSourcePtr result = static_cast<ISource*>(Clone().Get()); + for (auto curFilter: Filters) { + result->Filters.emplace_back(curFilter->Clone()); + } + for (int i = 0; i < static_cast<int>(EExprSeat::Max); ++i) { + result->NamedExprs[i] = CloneContainer(NamedExprs[i]); + } + result->FlattenColumns = FlattenColumns; + result->FlattenMode = FlattenMode; + return result; +} + +bool ISource::IsFake() const { + return false; +} + +void ISource::AllColumns() { + return; +} + +const TColumns* ISource::GetColumns() const { + return nullptr; +} + +void ISource::GetInputTables(TTableList& tableList) const { + for (auto srcPtr: UsedSources) { + srcPtr->GetInputTables(tableList); + } + return; +} + +TMaybe<bool> ISource::AddColumn(TContext& ctx, TColumnNode& column) { + if (column.IsReliable()) { + ctx.Error(Pos) << "Source does not allow column references"; + ctx.Error(column.GetPos()) << "Column reference " << + (column.GetColumnName() ? "'" + *column.GetColumnName() + "'" : "(expr)"); + } + return {}; +} + +void ISource::FinishColumns() { +} + + +bool ISource::AddFilter(TContext& ctx, TNodePtr filter) { + Y_UNUSED(ctx); + Filters.push_back(filter); + return true; +} + +bool ISource::AddGroupKey(TContext& ctx, const TString& column) { + if (!GroupKeys.insert(column).second) { + ctx.Error() << "Duplicate grouping column: " << column; + return false; + } + OrderedGroupKeys.push_back(column); + return true; +} + +void ISource::SetCompactGroupBy(bool compactGroupBy) { + CompactGroupBy = compactGroupBy; +} + +void ISource::SetGroupBySuffix(const TString& suffix) { + GroupBySuffix = suffix; +} + +bool ISource::AddExpressions(TContext& ctx, const TVector<TNodePtr>& expressions, EExprSeat exprSeat) { + YQL_ENSURE(exprSeat < EExprSeat::Max); + THashSet<TString> names; + THashSet<TString> aliasSet; + // TODO: merge FlattenBy with FlattenByExpr + const bool isFlatten = (exprSeat == EExprSeat::FlattenBy || exprSeat == EExprSeat::FlattenByExpr); + THashSet<TString>& aliases = isFlatten ? FlattenByAliases : aliasSet; + for (const auto& expr: expressions) { + const auto& alias = expr->GetLabel(); + const auto& columnNamePtr = expr->GetColumnName(); + if (alias) { + ExprAliases.insert(alias); + if (!aliases.emplace(alias).second) { + ctx.Error(expr->GetPos()) << "Duplicate alias found: " << alias << " in " << exprSeat << " section"; + return false; + } + if (names.contains(alias)) { + ctx.Error(expr->GetPos()) << "Collision between alias and column name: " << alias << " in " << exprSeat << " section"; + return false; + } + } + if (columnNamePtr) { + const auto& sourceName = *expr->GetSourceName(); + auto columnName = *columnNamePtr; + if (sourceName) { + columnName = DotJoin(sourceName, columnName); + } + if (!names.emplace(columnName).second) { + ctx.Error(expr->GetPos()) << "Duplicate column name found: " << columnName << " in " << exprSeat << " section"; + return false; + } + if (!alias && aliases.contains(columnName)) { + ctx.Error(expr->GetPos()) << "Collision between alias and column name: " << columnName << " in " << exprSeat << " section"; + return false; + } + if (alias && exprSeat == EExprSeat::GroupBy) { + auto columnAlias = GroupByColumnAliases.emplace(columnName, alias); + auto oldAlias = columnAlias.first->second; + if (columnAlias.second && oldAlias != alias) { + ctx.Error(expr->GetPos()) << "Alias for column not same, column: " << columnName << + ", exist alias: " << oldAlias << ", another alias: " << alias; + return false; + } + } + } + + if (exprSeat == EExprSeat::GroupBy) { + if (auto sessionWindow = dynamic_cast<TSessionWindow*>(expr.Get())) { + if (SessionWindow) { + ctx.Error(expr->GetPos()) << "Duplicate session window specification:"; + ctx.Error(SessionWindow->GetPos()) << "Previous session window is declared here"; + return false; + } + SessionWindow = expr; + } + if (auto hoppingWindow = dynamic_cast<THoppingWindow*>(expr.Get())) { + if (HoppingWindow) { + ctx.Error(expr->GetPos()) << "Duplicate hopping window specification:"; + ctx.Error(HoppingWindow->GetPos()) << "Previous hopping window is declared here"; + return false; + } + HoppingWindow = expr; + } + } + Expressions(exprSeat).emplace_back(expr); + } + return true; +} + +void ISource::SetFlattenByMode(const TString& mode) { + FlattenMode = mode; +} + +void ISource::MarkFlattenColumns() { + FlattenColumns = true; +} + +bool ISource::IsFlattenColumns() const { + return FlattenColumns; +} + +TString ISource::MakeLocalName(const TString& name) { + auto iter = GenIndexes.find(name); + if (iter == GenIndexes.end()) { + iter = GenIndexes.emplace(name, 0).first; + } + TStringBuilder str; + str << name << iter->second; + ++iter->second; + return std::move(str); +} + +bool ISource::AddAggregation(TContext& ctx, TAggregationPtr aggr) { + Y_UNUSED(ctx); + YQL_ENSURE(aggr); + Aggregations.push_back(aggr); + return true; +} + +bool ISource::HasAggregations() const { + return !Aggregations.empty() || !GroupKeys.empty(); +} + +void ISource::AddWindowSpecs(TWinSpecs winSpecs) { + WinSpecs = winSpecs; +} + +bool ISource::AddFuncOverWindow(TContext& ctx, TNodePtr expr) { + Y_UNUSED(ctx); + Y_UNUSED(expr); + return false; +} + +void ISource::AddTmpWindowColumn(const TString& column) { + TmpWindowColumns.push_back(column); +} + +const TVector<TString>& ISource::GetTmpWindowColumns() const { + return TmpWindowColumns; +} + +void ISource::SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec) { + LegacyHoppingWindowSpec = spec; +} + +TLegacyHoppingWindowSpecPtr ISource::GetLegacyHoppingWindowSpec() const { + return LegacyHoppingWindowSpec; +} + +TNodePtr ISource::GetSessionWindowSpec() const { + return SessionWindow; +} + +TNodePtr ISource::GetHoppingWindowSpec() const { + return HoppingWindow; +} + +TWindowSpecificationPtr ISource::FindWindowSpecification(TContext& ctx, const TString& windowName) const { + auto winIter = WinSpecs.find(windowName); + if (winIter == WinSpecs.end()) { + ctx.Error(Pos) << "Unable to find window specification for window '" << windowName << "'"; + return {}; + } + YQL_ENSURE(winIter->second); + return winIter->second; +} + +inline TVector<TNodePtr>& ISource::Expressions(EExprSeat exprSeat) { + return NamedExprs[static_cast<size_t>(exprSeat)]; +} + +const TVector<TNodePtr>& ISource::Expressions(EExprSeat exprSeat) const { + return NamedExprs[static_cast<size_t>(exprSeat)]; +} + +inline TNodePtr ISource::AliasOrColumn(const TNodePtr& node, bool withSource) { + auto result = node->GetLabel(); + if (!result) { + const auto columnNamePtr = node->GetColumnName(); + YQL_ENSURE(columnNamePtr); + result = *columnNamePtr; + if (withSource) { + const auto sourceNamePtr = node->GetSourceName(); + if (sourceNamePtr) { + result = DotJoin(*sourceNamePtr, result); + } + } + } + return BuildQuotedAtom(node->GetPos(), result); +} + +bool ISource::AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func) { + YQL_ENSURE(func->IsOverWindow()); + if (func->IsDistinct()) { + ctx.Error(func->GetPos()) << "Aggregation with distinct is not allowed over window: " << windowName; + return false; + } + if (!FindWindowSpecification(ctx, windowName)) { + return false; + } + AggregationOverWindow[windowName].emplace_back(std::move(func)); + return true; +} + +bool ISource::AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func) { + if (!FindWindowSpecification(ctx, windowName)) { + return false; + } + FuncOverWindow[windowName].emplace_back(std::move(func)); + return true; +} + +bool ISource::IsCompositeSource() const { + return false; +} + +bool ISource::IsGroupByColumn(const TString& column) const { + return GroupKeys.contains(column); +} + +bool ISource::IsFlattenByColumns() const { + return !Expressions(EExprSeat::FlattenBy).empty(); +} + +bool ISource::IsFlattenByExprs() const { + return !Expressions(EExprSeat::FlattenByExpr).empty(); +} + +bool ISource::IsAlias(EExprSeat exprSeat, const TString& column) const { + for (const auto& exprNode: Expressions(exprSeat)) { + const auto& labelName = exprNode->GetLabel(); + if (labelName && labelName == column) { + return true; + } + } + return false; +} + +bool ISource::IsExprAlias(const TString& column) const { + std::array<EExprSeat, 5> exprSeats = {{EExprSeat::FlattenBy, EExprSeat::FlattenByExpr, EExprSeat::GroupBy, + EExprSeat::WindowPartitionBy, EExprSeat::DistinctAggr}}; + for (auto seat: exprSeats) { + if (IsAlias(seat, column)) { + return true; + } + } + return false; +} + +bool ISource::IsExprSeat(EExprSeat exprSeat, EExprType type) const { + auto expressions = Expressions(exprSeat); + if (!expressions) { + return false; + } + for (const auto& exprNode: expressions) { + if (exprNode->GetLabel()) { + return type == EExprType::WithExpression; + } + } + return type == EExprType::ColumnOnly; +} + +TString ISource::GetGroupByColumnAlias(const TString& column) const { + auto iter = GroupByColumnAliases.find(column); + if (iter == GroupByColumnAliases.end()) { + return {}; + } + return iter->second; +} + +const TString* ISource::GetWindowName() const { + return {}; +} + +bool ISource::IsCalcOverWindow() const { + return !AggregationOverWindow.empty() || !FuncOverWindow.empty() || + AnyOf(WinSpecs, [](const auto& item) { return item.second->Session; }); +} + +bool ISource::IsOverWindowSource() const { + return !WinSpecs.empty(); +} + +bool ISource::IsStream() const { + return false; +} + +EOrderKind ISource::GetOrderKind() const { + return EOrderKind::None; +} + +TWriteSettings ISource::GetWriteSettings() const { + return {}; +} + +bool ISource::SetSamplingOptions(TContext& ctx, + TPosition pos, + ESampleMode mode, + TNodePtr samplingRate, + TNodePtr samplingSeed) { + Y_UNUSED(pos); + Y_UNUSED(mode); + Y_UNUSED(samplingRate); + Y_UNUSED(samplingSeed); + ctx.Error() << "Sampling is only supported for table sources"; + return false; +} + +bool ISource::SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints) { + Y_UNUSED(pos); + Y_UNUSED(contextHints); + if (hints) { + ctx.Error() << "Explicit hints are only supported for table sources"; + return false; + } + return true; +} + +bool ISource::CalculateGroupingHint(TContext& ctx, const TVector<TString>& columns, ui64& hint) const { + Y_UNUSED(columns); + Y_UNUSED(hint); + ctx.Error() << "Source not support grouping hint"; + return false; +} + +TNodePtr ISource::BuildFilter(TContext& ctx, const TString& label) { + return Filters.empty() ? nullptr : Y(ctx.UseUnordered(*this) ? "OrderedFilter" : "Filter", label, BuildFilterLambda()); +} + +TNodePtr ISource::BuildFilterLambda() { + if (Filters.empty()) { + return BuildLambda(Pos, Y("row"), Y("Bool", Q("true"))); + } + YQL_ENSURE(Filters[0]->HasState(ENodeState::Initialized)); + TNodePtr filter(Filters[0]); + for (ui32 i = 1; i < Filters.size(); ++i) { + YQL_ENSURE(Filters[i]->HasState(ENodeState::Initialized)); + filter = Y("And", filter, Filters[i]); + } + filter = Y("Coalesce", filter, Y("Bool", Q("false"))); + return BuildLambda(Pos, Y("row"), filter); +} + +TNodePtr ISource::BuildFlattenByColumns(const TString& label) { + auto columnsList = Y("FlattenByColumns", Q(FlattenMode), label); + for (const auto& column: Expressions(EExprSeat::FlattenBy)) { + const auto columnNamePtr = column->GetColumnName(); + YQL_ENSURE(columnNamePtr); + if (column->GetLabel().empty()) { + columnsList = L(columnsList, Q(*columnNamePtr)); + } else { + columnsList = L(columnsList, Q(Y(Q(*columnNamePtr), Q(column->GetLabel())))); + } + } + return Y(Y("let", "res", columnsList)); +} + +TNodePtr ISource::BuildFlattenColumns(const TString& label) { + return Y(Y("let", "res", Y("Just", Y("FlattenStructs", label)))); +} + +namespace { + +TNodePtr BuildLambdaBodyForExprAliases(TPosition pos, const TVector<TNodePtr>& exprs) { + auto structObj = BuildAtom(pos, "row", TNodeFlags::Default); + for (const auto& exprNode: exprs) { + const auto name = exprNode->GetLabel(); + YQL_ENSURE(name); + structObj = structObj->Y("ForceRemoveMember", structObj, structObj->Q(name)); + if (dynamic_cast<const TSessionWindow*>(exprNode.Get())) { + continue; + } + if (dynamic_cast<const THoppingWindow*>(exprNode.Get())) { + continue; + } + structObj = structObj->Y("AddMember", structObj, structObj->Q(name), exprNode); + } + return structObj->Y("AsList", structObj); +} + +} + +TNodePtr ISource::BuildPreaggregatedMap(TContext& ctx) { + Y_UNUSED(ctx); + const auto& groupByExprs = Expressions(EExprSeat::GroupBy); + const auto& distinctAggrExprs = Expressions(EExprSeat::DistinctAggr); + YQL_ENSURE(groupByExprs || distinctAggrExprs); + + TNodePtr res; + if (groupByExprs) { + auto body = BuildLambdaBodyForExprAliases(Pos, groupByExprs); + res = Y("FlatMap", "core", BuildLambda(Pos, Y("row"), body)); + } + + if (distinctAggrExprs) { + auto body = BuildLambdaBodyForExprAliases(Pos, distinctAggrExprs); + auto lambda = BuildLambda(Pos, Y("row"), body); + res = res ? Y("FlatMap", res, lambda) : Y("FlatMap", "core", lambda); + } + return res; +} + +TNodePtr ISource::BuildPreFlattenMap(TContext& ctx) { + Y_UNUSED(ctx); + YQL_ENSURE(IsFlattenByExprs()); + return BuildLambdaBodyForExprAliases(Pos, Expressions(EExprSeat::FlattenByExpr)); +} + +TNodePtr ISource::BuildPrewindowMap(TContext& ctx) { + auto feed = BuildAtom(Pos, "row", TNodeFlags::Default); + for (const auto& exprNode: Expressions(EExprSeat::WindowPartitionBy)) { + const auto name = exprNode->GetLabel(); + if (name && !dynamic_cast<const TSessionWindow*>(exprNode.Get())) { + feed = Y("AddMember", feed, Q(name), exprNode); + } + } + return Y(ctx.UseUnordered(*this) ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), Y("AsList", feed))); +} + +bool ISource::BuildSamplingLambda(TNodePtr& node) { + if (!SamplingRate) { + return true; + } + auto res = Y("Coalesce", Y("SafeCast", SamplingRate, Y("DataType", Q("Double"))), Y("Double", Q("0"))); + res = Y("/", res, Y("Double", Q("100"))); + res = Y(Y("let", "res", Y("OptionalIf", Y("<", Y("Random", Y("DependsOn", "row")), res), "row"))); + node = BuildLambda(GetPos(), Y("row"), res, "res"); + return !!node; +} + +bool ISource::SetSamplingRate(TContext& ctx, TNodePtr samplingRate) { + if (samplingRate) { + if (!samplingRate->Init(ctx, this)) { + return false; + } + SamplingRate = Y("Ensure", samplingRate, Y(">=", samplingRate, Y("Double", Q("0"))), Y("String", Q("\"Expected sampling rate to be nonnegative\""))); + SamplingRate = Y("Ensure", SamplingRate, Y("<=", SamplingRate, Y("Double", Q("100"))), Y("String", Q("\"Sampling rate is over 100%\""))); + } + return true; +} + +std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TContext& ctx) { + if (GroupKeys.empty() && Aggregations.empty() && !IsCompositeSource() && !LegacyHoppingWindowSpec) { + return { nullptr, true }; + } + + auto keysTuple = Y(); + YQL_ENSURE(GroupKeys.size() == OrderedGroupKeys.size()); + for (const auto& key: OrderedGroupKeys) { + YQL_ENSURE(GroupKeys.contains(key)); + keysTuple = L(keysTuple, BuildQuotedAtom(Pos, key)); + } + + std::map<std::pair<bool, TString>, std::vector<IAggregation*>> genericAggrs; + for (const auto& aggr: Aggregations) { + if (const auto key = aggr->GetGenericKey()) { + genericAggrs[{aggr->IsDistinct(), *key}].emplace_back(aggr.Get()); + } + } + + for (const auto& aggr : genericAggrs) { + for (size_t i = 1U; i < aggr.second.size(); ++i) { + aggr.second.front()->Join(aggr.second[i]); + } + } + + const auto listType = Y("TypeOf", label); + auto aggrArgs = Y(); + const bool overState = GroupBySuffix == "CombineState" || GroupBySuffix == "MergeState" || + GroupBySuffix == "MergeFinalize" || GroupBySuffix == "MergeManyFinalize"; + const bool allowAggApply = !LegacyHoppingWindowSpec && !SessionWindow && !HoppingWindow; + for (const auto& aggr: Aggregations) { + auto res = aggr->AggregationTraits(listType, overState, GroupBySuffix == "MergeManyFinalize", allowAggApply, ctx); + if (!res.second) { + return { nullptr, false }; + } + + if (res.first) { + aggrArgs = L(aggrArgs, res.first); + } + } + + auto options = Y(); + if (CompactGroupBy || GroupBySuffix == "Finalize") { + options = L(options, Q(Y(Q("compact")))); + } + + if (LegacyHoppingWindowSpec) { + auto hoppingTraits = Y( + "HoppingTraits", + Y("ListItemType", listType), + BuildLambda(Pos, Y("row"), LegacyHoppingWindowSpec->TimeExtractor), + LegacyHoppingWindowSpec->Hop, + LegacyHoppingWindowSpec->Interval, + LegacyHoppingWindowSpec->Delay, + LegacyHoppingWindowSpec->DataWatermarks ? Q("true") : Q("false"), + Q("v1")); + + options = L(options, Q(Y(Q("hopping"), hoppingTraits))); + } + + if (SessionWindow) { + YQL_ENSURE(SessionWindow->GetLabel()); + auto sessionWindow = dynamic_cast<TSessionWindow*>(SessionWindow.Get()); + YQL_ENSURE(sessionWindow); + options = L(options, Q(Y(Q("session"), + Q(Y(BuildQuotedAtom(Pos, SessionWindow->GetLabel()), sessionWindow->BuildTraits(label)))))); + } + + if (HoppingWindow) { + YQL_ENSURE(HoppingWindow->GetLabel()); + auto hoppingWindow = dynamic_cast<THoppingWindow*>(HoppingWindow.Get()); + YQL_ENSURE(hoppingWindow); + options = L(options, Q(Y(Q("hopping"), + Q(Y(BuildQuotedAtom(Pos, HoppingWindow->GetLabel()), hoppingWindow->BuildTraits(label)))))); + } + + return { Y("AssumeColumnOrderPartial", Y("Aggregate" + GroupBySuffix, label, Q(keysTuple), Q(aggrArgs), Q(options)), Q(keysTuple)), true }; +} + +TMaybe<TString> ISource::FindColumnMistype(const TString& name) const { + auto result = FindMistypeIn(GroupKeys, name); + return result ? result : FindMistypeIn(ExprAliases, name); +} + +void ISource::AddDependentSource(ISource* usedSource) { + UsedSources.push_back(usedSource); +} + +class TYqlFrameBound final: public TCallNode { +public: + TYqlFrameBound(TPosition pos, TNodePtr bound) + : TCallNode(pos, "EvaluateExpr", 1, 1, { bound }) + , FakeSource(BuildFakeSource(pos)) + { + } + + bool DoInit(TContext& ctx, ISource* src) override { + if (!ValidateArguments(ctx)) { + return false; + } + + if (!Args[0]->Init(ctx, FakeSource.Get())) { + return false; + } + + return TCallNode::DoInit(ctx, src); + } + + TNodePtr DoClone() const final { + return new TYqlFrameBound(Pos, Args[0]->Clone()); + } +private: + TSourcePtr FakeSource; +}; + +TNodePtr BuildFrameNode(const TFrameBound& frame, EFrameType frameType) { + TString settingStr; + switch (frame.Settings) { + case FramePreceding: settingStr = "preceding"; break; + case FrameCurrentRow: settingStr = "currentRow"; break; + case FrameFollowing: settingStr = "following"; break; + default: YQL_ENSURE(false, "Unexpected frame setting"); + } + + TNodePtr node = frame.Bound; + TPosition pos = frame.Pos; + if (frameType != EFrameType::FrameByRows) { + TVector<TNodePtr> settings; + settings.push_back(BuildQuotedAtom(pos, settingStr, TNodeFlags::Default)); + if (frame.Settings != FrameCurrentRow) { + if (!node) { + node = BuildQuotedAtom(pos, "unbounded", TNodeFlags::Default); + } else if (!node->IsLiteral()) { + node = new TYqlFrameBound(pos, node); + } + settings.push_back(std::move(node)); + } + return BuildTuple(pos, std::move(settings)); + } + + // TODO: switch FrameByRows to common format above + YQL_ENSURE(frame.Settings != FrameCurrentRow, "Should be already replaced by 0 preceding/following"); + if (!node) { + node = BuildLiteralVoid(pos); + } else if (node->IsLiteral()) { + YQL_ENSURE(node->GetLiteralType() == "Int32"); + i32 value = FromString<i32>(node->GetLiteralValue()); + YQL_ENSURE(value >= 0); + if (frame.Settings == FramePreceding) { + value = -value; + } + node = new TCallNodeImpl(pos, "Int32", { BuildQuotedAtom(pos, ToString(value), TNodeFlags::Default) }); + } else { + if (frame.Settings == FramePreceding) { + node = new TCallNodeImpl(pos, "Minus", { node->Clone() }); + } + node = new TYqlFrameBound(pos, node); + } + return node; +} + +TNodePtr ISource::BuildWindowFrame(const TFrameSpecification& spec, bool isCompact) { + YQL_ENSURE(spec.FrameExclusion == FrameExclNone); + YQL_ENSURE(spec.FrameBegin); + YQL_ENSURE(spec.FrameEnd); + + auto frameBeginNode = BuildFrameNode(*spec.FrameBegin, spec.FrameType); + auto frameEndNode = BuildFrameNode(*spec.FrameEnd, spec.FrameType); + + auto begin = Q(Y(Q("begin"), frameBeginNode)); + auto end = Q(Y(Q("end"), frameEndNode)); + + return isCompact ? Q(Y(begin, end, Q(Y(Q("compact"))))) : Q(Y(begin, end)); +} + +class TSessionWindowTraits final: public TCallNode { +public: + TSessionWindowTraits(TPosition pos, const TVector<TNodePtr>& args) + : TCallNode(pos, "SessionWindowTraits", args) + , FakeSource(BuildFakeSource(pos)) + { + YQL_ENSURE(args.size() == 4); + } + + bool DoInit(TContext& ctx, ISource* src) override { + if (!ValidateArguments(ctx)) { + return false; + } + + if (!Args.back()->Init(ctx, FakeSource.Get())) { + return false; + } + + return TCallNode::DoInit(ctx, src); + } + + TNodePtr DoClone() const final { + return new TSessionWindowTraits(Pos, CloneContainer(Args)); + } +private: + TSourcePtr FakeSource; +}; + +TNodePtr ISource::BuildCalcOverWindow(TContext& ctx, const TString& label) { + YQL_ENSURE(IsCalcOverWindow()); + + TSet<TString> usedWindows; + for (auto& it : AggregationOverWindow) { + usedWindows.insert(it.first); + } + for (auto& it : FuncOverWindow) { + usedWindows.insert(it.first); + } + for (auto& it : WinSpecs) { + if (it.second->Session) { + usedWindows.insert(it.first); + } + } + + YQL_ENSURE(!usedWindows.empty()); + + const bool onePartition = usedWindows.size() == 1; + const auto useLabel = onePartition ? label : "partitioning"; + const auto listType = Y("TypeOf", useLabel); + auto framesProcess = Y(); + auto resultNode = onePartition ? Y() : Y(Y("let", "partitioning", label)); + + for (const auto& name : usedWindows) { + auto spec = FindWindowSpecification(ctx, name); + YQL_ENSURE(spec); + + auto aggsIter = AggregationOverWindow.find(name); + auto funcsIter = FuncOverWindow.find(name); + + const auto& aggs = (aggsIter == AggregationOverWindow.end()) ? TVector<TAggregationPtr>() : aggsIter->second; + const auto& funcs = (funcsIter == FuncOverWindow.end()) ? TVector<TNodePtr>() : funcsIter->second; + + auto frames = Y(); + TString frameType; + switch (spec->Frame->FrameType) { + case EFrameType::FrameByRows: frameType = "WinOnRows"; break; + case EFrameType::FrameByRange: frameType = "WinOnRange"; break; + case EFrameType::FrameByGroups: frameType = "WinOnGroups"; break; + } + YQL_ENSURE(frameType); + auto callOnFrame = Y(frameType, BuildWindowFrame(*spec->Frame, spec->IsCompact)); + for (auto& agg : aggs) { + auto winTraits = agg->WindowTraits(listType, ctx); + callOnFrame = L(callOnFrame, winTraits); + } + for (auto& func : funcs) { + auto winSpec = func->WindowSpecFunc(listType); + callOnFrame = L(callOnFrame, winSpec); + } + frames = L(frames, callOnFrame); + + auto keysTuple = Y(); + for (const auto& key: spec->Partitions) { + if (!dynamic_cast<TSessionWindow*>(key.Get())) { + keysTuple = L(keysTuple, AliasOrColumn(key, GetJoin())); + } + } + + auto sortSpec = spec->OrderBy.empty() ? Y("Void") : BuildSortSpec(spec->OrderBy, useLabel, true, false); + if (spec->Session) { + TString label = spec->Session->GetLabel(); + YQL_ENSURE(label); + auto sessionWindow = dynamic_cast<TSessionWindow*>(spec->Session.Get()); + YQL_ENSURE(sessionWindow); + auto labelNode = BuildQuotedAtom(sessionWindow->GetPos(), label); + + auto sessionTraits = sessionWindow->BuildTraits(useLabel); + framesProcess = Y("CalcOverSessionWindow", useLabel, Q(keysTuple), sortSpec, Q(frames), sessionTraits, Q(Y(labelNode))); + } else { + YQL_ENSURE(aggs || funcs); + framesProcess = Y("CalcOverWindow", useLabel, Q(keysTuple), sortSpec, Q(frames)); + } + + if (!onePartition) { + resultNode = L(resultNode, Y("let", "partitioning", framesProcess)); + } + } + if (onePartition) { + return framesProcess; + } else { + return Y("block", Q(L(resultNode, Y("return", "partitioning")))); + } +} + +TNodePtr ISource::BuildSort(TContext& ctx, const TString& label) { + Y_UNUSED(ctx); + Y_UNUSED(label); + return nullptr; +} + +TNodePtr ISource::BuildCleanupColumns(TContext& ctx, const TString& label) { + Y_UNUSED(ctx); + Y_UNUSED(label); + return nullptr; +} + +IJoin* ISource::GetJoin() { + return nullptr; +} + +ISource* ISource::GetCompositeSource() { + return nullptr; +} + +bool ISource::IsSelect() const { + return true; +} + +bool ISource::IsTableSource() const { + return false; +} + +bool ISource::ShouldUseSourceAsColumn(const TString& source) const { + Y_UNUSED(source); + return false; +} + +bool ISource::IsJoinKeysInitializing() const { + return false; +} + +bool ISource::DoInit(TContext& ctx, ISource* src) { + for (auto& column: Expressions(EExprSeat::FlattenBy)) { + if (!column->Init(ctx, this)) { + return false; + } + } + + if (IsFlattenColumns() && src) { + src->AllColumns(); + } + + return true; +} + +bool ISource::InitFilters(TContext& ctx) { + for (auto& filter: Filters) { + if (!filter->Init(ctx, this)) { + return false; + } + if (filter->IsAggregated() && !filter->IsConstant() && !filter->HasState(ENodeState::AggregationKey)) { + ctx.Error(filter->GetPos()) << "Can not use aggregated values in filtering"; + return false; + } + } + return true; +} + +TAstNode* ISource::Translate(TContext& ctx) const { + Y_VERIFY_DEBUG(false); + Y_UNUSED(ctx); + return nullptr; +} + +void ISource::FillSortParts(const TVector<TSortSpecificationPtr>& orderBy, TNodePtr& sortDirection, TNodePtr& sortKeySelector) { + TNodePtr expr; + if (orderBy.empty()) { + YQL_ENSURE(!sortKeySelector); + sortDirection = sortKeySelector = Y("Void"); + return; + } else if (orderBy.size() == 1) { + auto& sortSpec = orderBy.front(); + expr = Y("PersistableRepr", sortSpec->OrderExpr); + sortDirection = Y("Bool", Q(sortSpec->Ascending ? "true" : "false")); + } else { + auto exprList = Y(); + sortDirection = Y(); + for (const auto& sortSpec: orderBy) { + const auto asc = sortSpec->Ascending; + sortDirection = L(sortDirection, Y("Bool", Q(asc ? "true" : "false"))); + exprList = L(exprList, Y("PersistableRepr", sortSpec->OrderExpr)); + } + sortDirection = Q(sortDirection); + expr = Q(exprList); + } + sortKeySelector = BuildLambda(Pos, Y("row"), expr); +} + +TNodePtr ISource::BuildSortSpec(const TVector<TSortSpecificationPtr>& orderBy, const TString& label, bool traits, bool assume) { + YQL_ENSURE(!orderBy.empty()); + TNodePtr dirsNode; + TNodePtr keySelectorNode; + FillSortParts(orderBy, dirsNode, keySelectorNode); + if (traits) { + return Y("SortTraits", Y("TypeOf", label), dirsNode, keySelectorNode); + } else if (assume) { + return Y("AssumeSorted", label, dirsNode, keySelectorNode); + } else { + return Y("Sort", label, dirsNode, keySelectorNode); + } +} + +IJoin::IJoin(TPosition pos) + : ISource(pos) +{ +} + +IJoin::~IJoin() +{ +} + +IJoin* IJoin::GetJoin() { + return this; +} + +} // namespace NSQLTranslationV1 diff --git a/ydb/library/yql/sql/v1/source.h b/ydb/library/yql/sql/v1/source.h new file mode 100644 index 0000000000..a62b53efb9 --- /dev/null +++ b/ydb/library/yql/sql/v1/source.h @@ -0,0 +1,300 @@ +#pragma once +#include "node.h" + +namespace NSQLTranslationV1 { + + class ISource; + typedef TIntrusivePtr<ISource> TSourcePtr; + + struct TTableRef { + TString RefName; + TString Service; + TDeferredAtom Cluster; + TNodePtr Keys; + TNodePtr Options; + TSourcePtr Source; + + TTableRef() = default; + TTableRef(const TString& refName, const TString& service, const TDeferredAtom& cluster, TNodePtr keys); + TTableRef(const TTableRef&) = default; + TTableRef& operator=(const TTableRef&) = default; + + TString ShortName() const; + }; + + typedef TVector<TTableRef> TTableList; + + + class IJoin; + class ISource: public INode { + public: + virtual ~ISource(); + + virtual bool IsFake() const; + virtual void AllColumns(); + virtual const TColumns* GetColumns() const; + virtual void GetInputTables(TTableList& tableList) const; + /// in case of error unfilled, flag show if ensure column name + virtual TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column); + virtual void FinishColumns(); + virtual bool AddExpressions(TContext& ctx, const TVector<TNodePtr>& columns, EExprSeat exprSeat); + virtual void SetFlattenByMode(const TString& mode); + virtual void MarkFlattenColumns(); + virtual bool IsFlattenColumns() const; + virtual bool AddFilter(TContext& ctx, TNodePtr filter); + virtual bool AddGroupKey(TContext& ctx, const TString& column); + virtual void SetCompactGroupBy(bool compactGroupBy); + virtual void SetGroupBySuffix(const TString& suffix); + virtual TString MakeLocalName(const TString& name); + virtual bool AddAggregation(TContext& ctx, TAggregationPtr aggr); + virtual bool AddFuncOverWindow(TContext& ctx, TNodePtr expr); + virtual void AddTmpWindowColumn(const TString& column); + virtual const TVector<TString>& GetTmpWindowColumns() const; + virtual bool HasAggregations() const; + virtual void AddWindowSpecs(TWinSpecs winSpecs); + virtual bool AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func); + virtual bool AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func); + virtual void SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec); + virtual TLegacyHoppingWindowSpecPtr GetLegacyHoppingWindowSpec() const; + virtual TNodePtr GetSessionWindowSpec() const; + virtual TNodePtr GetHoppingWindowSpec() const; + virtual bool IsCompositeSource() const; + virtual bool IsGroupByColumn(const TString& column) const; + virtual bool IsFlattenByColumns() const; + virtual bool IsFlattenByExprs() const; + virtual bool IsCalcOverWindow() const; + virtual bool IsOverWindowSource() const; + virtual bool IsStream() const; + virtual EOrderKind GetOrderKind() const; + virtual TWriteSettings GetWriteSettings() const; + virtual bool SetSamplingOptions(TContext& ctx, TPosition pos, ESampleMode mode, TNodePtr samplingRate, TNodePtr samplingSeed); + virtual bool SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints); + virtual bool CalculateGroupingHint(TContext& ctx, const TVector<TString>& columns, ui64& hint) const; + virtual TNodePtr BuildFilter(TContext& ctx, const TString& label); + virtual TNodePtr BuildFilterLambda(); + virtual TNodePtr BuildFlattenByColumns(const TString& label); + virtual TNodePtr BuildFlattenColumns(const TString& label); + virtual TNodePtr BuildPreaggregatedMap(TContext& ctx); + virtual TNodePtr BuildPreFlattenMap(TContext& ctx); + virtual TNodePtr BuildPrewindowMap(TContext& ctx); + virtual std::pair<TNodePtr, bool> BuildAggregation(const TString& label, TContext& ctx); + virtual TNodePtr BuildCalcOverWindow(TContext& ctx, const TString& label); + virtual TNodePtr BuildSort(TContext& ctx, const TString& label); + virtual TNodePtr BuildCleanupColumns(TContext& ctx, const TString& label); + virtual bool BuildSamplingLambda(TNodePtr& node); + virtual bool SetSamplingRate(TContext& ctx, TNodePtr samplingRate); + virtual IJoin* GetJoin(); + virtual ISource* GetCompositeSource(); + virtual bool IsSelect() const; + virtual bool IsTableSource() const; + virtual bool ShouldUseSourceAsColumn(const TString& source) const; + virtual bool IsJoinKeysInitializing() const; + virtual const TString* GetWindowName() const; + + virtual bool DoInit(TContext& ctx, ISource* src); + virtual TNodePtr Build(TContext& ctx) = 0; + + virtual TMaybe<TString> FindColumnMistype(const TString& name) const; + + virtual bool InitFilters(TContext& ctx); + void AddDependentSource(ISource* usedSource); + bool IsAlias(EExprSeat exprSeat, const TString& label) const; + bool IsExprAlias(const TString& label) const; + bool IsExprSeat(EExprSeat exprSeat, EExprType type = EExprType::WithExpression) const; + TString GetGroupByColumnAlias(const TString& column) const; + const TVector<TNodePtr>& Expressions(EExprSeat exprSeat) const; + + virtual TWindowSpecificationPtr FindWindowSpecification(TContext& ctx, const TString& windowName) const; + + TIntrusivePtr<ISource> CloneSource() const; + + protected: + ISource(TPosition pos); + virtual TAstNode* Translate(TContext& ctx) const; + + void FillSortParts(const TVector<TSortSpecificationPtr>& orderBy, TNodePtr& sortKeySelector, TNodePtr& sortDirection); + TNodePtr BuildSortSpec(const TVector<TSortSpecificationPtr>& orderBy, const TString& label, bool traits, bool assume); + + TVector<TNodePtr>& Expressions(EExprSeat exprSeat); + TNodePtr AliasOrColumn(const TNodePtr& node, bool withSource); + + TNodePtr BuildWindowFrame(const TFrameSpecification& spec, bool isCompact); + + THashSet<TString> ExprAliases; + THashSet<TString> FlattenByAliases; + THashMap<TString, TString> GroupByColumnAliases; + TVector<TNodePtr> Filters; + bool CompactGroupBy = false; + TString GroupBySuffix; + TSet<TString> GroupKeys; + TVector<TString> OrderedGroupKeys; + std::array<TVector<TNodePtr>, static_cast<unsigned>(EExprSeat::Max)> NamedExprs; + TVector<TAggregationPtr> Aggregations; + TMap<TString, TVector<TAggregationPtr>> AggregationOverWindow; + TMap<TString, TVector<TNodePtr>> FuncOverWindow; + TWinSpecs WinSpecs; + TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec; + TNodePtr SessionWindow; + TNodePtr HoppingWindow; + TVector<ISource*> UsedSources; + TString FlattenMode; + bool FlattenColumns = false; + THashMap<TString, ui32> GenIndexes; + TVector<TString> TmpWindowColumns; + TNodePtr SamplingRate; + }; + + template<> + inline TVector<TSourcePtr> CloneContainer<TSourcePtr>(const TVector<TSourcePtr>& args) { + TVector<TSourcePtr> cloneArgs; + cloneArgs.reserve(args.size()); + for (const auto& arg: args) { + cloneArgs.emplace_back(arg ? arg->CloneSource() : nullptr); + } + return cloneArgs; + } + + struct TJoinLinkSettings { + bool ForceSortedMerge = false; + }; + + class IJoin: public ISource { + public: + virtual ~IJoin(); + + virtual IJoin* GetJoin(); + virtual TNodePtr BuildJoinKeys(TContext& ctx, const TVector<TDeferredAtom>& names) = 0; + virtual void SetupJoin(const TString& joinOp, TNodePtr joinExpr, const TJoinLinkSettings& linkSettings) = 0; + virtual const THashMap<TString, THashSet<TString>>& GetSameKeysMap() const = 0; + virtual TVector<TString> GetJoinLabels() const = 0; + + protected: + IJoin(TPosition pos); + }; + + class TSessionWindow final : public INode { + public: + TSessionWindow(TPosition pos, const TVector<TNodePtr>& args); + void MarkValid(); + TNodePtr BuildTraits(const TString& label) const; + private: + bool DoInit(TContext& ctx, ISource* src) override; + TAstNode* Translate(TContext&) const override; + void DoUpdateState() const override; + TNodePtr DoClone() const override; + TString GetOpName() const override; + + TVector<TNodePtr> Args; + TSourcePtr FakeSource; + TNodePtr Node; + bool Valid; + }; + + class THoppingWindow final : public INode { + public: + THoppingWindow(TPosition pos, const TVector<TNodePtr>& args); + void MarkValid(); + TNodePtr BuildTraits(const TString& label) const; + public: + TNodePtr Hop; + TNodePtr Interval; + private: + bool DoInit(TContext& ctx, ISource* src) override; + TAstNode* Translate(TContext&) const override; + void DoUpdateState() const override; + TNodePtr DoClone() const override; + TString GetOpName() const override; + TNodePtr ProcessIntervalParam(const TNodePtr& val) const; + + TVector<TNodePtr> Args; + TSourcePtr FakeSource; + TNodePtr Node; + bool Valid; + }; + + + // Implemented in join.cpp + TString NormalizeJoinOp(const TString& joinOp); + TSourcePtr BuildEquiJoin(TPosition pos, TVector<TSourcePtr>&& sources, TVector<bool>&& anyFlags, bool strictJoinKeyTypes); + + // Implemented in select.cpp + TNodePtr BuildSubquery(TSourcePtr source, const TString& alias, bool inSubquery, int ensureTupleSize, TScopedStatePtr scoped); + TNodePtr BuildSubqueryRef(TNodePtr subquery, const TString& alias, int tupleIndex = -1); + TNodePtr BuildInvalidSubqueryRef(TPosition subqueryPos); + TNodePtr BuildSourceNode(TPosition pos, TSourcePtr source, bool checkExist = false); + TSourcePtr BuildMuxSource(TPosition pos, TVector<TSourcePtr>&& sources); + TSourcePtr BuildFakeSource(TPosition pos, bool missingFrom = false, bool inSubquery = false); + TSourcePtr BuildNodeSource(TPosition pos, const TNodePtr& node, bool wrapToList = false); + TSourcePtr BuildTableSource(TPosition pos, const TTableRef& table, const TString& label = TString()); + TSourcePtr BuildInnerSource(TPosition pos, TNodePtr node, const TString& service, const TDeferredAtom& cluster, const TString& label = TString()); + TSourcePtr BuildRefColumnSource(TPosition pos, const TString& partExpression); + TSourcePtr BuildUnionAll(TPosition pos, TVector<TSourcePtr>&& sources, const TWriteSettings& settings); + TSourcePtr BuildOverWindowSource(TPosition pos, const TString& windowName, ISource* origSource); + + TNodePtr BuildOrderBy(TPosition pos, const TVector<TNodePtr>& keys, const TVector<bool>& order); + TNodePtr BuildSkipTake(TPosition pos, const TNodePtr& skip, const TNodePtr& take); + + + TSourcePtr BuildSelectCore( + TContext& ctx, + TPosition pos, + TSourcePtr source, + const TVector<TNodePtr>& groupByExpr, + const TVector<TNodePtr>& groupBy, + bool compactGroupBy, + const TString& groupBySuffix, + bool assumeSorted, + const TVector<TSortSpecificationPtr>& orderBy, + TNodePtr having, + TWinSpecs&& windowSpec, + TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec, + TVector<TNodePtr>&& terms, + bool distinct, + TVector<TNodePtr>&& without, + bool selectStream, + const TWriteSettings& settings + ); + TSourcePtr BuildSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake); + + + enum class ReduceMode { + ByPartition, + ByAll, + }; + TSourcePtr BuildReduce(TPosition pos, ReduceMode mode, TSourcePtr source, TVector<TSortSpecificationPtr>&& orderBy, + TVector<TNodePtr>&& keys, TVector<TNodePtr>&& args, TNodePtr udf, TNodePtr having, const TWriteSettings& settings, + const TVector<TSortSpecificationPtr>& assumeOrderBy, bool listCall); + TSourcePtr BuildProcess(TPosition pos, TSourcePtr source, TNodePtr with, bool withExtFunction, TVector<TNodePtr>&& terms, bool listCall, + bool prcessStream, const TWriteSettings& settings, const TVector<TSortSpecificationPtr>& assumeOrderBy); + + TNodePtr BuildSelectResult(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery, TScopedStatePtr scoped); + + // Implemented in insert.cpp + TSourcePtr BuildWriteValues(TPosition pos, const TString& opertationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values); + TSourcePtr BuildWriteValues(TPosition pos, const TString& opertationHumanName, const TVector<TString>& columnsHint, TSourcePtr source); + TSourcePtr BuildUpdateValues(TPosition pos, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values); + + EWriteColumnMode ToWriteColumnsMode(ESQLWriteColumnMode sqlWriteColumnMode); + TNodePtr BuildEraseColumns(TPosition pos, const TVector<TString>& columns); + TNodePtr BuildIntoTableOptions(TPosition pos, const TVector<TString>& eraseColumns, const TTableHints& hints); + TNodePtr BuildWriteColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, EWriteColumnMode mode, TSourcePtr values, TNodePtr options = nullptr); + TNodePtr BuildUpdateColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr values, TSourcePtr source); + TNodePtr BuildDelete(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr source); + + // Implemented in query.cpp + TNodePtr BuildTableKey(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TDeferredAtom& name, const TString& view); + TNodePtr BuildTableKeys(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TString& func, const TVector<TTableArg>& args); + TNodePtr BuildTopicKey(TPosition pos, const TDeferredAtom& cluster, const TDeferredAtom& name); + TNodePtr BuildInputOptions(TPosition pos, const TTableHints& hints); + TNodePtr BuildInputTables(TPosition pos, const TTableList& tables, bool inSubquery, TScopedStatePtr scoped); + TNodePtr BuildCreateTable(TPosition pos, const TTableRef& tr, const TCreateTableParameters& params, TScopedStatePtr scoped); + TNodePtr BuildAlterTable(TPosition pos, const TTableRef& tr, const TAlterTableParameters& params, TScopedStatePtr scoped); + TNodePtr BuildDropTable(TPosition pos, const TTableRef& table, ETableType tableType, TScopedStatePtr scoped); + TNodePtr BuildWriteTable(TPosition pos, const TString& label, const TTableRef& table, EWriteColumnMode mode, TNodePtr options, + TScopedStatePtr scoped); + TSourcePtr TryMakeSourceFromExpression(TContext& ctx, const TString& currService, const TDeferredAtom& currCluster, + TNodePtr node, const TString& view = {}); + void MakeTableFromExpression(TContext& ctx, TNodePtr node, TDeferredAtom& table, const TString& prefix = {}); + TDeferredAtom MakeAtomFromExpression(TContext& ctx, TNodePtr node, const TString& prefix = {}); + TString NormalizeTypeString(const TString& str); +} // namespace NSQLTranslationV1 diff --git a/ydb/library/yql/sql/v1/sql_group_by.cpp b/ydb/library/yql/sql/v1/sql_group_by.cpp index 33a525b259..d1dd439045 100644 --- a/ydb/library/yql/sql/v1/sql_group_by.cpp +++ b/ydb/library/yql/sql/v1/sql_group_by.cpp @@ -1,5 +1,6 @@ #include "sql_group_by.h" #include "sql_expression.h" +#include "source.h" #include <ydb/library/yql/core/yql_expr_type_annotation.h> namespace NSQLTranslationV1 { diff --git a/ydb/library/yql/sql/v1/sql_translation.cpp b/ydb/library/yql/sql/v1/sql_translation.cpp index 67bb735a4e..edd313801d 100644 --- a/ydb/library/yql/sql/v1/sql_translation.cpp +++ b/ydb/library/yql/sql/v1/sql_translation.cpp @@ -4,6 +4,7 @@ #include "sql_query.h" #include "sql_values.h" #include "sql_select.h" +#include "source.h" #include <ydb/library/yql/parser/proto_ast/gen/v1/SQLv1Lexer.h> #include <ydb/library/yql/sql/settings/partitioning.h> #include <util/generic/scope.h> diff --git a/ydb/library/yql/sql/v1/sql_values.cpp b/ydb/library/yql/sql/v1/sql_values.cpp index 4071338282..4b31cb0884 100644 --- a/ydb/library/yql/sql/v1/sql_values.cpp +++ b/ydb/library/yql/sql/v1/sql_values.cpp @@ -3,6 +3,7 @@ #include "sql_query.h" #include "sql_select.h" #include "sql_expression.h" +#include "source.h" namespace NSQLTranslationV1 { diff --git a/ydb/library/yql/sql/v1/ya.make b/ydb/library/yql/sql/v1/ya.make index 153b9fd659..39aaf6c45a 100644 --- a/ydb/library/yql/sql/v1/ya.make +++ b/ydb/library/yql/sql/v1/ya.make @@ -32,6 +32,7 @@ SRCS( list_builtin.cpp node.cpp select.cpp + source.cpp sql.cpp sql_call_expr.cpp sql_expression.cpp |