aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-08-04 21:25:10 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-08-04 21:58:33 +0300
commit803265b1cbfeebb85765030cc317e24938d787e8 (patch)
treee6563c70f64eaf3a01868b741d6f6b026d232f0c
parent0f496fc8a55e9d2dfd4fe9da109ef9abc58c68f4 (diff)
downloadydb-803265b1cbfeebb85765030cc317e24938d787e8.tar.gz
Support write to external sources in KQP
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp9
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h22
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp8
-rw-r--r--ydb/core/kqp/expr_nodes/kqp_expr_nodes.json25
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp2
-rw-r--r--ydb/core/kqp/host/kqp_host_impl.h2
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp10
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp35
-rw-r--r--ydb/core/kqp/opt/kqp_opt.cpp14
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp1
-rw-r--r--ydb/core/kqp/opt/kqp_opt_effects.cpp128
-rw-r--r--ydb/core/kqp/opt/kqp_opt_impl.h2
-rw-r--r--ydb/core/kqp/opt/kqp_opt_kql.cpp42
-rw-r--r--ydb/core/kqp/opt/kqp_opt_phy_check.cpp42
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp21
-rw-r--r--ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp1
-rw-r--r--ydb/core/kqp/provider/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/provider/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/provider/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/provider/ya.make1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasink.cpp65
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasource.cpp87
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp43
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt_build.cpp66
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider_impl.h7
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_type_ann.cpp40
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp37
-rw-r--r--ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp194
-rw-r--r--ydb/core/protos/kqp_physical.proto16
-rw-r--r--ydb/library/yql/core/services/yql_transform_pipeline.cpp8
-rw-r--r--ydb/library/yql/core/yql_graph_transformer.cpp2
-rw-r--r--ydb/library/yql/dq/integration/yql_dq_integration.h8
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_build.cpp7
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp6
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h3
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp8
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json10
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp54
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp15
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp6
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp270
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp2
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp2
47 files changed, 1127 insertions, 203 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index b247f4a77f..4bc8e8ba9e 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1338,6 +1338,7 @@ private:
void BuildDatashardTasks(TStageInfo& stageInfo) {
THashMap<ui64, ui64> shardTasks; // shardId -> taskId
+ auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
auto getShardTask = [&](ui64 shardId) -> TTask& {
auto it = shardTasks.find(shardId);
@@ -1349,11 +1350,12 @@ private:
task.Meta.ExecuterId = SelfId();
task.Meta.ShardId = shardId;
shardTasks.emplace(shardId, task.Id);
+
+ BuildSinks(stage, task);
+
return task;
};
- auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
-
const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId);
const auto& keyTypes = table.KeyColumnTypes;;
@@ -1531,6 +1533,9 @@ private:
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.ExecuterId = SelfId();
task.Meta.Type = TTaskMeta::TTaskType::Compute;
+
+ BuildSinks(stage, task);
+
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
}
}
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 8d9c9595b2..1014b280cc 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -732,15 +732,32 @@ protected:
task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse();
task.Meta.Type = TTaskMeta::TTaskType::Compute;
+ BuildSinks(stage, task);
+
LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
}
}
+ void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) {
+ if (stage.SinksSize() > 0) {
+ YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported");
+ const auto& sink = stage.GetSinks(0);
+ YQL_ENSURE(sink.HasExternalSink(), "only external sinks are supported");
+ const auto& extSink = sink.GetExternalSink();
+ YQL_ENSURE(sink.GetOutputIndex() < task.Outputs.size());
+ auto& output = task.Outputs[sink.GetOutputIndex()];
+ output.Type = TTaskOutputType::Sink;
+ output.SinkType = extSink.GetType();
+ output.SinkSettings = extSink.GetSettings();
+ }
+ }
+
void BuildReadTasksFromSource(TStageInfo& stageInfo, TMap<TString, TString> secureParams) {
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
YQL_ENSURE(stage.GetSources(0).HasExternalSource());
- YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections");
+ YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1,
+ "multiple sources or sources mixed with connections");
const auto& stageSource = stage.GetSources(0);
const auto& externalSource = stageSource.GetExternalSource();
@@ -762,6 +779,7 @@ protected:
task.Meta.Type = TTaskMeta::TTaskType::Compute;
+ BuildSinks(stage, task);
}
}
@@ -873,6 +891,8 @@ protected:
settings->SetLockTxId(*lockTxId);
settings->SetLockNodeId(self.NodeId());
}
+
+ BuildSinks(stage, task);
};
if (source.GetSequentialInFlightShards()) {
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index e0ba573bbf..c52c248672 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -945,6 +945,14 @@ void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutpu
break;
}
+ case TTaskOutputType::Sink: {
+ auto* sink = outputDesc.MutableSink();
+ sink->SetType(output.SinkType);
+ YQL_ENSURE(output.SinkSettings);
+ sink->MutableSettings()->CopyFrom(*output.SinkSettings);
+ break;
+ }
+
default: {
YQL_ENSURE(false, "Unexpected task output type " << output.Type);
}
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
index 622675b009..d208142f54 100644
--- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
+++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
@@ -205,9 +205,15 @@
"Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"}
},
{
- "Name": "TKqlTableEffect",
+ "Name": "TKqlEffectBase",
"Base": "TExprBase",
"Match": {"Type": "CallableBase"},
+ "Builder": {"Generate": "None"}
+ },
+ {
+ "Name": "TKqlTableEffect",
+ "Base": "TKqlEffectBase",
+ "Match": {"Type": "CallableBase"},
"Builder": {"Generate": "None"},
"Children": [
{"Index": 0, "Name": "Table", "Type": "TKqpTable"}
@@ -289,6 +295,14 @@
"Match": {"Type": "Callable", "Name": "TKqlInsertRowsIndex"}
},
{
+ "Name": "TKqlExternalEffect",
+ "Base": "TKqlEffectBase",
+ "Match": {"Type": "Callable", "Name": "KqlExternalEffect"},
+ "Children": [
+ {"Index": 0, "Name": "Input", "Type": "TExprBase"}
+ ]
+ },
+ {
"Name": "TKqpParamBinding",
"Base": "TExprBase",
"Match": {"Type": "Tuple"},
@@ -449,6 +463,15 @@
"Match": {"Type": "Callable", "Name": "KqpDeleteRows"}
},
{
+ "Name": "TKqpSinkEffect",
+ "Base": "TKqlEffectBase",
+ "Match": {"Type": "Callable", "Name": "KqpSinkEffect"},
+ "Children": [
+ {"Index": 0, "Name": "Stage", "Type": "TExprBase"},
+ {"Index": 1, "Name": "SinkIndex", "Type": "TCoAtom"}
+ ]
+ },
+ {
"Name": "TKqpOlapOperationBase",
"Base": "TCallable",
"Match": {"Type": "CallableBase"},
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 6be649aa8a..1ce9e72f6a 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1527,7 +1527,7 @@ private:
auto queryExecutor = MakeIntrusive<TKqpQueryExecutor>(Gateway, Cluster, SessionCtx, KqpRunner);
auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, gatewayProxy, SessionCtx,
ExternalSourceFactory, IsInternalCall);
- auto kikimrDataSink = CreateKikimrDataSink(*FuncRegistry, *TypesCtx, gatewayProxy, SessionCtx, queryExecutor);
+ auto kikimrDataSink = CreateKikimrDataSink(*FuncRegistry, *TypesCtx, gatewayProxy, SessionCtx, ExternalSourceFactory, queryExecutor);
FillSettings.AllResultsBytesLimit = Nothing();
FillSettings.RowsLimitPerWrite = SessionCtx->Config()._ResultRowsLimit.Get().GetRef();
diff --git a/ydb/core/kqp/host/kqp_host_impl.h b/ydb/core/kqp/host/kqp_host_impl.h
index 4cc155552d..b2972b0176 100644
--- a/ydb/core/kqp/host/kqp_host_impl.h
+++ b/ydb/core/kqp/host/kqp_host_impl.h
@@ -243,7 +243,7 @@ public:
};
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
- TIntrusivePtr<NYql::TTypeAnnotationContext> typesCtx, TIntrusivePtr<NYql::TKikimrSessionContext> sessionCtx,
+ const TIntrusivePtr<NYql::TTypeAnnotationContext>& typesCtx, TIntrusivePtr<NYql::TKikimrSessionContext> sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry,
TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider);
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index d9f7c0fe34..ecbffe376e 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -14,6 +14,7 @@
#include <ydb/library/yql/core/type_ann/type_ann_expr.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
+#include <ydb/library/yql/core/yql_opt_proposed_by_data.h>
#include <util/generic/is_in.h>
@@ -58,7 +59,7 @@ private:
class TKqpRunner : public IKqpRunner {
public:
TKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
- TIntrusivePtr<TTypeAnnotationContext> typesCtx, TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, TIntrusivePtr<TKikimrSessionContext> sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry,
TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider)
: Gateway(gateway)
@@ -82,13 +83,16 @@ public:
PhysicalOptimizeTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx)
.AddServiceTransformers()
.Add(TLogExprTransformer::Sync("PhysicalOptimizeTransformer", logComp, logLevel), "LogPhysicalOptimize")
+ .AddExpressionEvaluation(FuncRegistry)
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(),
*typesCtx, Config))
.Add(CreateKqpCheckQueryTransformer(), "CheckKqlQuery")
.AddPostTypeAnnotation(/* forSubgraph */ true)
.AddCommonOptimization()
.Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config), "LogicalOptimize")
- .Add(CreateKqpPhyOptTransformer(OptimizeCtx, *typesCtx), "PhysicalOptimize")
+ .Add(CreateLogicalDataProposalsInspector(*typesCtx), "ProvidersLogicalOptimize")
+ .Add(CreateKqpPhyOptTransformer(OptimizeCtx, *typesCtx), "KqpPhysicalOptimize")
+ .Add(CreatePhysicalDataProposalsInspector(*typesCtx), "ProvidersPhysicalOptimize")
.Add(CreateKqpFinalizingOptTransformer(OptimizeCtx), "FinalizingOptimize")
.Add(CreateKqpQueryPhasesTransformer(), "QueryPhases")
.Add(CreateKqpQueryEffectsTransformer(OptimizeCtx), "QueryEffects")
@@ -347,7 +351,7 @@ private:
} // namespace
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
- TIntrusivePtr<TTypeAnnotationContext> typesCtx, TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, TIntrusivePtr<TKikimrSessionContext> sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry,
TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider)
{
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index 7211e6dbf3..d00d3e6da7 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -1410,7 +1410,7 @@ TStatus AnnotateSequencerConnection(const TExprNode::TPtr& node, TExprContext& c
node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));
return TStatus::Ok;
-}
+}
TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData, bool withSystemColumns) {
@@ -1471,6 +1471,32 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
return TStatus::Ok;
}
+TStatus AnnotateExternalEffect(const TExprNode::TPtr& node, TExprContext& ctx) {
+ if (!EnsureArgsCount(*node, 1, ctx)) {
+ return TStatus::Error;
+ }
+
+ node->SetTypeAnn(node->Child(TKqlExternalEffect::idx_Input)->GetTypeAnn());
+ return TStatus::Ok;
+}
+
+TStatus AnnotateKqpSinkEffect(const TExprNode::TPtr& node, TExprContext& ctx) {
+ if (!EnsureArgsCount(*node, 2, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!TDqStageBase::Match(node->Child(TKqpSinkEffect::idx_Stage))) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureAtom(*node->Child(TKqpSinkEffect::idx_SinkIndex), ctx)) {
+ return TStatus::Error;
+ }
+
+ node->SetTypeAnn(node->Child(TKqpSinkEffect::idx_Stage)->GetTypeAnn());
+ return TStatus::Ok;
+}
+
} // namespace
TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cluster,
@@ -1609,6 +1635,13 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
return AnnotateKqpSourceSettings(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
}
+ if (TKqlExternalEffect::Match(input.Get())) {
+ return AnnotateExternalEffect(input, ctx);
+ }
+
+ if (TKqpSinkEffect::Match(input.Get())) {
+ return AnnotateKqpSinkEffect(input, ctx);
+ }
return dqTransformer->Transform(input, output, ctx);
});
diff --git a/ydb/core/kqp/opt/kqp_opt.cpp b/ydb/core/kqp/opt/kqp_opt.cpp
index 8bea802beb..50ece581fd 100644
--- a/ydb/core/kqp/opt/kqp_opt.cpp
+++ b/ydb/core/kqp/opt/kqp_opt.cpp
@@ -104,4 +104,18 @@ TKqpTable BuildTableMeta(const TKikimrTableDescription& tableDesc, const TPositi
return BuildTableMeta(*tableDesc.Metadata, pos, ctx);
}
+bool IsBuiltEffect(const TExprBase& effect) {
+ // Stage with effect output
+ if (effect.Maybe<TDqOutput>()) {
+ return true;
+ }
+
+ // Stage with sink effect
+ if (effect.Maybe<TKqpSinkEffect>()) {
+ return true;
+ }
+
+ return false;
+}
+
} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index 4997f007f9..3054575846 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -390,6 +390,7 @@ private:
.Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), argsMap))
.Build()
.Settings(stage.Settings())
+ .Outputs(stage.Outputs())
.Done();
}
diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp
index 69db639676..74b880ac3d 100644
--- a/ydb/core/kqp/opt/kqp_opt_effects.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp
@@ -1,6 +1,7 @@
#include "kqp_opt_impl.h"
#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
namespace NKikimr::NKqp::NOpt {
@@ -363,63 +364,102 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
return true;
}
-bool BuildEffects(TPositionHandle pos, const TVector<TKqlTableEffect>& effects,
- TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, TVector<TExprBase>& builtEffects)
+bool BuildEffects(TPositionHandle pos, const TVector<TExprBase>& effects,
+ TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
+ TVector<TExprBase>& builtEffects)
{
TVector<TCoArgument> inputArgs;
TVector<TExprBase> inputs;
TVector<TExprBase> newEffects;
+ TVector<TExprBase> newSinkEffects;
newEffects.reserve(effects.size());
+ newSinkEffects.reserve(effects.size());
for (const auto& effect : effects) {
- TCoArgument inputArg = Build<TCoArgument>(ctx, pos)
- .Name("inputArg")
- .Done();
-
- TMaybeNode<TExprBase> input;
TMaybeNode<TExprBase> newEffect;
+ bool sinkEffect = false;
+ YQL_ENSURE(effect.Maybe<TKqlEffectBase>());
+ if (effect.Maybe<TKqlTableEffect>()) {
+ TMaybeNode<TExprBase> input;
+ TCoArgument inputArg = Build<TCoArgument>(ctx, pos)
+ .Name("inputArg")
+ .Done();
+
+ if (auto maybeUpsertRows = effect.Maybe<TKqlUpsertRows>()) {
+ if (!BuildUpsertRowsEffect(maybeUpsertRows.Cast(), ctx, kqpCtx, inputArg, input, newEffect)) {
+ return false;
+ }
+ }
- if (auto maybeUpsertRows = effect.Maybe<TKqlUpsertRows>()) {
- if (!BuildUpsertRowsEffect(maybeUpsertRows.Cast(), ctx, kqpCtx, inputArg, input, newEffect)) {
- return false;
+ if (auto maybeDeleteRows = effect.Maybe<TKqlDeleteRows>()) {
+ if (!BuildDeleteRowsEffect(maybeDeleteRows.Cast(), ctx, kqpCtx, inputArg, input, newEffect)) {
+ return false;
+ }
}
- }
- if (auto maybeDeleteRows = effect.Maybe<TKqlDeleteRows>()) {
- if (!BuildDeleteRowsEffect(maybeDeleteRows.Cast(), ctx, kqpCtx, inputArg, input, newEffect)) {
+ if (input) {
+ inputArgs.push_back(inputArg);
+ inputs.push_back(input.Cast());
+ }
+ } else if (auto maybeExt = effect.Maybe<TKqlExternalEffect>()) {
+ sinkEffect = true;
+ TKqlExternalEffect externalEffect = maybeExt.Cast();
+ TExprBase input = externalEffect.Input();
+ auto maybeStage = input.Maybe<TDqStageBase>();
+ if (!maybeStage) {
return false;
}
+ auto stage = maybeStage.Cast();
+ const auto outputsList = stage.Outputs();
+ if (!outputsList) {
+ return false;
+ }
+ TDqStageOutputsList outputs = outputsList.Cast();
+ YQL_ENSURE(outputs.Size() == 1, "Multiple sinks are not supported yet");
+ TDqOutputAnnotationBase output = outputs.Item(0);
+ if (!output.Maybe<TDqSink>()) {
+ return false;
+ }
+ newEffect = Build<TKqpSinkEffect>(ctx, effect.Pos())
+ .Stage(maybeStage.Cast().Ptr())
+ .SinkIndex().Build("0")
+ .Done();
}
YQL_ENSURE(newEffect);
- newEffects.push_back(newEffect.Cast());
-
- if (input) {
- inputArgs.push_back(inputArg);
- inputs.push_back(input.Cast());
+ if (sinkEffect) {
+ newSinkEffects.push_back(newEffect.Cast());
+ } else {
+ newEffects.push_back(newEffect.Cast());
}
}
- auto stage = Build<TDqStage>(ctx, pos)
- .Inputs()
- .Add(inputs)
- .Build()
- .Program()
- .Args(inputArgs)
- .Body<TKqpEffects>()
- .Add(newEffects)
+ if (!newEffects.empty()) {
+ auto stage = Build<TDqStage>(ctx, pos)
+ .Inputs()
+ .Add(inputs)
.Build()
- .Build()
- .Settings().Build()
- .Done();
-
- for (ui32 i = 0; i < newEffects.size(); ++i) {
- auto effect = Build<TDqOutput>(ctx, pos)
- .Stage(stage)
- .Index().Build(ToString(0))
+ .Program()
+ .Args(inputArgs)
+ .Body<TKqpEffects>()
+ .Add(newEffects)
+ .Build()
+ .Build()
+ .Settings().Build()
.Done();
- builtEffects.push_back(effect);
+ for (ui32 i = 0; i < newEffects.size(); ++i) {
+ auto effect = Build<TDqOutput>(ctx, pos)
+ .Stage(stage)
+ .Index().Build(ToString(0))
+ .Done();
+
+ builtEffects.push_back(effect);
+ }
+ }
+
+ if (!newSinkEffects.empty()) {
+ builtEffects.insert(builtEffects.end(), newSinkEffects.begin(), newSinkEffects.end());
}
return true;
@@ -432,20 +472,20 @@ TMaybeNode<TKqlQuery> BuildEffects(const TKqlQuery& query, TExprContext& ctx,
TVector<TExprBase> builtEffects;
if constexpr (GroupEffectsByTable) {
- TMap<TStringBuf, TVector<TKqlTableEffect>> tableEffectsMap;
+ TMap<TStringBuf, TVector<TExprBase>> tableEffectsMap;
for (const auto& maybeEffect : query.Effects()) {
if (const auto maybeList = maybeEffect.Maybe<TExprList>()) {
for (const auto effect : maybeList.Cast()) {
YQL_ENSURE(effect.Maybe<TKqlTableEffect>());
auto tableEffect = effect.Cast<TKqlTableEffect>();
- tableEffectsMap[tableEffect.Table().Path()].push_back(tableEffect);
+ tableEffectsMap[tableEffect.Table().Path()].push_back(effect);
}
} else {
YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>());
auto tableEffect = maybeEffect.Cast<TKqlTableEffect>();
- tableEffectsMap[tableEffect.Table().Path()].push_back(tableEffect);
+ tableEffectsMap[tableEffect.Table().Path()].push_back(maybeEffect);
}
}
@@ -460,18 +500,12 @@ TMaybeNode<TKqlQuery> BuildEffects(const TKqlQuery& query, TExprContext& ctx,
for (const auto& maybeEffect : query.Effects()) {
if (const auto maybeList = maybeEffect.Maybe<TExprList>()) {
for (const auto effect : maybeList.Cast()) {
- YQL_ENSURE(effect.Maybe<TKqlTableEffect>());
- auto tableEffect = effect.Cast<TKqlTableEffect>();
-
- if (!BuildEffects(query.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) {
+ if (!BuildEffects(query.Pos(), {effect}, ctx, kqpCtx, builtEffects)) {
return {};
}
}
} else {
- YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>());
- auto tableEffect = maybeEffect.Cast<TKqlTableEffect>();
-
- if (!BuildEffects(query.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) {
+ if (!BuildEffects(query.Pos(), {maybeEffect}, ctx, kqpCtx, builtEffects)) {
return {};
}
}
@@ -502,7 +536,7 @@ TAutoPtr<IGraphTransformer> CreateKqpQueryEffectsTransformer(const TIntrusivePtr
bool requireBuild = false;
bool hasBuilt = false;
for (const auto& effect : query.Effects()) {
- if (!effect.Maybe<TDqOutput>()) {
+ if (!IsBuiltEffect(effect)) {
requireBuild = true;
} else {
hasBuilt = true;
diff --git a/ydb/core/kqp/opt/kqp_opt_impl.h b/ydb/core/kqp/opt/kqp_opt_impl.h
index 1b6c528e9d..8370b0994b 100644
--- a/ydb/core/kqp/opt/kqp_opt_impl.h
+++ b/ydb/core/kqp/opt/kqp_opt_impl.h
@@ -60,4 +60,6 @@ TVector<std::pair<NYql::TExprNode::TPtr, const NYql::TIndexDescription*>> BuildS
const std::function<NYql::NNodes::TExprBase (const NYql::TKikimrTableMetadata&,
NYql::TPositionHandle, NYql::TExprContext&)>& tableBuilder);
+bool IsBuiltEffect(const NYql::NNodes::TExprBase& effect);
+
} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp
index 2ac4bc8ae8..1c25cc2ad4 100644
--- a/ydb/core/kqp/opt/kqp_opt_kql.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp
@@ -235,7 +235,7 @@ TCoAtomList BuildUpsertInputColumns(const TCoAtomList& inputColumns,
for(const auto& item: inputColumns) {
result.push_back(item.Ptr());
}
-
+
for(const auto& item: autoincrement) {
result.push_back(item.Ptr());
}
@@ -259,7 +259,7 @@ std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, co
}
if (isWriteReplace) {
- std::tie(input, inputCols) = CreateRowsToReplace(input, inputColumns, table, write.Pos(), ctx);
+ std::tie(input, inputCols) = CreateRowsToReplace(input, inputColumns, table, write.Pos(), ctx);
}
auto baseInput = Build<TKqpWriteConstraint>(ctx, pos)
@@ -816,6 +816,32 @@ TVector<TExprBase> HandleDeleteTable(const TKiDeleteTable& del, TExprContext& ct
}
}
+TExprNode::TPtr HandleExternalWrite(const TCallable& effect, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
+ if (effect.Ref().ChildrenSize() <= 1) {
+ return {};
+ }
+ // As a rule data sink is a second child for all write callables
+ TExprBase dataSinkArg(effect.Ref().Child(1));
+ if (auto maybeDataSink = dataSinkArg.Maybe<TCoDataSink>()) {
+ TStringBuf dataSinkCategory = maybeDataSink.Cast().Category();
+ auto dataSinkProviderIt = typesCtx.DataSinkMap.find(dataSinkCategory);
+ if (dataSinkProviderIt != typesCtx.DataSinkMap.end()) {
+ if (auto* dqIntegration = dataSinkProviderIt->second->GetDqIntegration()) {
+ if (auto canWrite = dqIntegration->CanWrite(*effect.Raw(), ctx)) {
+ YQL_ENSURE(*canWrite, "Erros handling write");
+ if (auto result = dqIntegration->WrapWrite(effect.Ptr(), ctx)) {
+ return Build<TKqlExternalEffect>(ctx, effect.Pos())
+ .Input(result)
+ .Done()
+ .Ptr();
+ }
+ }
+ }
+ }
+ }
+ return {};
+}
+
} // namespace
const TKikimrTableDescription& GetTableData(const TKikimrTablesData& tablesData,
@@ -836,12 +862,12 @@ TIntrusivePtr<TKikimrTableMetadata> GetIndexMetadata(const TKqlReadTableIndex& r
}
TMaybe<TKqlQueryList> BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TKikimrTablesData& tablesData,
- TExprContext& ctx, bool withSystemColumns, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, NYql::TTypeAnnotationContext& typesCtx)
+ TExprContext& ctx, bool withSystemColumns, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typesCtx)
{
TVector<TKqlQuery> queryBlocks;
queryBlocks.reserve(dataQueryBlocks.ArgCount());
for (const auto& block : dataQueryBlocks) {
- TVector <TExprBase> kqlEffects;
+ TVector<TExprBase> kqlEffects;
for (const auto& effect : block.Effects()) {
if (auto maybeWrite = effect.Maybe<TKiWriteTable>()) {
auto result = HandleWriteTable(maybeWrite.Cast(), ctx, tablesData, kqpCtx);
@@ -861,9 +887,13 @@ TMaybe<TKqlQueryList> BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TK
auto results = HandleDeleteTable(maybeDelete.Cast(), ctx, tablesData, withSystemColumns, kqpCtx);
kqlEffects.insert(kqlEffects.end(), results.begin(), results.end());
}
+
+ if (TExprNode::TPtr result = HandleExternalWrite(effect, ctx, typesCtx)) {
+ kqlEffects.emplace_back(result);
+ }
}
- TVector <TKqlQueryResult> kqlResults;
+ TVector<TKqlQueryResult> kqlResults;
kqlResults.reserve(block.Results().Size());
for (const auto& kiResult : block.Results()) {
kqlResults.emplace_back(
@@ -888,7 +918,7 @@ TMaybe<TKqlQueryList> BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TK
TOptimizeExprSettings optSettings(nullptr);
optSettings.VisitChanges = true;
auto status = OptimizeExpr(queryBlock.Ptr(), optResult,
- [&tablesData, withSystemColumns, &kqpCtx, &typesCtx](const TExprNode::TPtr& input, TExprContext &ctx) {
+ [&tablesData, withSystemColumns, &kqpCtx, &typesCtx](const TExprNode::TPtr& input, TExprContext& ctx) {
auto node = TExprBase(input);
TExprNode::TPtr effect;
diff --git a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp
index 90ab8da99a..6353167edc 100644
--- a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp
@@ -35,7 +35,7 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckPhysicalQueryTransformer() {
}
for (const auto& effect : query.Effects()) {
- if (!effect.Maybe<TDqOutput>()) {
+ if (!IsBuiltEffect(effect)) {
ctx.AddError(TIssue(ctx.GetPosition(effect.Pos()), "Failed to build query effects."));
return TStatus::Error;
}
@@ -86,10 +86,12 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckPhysicalQueryTransformer() {
YQL_ENSURE(stageType);
auto stageResultType = stageType->Cast<TTupleExprType>();
const auto& stageConsumers = GetConsumers(stage, parentsMap);
+ bool stageWithResult = false;
TDynBitMap usedOutputs;
for (auto consumer : stageConsumers) {
if (auto maybeOutput = TExprBase(consumer).Maybe<TDqOutput>()) {
+ stageWithResult = true;
auto output = maybeOutput.Cast();
auto outputIndex = FromString<ui32>(output.Index().Value());
if (usedOutputs.Test(outputIndex)) {
@@ -100,18 +102,38 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckPhysicalQueryTransformer() {
}
usedOutputs.Set(outputIndex);
} else {
- YQL_ENSURE(false, "Stage #" << PrintKqpStageOnly(stage, ctx) << " has unexpected consumer: "
- << consumer->Content());
+ // There can be also an effect with stage that has dq sinks
+ // Check the following structure:
+ // TKqlQuery (tuple with 2 elems) - results and effects
+ auto stageParentsIt = parentsMap.find(stage.Raw());
+ YQL_ENSURE(stageParentsIt != parentsMap.end());
+ if (stageParentsIt->second.size() != 1) {
+ hasMultipleConsumers = true;
+ } else {
+ const TExprNode* effectNode = *stageParentsIt->second.begin();
+ auto effectParentIt = parentsMap.find(effectNode);
+ YQL_ENSURE(effectParentIt != parentsMap.end());
+ if (effectParentIt->second.size() != 1) {
+ hasMultipleConsumers = true;
+ } else {
+ const TExprNode* queryNode = *effectParentIt->second.begin();
+ YQL_ENSURE(queryNode->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Tuple,
+ "Stage #" << PrintKqpStageOnly(stage, ctx) << " has unexpected consumer: "
+ << consumer->Content());
+ }
+ }
}
}
- for (size_t i = 0; i < stageResultType->GetSize(); ++i) {
- if (!usedOutputs.Test(i)) {
- hasBrokenStage = true;
- YQL_CLOG(ERROR, ProviderKqp) << "Stage #" << PrintKqpStageOnly(stage, ctx)
- << ", output " << i << " (" << FormatType(stageResultType->GetItems()[i]) << ")"
- << " not used";
- return false;
+ if (stageWithResult) {
+ for (size_t i = 0; i < stageResultType->GetSize(); ++i) {
+ if (!usedOutputs.Test(i)) {
+ hasBrokenStage = true;
+ YQL_CLOG(ERROR, ProviderKqp) << "Stage #" << PrintKqpStageOnly(stage, ctx)
+ << ", output " << i << " (" << FormatType(stageResultType->GetItems()[i]) << ")"
+ << " not used";
+ return false;
+ }
}
}
}
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index b29c09596a..543047c883 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -190,10 +190,15 @@ public:
}
for (const auto& stage: Tx.Stages()) {
- if (stage.Cast<TDqStageBase>().Program().Body().Maybe<TKqpEffects>()) {
- auto &planNode = AddPlanNode(phaseNode);
+ TDqStageBase stageBase = stage.Cast<TDqStageBase>();
+ if (stageBase.Program().Body().Maybe<TKqpEffects>()) {
+ auto& planNode = AddPlanNode(phaseNode);
planNode.TypeName = "Effect";
Visit(TExprBase(stage), planNode);
+ } else if (stageBase.Outputs()) { // Sink
+ auto& planNode = AddPlanNode(phaseNode);
+ planNode.TypeName = "Sink";
+ Visit(TExprBase(stage), planNode);
}
}
@@ -884,7 +889,7 @@ private:
Visit(settings.Cast(), stagePlanNode);
} else {
TOperator op;
- op.Properties["Name"] = TString(source.Cast().DataSource().Cast<TCoDataSource>().Category().Value());
+ op.Properties["Name"] = source.Cast().DataSource().Cast<TCoDataSource>().Category().StringValue();
AddOperator(stagePlanNode, "Source", op);
}
} else {
@@ -896,6 +901,16 @@ private:
Visit(inputCn.Output().Stage(), inputPlanNode);
}
}
+
+ if (auto outputs = expr.Cast<TDqStageBase>().Outputs()) {
+ for (auto output : outputs.Cast()) {
+ if (output.Maybe<TDqSink>()) {
+ TOperator op;
+ op.Properties["Name"] = output.DataSink().Cast<TCoDataSink>().Category().StringValue();
+ AddOperator(stagePlanNode, "Sink", op);
+ }
+ }
+ }
} else {
Visit(expr.Ptr(), planNode);
diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
index 1a1068757d..16b322b860 100644
--- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
+++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
@@ -250,6 +250,7 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
.Inputs(ctx.ReplaceNodes(stage.Inputs().Ptr(), stagesMap))
.Program(ctx.DeepCopyLambda(TKqpProgram(newProgram).Lambda().Ref()))
.Settings(stage.Settings())
+ .Outputs(stage.Outputs())
.Done();
stages.emplace_back(newStage);
diff --git a/ydb/core/kqp/provider/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/provider/CMakeLists.darwin-x86_64.txt
index 4b843449f5..324ba1b41a 100644
--- a/ydb/core/kqp/provider/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/provider/CMakeLists.darwin-x86_64.txt
@@ -36,6 +36,7 @@ target_link_libraries(core-kqp-provider PUBLIC
public-lib-scheme_types
cpp-client-ydb_topic
yql-core-expr_nodes
+ yql-core-peephole_opt
parser-pg_wrapper-interface
providers-common-codec
providers-common-config
diff --git a/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt
index 46d277a850..125cf43faa 100644
--- a/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt
@@ -37,6 +37,7 @@ target_link_libraries(core-kqp-provider PUBLIC
public-lib-scheme_types
cpp-client-ydb_topic
yql-core-expr_nodes
+ yql-core-peephole_opt
parser-pg_wrapper-interface
providers-common-codec
providers-common-config
diff --git a/ydb/core/kqp/provider/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/provider/CMakeLists.linux-x86_64.txt
index 46d277a850..125cf43faa 100644
--- a/ydb/core/kqp/provider/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/provider/CMakeLists.linux-x86_64.txt
@@ -37,6 +37,7 @@ target_link_libraries(core-kqp-provider PUBLIC
public-lib-scheme_types
cpp-client-ydb_topic
yql-core-expr_nodes
+ yql-core-peephole_opt
parser-pg_wrapper-interface
providers-common-codec
providers-common-config
diff --git a/ydb/core/kqp/provider/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/provider/CMakeLists.windows-x86_64.txt
index 4b843449f5..324ba1b41a 100644
--- a/ydb/core/kqp/provider/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/provider/CMakeLists.windows-x86_64.txt
@@ -36,6 +36,7 @@ target_link_libraries(core-kqp-provider PUBLIC
public-lib-scheme_types
cpp-client-ydb_topic
yql-core-expr_nodes
+ yql-core-peephole_opt
parser-pg_wrapper-interface
providers-common-codec
providers-common-config
diff --git a/ydb/core/kqp/provider/ya.make b/ydb/core/kqp/provider/ya.make
index c25d4d5aa8..e49a6415fc 100644
--- a/ydb/core/kqp/provider/ya.make
+++ b/ydb/core/kqp/provider/ya.make
@@ -35,6 +35,7 @@ PEERDIR(
ydb/public/lib/scheme_types
ydb/public/sdk/cpp/client/ydb_topic
ydb/library/yql/core/expr_nodes
+ ydb/library/yql/core/peephole_opt
ydb/library/yql/parser/pg_wrapper/interface
ydb/library/yql/providers/common/codec
ydb/library/yql/providers/common/config
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
index 8d64d840cd..3bf5acc332 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
@@ -5,6 +5,8 @@
#include <ydb/library/yql/core/yql_expr_optimize.h>
+#include <ydb/library/yql/utils/log/log.h>
+
namespace NYql {
namespace {
@@ -340,13 +342,15 @@ public:
TTypeAnnotationContext& types,
TIntrusivePtr<IKikimrGateway> gateway,
TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory,
TIntrusivePtr<IKikimrQueryExecutor> queryExecutor)
: FunctionRegistry(functionRegistry)
, Types(types)
, Gateway(gateway)
, SessionCtx(sessionCtx)
+ , ExternalSourceFactory(externalSourceFactory)
, IntentDeterminationTransformer(CreateKiSinkIntentDeterminationTransformer(sessionCtx))
- , TypeAnnotationTransformer(CreateKiSinkTypeAnnotationTransformer(gateway, sessionCtx))
+ , TypeAnnotationTransformer(CreateKiSinkTypeAnnotationTransformer(gateway, sessionCtx, types))
, LogicalOptProposalTransformer(CreateKiLogicalOptProposalTransformer(sessionCtx, types))
, PhysicalOptProposalTransformer(CreateKiPhysicalOptProposalTransformer(sessionCtx))
, CallableExecutionTransformer(CreateKiSinkCallableExecutionTransformer(gateway, sessionCtx, queryExecutor))
@@ -496,6 +500,57 @@ public:
.Ptr();
}
+ TExprNode::TPtr RewriteIOExternal(const TKikimrKey& key, const TExprNode::TPtr& node, TExprContext& ctx) {
+ TKiDataSink dataSink(node->ChildPtr(1));
+ auto& tableDesc = SessionCtx->Tables().GetTable(TString{dataSink.Cluster()}, key.GetTablePath());
+ if (!tableDesc.Metadata || tableDesc.Metadata->Kind != EKikimrTableKind::External) {
+ return nullptr;
+ }
+
+ if (tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalDataSource && tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalTable) {
+ YQL_CVLOG(NLog::ELevel::ERROR, NLog::EComponent::ProviderKikimr) << "Skip RewriteIO for external entity: unknown entity type: " << (int)tableDesc.Metadata->ExternalSource.SourceType;
+ return nullptr;
+ }
+
+ ctx.Step.Repeat(TExprStep::DiscoveryIO)
+ .Repeat(TExprStep::Epochs)
+ .Repeat(TExprStep::Intents)
+ .Repeat(TExprStep::LoadTablesMetadata)
+ .Repeat(TExprStep::RewriteIO);
+
+ const auto& externalSource = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type);
+ if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource) {
+ auto writeArgs = node->ChildrenList();
+ writeArgs[1] = Build<TCoDataSink>(ctx, node->Pos())
+ .Category(ctx.NewAtom(node->Pos(), externalSource->GetName()))
+ .FreeArgs()
+ .Add(writeArgs[1]->ChildrenList()[1])
+ .Build()
+ .Done().Ptr();
+ return ctx.ChangeChildren(*node, std::move(writeArgs));
+ } else { // tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalTable
+ TExprNode::TPtr path = ctx.NewCallable(node->Pos(), "String", { ctx.NewAtom(node->Pos(), tableDesc.Metadata->ExternalSource.TableLocation) });
+ auto table = ctx.NewList(node->Pos(), {ctx.NewAtom(node->Pos(), "table"), path});
+ auto keyNode = ctx.NewCallable(node->Pos(), "Key", {table});
+ auto r = Build<TCoWrite>(ctx, node->Pos())
+ .World(node->Child(0))
+ .DataSink()
+ .Category(ctx.NewAtom(node->Pos(), externalSource->GetName()))
+ .FreeArgs()
+ .Add(ctx.NewAtom(node->Pos(), tableDesc.Metadata->ExternalSource.DataSourcePath))
+ .Build()
+ .Build()
+ .FreeArgs()
+ .Add(keyNode)
+ .Add(node->Child(3))
+ .Add(BuildExternalTableSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, externalSource, tableDesc.Metadata->ExternalSource.TableContent))
+ .Build()
+ .Done().Ptr();
+ return r;
+ }
+ return nullptr;
+ }
+
TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) override {
YQL_ENSURE(node->IsCallable(WriteName), "Expected Write!, got: " << node->Content());
@@ -504,6 +559,10 @@ public:
switch (key.GetKeyType()) {
case TKikimrKey::Type::Table: {
+ if (TExprNode::TPtr resultNode = RewriteIOExternal(key, node, ctx)) {
+ return resultNode;
+ }
+
NCommon::TWriteTableSettings settings = NCommon::ParseWriteTableSettings(TExprList(node->Child(4)), ctx);
YQL_ENSURE(settings.Mode);
auto mode = settings.Mode.Cast();
@@ -837,6 +896,7 @@ private:
const TTypeAnnotationContext& Types;
TIntrusivePtr<IKikimrGateway> Gateway;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
+ NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory;
TAutoPtr<IGraphTransformer> IntentDeterminationTransformer;
TAutoPtr<IGraphTransformer> TypeAnnotationTransformer;
@@ -970,9 +1030,10 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSink(
TTypeAnnotationContext& types,
TIntrusivePtr<IKikimrGateway> gateway,
TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory,
TIntrusivePtr<IKikimrQueryExecutor> queryExecutor)
{
- return new TKikimrDataSink(functionRegistry, types, gateway, sessionCtx, queryExecutor);
+ return new TKikimrDataSink(functionRegistry, types, gateway, sessionCtx, externalSourceFactory, queryExecutor);
}
TAutoPtr<IGraphTransformer> CreateKiSinkIntentDeterminationTransformer(
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
index dbe344fce2..d18ac31c98 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
@@ -16,6 +16,48 @@
#include <util/generic/is_in.h>
namespace NYql {
+
+static Ydb::Type CreateYdbType(const NKikimr::NScheme::TTypeInfo& typeInfo, bool notNull) {
+ Ydb::Type ydbType;
+ if (typeInfo.GetTypeId() == NKikimr::NScheme::NTypeIds::Pg) {
+ auto* typeDesc = typeInfo.GetTypeDesc();
+ auto* pg = ydbType.mutable_pg_type();
+ pg->set_type_name(NKikimr::NPg::PgTypeNameFromTypeDesc(typeDesc));
+ pg->set_oid(NKikimr::NPg::PgTypeIdFromTypeDesc(typeDesc));
+ } else {
+ auto& item = notNull
+ ? ydbType
+ : *ydbType.mutable_optional_type()->mutable_item();
+ item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId());
+ }
+ return ydbType;
+}
+
+TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ctx, const TMap<TString, NYql::TKikimrColumnMetadata>& columns, const NKikimr::NExternalSource::IExternalSource::TPtr& source, const TString& content) {
+ TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>> typedColumns;
+ typedColumns.reserve(columns.size());
+ for (const auto& [n, c] : columns) {
+ NYdb::TTypeParser parser(NYdb::TType(CreateYdbType(c.TypeInfo, c.NotNull)));
+ auto type = NFq::MakeType(parser, ctx);
+ typedColumns.emplace_back(n, type);
+ }
+
+ const TString ysonSchema = NYql::NCommon::WriteTypeToYson(NFq::MakeStructType(typedColumns, ctx), NYson::EYsonFormat::Text);
+ TExprNode::TListType items;
+ auto schema = ctx.NewAtom(pos, ysonSchema);
+ auto type = ctx.NewCallable(pos, "SqlTypeFromYson"sv, { schema });
+ auto order = ctx.NewCallable(pos, "SqlColumnOrderFromYson"sv, { schema });
+ auto userSchema = ctx.NewAtom(pos, "userschema"sv);
+ items.emplace_back(ctx.NewList(pos, {userSchema, type, order}));
+
+ for (const auto& [key, value]: source->GetParameters(content)) {
+ auto keyAtom = ctx.NewAtom(pos, NormalizeName(key));
+ auto valueAtom = ctx.NewAtom(pos, value);
+ items.emplace_back(ctx.NewList(pos, {keyAtom, valueAtom}));
+ }
+ return ctx.NewList(pos, std::move(items));
+}
+
namespace {
using namespace NKikimr;
@@ -368,7 +410,7 @@ public:
, IntentDeterminationTransformer(new TKiSourceIntentDeterminationTransformer(sessionCtx))
, LoadTableMetadataTransformer(CreateKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory, isInternalCall))
, TypeAnnotationTransformer(CreateKiSourceTypeAnnotationTransformer(sessionCtx, types))
- , CallableExecutionTransformer(CreateKiSourceCallableExecutionTransformer(gateway, sessionCtx))
+ , CallableExecutionTransformer(CreateKiSourceCallableExecutionTransformer(gateway, sessionCtx, types))
{
Y_UNUSED(FunctionRegistry);
@@ -551,47 +593,6 @@ public:
return false;
}
- static Ydb::Type CreateYdbType(const NScheme::TTypeInfo& typeInfo, bool notNull) {
- Ydb::Type ydbType;
- if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
- auto* typeDesc = typeInfo.GetTypeDesc();
- auto* pg = ydbType.mutable_pg_type();
- pg->set_type_name(NKikimr::NPg::PgTypeNameFromTypeDesc(typeDesc));
- pg->set_oid(NKikimr::NPg::PgTypeIdFromTypeDesc(typeDesc));
- } else {
- auto& item = notNull
- ? ydbType
- : *ydbType.mutable_optional_type()->mutable_item();
- item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId());
- }
- return ydbType;
- }
-
- TExprNode::TPtr BuildSettings(TPositionHandle pos, TExprContext& ctx, const TMap<TString, NYql::TKikimrColumnMetadata>& columns, const NExternalSource::IExternalSource::TPtr& source, const TString& content) {
- TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>> typedColumns;
- typedColumns.reserve(columns.size());
- for (const auto& [n, c] : columns) {
- NYdb::TTypeParser parser(NYdb::TType(CreateYdbType(c.TypeInfo, c.NotNull)));
- auto type = NFq::MakeType(parser, ctx);
- typedColumns.emplace_back(n, type);
- }
-
- const TString ysonSchema = NYql::NCommon::WriteTypeToYson(NFq::MakeStructType(typedColumns, ctx), NYson::EYsonFormat::Text);
- TExprNode::TListType items;
- auto schema = ctx.NewAtom(pos, ysonSchema);
- auto type = ctx.NewCallable(pos, "SqlTypeFromYson"sv, { schema });
- auto order = ctx.NewCallable(pos, "SqlColumnOrderFromYson"sv, { schema });
- auto userSchema = ctx.NewAtom(pos, "userschema"sv);
- items.emplace_back(ctx.NewList(pos, {userSchema, type, order}));
-
- for (const auto& [key, value]: source->GetParameters(content)) {
- auto keyAtom = ctx.NewAtom(pos, NormalizeName(key));
- auto valueAtom = ctx.NewAtom(pos, value);
- items.emplace_back(ctx.NewList(pos, {keyAtom, valueAtom}));
- }
- return ctx.NewList(pos, std::move(items));
- }
-
TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) override {
auto read = node->Child(0);
if (!read->IsCallable(ReadName)) {
@@ -664,7 +665,7 @@ public:
.FreeArgs()
.Add(ctx.NewCallable(node->Pos(), "MrTableConcat", {key}))
.Add(ctx.NewCallable(node->Pos(), "Void", {}))
- .Add(BuildSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, source, tableDesc.Metadata->ExternalSource.TableContent))
+ .Add(BuildExternalTableSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, source, tableDesc.Metadata->ExternalSource.TableContent))
.Build()
.Done().Ptr();
auto retChildren = node->ChildrenList();
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index ec0792a718..1557ede0b7 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -499,19 +499,32 @@ private:
};
class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<TKiSourceCallableExecutionTransformer> {
-
private:
+ IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) const {
+ TPeepholeSettings peepholeSettings;
+ bool hasNonDeterministicFunctions;
+ auto status = PeepHoleOptimizeNode(input, output, ctx, TypesCtx, nullptr, hasNonDeterministicFunctions, peepholeSettings);
+ if (status.Level != TStatus::Ok) {
+ ctx.AddError(TIssue(ctx.GetPosition(output->Pos()), TString("Peephole optimization failed for Dq stage")));
+ return status;
+ }
+ return status;
+ }
- TString GetLambdaBody(TExprNode::TPtr resInput, NKikimrMiniKQL::TType& resultType, TExprContext& ctx) {
-
+ IGraphTransformer::TStatus GetLambdaBody(TExprNode::TPtr resInput, NKikimrMiniKQL::TType& resultType, TExprContext& ctx, TString& lambda) {
auto pos = resInput->Pos();
auto typeAnn = resInput->GetTypeAnn();
const auto kind = resInput->GetTypeAnn()->GetKind();
- const bool data = kind != ETypeAnnotationKind::Flow && kind != ETypeAnnotationKind::List && kind != ETypeAnnotationKind::Stream && kind != ETypeAnnotationKind::Optional;
+ const bool data = kind != ETypeAnnotationKind::Flow && kind != ETypeAnnotationKind::Stream && kind != ETypeAnnotationKind::Optional;
auto node = ctx.WrapByCallableIf(kind != ETypeAnnotationKind::Stream, "ToStream",
ctx.WrapByCallableIf(data, "Just", std::move(resInput)));
+ auto peepHoleStatus = PeepHole(node, node, ctx);
+ if (peepHoleStatus.Level != IGraphTransformer::TStatus::Ok) {
+ return peepHoleStatus;
+ }
+
auto guard = Guard(SessionCtx->Query().QueryData->GetAllocState()->Alloc);
auto input = Build<TDqPhyStage>(ctx, pos)
@@ -530,7 +543,7 @@ private:
TVector<TExprBase> fakeReads;
auto paramsType = NDq::CollectParameters(programLambda, ctx);
- auto lambda = NDq::BuildProgram(
+ lambda = NDq::BuildProgram(
programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(),
ctx, fakeReads);
@@ -542,7 +555,7 @@ private:
auto type = NYql::NCommon::BuildType(*typeAnn, programBuilder, errorStream);
ExportTypeToProto(type, resultType);
- return lambda;
+ return IGraphTransformer::TStatus::Ok;
}
TString EncodeResultToYson(const NKikimrMiniKQL::TResult& result, bool& truncated) {
@@ -563,9 +576,11 @@ private:
public:
TKiSourceCallableExecutionTransformer(TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx)
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ TTypeAnnotationContext& types)
: Gateway(gateway)
- , SessionCtx(sessionCtx) {}
+ , SessionCtx(sessionCtx)
+ , TypesCtx(types) {}
std::pair<TStatus, TAsyncTransformCallbackFuture> CallbackTransform(const TExprNode::TPtr& input,
TExprNode::TPtr& output, TExprContext& ctx)
@@ -588,7 +603,11 @@ public:
if (input->Content() == "Result") {
auto result = TMaybeNode<TResult>(input).Cast();
NKikimrMiniKQL::TType resultType;
- auto program = GetLambdaBody(result.Input().Ptr(), resultType, ctx);
+ TString program;
+ TStatus status = GetLambdaBody(result.Input().Ptr(), resultType, ctx, program);
+ if (status.Level != TStatus::Ok) {
+ return SyncStatus(status);
+ }
auto asyncResult = Gateway->ExecuteLiteral(program, resultType, SessionCtx->Query().QueryData->GetAllocState());
return std::make_pair(IGraphTransformer::TStatus::Async, asyncResult.Apply(
@@ -745,6 +764,7 @@ private:
private:
TIntrusivePtr<IKikimrGateway> Gateway;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
+ TTypeAnnotationContext& TypesCtx;
};
template <class TKiObject, class TSettings>
@@ -1975,9 +1995,10 @@ private:
TAutoPtr<IGraphTransformer> CreateKiSourceCallableExecutionTransformer(
TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx)
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ TTypeAnnotationContext& types)
{
- return new TKiSourceCallableExecutionTransformer(gateway, sessionCtx);
+ return new TKiSourceCallableExecutionTransformer(gateway, sessionCtx, types);
}
TAutoPtr<IGraphTransformer> CreateKiSinkCallableExecutionTransformer(
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
index c50913c001..c2f5a31f4d 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
@@ -328,25 +328,33 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return result;
}
- for (const auto& dataSource : types.DataSources) {
- if (auto* dqIntegration = dataSource->GetDqIntegration(); dqIntegration
- && dqIntegration->CanRead(*node.Ptr(), ctx)
- && dqIntegration->EstimateReadSize(TDqSettings::TDefault::DataSizePerJob, TDqSettings::TDefault::MaxTasksPerStage, *node.Ptr(), ctx)) {
- txRes.Ops.insert(node.Raw());
- for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) {
- if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) {
- bool isWorldChild = false;
- if (child->IsWorld()) {
- isWorldChild = true;
- } else if (auto* typeAnn = child->GetTypeAnn(); typeAnn && typeAnn->GetKind() == ETypeAnnotationKind::World) {
- isWorldChild = true;
- }
- if (isWorldChild) {
- return ExploreTx(TExprBase(child), ctx, dataSink, txRes, tablesData, types);
+ if (node.Ref().ChildrenSize() > 1) {
+ TExprBase dataSourceArg(node.Ref().Child(1));
+ if (auto maybeDataSource = dataSourceArg.Maybe<TCoDataSource>()) {
+ TStringBuf dataSourceCategory = maybeDataSource.Cast().Category();
+ auto dataSourceProviderIt = types.DataSourceMap.find(dataSourceCategory);
+ if (dataSourceProviderIt != types.DataSourceMap.end()) {
+ if (auto* dqIntegration = dataSourceProviderIt->second->GetDqIntegration()) {
+ if (dqIntegration->CanRead(*node.Ptr(), ctx)
+ && dqIntegration->EstimateReadSize(
+ TDqSettings::TDefault::DataSizePerJob,
+ TDqSettings::TDefault::MaxTasksPerStage,
+ *node.Ptr(),
+ ctx))
+ {
+ txRes.Ops.insert(node.Raw());
+ for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) {
+ if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) {
+ auto* typeAnn = child->GetTypeAnn();
+ if (typeAnn && typeAnn->GetKind() == ETypeAnnotationKind::World) {
+ return ExploreTx(TExprBase(child), ctx, dataSink, txRes, tablesData, types);
+ }
+ }
+ }
+ YQL_ENSURE(false, "Node \"" << node.Ref().Content() << "\" is expected to contain world child");
}
}
}
- return true;
}
}
@@ -382,6 +390,32 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return result;
}
+ if (node.Ref().ChildrenSize() > 1) {
+ TExprBase dataSinkArg(node.Ref().Child(1));
+ if (auto maybeDataSink = dataSinkArg.Maybe<TCoDataSink>()) {
+ TStringBuf dataSinkCategory = maybeDataSink.Cast().Category();
+ auto dataSinkProviderIt = types.DataSinkMap.find(dataSinkCategory);
+ if (dataSinkProviderIt != types.DataSinkMap.end()) {
+ if (auto* dqIntegration = dataSinkProviderIt->second->GetDqIntegration()) {
+ if (auto canWrite = dqIntegration->CanWrite(node.Ref(), ctx)) {
+ YQL_ENSURE(*canWrite, "Errors handling write");
+ txRes.Ops.insert(node.Raw());
+ txRes.AddEffect(node, THashMap<TString, TPrimitiveYdbOperations>{});
+ for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) {
+ if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) {
+ auto* typeAnn = child->GetTypeAnn();
+ if (typeAnn && typeAnn->GetKind() == ETypeAnnotationKind::World) {
+ return ExploreTx(TExprBase(child), ctx, dataSink, txRes, tablesData, types);
+ }
+ }
+ }
+ YQL_ENSURE(false, "Node \"" << node.Ref().Content() << "\" is expected to contain world child");
+ }
+ }
+ }
+ }
+ }
+
if (auto maybeUpdate = node.Maybe<TKiUpdateTable>()) {
auto update = maybeUpdate.Cast();
if (!checkDataSink(update.DataSink())) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index a6345a57fd..a743c6ceb4 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -484,6 +484,7 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSink(
TTypeAnnotationContext& types,
TIntrusivePtr<IKikimrGateway> gateway,
TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory,
TIntrusivePtr<IKikimrQueryExecutor> queryExecutor);
} // namespace NYql
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
index 4adb83d704..414c128e65 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
@@ -162,7 +162,7 @@ struct TKiExecDataQuerySettings {
TAutoPtr<IGraphTransformer> CreateKiSourceTypeAnnotationTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx,
TTypeAnnotationContext& types);
TAutoPtr<IGraphTransformer> CreateKiSinkTypeAnnotationTransformer(TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx);
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types);
TAutoPtr<IGraphTransformer> CreateKiLogicalOptProposalTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx,
TTypeAnnotationContext& types);
TAutoPtr<IGraphTransformer> CreateKiPhysicalOptProposalTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx);
@@ -175,7 +175,8 @@ TAutoPtr<IGraphTransformer> CreateKiSinkIntentDeterminationTransformer(TIntrusiv
TAutoPtr<IGraphTransformer> CreateKiSourceCallableExecutionTransformer(
TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx);
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ TTypeAnnotationContext& types);
TAutoPtr<IGraphTransformer> CreateKiSinkCallableExecutionTransformer(
TIntrusivePtr<IKikimrGateway> gateway,
@@ -219,4 +220,6 @@ bool IsKikimrSystemColumn(const TStringBuf columnName);
bool ValidateTableHasIndex(TKikimrTableMetadataPtr metadata, TExprContext& ctx, const TPositionHandle& pos);
+TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ctx, const TMap<TString, NYql::TKikimrColumnMetadata>& columns, const NKikimr::NExternalSource::IExternalSource::TPtr& source, const TString& content);
+
} // namespace NYql
diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
index 10c58733c7..f07c2a564c 100644
--- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
@@ -7,6 +7,7 @@
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
#include <ydb/library/yql/parser/pg_wrapper/interface/type_desc.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.h>
@@ -292,9 +293,10 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer
{
public:
TKiSinkTypeAnnotationTransformer(TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx)
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types)
: Gateway(gateway)
- , SessionCtx(sessionCtx) {}
+ , SessionCtx(sessionCtx)
+ , Types(types) {}
private:
virtual TStatus HandleWriteTable(TKiWriteTable node, TExprContext& ctx) override {
@@ -382,7 +384,7 @@ private:
if (rowType->FindItem(keyColumnName)) {
continue;
}
-
+
if (!columnInfo.IsAutoIncrement()) {
ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
<< "Missing key column in input: " << keyColumnName
@@ -410,7 +412,7 @@ private:
//no type-level notnull check for pg types.
continue;
}
-
+
if (itemType && itemType->HasOptionalOrNull()) {
ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_COLUMN_TYPE, TStringBuilder()
<< "Can't set NULL or optional value to not null column: " << name
@@ -1501,9 +1503,28 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
}
if (!KikimrSupportedEffects().contains(effect.CallableName())) {
- ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
- << "Unsupported Kikimr data query effect: " << effect.CallableName()));
- return TStatus::Error;
+ bool supported = false;
+ if (effect.Ref().ChildrenSize() > 1) {
+ TExprBase dataSinkArg(effect.Ref().Child(1));
+ if (auto maybeDataSink = dataSinkArg.Maybe<TCoDataSink>()) {
+ TStringBuf dataSinkCategory = maybeDataSink.Cast().Category();
+ auto dataSinkProviderIt = Types.DataSinkMap.find(dataSinkCategory);
+ if (dataSinkProviderIt != Types.DataSinkMap.end()) {
+ if (auto* dqIntegration = dataSinkProviderIt->second->GetDqIntegration()) {
+ auto canWrite = dqIntegration->CanWrite(*effect.Raw(), ctx);
+ if (canWrite) {
+ supported = *canWrite; // if false, we will exit this function a few lines later
+ }
+ }
+ }
+ }
+ }
+
+ if (!supported) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
+ << "Unsupported Kikimr data query effect: " << effect.CallableName()));
+ return TStatus::Error;
+ }
}
}
@@ -1599,6 +1620,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
private:
TIntrusivePtr<IKikimrGateway> Gateway;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
+ TTypeAnnotationContext& Types;
};
} // namespace
@@ -1610,9 +1632,9 @@ TAutoPtr<IGraphTransformer> CreateKiSourceTypeAnnotationTransformer(TIntrusivePt
}
TAutoPtr<IGraphTransformer> CreateKiSinkTypeAnnotationTransformer(TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx)
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types)
{
- return new TKiSinkTypeAnnotationTransformer(gateway, sessionCtx);
+ return new TKiSinkTypeAnnotationTransformer(gateway, sessionCtx, types);
}
const TTypeAnnotationNode* GetReadTableRowType(TExprContext& ctx, const TKikimrTablesData& tablesData,
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 841d819b49..6f7122105f 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -708,6 +708,20 @@ private:
stageProto.SetOutputsCount(outputsCount);
+ // Dq sinks
+ if (auto maybeOutputsNode = stage.Outputs()) {
+ auto outputsNode = maybeOutputsNode.Cast();
+ for (size_t i = 0; i < outputsNode.Size(); ++i) {
+ auto outputNode = outputsNode.Item(i);
+ auto maybeSinkNode = outputNode.Maybe<TDqSink>();
+ YQL_ENSURE(maybeSinkNode);
+ auto sinkNode = maybeSinkNode.Cast();
+ auto* sinkProto = stageProto.AddSinks();
+ FillSink(sinkNode, sinkProto, ctx);
+ sinkProto->SetOutputIndex(FromString(TStringBuf(sinkNode.Index())));
+ }
+ }
+
auto paramsType = CollectParameters(stage, ctx);
auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry,
ctx, {});
@@ -751,10 +765,7 @@ private:
i.MutableProgram()->MutableSettings()->SetLevelDataPrediction(rPredictor.GetLevelDataVolume(i.GetProgram().GetSettings().GetStageLevel()));
}
-
- YQL_ENSURE(hasEffectStage == txSettings.WithEffects);
-
- txProto.SetHasEffects(txSettings.WithEffects);
+ txProto.SetHasEffects(hasEffectStage);
for (const auto& paramBinding : tx.ParamBindings()) {
TString paramName(paramBinding.Name().Value());
@@ -911,7 +922,7 @@ private:
void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns,
THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, TExprContext& ctx)
{
- const TStringBuf dataSourceCategory = source.DataSource().Cast<TCoDataSource>().Category().Value();
+ const TStringBuf dataSourceCategory = source.DataSource().Cast<TCoDataSource>().Category();
if (dataSourceCategory == NYql::KikimrProviderName || dataSourceCategory == NYql::YdbProviderName || dataSourceCategory == NYql::KqpReadRangesSourceName) {
FillKqpSource(source, protoSource, allowSystemColumns, tablesMap);
} else {
@@ -949,6 +960,22 @@ private:
}
}
+ void FillSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, TExprContext& ctx) {
+ Y_UNUSED(ctx);
+ const TStringBuf dataSinkCategory = sink.DataSink().Cast<TCoDataSink>().Category();
+ // Delegate sink filling to dq integration of specific provider
+ const auto provider = TypesCtx.DataSinkMap.find(dataSinkCategory);
+ YQL_ENSURE(provider != TypesCtx.DataSinkMap.end(), "Unsupported data sink category: \"" << dataSinkCategory << "\"");
+ NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration();
+ YQL_ENSURE(dqIntegration, "Unsupported dq sink for provider: \"" << dataSinkCategory << "\"");
+ auto& externalSink = *protoSink->MutableExternalSink();
+ google::protobuf::Any& settings = *externalSink.MutableSettings();
+ TString& sinkType = *externalSink.MutableType();
+ dqIntegration->FillSinkSettings(sink.Ref(), settings, sinkType);
+ YQL_ENSURE(!settings.type_url().empty(), "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings for its dq sink node");
+ YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings type for its dq sink node");
+ }
+
void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap,
NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx,
THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
index 1d7a7400da..18e0b3766b 100644
--- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
@@ -5,6 +5,8 @@
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/core/Aws.h>
#include <aws/s3/model/CreateBucketRequest.h>
+#include <aws/s3/model/GetObjectRequest.h>
+#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/S3Client.h>
@@ -39,21 +41,31 @@ bool EnsureAwsApiInited() {
return inited;
}
-void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content) {
+Aws::S3::S3Client MakeS3Client() {
EnsureAwsApiInited();
Aws::Client::ClientConfiguration s3ClientConfig;
s3ClientConfig.endpointOverride = GetEnv("S3_ENDPOINT");
s3ClientConfig.scheme = Aws::Http::Scheme::HTTP;
- Aws::S3::S3Client s3Client(std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3ClientConfig);
+ return Aws::S3::S3Client(std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3ClientConfig);
+}
- {
- Aws::S3::Model::CreateBucketRequest req;
- req.SetBucket(bucket);
- req.SetACL(Aws::S3::Model::BucketCannedACL::public_read_write);
- const Aws::S3::Model::CreateBucketOutcome result = s3Client.CreateBucket(req);
- UNIT_ASSERT_C(result.IsSuccess(), "Error creating bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
- }
+void CreateBucket(const TString& bucket, Aws::S3::S3Client& s3Client) {
+ Aws::S3::Model::CreateBucketRequest req;
+ req.SetBucket(bucket);
+ req.SetACL(Aws::S3::Model::BucketCannedACL::public_read_write);
+ const Aws::S3::Model::CreateBucketOutcome result = s3Client.CreateBucket(req);
+ UNIT_ASSERT_C(result.IsSuccess(), "Error creating bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
+}
+
+void CreateBucket(const TString& bucket) {
+ Aws::S3::S3Client s3Client = MakeS3Client();
+
+ CreateBucket(bucket, s3Client);
+}
+
+void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
+ CreateBucket(bucket, s3Client);
{
Aws::S3::Model::PutObjectRequest req;
@@ -67,11 +79,81 @@ void CreateBucketWithObject(const TString& bucket, const TString& object, const
}
}
+void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content) {
+ Aws::S3::S3Client s3Client = MakeS3Client();
+
+ CreateBucketWithObject(bucket, object, content, s3Client);
+}
+
+TString GetObject(const TString& bucket, const TString& object, Aws::S3::S3Client& s3Client) {
+ Aws::S3::Model::GetObjectRequest req;
+ req.WithBucket(bucket).WithKey(object);
+
+ Aws::S3::Model::GetObjectOutcome outcome = s3Client.GetObject(req);
+ UNIT_ASSERT(outcome.IsSuccess());
+ Aws::S3::Model::GetObjectResult& result = outcome.GetResult();
+ std::istreambuf_iterator<char> eos;
+ std::string objContent(std::istreambuf_iterator<char>(result.GetBody()), eos);
+ Cerr << "Got object content from \"" << bucket << "." << object << "\"\n" << objContent << Endl;
+ return objContent;
+}
+
+TString GetObject(const TString& bucket, const TString& object) {
+ Aws::S3::S3Client s3Client = MakeS3Client();
+
+ return GetObject(bucket, object, s3Client);
+}
+
+std::vector<TString> GetObjectKeys(const TString& bucket, Aws::S3::S3Client& s3Client) {
+ Aws::S3::Model::ListObjectsRequest listReq;
+ listReq.WithBucket(bucket);
+
+ Aws::S3::Model::ListObjectsOutcome outcome = s3Client.ListObjects(listReq);
+ UNIT_ASSERT(outcome.IsSuccess());
+
+ std::vector<TString> keys;
+ for (auto& obj : outcome.GetResult().GetContents()) {
+ keys.push_back(TString(obj.GetKey()));
+ Cerr << "Found S3 object: \"" << obj.GetKey() << "\"" << Endl;
+ }
+ return keys;
+}
+
+std::vector<TString> GetObjectKeys(const TString& bucket) {
+ Aws::S3::S3Client s3Client = MakeS3Client();
+
+ return GetObjectKeys(bucket, s3Client);
+}
+
+TString GetAllObjects(const TString& bucket, TStringBuf separator, Aws::S3::S3Client& s3Client) {
+ std::vector<TString> keys = GetObjectKeys(bucket, s3Client);
+ TString result;
+ bool firstObject = true;
+ for (const TString& key : keys) {
+ result += GetObject(bucket, key, s3Client);
+ if (!firstObject) {
+ result += separator;
+ }
+ firstObject = false;
+ }
+ return result;
+}
+
+TString GetAllObjects(const TString& bucket, TStringBuf separator = "") {
+ Aws::S3::S3Client s3Client = MakeS3Client();
+
+ return GetAllObjects(bucket, separator, s3Client);
+}
+
+TString GetBucketLocation(const TStringBuf bucket) {
+ return TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket << '/';
+}
+
NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) {
NYdb::NOperation::TOperationClient client(ydbDriver);
NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op;
do {
- if (!op.Initialized()) {
+ if (op.Initialized()) {
Sleep(TDuration::MilliSeconds(10));
}
op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId);
@@ -111,7 +193,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
);)",
"external_source"_a = externalDataSourceName,
"external_table"_a = externalTableName,
- "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/",
+ "location"_a = GetBucketLocation(bucket),
"object"_a = object
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
@@ -175,7 +257,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
);)",
"external_source"_a = externalDataSourceName,
"external_table"_a = externalTableName,
- "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/",
+ "location"_a = GetBucketLocation(bucket),
"object"_a = object
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
@@ -235,7 +317,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
AUTH_METHOD="NONE"
);)",
"external_source"_a = externalDataSourceName,
- "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/"
+ "location"_a = GetBucketLocation(bucket)
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
@@ -294,7 +376,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
);
)",
"external_source"_a = externalDataSourceName,
- "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/",
+ "location"_a = GetBucketLocation(bucket),
"ydb_table"_a = ydbTable
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
@@ -374,7 +456,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
);)",
"external_source"_a = externalDataSourceName,
"external_table"_a = externalTableName,
- "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/",
+ "location"_a = GetBucketLocation(bucket),
"object"_a = object
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
@@ -434,7 +516,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
);
)",
"external_source"_a = externalDataSourceName,
- "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/",
+ "location"_a = GetBucketLocation(bucket),
"ydb_table"_a = ydbTable
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
@@ -653,6 +735,86 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
}
}
+ Y_UNIT_TEST(InsertIntoBucket) {
+ using namespace fmt::literals;
+ const TString readDataSourceName = "/Root/read_data_source";
+ const TString readTableName = "/Root/read_binding";
+ const TString readBucket = "test_bucket_read";
+ const TString readObject = "test_object_read";
+ const TString writeDataSourceName = "/Root/write_data_source";
+ const TString writeTableName = "/Root/write_binding";
+ const TString writeBucket = "test_bucket_write";
+ const TString writeObject = "test_object_write/";
+
+ {
+ Aws::S3::S3Client s3Client = MakeS3Client();
+ CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client);
+ CreateBucket(writeBucket, s3Client);
+ }
+
+ auto kikimr = DefaultKikimrRunner();
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{read_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{read_location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `{read_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{read_source}",
+ LOCATION="{read_object}",
+ FORMAT="json_each_row"
+ );
+
+ CREATE EXTERNAL DATA SOURCE `{write_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{write_location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `{write_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{write_source}",
+ LOCATION="{write_object}",
+ FORMAT="tsv_with_names"
+ );
+ )",
+ "read_source"_a = readDataSourceName,
+ "read_table"_a = readTableName,
+ "read_location"_a = GetBucketLocation(readBucket),
+ "read_object"_a = readObject,
+ "write_source"_a = writeDataSourceName,
+ "write_table"_a = writeTableName,
+ "write_location"_a = GetBucketLocation(writeBucket),
+ "write_object"_a = writeObject
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ const TString sql = fmt::format(R"(
+ INSERT INTO `{write_table}`
+ SELECT * FROM `{read_table}`
+ )",
+ "read_table"_a=readTableName,
+ "write_table"_a = writeTableName);
+
+ auto db = kikimr.GetQueryClient();
+ auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx());
+ resultFuture.Wait();
+ UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
+
+ TString content = GetAllObjects(writeBucket);
+ UNIT_ASSERT_STRING_CONTAINS(content, "key\tvalue\n"); // tsv header
+ UNIT_ASSERT_STRING_CONTAINS(content, "1\ttrololo\n");
+ UNIT_ASSERT_STRING_CONTAINS(content, "2\thello world\n");
+ }
}
} // namespace NKqp
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 936680f952..2fe9c89f07 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -319,7 +319,20 @@ message TKqpSource {
TKqpReadRangesSource ReadRangesSource = 3;
TKqpExternalSource ExternalSource = 4;
}
-};
+}
+
+message TKqpExternalSink {
+ string Type = 1;
+ google.protobuf.Any Settings = 2;
+}
+
+message TKqpSink {
+ uint32 OutputIndex = 1;
+
+ oneof Type {
+ TKqpExternalSink ExternalSink = 2;
+ }
+}
message TKqpPhyStage {
NYql.NDqProto.TProgram Program = 1;
@@ -332,6 +345,7 @@ message TKqpPhyStage {
string StageGuid = 8;
repeated TKqpSource Sources = 9;
bool IsSinglePartition = 10;
+ repeated TKqpSink Sinks = 11;
}
message TKqpPhyResult {
diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.cpp b/ydb/library/yql/core/services/yql_transform_pipeline.cpp
index b4b494de29..719d35a306 100644
--- a/ydb/library/yql/core/services/yql_transform_pipeline.cpp
+++ b/ydb/library/yql/core/services/yql_transform_pipeline.cpp
@@ -56,11 +56,11 @@ TTransformationPipeline& TTransformationPipeline::AddParametersEvaluation(const
TTransformationPipeline& TTransformationPipeline::AddExpressionEvaluation(const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
IGraphTransformer* calcTransfomer, EYqlIssueCode issueCode) {
- auto typeCtx = TypeAnnotationContext_;
- auto funcReg = &functionRegistry;
+ auto& typeCtx = *TypeAnnotationContext_;
+ auto& funcReg = functionRegistry;
Transformers_.push_back(TTransformStage(CreateFunctorTransformer(
- [=](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
- return EvaluateExpression(input, output, *typeCtx, ctx, *funcReg, calcTransfomer);
+ [&typeCtx, &funcReg, calcTransfomer](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ return EvaluateExpression(input, output, typeCtx, ctx, funcReg, calcTransfomer);
}), "EvaluateExpression", issueCode));
return *this;
diff --git a/ydb/library/yql/core/yql_graph_transformer.cpp b/ydb/library/yql/core/yql_graph_transformer.cpp
index 886fddbfd1..dd3aa951df 100644
--- a/ydb/library/yql/core/yql_graph_transformer.cpp
+++ b/ydb/library/yql/core/yql_graph_transformer.cpp
@@ -142,7 +142,7 @@ protected:
void AddTooManyTransformationsError(TPositionHandle pos, const TStringBuf& where, TExprContext& ctx) {
ctx.AddError(TIssue(ctx.GetPosition(pos),
- TStringBuilder() << "YQL: Internal core error! " << where << " take too much iteration: "
+ TStringBuilder() << "YQL: Internal core error! " << where << " takes too much iterations: "
<< ctx.RepeatTransformLimit
<< ". You may set RepeatTransformLimit as flags for config provider."));
}
diff --git a/ydb/library/yql/dq/integration/yql_dq_integration.h b/ydb/library/yql/dq/integration/yql_dq_integration.h
index 860b69b4c3..b528faaa82 100644
--- a/ydb/library/yql/dq/integration/yql_dq_integration.h
+++ b/ydb/library/yql/dq/integration/yql_dq_integration.h
@@ -44,7 +44,13 @@ public:
virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0;
virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) = 0;
virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0;
- virtual TMaybe<bool> CanWrite(const TDqSettings& config, const TExprNode& write, TExprContext& ctx) = 0;
+
+ // Nothing if callable is not for writing,
+ // false if callable is for writing and there are some errors (they are added to ctx),
+ // true if callable is for writing and no issues occured.
+ virtual TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) = 0;
+
+ virtual TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0;
virtual void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) = 0;
virtual bool CanFallback() = 0;
virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) = 0;
diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp
index 10abdb7bcf..9caff49017 100644
--- a/ydb/library/yql/dq/opt/dq_opt_build.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp
@@ -116,7 +116,7 @@ void MakeConsumerReplaces(
TNodeOnNodeOwnedMap argsMap;
CollectArgsReplaces(dqStage, newArgs, argsMap, ctx);
auto newStage = Build<TDqStage>(ctx, dqStage.Pos())
- .InitFrom(dqStage)
+ .InitFrom(dqStage)
.Program()
.Args(newArgs)
.Body<TCoFlatMap>()
@@ -481,7 +481,7 @@ TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, bool useChannelBl
.Callable(0, "NarrowMap")
.Callable(0, "ToFlow")
.Add(0, newArgNode)
- .Seal()
+ .Seal()
.Lambda(1)
.Params("fields", itemType->GetSize())
.Callable("AsStruct")
@@ -568,6 +568,7 @@ TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExpr
.Body(resultStream)
.Build()
.Settings(TDqStageSettings::New(stage).SetWideChannels(outputItemType).BuildNode(ctx, stage.Pos()))
+ .Outputs(stage.Outputs())
.Done();
}
@@ -630,7 +631,7 @@ IGraphTransformer::TStatus DqEnableWideChannels(EChannelMode mode, TExprNode::TP
.Stage(newStage)
.Build()
.SortColumns(builder.Build().Value())
- .Done().Ptr();
+ .Done().Ptr();
} else {
auto newOutput = Build<TDqOutput>(ctx, conn.Output().Pos())
.InitFrom(conn.Output())
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
index 4f9da234bd..9ba72f7a86 100644
--- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
+++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
@@ -19,10 +19,14 @@ TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode
return read;
}
-TMaybe<bool> TDqIntegrationBase::CanWrite(const TDqSettings&, const TExprNode&, TExprContext&) {
+TMaybe<bool> TDqIntegrationBase::CanWrite(const TExprNode&, TExprContext&) {
return Nothing();
}
+TExprNode::TPtr TDqIntegrationBase::WrapWrite(const TExprNode::TPtr& write, TExprContext&) {
+ return write;
+}
+
void TDqIntegrationBase::RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase&) {
}
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
index de26d81671..abbc412d08 100644
--- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
+++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
@@ -12,7 +12,8 @@ public:
TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) override;
TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override;
void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override;
- TMaybe<bool> CanWrite(const TDqSettings& config, const TExprNode& write, TExprContext& ctx) override;
+ TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) override;
+ TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) override;
bool CanFallback() override;
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) override;
void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) override;
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
index e568efb416..0cc5f15570 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
@@ -180,7 +180,7 @@ private:
auto dataSink = State_->TypeCtx->DataSinkMap.FindPtr(dataSinkName);
YQL_ENSURE(dataSink);
if (auto dqIntegration = dataSink->Get()->GetDqIntegration()) {
- if (auto canWrite = dqIntegration->CanWrite(*State_->Settings, node, ctx)) {
+ if (auto canWrite = dqIntegration->CanWrite(node, ctx)) {
if (!canWrite.GetRef()) {
good = false;
} else if (!State_->Settings->EnableInsert.Get().GetOrElse(false)) {
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
index a9cfab9cee..e47fd3b2b8 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
@@ -145,7 +145,7 @@ public:
return read;
}
- TMaybe<bool> CanWrite(const TDqSettings&, const TExprNode&, TExprContext&) override {
+ TMaybe<bool> CanWrite(const TExprNode&, TExprContext&) override {
YQL_ENSURE(false, "Unimplemented");
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 40a1d4f494..951aee6dd0 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -193,7 +193,7 @@ struct TEvPrivate {
};
struct TEvReadStarted : public TEventLocal<TEvReadStarted, EvReadStarted> {
- TEvReadStarted(CURLcode curlResponseCode, long httpResponseCode)
+ TEvReadStarted(CURLcode curlResponseCode, long httpResponseCode)
: CurlResponseCode(curlResponseCode), HttpResponseCode(httpResponseCode) {}
const CURLcode CurlResponseCode;
const long HttpResponseCode;
@@ -1287,7 +1287,7 @@ struct TReadBufferCounter {
DecodedBytes += deltaDecodedBytes;
DecodedRows += deltaDecodedRows;
}
-
+
ui64 Value = 0;
const ui64 Limit;
ui64 CoroCount = 0;
@@ -2280,8 +2280,8 @@ public:
void Bootstrap() {
LOG_D("TS3StreamReadActor", "Bootstrap");
QueueBufferCounter = std::make_shared<TReadBufferCounter>(
- ReadActorFactoryCfg.DataInflight,
- TActivationContext::ActorSystem(),
+ ReadActorFactoryCfg.DataInflight,
+ TActivationContext::ActorSystem(),
QueueDataSize,
TaskQueueDataSize,
DownloadPaused,
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index b87bab16ce..91d585082e 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -140,6 +140,16 @@
]
},
{
+ "Name": "TS3Insert",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "S3Insert"},
+ "Children": [
+ {"Index": 0, "Name": "DataSink", "Type": "TS3DataSink"},
+ {"Index": 1, "Name": "Target", "Type": "TS3Target"},
+ {"Index": 2, "Name": "Input", "Type": "TExprBase"}
+ ]
+ },
+ {
"Name": "TS3SinkSettings",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "S3SinkSettings"},
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index cf94e8f102..9000aaef8e 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -29,6 +29,7 @@ public:
AddHandler({TS3Target::CallableName()}, Hndl(&TSelf::HandleTarget));
AddHandler({TS3SinkSettings::CallableName()}, Hndl(&TSelf::HandleSink));
AddHandler({TS3SinkOutput::CallableName()}, Hndl(&TSelf::HandleOutput));
+ AddHandler({TS3Insert::CallableName()}, Hndl(&TSelf::HandleInsert));
}
private:
TStatus HandleCommit(TExprBase input, TExprContext&) {
@@ -84,6 +85,59 @@ private:
return TStatus::Ok;
}
+ TStatus HandleInsert(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureArgsCount(*input, 3U, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureSpecificDataSink(*input->Child(TS3Insert::idx_DataSink), S3ProviderName, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto source = input->Child(TS3Insert::idx_Input);
+ if (!EnsureListType(*source, ctx)) {
+ return TStatus::Error;
+ }
+
+ const TTypeAnnotationNode* sourceType = source->GetTypeAnn()->Cast<TListExprType>()->GetItemType();
+ if (!EnsureStructType(source->Pos(), *sourceType, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto target = input->Child(TS3Insert::idx_Target);
+ if (!TS3Target::Match(target)) {
+ ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target."));
+ return TStatus::Error;
+ }
+
+ TS3Target tgt(target);
+ if (auto settings = tgt.Settings()) {
+ if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
+ const TTypeAnnotationNode* targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ if (!IsSameAnnotation(*targetType, *sourceType)) {
+ ctx.AddError(TIssue(ctx.GetPosition(source->Pos()),
+ TStringBuilder() << "Type mismatch between schema type: " << *targetType
+ << " and actual data type: " << *sourceType << ", diff is: "
+ << GetTypeDiff(*targetType, *sourceType)));
+ return TStatus::Error;
+ }
+ }
+ }
+
+ input->SetTypeAnn(
+ ctx.MakeType<TTupleExprType>(
+ TTypeAnnotationNode::TListType{
+ ctx.MakeType<TListExprType>(
+ ctx.MakeType<TOptionalExprType>(
+ ctx.MakeType<TDataExprType>(EDataSlot::String)
+ )
+ )
+ }
+ )
+ );
+ return TStatus::Ok;
+ }
+
TStatus HandleTarget(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(*input, 2U, 3U, ctx)) {
return TStatus::Error;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 31628ec902..d98ab61154 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -340,6 +340,21 @@ public:
}
}
+ TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) override {
+ Y_UNUSED(ctx);
+ return TS3WriteObject::Match(&write);
+ }
+
+ TExprNode::TPtr WrapWrite(const TExprNode::TPtr& writeNode, TExprContext& ctx) override {
+ TExprBase writeExpr(writeNode);
+ const auto write = writeExpr.Cast<TS3WriteObject>();
+ return Build<TS3Insert>(ctx, write.Pos())
+ .DataSink(write.DataSink())
+ .Target(write.Target())
+ .Input(write.Input())
+ .Done().Ptr();
+ }
+
void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sinkType) override {
const TDqSink sink(&node);
if (const auto maybeSettings = sink.Settings().Maybe<TS3SinkSettings>()) {
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 9feecb6722..61700c459a 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -587,7 +587,7 @@ private:
if (!s3ParseSettingsBase.Paths().Empty()) {
resultSetLimitPerPath /= s3ParseSettingsBase.Paths().Size();
}
-
+
for (auto path : s3ParseSettingsBase.Paths()) {
NS3Details::TPathList directories;
NS3Details::UnpackPathsList(path.Data().Literal().Value(), FromString<bool>(path.IsText().Literal().Value()), directories);
@@ -613,7 +613,7 @@ private:
State_->Configuration->UseConcurrentDirectoryLister.Get().GetOrElse(
State_->Configuration->AllowConcurrentListings),
.MaxResultSet = resultSetLimitPerPath});
-
+
RequestsByNode_[source.Raw()].push_back(req);
PendingRequests_[req] = future;
@@ -755,7 +755,7 @@ private:
.S3Request{.Url = url, .Token = tokenStr},
.FilePattern = effectiveFilePattern,
.Options{
- .IsConcurrentListing = isConcurrentListingEnabled,
+ .IsConcurrentListing = isConcurrentListingEnabled,
.MaxResultSet = std::max(State_->Configuration->MaxDiscoveryFilesPerQuery, State_->Configuration->MaxDirectoriesAndFilesPerQuery)
}};
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
index 7bfad7c4e6..2035ee5936 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
@@ -104,6 +104,7 @@ public:
#define HNDL(name) "PhysicalOptimizer-"#name, Hndl(&TS3PhysicalOptProposalTransformer::name)
AddHandler(0, &TCoLeft::Match, HNDL(TrimReadWorld));
AddHandler(0, &TS3WriteObject::Match, HNDL(S3WriteObject));
+ AddHandler(0, &TS3Insert::Match, HNDL(S3Insert));
#undef HNDL
}
@@ -118,7 +119,276 @@ public:
return TExprBase(maybeRead.Cast().World().Ptr());
}
+ TMaybe<TDqStage> BuildSinkStage(TPositionHandle writePos, TS3DataSink dataSink, TS3Target target, TExprBase input, TExprContext& ctx, const TGetParents& getParents) const {
+ const auto& cluster = dataSink.Cluster().StringValue();
+ const auto token = "cluster:default_" + cluster;
+ const auto& settings = target.Settings().Ref();
+ auto partBy = GetPartitionBy(settings);
+ auto keys = GetPartitionKeys(partBy);
+
+ auto sinkSettingsBuilder = Build<TExprList>(ctx, target.Pos());
+ if (partBy)
+ sinkSettingsBuilder.Add(std::move(partBy));
+
+ auto compression = GetCompression(settings);
+ const auto& extension = GetExtension(target.Format().Value(), compression ? compression->Tail().Content() : ""sv);
+ if (compression)
+ sinkSettingsBuilder.Add(std::move(compression));
+
+ auto sinkOutputSettingsBuilder = Build<TExprList>(ctx, target.Pos());
+ if (auto csvDelimiter = GetCsvDelimiter(settings)) {
+ sinkOutputSettingsBuilder.Add(std::move(csvDelimiter));
+ }
+
+ bool hasDateTimeFormat = false;
+ bool hasDateTimeFormatName = false;
+ bool hasTimestampFormat = false;
+ bool hasTimestampFormatName = false;
+ if (auto dateTimeFormatName = GetDateTimeFormatName(settings)) {
+ sinkOutputSettingsBuilder.Add(std::move(dateTimeFormatName));
+ hasDateTimeFormatName = true;
+ }
+
+ if (auto dateTimeFormat = GetDateTimeFormat(settings)) {
+ sinkOutputSettingsBuilder.Add(std::move(dateTimeFormat));
+ hasDateTimeFormat = true;
+ }
+
+ if (auto timestampFormatName = GetTimestampFormatName(settings)) {
+ sinkOutputSettingsBuilder.Add(std::move(timestampFormatName));
+ hasTimestampFormatName = true;
+ }
+
+ if (auto timestampFormat = GetTimestampFormat(settings)) {
+ sinkOutputSettingsBuilder.Add(std::move(timestampFormat));
+ hasTimestampFormat = true;
+ }
+
+ if (!hasDateTimeFormat && !hasDateTimeFormatName) {
+ TExprNode::TListType pair;
+ pair.push_back(ctx.NewAtom(target.Pos(), "data.datetime.formatname"));
+ pair.push_back(ctx.NewAtom(target.Pos(), "POSIX"));
+ sinkOutputSettingsBuilder.Add(ctx.NewList(target.Pos(), std::move(pair)));
+ }
+
+ if (!hasTimestampFormat && !hasTimestampFormatName) {
+ TExprNode::TListType pair;
+ pair.push_back(ctx.NewAtom(target.Pos(), "data.timestamp.formatname"));
+ pair.push_back(ctx.NewAtom(target.Pos(), "POSIX"));
+ sinkOutputSettingsBuilder.Add(ctx.NewList(target.Pos(), std::move(pair)));
+ }
+
+ const TStringBuf format = target.Format();
+ if (format != "raw" && format != "json_list") { // multipart
+ {
+ TExprNode::TListType pair;
+ pair.push_back(ctx.NewAtom(target.Pos(), "multipart"));
+ pair.push_back(ctx.NewAtom(target.Pos(), "true"));
+ sinkSettingsBuilder.Add(ctx.NewList(target.Pos(), std::move(pair)));
+ }
+ {
+ TExprNode::TListType pair;
+ pair.push_back(ctx.NewAtom(target.Pos(), "file_size_limit"));
+ size_t fileSize = 50_MB;
+ if (const auto& maxObjectSize = State_->Configuration->MaxOutputObjectSize.Get()) {
+ fileSize = *maxObjectSize;
+ }
+ pair.push_back(ctx.NewAtom(target.Pos(), ToString(fileSize)));
+ sinkOutputSettingsBuilder.Add(ctx.NewList(target.Pos(), std::move(pair)));
+ }
+ }
+
+ if (!FindNode(input.Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
+ YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << target.Path().StringValue() << "` as stage with sink.";
+ return keys.empty() ?
+ Build<TDqStage>(ctx, writePos)
+ .Inputs().Build()
+ .Program<TCoLambda>()
+ .Args({})
+ .Body<TS3SinkOutput>()
+ .Input<TCoToFlow>()
+ .Input(input)
+ .Build()
+ .Format(target.Format())
+ .KeyColumns().Build()
+ .Settings(sinkOutputSettingsBuilder.Done())
+ .Build()
+ .Build()
+ .Outputs<TDqStageOutputsList>()
+ .Add<TDqSink>()
+ .DataSink(dataSink)
+ .Index().Value("0").Build()
+ .Settings<TS3SinkSettings>()
+ .Path(target.Path())
+ .Settings(sinkSettingsBuilder.Done())
+ .Token<TCoSecureParam>()
+ .Name().Build(token)
+ .Build()
+ .Extension().Value(extension).Build()
+ .Build()
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Done()
+ :
+ Build<TDqStage>(ctx, writePos)
+ .Inputs()
+ .Add<TDqCnHashShuffle>()
+ .Output<TDqOutput>()
+ .Stage<TDqStage>()
+ .Inputs().Build()
+ .Program<TCoLambda>()
+ .Args({})
+ .Body<TCoToFlow>()
+ .Input(input)
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Build()
+ .Index().Value("0", TNodeFlags::Default).Build()
+ .Build()
+ .KeyColumns().Add(keys).Build()
+ .Build()
+ .Build()
+ .Program<TCoLambda>()
+ .Args({"in"})
+ .Body<TS3SinkOutput>()
+ .Input("in")
+ .Format(target.Format())
+ .KeyColumns().Add(keys).Build()
+ .Settings(sinkOutputSettingsBuilder.Done())
+ .Build()
+ .Build()
+ .Outputs<TDqStageOutputsList>()
+ .Add<TDqSink>()
+ .DataSink(dataSink)
+ .Index().Value("0", TNodeFlags::Default).Build()
+ .Settings<TS3SinkSettings>()
+ .Path(target.Path())
+ .Settings(sinkSettingsBuilder.Done())
+ .Token<TCoSecureParam>()
+ .Name().Build(token)
+ .Build()
+ .Extension().Value(extension).Build()
+ .Build()
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Done();
+ }
+
+ if (!TDqCnUnionAll::Match(input.Raw())) {
+ return Nothing();
+ }
+
+ const TParentsMap* parentsMap = getParents();
+ const auto dqUnion = input.Cast<TDqCnUnionAll>();
+ if (!NDq::IsSingleConsumerConnection(dqUnion, *parentsMap)) {
+ return Nothing();
+ }
+
+ YQL_CLOG(INFO, ProviderS3) << "Rewrite S3WriteObject `" << cluster << "`.`" << target.Path().StringValue() << "` as sink.";
+
+ const auto inputStage = dqUnion.Output().Stage().Cast<TDqStage>();
+
+ const auto sink = Build<TDqSink>(ctx, writePos)
+ .DataSink(dataSink)
+ .Index(dqUnion.Output().Index())
+ .Settings<TS3SinkSettings>()
+ .Path(target.Path())
+ .Settings(sinkSettingsBuilder.Done())
+ .Token<TCoSecureParam>()
+ .Name().Build(token)
+ .Build()
+ .Extension().Value(extension).Build()
+ .Build()
+ .Done();
+
+ auto outputsBuilder = Build<TDqStageOutputsList>(ctx, target.Pos());
+ if (inputStage.Outputs() && keys.empty()) {
+ outputsBuilder.InitFrom(inputStage.Outputs().Cast());
+ }
+ outputsBuilder.Add(sink);
+
+ if (keys.empty()) {
+ const auto outputBuilder = Build<TS3SinkOutput>(ctx, target.Pos())
+ .Input(inputStage.Program().Body().Ptr())
+ .Format(target.Format())
+ .KeyColumns().Add(std::move(keys)).Build()
+ .Settings(sinkOutputSettingsBuilder.Done())
+ .Done();
+
+ return Build<TDqStage>(ctx, writePos)
+ .InitFrom(inputStage)
+ .Program(ctx.DeepCopyLambda(inputStage.Program().Ref(), outputBuilder.Ptr()))
+ .Outputs(outputsBuilder.Done())
+ .Done();
+ } else {
+ return Build<TDqStage>(ctx, writePos)
+ .Inputs()
+ .Add<TDqCnHashShuffle>()
+ .Output<TDqOutput>()
+ .Stage(inputStage)
+ .Index(dqUnion.Output().Index())
+ .Build()
+ .KeyColumns().Add(keys).Build()
+ .Build()
+ .Build()
+ .Program<TCoLambda>()
+ .Args({"in"})
+ .Body<TS3SinkOutput>()
+ .Input("in")
+ .Format(target.Format())
+ .KeyColumns().Add(std::move(keys)).Build()
+ .Settings(sinkOutputSettingsBuilder.Done())
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Outputs(outputsBuilder.Done())
+ .Done();
+ }
+ }
+
+ TMaybeNode<TExprBase> S3Insert(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
+ auto insert = node.Cast<TS3Insert>();
+ TMaybe<TDqStage> stage
+ = BuildSinkStage(node.Pos(),
+ insert.DataSink(),
+ insert.Target(),
+ insert.Input(),
+ ctx,
+ getParents);
+
+ if (stage) {
+ return *stage;
+ } else {
+ return node;
+ }
+ }
+
TMaybeNode<TExprBase> S3WriteObject(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
+ auto write = node.Cast<TS3WriteObject>();
+ TMaybe<TDqStage> stage
+ = BuildSinkStage(node.Pos(),
+ write.DataSink(),
+ write.Target(),
+ write.Input(),
+ ctx,
+ getParents);
+
+ if (stage) {
+ return Build<TDqQuery>(ctx, write.Pos())
+ .World(write.World())
+ .SinkStages()
+ .Add(*stage)
+ .Build()
+ .Done();
+ } else {
+ return node;
+ }
+ }
+
+ TMaybeNode<TExprBase> S3WriteObject1(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
const auto& write = node.Cast<TS3WriteObject>();
const auto& targetNode = write.Target();
const auto& cluster = write.DataSink().Cluster().StringValue();
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
index 33be2f8fdd..cb82aec4f5 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
@@ -87,7 +87,7 @@ public:
YQL_ENSURE(false, "Unimplemented");
}
- TMaybe<bool> CanWrite(const TDqSettings&, const TExprNode&, TExprContext&) override {
+ TMaybe<bool> CanWrite(const TExprNode&, TExprContext&) override {
YQL_ENSURE(false, "Unimplemented");
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
index 1c21299802..78247608b4 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
@@ -410,7 +410,7 @@ public:
return read;
}
- TMaybe<bool> CanWrite(const TDqSettings&, const TExprNode& node, TExprContext& ctx) override {
+ TMaybe<bool> CanWrite(const TExprNode& node, TExprContext& ctx) override {
if (auto maybeWrite = TMaybeNode<TYtWriteTable>(&node)) {
auto cluster = TString{maybeWrite.Cast().DataSink().Cluster().Value()};
auto tableName = TString{TYtTableInfo::GetTableLabel(maybeWrite.Cast().Table())};