aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-04-04 16:13:03 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-04-04 16:13:03 +0300
commit09551dcd27f59c2f79b95f9a5cef993dae3db587 (patch)
treee84f48eafb2b44bc360301553cad3a9639f2cf91
parent2b8c0531462af1575ad27e634bc6c3febc7a73d1 (diff)
downloadydb-09551dcd27f59c2f79b95f9a5cef993dae3db587.tar.gz
Support DQ graph building with external sources and its runtime
-rw-r--r--ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.cpp7
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h3
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp8
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/executer_actor/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/executer_actor/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp14
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp9
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h23
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h2
-rw-r--r--ydb/core/kqp/node_service/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/node_service/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/node_service/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/node_service/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp15
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.h3
-rw-r--r--ydb/core/kqp/node_service/kqp_node_ut.cpp2
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp9
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp2
-rw-r--r--ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt6
-rw-r--r--ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt6
-rw-r--r--ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt6
-rw-r--r--ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt6
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp43
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp3
-rw-r--r--ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp19
-rw-r--r--ydb/core/protos/kqp_physical.proto11
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h20
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp2
38 files changed, 209 insertions, 48 deletions
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt
index 7ec4d4514bf..256892ad206 100644
--- a/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt
@@ -26,6 +26,8 @@ target_link_libraries(core-kqp-compute_actor PUBLIC
core-tx-datashard
core-tx-scheme_cache
dq-actors-compute
+ providers-common-http_gateway
+ providers-s3-actors
yql-public-issue
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt
index ba8cdee953e..7a896a844d9 100644
--- a/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt
@@ -27,6 +27,8 @@ target_link_libraries(core-kqp-compute_actor PUBLIC
core-tx-datashard
core-tx-scheme_cache
dq-actors-compute
+ providers-common-http_gateway
+ providers-s3-actors
yql-public-issue
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt
index ba8cdee953e..7a896a844d9 100644
--- a/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt
@@ -27,6 +27,8 @@ target_link_libraries(core-kqp-compute_actor PUBLIC
core-tx-datashard
core-tx-scheme_cache
dq-actors-compute
+ providers-common-http_gateway
+ providers-s3-actors
yql-public-issue
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt
index 7ec4d4514bf..256892ad206 100644
--- a/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt
@@ -26,6 +26,8 @@ target_link_libraries(core-kqp-compute_actor PUBLIC
core-tx-datashard
core-tx-scheme_cache
dq-actors-compute
+ providers-common-http_gateway
+ providers-s3-actors
yql-public-issue
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
index 776403a4990..e63c9fdb83f 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
@@ -5,6 +5,8 @@
#include <ydb/core/kqp/runtime/kqp_read_table.h>
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
+#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h>
+#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>
namespace NKikimr {
namespace NMiniKQL {
@@ -50,10 +52,12 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
namespace NKqp {
-NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters) {
+NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters, const NYql::IHTTPGateway::TPtr& httpGateway) {
auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
RegisterStreamLookupActorFactory(*factory, counters);
RegisterKqpReadActor(*factory, counters);
+ RegisterS3ReadActorFactory(*factory, nullptr, httpGateway);
+ RegisterS3WriteActorFactory(*factory, nullptr, httpGateway);
return factory;
}
@@ -93,4 +97,3 @@ void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TK
}
}
} // namespace NKikimr
-
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index 2e99e5bfe67..d1b5c60efea 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -3,6 +3,7 @@
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
namespace NKikimr {
@@ -52,7 +53,7 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
-NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters);
+NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters, const NYql::IHTTPGateway::TPtr& httpGateway);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index eb2e2f4acb2..89a05ebf2c7 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -80,6 +80,14 @@ public:
settings.UseCacheForLLVM = AppData()->FeatureFlags.GetEnableLLVMCache();
settings.AllowGeneratorsInUnboxedValues = false;
+ for (const auto& [paramsName, paramsValue] : GetTask().GetTaskParams()) {
+ settings.TaskParams[paramsName] = paramsValue;
+ }
+
+ for (const auto& [paramsName, paramsValue] : GetTask().GetSecureParams()) {
+ settings.SecureParams[paramsName] = paramsValue;
+ }
+
auto taskRunner = MakeDqTaskRunner(execCtx, settings, logger);
SetTaskRunner(taskRunner);
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 55f73aa5041..4fc1059d8e1 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -169,6 +169,14 @@ public:
settings.UseCacheForLLVM = AppData()->FeatureFlags.GetEnableLLVMCache();
settings.AllowGeneratorsInUnboxedValues = false;
+ for (const auto& [paramsName, paramsValue] : GetTask().GetTaskParams()) {
+ settings.TaskParams[paramsName] = paramsValue;
+ }
+
+ for (const auto& [paramsName, paramsValue] : GetTask().GetSecureParams()) {
+ settings.SecureParams[paramsName] = paramsValue;
+ }
+
NDq::TLogFunc logger;
if (IsDebugLogEnabled(actorSystem, NKikimrServices::KQP_TASKS_RUNNER)) {
logger = [actorSystem, txId = GetTxId(), taskId = GetTask().GetId()](const TString& message) {
diff --git a/ydb/core/kqp/executer_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/executer_actor/CMakeLists.darwin-x86_64.txt
index 4de87f9588a..2775ed801cc 100644
--- a/ydb/core/kqp/executer_actor/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/executer_actor/CMakeLists.darwin-x86_64.txt
@@ -33,6 +33,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC
dq-actors-compute
yql-dq-runtime
yql-dq-tasks
+ providers-common-http_gateway
)
target_sources(core-kqp-executer_actor PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
diff --git a/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt
index bcfd6eb80fa..1811fd78044 100644
--- a/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt
@@ -34,6 +34,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC
dq-actors-compute
yql-dq-runtime
yql-dq-tasks
+ providers-common-http_gateway
)
target_sources(core-kqp-executer_actor PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
diff --git a/ydb/core/kqp/executer_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/executer_actor/CMakeLists.linux-x86_64.txt
index bcfd6eb80fa..1811fd78044 100644
--- a/ydb/core/kqp/executer_actor/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/executer_actor/CMakeLists.linux-x86_64.txt
@@ -34,6 +34,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC
dq-actors-compute
yql-dq-runtime
yql-dq-tasks
+ providers-common-http_gateway
)
target_sources(core-kqp-executer_actor PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
diff --git a/ydb/core/kqp/executer_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/executer_actor/CMakeLists.windows-x86_64.txt
index 4de87f9588a..2775ed801cc 100644
--- a/ydb/core/kqp/executer_actor/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/executer_actor/CMakeLists.windows-x86_64.txt
@@ -33,6 +33,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC
dq-actors-compute
yql-dq-runtime
yql-dq-tasks
+ providers-common-http_gateway
)
target_sources(core-kqp-executer_actor PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 37a6c2a25f2..ca22a1bd79c 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -126,8 +126,10 @@ public:
TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult,
- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig)
+ const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
+ NYql::IHTTPGateway::TPtr httpGateway)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, TWilsonKqp::DataExecuter, "DataExecuter")
+ , HttpGateway(std::move(httpGateway))
, StreamResult(streamResult)
{
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
@@ -1631,7 +1633,7 @@ private:
return false;
};
- auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(Counters->Counters),
+ auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(Counters->Counters, HttpGateway),
AppData()->FunctionRegistry, settings, limits);
auto computeActorId = shareMailbox ? RegisterWithSameMailbox(computeActor) : Register(computeActor);
@@ -1699,6 +1701,9 @@ private:
case NKqpProto::TKqpSource::kReadRangesSource:
readActors += BuildScanTasksFromSource(stageInfo, Request.Snapshot, LockTxId);
break;
+ case NKqpProto::TKqpSource::kExternalSource:
+ BuildReadTasksFromSource(stageInfo);
+ break;
default:
YQL_ENSURE(false, "unknown source type");
}
@@ -2505,6 +2510,7 @@ private:
}
private:
+ NYql::IHTTPGateway::TPtr HttpGateway;
bool StreamResult = false;
bool HasStreamLookup = false;
@@ -2547,9 +2553,9 @@ private:
} // namespace
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
- TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig)
+ TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::IHTTPGateway::TPtr httpGateway)
{
- return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig);
+ return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(httpGateway));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 5cc80581040..ffc106c28f6 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -8,6 +8,7 @@
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/kqp.pb.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
namespace NKikimr {
namespace NKqp {
@@ -83,7 +84,8 @@ struct TEvKqpExecuter {
IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig);
+ const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
+ NYql::IHTTPGateway::TPtr httpGateway);
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral(
IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 466853600f8..1d597aa7584 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -133,12 +133,13 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,
IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig)
+ const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
+ NYql::IHTTPGateway::TPtr httpGateway)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.EraseLocks);
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(httpGateway));
}
TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
@@ -154,13 +155,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
switch (*txsType) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(httpGateway));
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig);
case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(httpGateway));
default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 23e43600b3b..e99e322462f 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -30,6 +30,7 @@
#include <ydb/library/yql/dq/proto/dq_transport.pb.h>
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
#include <ydb/library/yql/dq/runtime/dq_transport.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
@@ -674,6 +675,26 @@ protected:
}
}
+ void BuildReadTasksFromSource(TStageInfo& stageInfo) {
+ const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
+
+ YQL_ENSURE(stage.GetSources(0).HasExternalSource());
+ YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections");
+
+ const auto& stageSource = stage.GetSources(0);
+ const auto& externalSource = stageSource.GetExternalSource();
+ for (const TString& partitionParam : externalSource.GetPartitionedTaskParams()) {
+ auto& task = TasksGraph.AddTask(stageInfo);
+
+ auto& input = task.Inputs[stageSource.GetInputIndex()];
+ input.ConnectionInfo = NYql::NDq::TSourceInput{};
+ input.SourceSettings = externalSource.GetSettings();
+ input.SourceType = externalSource.GetType();
+
+ task.Meta.DqTaskParams.emplace(externalSource.GetTaskParamKey(), partitionParam);
+ }
+ }
+
size_t BuildScanTasksFromSource(TStageInfo& stageInfo, IKqpGateway::TKqpSnapshot snapshot, const TMaybe<ui64> lockTxId = {}) {
THashMap<ui64, std::vector<ui64>> nodeTasks;
THashMap<ui64, ui64> assignedShardsCount;
@@ -1061,7 +1082,7 @@ private:
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig);
+ const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::IHTTPGateway::TPtr httpGateway);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 011ea1350cb..43f0a29060a 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -616,6 +616,14 @@ NYql::NDqProto::TDqTask SerializeTaskToProto(
result.SetId(task.Id);
result.SetStageId(stageInfo.Id.StageId);
+ for (const auto& [paramName, paramValue] : task.Meta.DqTaskParams) {
+ (*result.MutableTaskParams())[paramName] = paramValue;
+ }
+
+ for (const auto& [paramName, paramValue] : task.Meta.DqSecureParams) {
+ (*result.MutableSecureParams())[paramName] = paramValue;
+ }
+
for (const auto& input : task.Inputs) {
FillInputDesc(tasksGraph, resultChannelProxies, *result.AddInputs(), input);
}
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index 5af143faf32..6cfe34f4bf1 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -124,6 +124,8 @@ struct TTaskMeta {
TActorId ExecuterId;
TMap<TString, NYql::NDqProto::TData> Params;
+ THashMap<TString, TString> DqTaskParams; // Params for sources/sinks
+ THashMap<TString, TString> DqSecureParams;
struct TColumn {
ui32 Id = 0;
diff --git a/ydb/core/kqp/node_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/node_service/CMakeLists.darwin-x86_64.txt
index 3656f42ea20..0a67653156a 100644
--- a/ydb/core/kqp/node_service/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/node_service/CMakeLists.darwin-x86_64.txt
@@ -25,6 +25,7 @@ target_link_libraries(core-kqp-node_service PUBLIC
ydb-core-protos
ydb-core-tablet
dq-actors-compute
+ providers-common-http_gateway
)
target_sources(core-kqp-node_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/node_service/kqp_node_service.cpp
diff --git a/ydb/core/kqp/node_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/node_service/CMakeLists.linux-aarch64.txt
index 809b67173cf..712794bc6be 100644
--- a/ydb/core/kqp/node_service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/node_service/CMakeLists.linux-aarch64.txt
@@ -26,6 +26,7 @@ target_link_libraries(core-kqp-node_service PUBLIC
ydb-core-protos
ydb-core-tablet
dq-actors-compute
+ providers-common-http_gateway
)
target_sources(core-kqp-node_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/node_service/kqp_node_service.cpp
diff --git a/ydb/core/kqp/node_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/node_service/CMakeLists.linux-x86_64.txt
index 809b67173cf..712794bc6be 100644
--- a/ydb/core/kqp/node_service/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/node_service/CMakeLists.linux-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(core-kqp-node_service PUBLIC
ydb-core-protos
ydb-core-tablet
dq-actors-compute
+ providers-common-http_gateway
)
target_sources(core-kqp-node_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/node_service/kqp_node_service.cpp
diff --git a/ydb/core/kqp/node_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/node_service/CMakeLists.windows-x86_64.txt
index 3656f42ea20..0a67653156a 100644
--- a/ydb/core/kqp/node_service/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/node_service/CMakeLists.windows-x86_64.txt
@@ -25,6 +25,7 @@ target_link_libraries(core-kqp-node_service PUBLIC
ydb-core-protos
ydb-core-tablet
dq-actors-compute
+ providers-common-http_gateway
)
target_sources(core-kqp-node_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/node_service/kqp_node_service.cpp
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index ef60b69503a..871df1546d5 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -88,10 +88,12 @@ public:
}
TKqpNodeService(const NKikimrConfig::TTableServiceConfig& config, const TIntrusivePtr<TKqpCounters>& counters,
- IKqpNodeComputeActorFactory* caFactory)
+ IKqpNodeComputeActorFactory* caFactory, NYql::IHTTPGateway::TPtr httpGateway)
: Config(config.GetResourceManager())
, Counters(counters)
- , CaFactory(caFactory) {}
+ , CaFactory(caFactory)
+ , HttpGateway(std::move(httpGateway))
+ {}
void Bootstrap() {
LOG_I("Starting KQP Node service");
@@ -324,12 +326,12 @@ private:
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask),
- CreateKqpAsyncIoFactory(Counters), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy,
+ CreateKqpAsyncIoFactory(Counters, HttpGateway), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy,
Counters, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
if (Y_LIKELY(!CaFactory)) {
- computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(Counters),
+ computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(Counters, HttpGateway),
AppData()->FunctionRegistry, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
@@ -512,6 +514,7 @@ private:
TIntrusivePtr<TKqpCounters> Counters;
IKqpNodeComputeActorFactory* CaFactory;
NRm::IKqpResourceManager* ResourceManager_ = nullptr;
+ NYql::IHTTPGateway::TPtr HttpGateway;
//state sharded by TxId
std::array<NKqpNode::TState, BucketsCount> Buckets;
@@ -521,9 +524,9 @@ private:
} // anonymous namespace
IActor* CreateKqpNodeService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
- TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory)
+ TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory, NYql::IHTTPGateway::TPtr httpGateway)
{
- return new TKqpNodeService(tableServiceConfig, counters, caFactory);
+ return new TKqpNodeService(tableServiceConfig, counters, caFactory, std::move(httpGateway));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/node_service/kqp_node_service.h b/ydb/core/kqp/node_service/kqp_node_service.h
index f9f4c7f1562..56ea1361d24 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.h
+++ b/ydb/core/kqp/node_service/kqp_node_service.h
@@ -5,6 +5,7 @@
#include <ydb/core/protos/config.pb.h>
#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <library/cpp/actors/core/actor.h>
@@ -65,7 +66,7 @@ struct IKqpNodeComputeActorFactory {
};
NActors::IActor* CreateKqpNodeService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
- TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory = nullptr);
+ TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory = nullptr, NYql::IHTTPGateway::TPtr httpGateway = nullptr);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp
index 88a852aed7f..64719646e0f 100644
--- a/ydb/core/kqp/node_service/kqp_node_ut.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp
@@ -170,7 +170,7 @@ public:
Runtime->EnableScheduleForActor(ResourceManagerActorId, true);
WaitForBootstrap();
- auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get());
+ auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), NYql::IHTTPGateway::Make());
KqpNodeActorId = Runtime->Register(kqpNode);
Runtime->EnableScheduleForActor(KqpNodeActorId, true);
WaitForBootstrap();
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index 50317b5da9b..33447c05f70 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -863,8 +863,13 @@ private:
for (const auto& input : expr.Cast<TDqStageBase>().Inputs()) {
if (auto source = input.Maybe<TDqSource>()) {
auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>();
- YQL_ENSURE(settings.IsValid(), "only readranges sources are supported");
- Visit(settings.Cast(), stagePlanNode);
+ if (settings.IsValid()) {
+ Visit(settings.Cast(), stagePlanNode);
+ } else {
+ TOperator op;
+ op.Properties["Name"] = TString(source.Cast().DataSource().Cast<TCoDataSource>().Category().Value());
+ AddOperator(stagePlanNode, "Source", op);
+ }
} else {
auto inputCn = input.Cast<TDqConnection>();
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 5c982d859b6..3a38836c398 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -207,7 +207,7 @@ public:
TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService(
MakeKqpCompileServiceID(SelfId().NodeId()), CompileService);
- KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, Counters));
+ KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, Counters, nullptr, HttpGateway));
TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService(
MakeKqpNodeServiceID(SelfId().NodeId()), KqpNodeService);
diff --git a/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt
index c848e589bdd..ecf0c85728a 100644
--- a/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt
@@ -18,6 +18,12 @@ target_link_libraries(core-kqp-query_compiler PUBLIC
core-kqp-common
ydb-core-protos
ydb-library-mkql_proto
+ yql-dq-integration
+ yql-dq-opt
+ yql-dq-tasks
+ library-yql-minikql
+ providers-common-mkql
+ providers-dq-common
)
target_sources(core-kqp-query_compiler PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt
index bd6045c8f04..cf0d3c30252 100644
--- a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt
@@ -19,6 +19,12 @@ target_link_libraries(core-kqp-query_compiler PUBLIC
core-kqp-common
ydb-core-protos
ydb-library-mkql_proto
+ yql-dq-integration
+ yql-dq-opt
+ yql-dq-tasks
+ library-yql-minikql
+ providers-common-mkql
+ providers-dq-common
)
target_sources(core-kqp-query_compiler PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt
index bd6045c8f04..cf0d3c30252 100644
--- a/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt
@@ -19,6 +19,12 @@ target_link_libraries(core-kqp-query_compiler PUBLIC
core-kqp-common
ydb-core-protos
ydb-library-mkql_proto
+ yql-dq-integration
+ yql-dq-opt
+ yql-dq-tasks
+ library-yql-minikql
+ providers-common-mkql
+ providers-dq-common
)
target_sources(core-kqp-query_compiler PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
diff --git a/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt
index c848e589bdd..ecf0c85728a 100644
--- a/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt
@@ -18,6 +18,12 @@ target_link_libraries(core-kqp-query_compiler PUBLIC
core-kqp-common
ydb-core-protos
ydb-library-mkql_proto
+ yql-dq-integration
+ yql-dq-opt
+ yql-dq-tasks
+ library-yql-minikql
+ providers-common-mkql
+ providers-dq-common
)
target_sources(core-kqp-query_compiler PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 51f4b03298e..1e16ec91c57 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -11,10 +11,13 @@
#include <ydb/core/tx/schemeshard/schemeshard_utils.h>
#include <ydb/library/mkql_proto/mkql_proto.h>
+#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
#include <ydb/library/yql/dq/opt/dq_opt.h>
#include <ydb/library/yql/dq/tasks/dq_task_program.h>
-#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
+#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
namespace NKikimr {
namespace NKqp {
@@ -439,6 +442,7 @@ public:
, TypeEnv(Alloc)
, KqlCtx(cluster, tablesData, TypeEnv, FuncRegistry)
, KqlCompiler(CreateKqlCompiler(KqlCtx, typesCtx))
+ , TypesCtx(typesCtx)
{
Alloc.Release();
}
@@ -556,7 +560,7 @@ private:
if (input.Maybe<TDqSource>()) {
auto* protoSource = stageProto.AddSources();
- FillSource(input.Cast<TDqSource>(), protoSource, true, tablesMap);
+ FillSource(input.Cast<TDqSource>(), protoSource, true, tablesMap, ctx);
protoSource->SetInputIndex(inputIndex);
} else {
YQL_ENSURE(input.Maybe<TDqConnection>());
@@ -805,7 +809,7 @@ private:
}
}
- void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns,
+ void FillKqpSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns,
THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
{
if (auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>()) {
@@ -864,7 +868,37 @@ private:
}
}
} else {
- YQL_ENSURE(false, "unsupported source type");
+ YQL_ENSURE(false, "Unsupported source type");
+ }
+ }
+
+ void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns,
+ THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, TExprContext& ctx)
+ {
+ const TStringBuf dataSourceCategory = source.DataSource().Cast<TCoDataSource>().Category().Value();
+ if (dataSourceCategory == NYql::KikimrProviderName || dataSourceCategory == NYql::YdbProviderName || dataSourceCategory == NYql::KqpReadRangesSourceName) {
+ FillKqpSource(source, protoSource, allowSystemColumns, tablesMap);
+ } else {
+ // Delegate source filling to dq integration of specific provider
+ const auto provider = TypesCtx.DataSourceMap.find(dataSourceCategory);
+ YQL_ENSURE(provider != TypesCtx.DataSourceMap.end(), "Unsupported data source category: \"" << dataSourceCategory << "\"");
+ NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration();
+ YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\"");
+ auto& externalSource = *protoSource->MutableExternalSource();
+ google::protobuf::Any& settings = *externalSource.MutableSettings();
+ TString& sourceType = *externalSource.MutableType();
+ dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType);
+ YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node");
+ YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node");
+
+ // Partitioning
+ TVector<TString> partitionParams;
+ TString clusterName;
+ dqIntegration->Partition(NYql::TDqSettings(), NYql::TDqSettings::TDefault::MaxTasksPerStage, source.Ref(), partitionParams, &clusterName, ctx, false);
+ externalSource.SetTaskParamKey(TString(dataSourceCategory));
+ for (const TString& partitionParam : partitionParams) {
+ externalSource.AddPartitionedTaskParams(partitionParam);
+ }
}
}
@@ -999,6 +1033,7 @@ private:
NMiniKQL::TTypeEnvironment TypeEnv;
TKqlCompileContext KqlCtx;
TIntrusivePtr<NCommon::IMkqlCallableCompiler> KqlCompiler;
+ TTypeAnnotationContext& TypesCtx;
};
} // namespace
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 3d82733b327..16bc1f3d6fb 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -977,7 +977,8 @@ public:
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
- RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig());
+ RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig(),
+ HttpGateway);
auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
index 69471a059a9..b64aad7b937 100644
--- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
@@ -42,6 +42,7 @@ void CreateBucketWithObject(const TString& bucket, const TString& object, const
{
Aws::S3::Model::CreateBucketRequest req;
req.SetBucket(bucket);
+ req.SetACL(Aws::S3::Model::BucketCannedACL::public_read_write);
const Aws::S3::Model::CreateBucketOutcome result = s3Client.CreateBucket(req);
UNIT_ASSERT_C(result.IsSuccess(), "Error creating bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
}
@@ -83,22 +84,22 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
Sleep(TDuration::MilliSeconds(50));
TAsyncFetchScriptResultsResult future = db.FetchScriptResults(executeScrptsResult.Metadata().ExecutionId);
results.ConstructInPlace(future.ExtractValueSync());
- ////////////////////////////////////////////////////////////////////////// tmp return; // YQ-1636
- if (results->GetStatus() == NYdb::EStatus::INTERNAL_ERROR) {
- UNIT_ASSERT_STRING_CONTAINS(results->GetIssues().ToOneLineString(), "unsupported source type");
- return;
- }
- ////////////////////////////////////////////////////////////////////////// tmp return; // YQ-1636
if (!results->IsSuccess()) {
UNIT_ASSERT_C(results->GetStatus() == NYdb::EStatus::BAD_REQUEST, results->GetStatus() << ": " << results->GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(results->GetIssues().ToOneLineString(), "Results are not ready");
}
} while (!results->HasResultSet());
TResultSetParser resultSet(results->ExtractResultSet());
- UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
- UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
+
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
+
UNIT_ASSERT(resultSet.TryNextRow());
- UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
}
}
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 1a593cbf3b0..f1359d54abb 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -4,6 +4,7 @@ option cc_enable_arenas = true;
package NKqpProto;
option java_package = "ru.yandex.kikimr.proto";
+import "google/protobuf/any.proto";
import "ydb/library/mkql_proto/protos/minikql.proto";
import "ydb/library/yql/dq/proto/dq_tasks.proto";
import "ydb/public/api/protos/ydb_value.proto";
@@ -281,11 +282,21 @@ message TKqpReadRangesSource {
repeated string SkipNullKeys = 8;
}
+message TKqpExternalSource {
+ string Type = 1;
+ google.protobuf.Any Settings = 2;
+
+ // Partitioning
+ string TaskParamKey = 3;
+ repeated string PartitionedTaskParams = 4;
+}
+
message TKqpSource {
uint32 InputIndex = 1;
oneof Type {
TKqpReadRangesSource ReadRangesSource = 3;
+ TKqpExternalSource ExternalSource = 4;
}
};
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 39fb80aead3..0b56f2b15b2 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -575,17 +575,21 @@ protected:
void FillExtraData(NDqProto::TEvComputeActorState& state) {
auto* extraData = state.MutableExtraData();
for (auto& [index, input] : SourcesMap) {
- if (auto data = input.AsyncInput->ExtraData()) {
- auto* entry = extraData->AddSourcesExtraData();
- entry->SetIndex(index);
- entry->MutableData()->CopyFrom(*data);
+ if (input.AsyncInput) {
+ if (auto data = input.AsyncInput->ExtraData()) {
+ auto* entry = extraData->AddSourcesExtraData();
+ entry->SetIndex(index);
+ entry->MutableData()->CopyFrom(*data);
+ }
}
}
for (auto& [index, input] : InputTransformsMap) {
- if (auto data = input.AsyncInput->ExtraData()) {
- auto* entry = extraData->AddInputTransformsData();
- entry->SetIndex(index);
- entry->MutableData()->CopyFrom(*data);
+ if (input.AsyncInput) {
+ if (auto data = input.AsyncInput->ExtraData()) {
+ auto* entry = extraData->AddInputTransformsData();
+ entry->SetIndex(index);
+ entry->MutableData()->CopyFrom(*data);
+ }
}
}
}
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index 9365589705c..d04939a2a7c 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -175,4 +175,6 @@ message TDqTask {
string RateLimiter = 10;
string RateLimiterResource = 11;
uint64 InitialTaskMemoryLimit = 12;
+ map<string, bytes> TaskParams = 13;
+ map<string, string> SecureParams = 14;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 1d8da023f2b..420d42bbcc8 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -61,10 +61,8 @@ public:
}
ui64 Partition(const TDqSettings&, size_t maxPartitions, const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, bool) override {
- TString cluster;
std::vector<std::vector<TPath>> parts;
if (const TMaybeNode<TDqSource> source = &node) {
- cluster = source.Cast().DataSource().Cast<TS3DataSource>().Cluster().Value();
const auto settings = source.Cast().Settings().Cast<TS3SourceSettingsBase>();
for (auto i = 0u; i < settings.Paths().Size(); ++i) {
const auto& packed = settings.Paths().Item(i);