diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-04-04 16:13:03 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-04-04 16:13:03 +0300 |
commit | 09551dcd27f59c2f79b95f9a5cef993dae3db587 (patch) | |
tree | e84f48eafb2b44bc360301553cad3a9639f2cf91 | |
parent | 2b8c0531462af1575ad27e634bc6c3febc7a73d1 (diff) | |
download | ydb-09551dcd27f59c2f79b95f9a5cef993dae3db587.tar.gz |
Support DQ graph building with external sources and its runtime
38 files changed, 209 insertions, 48 deletions
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt index 7ec4d4514bf..256892ad206 100644 --- a/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt @@ -26,6 +26,8 @@ target_link_libraries(core-kqp-compute_actor PUBLIC core-tx-datashard core-tx-scheme_cache dq-actors-compute + providers-common-http_gateway + providers-s3-actors yql-public-issue tools-enum_parser-enum_serialization_runtime ) diff --git a/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt index ba8cdee953e..7a896a844d9 100644 --- a/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt @@ -27,6 +27,8 @@ target_link_libraries(core-kqp-compute_actor PUBLIC core-tx-datashard core-tx-scheme_cache dq-actors-compute + providers-common-http_gateway + providers-s3-actors yql-public-issue tools-enum_parser-enum_serialization_runtime ) diff --git a/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt index ba8cdee953e..7a896a844d9 100644 --- a/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt @@ -27,6 +27,8 @@ target_link_libraries(core-kqp-compute_actor PUBLIC core-tx-datashard core-tx-scheme_cache dq-actors-compute + providers-common-http_gateway + providers-s3-actors yql-public-issue tools-enum_parser-enum_serialization_runtime ) diff --git a/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt index 7ec4d4514bf..256892ad206 100644 --- a/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt @@ -26,6 +26,8 @@ target_link_libraries(core-kqp-compute_actor PUBLIC core-tx-datashard core-tx-scheme_cache dq-actors-compute + providers-common-http_gateway + providers-s3-actors yql-public-issue tools-enum_parser-enum_serialization_runtime ) diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 776403a4990..e63c9fdb83f 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -5,6 +5,8 @@ #include <ydb/core/kqp/runtime/kqp_read_table.h> #include <ydb/core/kqp/runtime/kqp_read_actor.h> #include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h> +#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h> +#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h> namespace NKikimr { namespace NMiniKQL { @@ -50,10 +52,12 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput namespace NKqp { -NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters) { +NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters, const NYql::IHTTPGateway::TPtr& httpGateway) { auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); RegisterStreamLookupActorFactory(*factory, counters); RegisterKqpReadActor(*factory, counters); + RegisterS3ReadActorFactory(*factory, nullptr, httpGateway); + RegisterS3WriteActorFactory(*factory, nullptr, httpGateway); return factory; } @@ -93,4 +97,3 @@ void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TK } } } // namespace NKikimr - diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 2e99e5bfe67..d1b5c60efea 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -3,6 +3,7 @@ #include <ydb/core/kqp/compute_actor/kqp_compute_events.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/core/scheme/scheme_tabledefs.h> namespace NKikimr { @@ -52,7 +53,7 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId); -NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters); +NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters, const NYql::IHTTPGateway::TPtr& httpGateway); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index eb2e2f4acb2..89a05ebf2c7 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -80,6 +80,14 @@ public: settings.UseCacheForLLVM = AppData()->FeatureFlags.GetEnableLLVMCache(); settings.AllowGeneratorsInUnboxedValues = false; + for (const auto& [paramsName, paramsValue] : GetTask().GetTaskParams()) { + settings.TaskParams[paramsName] = paramsValue; + } + + for (const auto& [paramsName, paramsValue] : GetTask().GetSecureParams()) { + settings.SecureParams[paramsName] = paramsValue; + } + auto taskRunner = MakeDqTaskRunner(execCtx, settings, logger); SetTaskRunner(taskRunner); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 55f73aa5041..4fc1059d8e1 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -169,6 +169,14 @@ public: settings.UseCacheForLLVM = AppData()->FeatureFlags.GetEnableLLVMCache(); settings.AllowGeneratorsInUnboxedValues = false; + for (const auto& [paramsName, paramsValue] : GetTask().GetTaskParams()) { + settings.TaskParams[paramsName] = paramsValue; + } + + for (const auto& [paramsName, paramsValue] : GetTask().GetSecureParams()) { + settings.SecureParams[paramsName] = paramsValue; + } + NDq::TLogFunc logger; if (IsDebugLogEnabled(actorSystem, NKikimrServices::KQP_TASKS_RUNNER)) { logger = [actorSystem, txId = GetTxId(), taskId = GetTask().GetId()](const TString& message) { diff --git a/ydb/core/kqp/executer_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/executer_actor/CMakeLists.darwin-x86_64.txt index 4de87f9588a..2775ed801cc 100644 --- a/ydb/core/kqp/executer_actor/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/executer_actor/CMakeLists.darwin-x86_64.txt @@ -33,6 +33,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC dq-actors-compute yql-dq-runtime yql-dq-tasks + providers-common-http_gateway ) target_sources(core-kqp-executer_actor PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/executer_actor/kqp_data_executer.cpp diff --git a/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt index bcfd6eb80fa..1811fd78044 100644 --- a/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/executer_actor/CMakeLists.linux-aarch64.txt @@ -34,6 +34,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC dq-actors-compute yql-dq-runtime yql-dq-tasks + providers-common-http_gateway ) target_sources(core-kqp-executer_actor PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/executer_actor/kqp_data_executer.cpp diff --git a/ydb/core/kqp/executer_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/executer_actor/CMakeLists.linux-x86_64.txt index bcfd6eb80fa..1811fd78044 100644 --- a/ydb/core/kqp/executer_actor/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/executer_actor/CMakeLists.linux-x86_64.txt @@ -34,6 +34,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC dq-actors-compute yql-dq-runtime yql-dq-tasks + providers-common-http_gateway ) target_sources(core-kqp-executer_actor PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/executer_actor/kqp_data_executer.cpp diff --git a/ydb/core/kqp/executer_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/executer_actor/CMakeLists.windows-x86_64.txt index 4de87f9588a..2775ed801cc 100644 --- a/ydb/core/kqp/executer_actor/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/executer_actor/CMakeLists.windows-x86_64.txt @@ -33,6 +33,7 @@ target_link_libraries(core-kqp-executer_actor PUBLIC dq-actors-compute yql-dq-runtime yql-dq-tasks + providers-common-http_gateway ) target_sources(core-kqp-executer_actor PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/executer_actor/kqp_data_executer.cpp diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 37a6c2a25f2..ca22a1bd79c 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -126,8 +126,10 @@ public: TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig) + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + NYql::IHTTPGateway::TPtr httpGateway) : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, TWilsonKqp::DataExecuter, "DataExecuter") + , HttpGateway(std::move(httpGateway)) , StreamResult(streamResult) { YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED); @@ -1631,7 +1633,7 @@ private: return false; }; - auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(Counters->Counters), + auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(Counters->Counters, HttpGateway), AppData()->FunctionRegistry, settings, limits); auto computeActorId = shareMailbox ? RegisterWithSameMailbox(computeActor) : Register(computeActor); @@ -1699,6 +1701,9 @@ private: case NKqpProto::TKqpSource::kReadRangesSource: readActors += BuildScanTasksFromSource(stageInfo, Request.Snapshot, LockTxId); break; + case NKqpProto::TKqpSource::kExternalSource: + BuildReadTasksFromSource(stageInfo); + break; default: YQL_ENSURE(false, "unknown source type"); } @@ -2505,6 +2510,7 @@ private: } private: + NYql::IHTTPGateway::TPtr HttpGateway; bool StreamResult = false; bool HasStreamLookup = false; @@ -2547,9 +2553,9 @@ private: } // namespace IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, - TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig) + TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::IHTTPGateway::TPtr httpGateway) { - return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig); + return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(httpGateway)); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 5cc80581040..ffc106c28f6 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -8,6 +8,7 @@ #include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <ydb/core/protos/config.pb.h> #include <ydb/core/protos/kqp.pb.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> namespace NKikimr { namespace NKqp { @@ -83,7 +84,8 @@ struct TEvKqpExecuter { IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig); + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + NYql::IHTTPGateway::TPtr httpGateway); std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral( IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 466853600f8..1d597aa7584 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -133,12 +133,13 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken, IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig) + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + NYql::IHTTPGateway::TPtr httpGateway) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction YQL_ENSURE(request.EraseLocks); - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(httpGateway)); } TMaybe<NKqpProto::TKqpPhyTx::EType> txsType; @@ -154,13 +155,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt switch (*txsType) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(httpGateway)); case NKqpProto::TKqpPhyTx::TYPE_SCAN: return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(httpGateway)); default: YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 23e43600b3b..e99e322462f 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -30,6 +30,7 @@ #include <ydb/library/yql/dq/proto/dq_transport.pb.h> #include <ydb/library/yql/dq/proto/dq_tasks.pb.h> #include <ydb/library/yql/dq/runtime/dq_transport.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/library/yql/public/issue/yql_issue.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> @@ -674,6 +675,26 @@ protected: } } + void BuildReadTasksFromSource(TStageInfo& stageInfo) { + const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); + + YQL_ENSURE(stage.GetSources(0).HasExternalSource()); + YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections"); + + const auto& stageSource = stage.GetSources(0); + const auto& externalSource = stageSource.GetExternalSource(); + for (const TString& partitionParam : externalSource.GetPartitionedTaskParams()) { + auto& task = TasksGraph.AddTask(stageInfo); + + auto& input = task.Inputs[stageSource.GetInputIndex()]; + input.ConnectionInfo = NYql::NDq::TSourceInput{}; + input.SourceSettings = externalSource.GetSettings(); + input.SourceType = externalSource.GetType(); + + task.Meta.DqTaskParams.emplace(externalSource.GetTaskParamKey(), partitionParam); + } + } + size_t BuildScanTasksFromSource(TStageInfo& stageInfo, IKqpGateway::TKqpSnapshot snapshot, const TMaybe<ui64> lockTxId = {}) { THashMap<ui64, std::vector<ui64>> nodeTasks; THashMap<ui64, ui64> assignedShardsCount; @@ -1061,7 +1082,7 @@ private: IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig); + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::IHTTPGateway::TPtr httpGateway); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 011ea1350cb..43f0a29060a 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -616,6 +616,14 @@ NYql::NDqProto::TDqTask SerializeTaskToProto( result.SetId(task.Id); result.SetStageId(stageInfo.Id.StageId); + for (const auto& [paramName, paramValue] : task.Meta.DqTaskParams) { + (*result.MutableTaskParams())[paramName] = paramValue; + } + + for (const auto& [paramName, paramValue] : task.Meta.DqSecureParams) { + (*result.MutableSecureParams())[paramName] = paramValue; + } + for (const auto& input : task.Inputs) { FillInputDesc(tasksGraph, resultChannelProxies, *result.AddInputs(), input); } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 5af143faf32..6cfe34f4bf1 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -124,6 +124,8 @@ struct TTaskMeta { TActorId ExecuterId; TMap<TString, NYql::NDqProto::TData> Params; + THashMap<TString, TString> DqTaskParams; // Params for sources/sinks + THashMap<TString, TString> DqSecureParams; struct TColumn { ui32 Id = 0; diff --git a/ydb/core/kqp/node_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/node_service/CMakeLists.darwin-x86_64.txt index 3656f42ea20..0a67653156a 100644 --- a/ydb/core/kqp/node_service/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/node_service/CMakeLists.darwin-x86_64.txt @@ -25,6 +25,7 @@ target_link_libraries(core-kqp-node_service PUBLIC ydb-core-protos ydb-core-tablet dq-actors-compute + providers-common-http_gateway ) target_sources(core-kqp-node_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/node_service/kqp_node_service.cpp diff --git a/ydb/core/kqp/node_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/node_service/CMakeLists.linux-aarch64.txt index 809b67173cf..712794bc6be 100644 --- a/ydb/core/kqp/node_service/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/node_service/CMakeLists.linux-aarch64.txt @@ -26,6 +26,7 @@ target_link_libraries(core-kqp-node_service PUBLIC ydb-core-protos ydb-core-tablet dq-actors-compute + providers-common-http_gateway ) target_sources(core-kqp-node_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/node_service/kqp_node_service.cpp diff --git a/ydb/core/kqp/node_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/node_service/CMakeLists.linux-x86_64.txt index 809b67173cf..712794bc6be 100644 --- a/ydb/core/kqp/node_service/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/node_service/CMakeLists.linux-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(core-kqp-node_service PUBLIC ydb-core-protos ydb-core-tablet dq-actors-compute + providers-common-http_gateway ) target_sources(core-kqp-node_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/node_service/kqp_node_service.cpp diff --git a/ydb/core/kqp/node_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/node_service/CMakeLists.windows-x86_64.txt index 3656f42ea20..0a67653156a 100644 --- a/ydb/core/kqp/node_service/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/node_service/CMakeLists.windows-x86_64.txt @@ -25,6 +25,7 @@ target_link_libraries(core-kqp-node_service PUBLIC ydb-core-protos ydb-core-tablet dq-actors-compute + providers-common-http_gateway ) target_sources(core-kqp-node_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/node_service/kqp_node_service.cpp diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index ef60b69503a..871df1546d5 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -88,10 +88,12 @@ public: } TKqpNodeService(const NKikimrConfig::TTableServiceConfig& config, const TIntrusivePtr<TKqpCounters>& counters, - IKqpNodeComputeActorFactory* caFactory) + IKqpNodeComputeActorFactory* caFactory, NYql::IHTTPGateway::TPtr httpGateway) : Config(config.GetResourceManager()) , Counters(counters) - , CaFactory(caFactory) {} + , CaFactory(caFactory) + , HttpGateway(std::move(httpGateway)) + {} void Bootstrap() { LOG_I("Starting KQP Node service"); @@ -324,12 +326,12 @@ private: IActor* computeActor; if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask), - CreateKqpAsyncIoFactory(Counters), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy, + CreateKqpAsyncIoFactory(Counters, HttpGateway), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { if (Y_LIKELY(!CaFactory)) { - computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(Counters), + computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(Counters, HttpGateway), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { @@ -512,6 +514,7 @@ private: TIntrusivePtr<TKqpCounters> Counters; IKqpNodeComputeActorFactory* CaFactory; NRm::IKqpResourceManager* ResourceManager_ = nullptr; + NYql::IHTTPGateway::TPtr HttpGateway; //state sharded by TxId std::array<NKqpNode::TState, BucketsCount> Buckets; @@ -521,9 +524,9 @@ private: } // anonymous namespace IActor* CreateKqpNodeService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig, - TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory) + TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory, NYql::IHTTPGateway::TPtr httpGateway) { - return new TKqpNodeService(tableServiceConfig, counters, caFactory); + return new TKqpNodeService(tableServiceConfig, counters, caFactory, std::move(httpGateway)); } } // namespace NKqp diff --git a/ydb/core/kqp/node_service/kqp_node_service.h b/ydb/core/kqp/node_service/kqp_node_service.h index f9f4c7f1562..56ea1361d24 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.h +++ b/ydb/core/kqp/node_service/kqp_node_service.h @@ -5,6 +5,7 @@ #include <ydb/core/protos/config.pb.h> #include <ydb/library/yql/dq/runtime/dq_tasks_runner.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/library/yql/public/issue/yql_issue.h> #include <library/cpp/actors/core/actor.h> @@ -65,7 +66,7 @@ struct IKqpNodeComputeActorFactory { }; NActors::IActor* CreateKqpNodeService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig, - TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory = nullptr); + TIntrusivePtr<TKqpCounters> counters, IKqpNodeComputeActorFactory* caFactory = nullptr, NYql::IHTTPGateway::TPtr httpGateway = nullptr); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp index 88a852aed7f..64719646e0f 100644 --- a/ydb/core/kqp/node_service/kqp_node_ut.cpp +++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp @@ -170,7 +170,7 @@ public: Runtime->EnableScheduleForActor(ResourceManagerActorId, true); WaitForBootstrap(); - auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get()); + auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), NYql::IHTTPGateway::Make()); KqpNodeActorId = Runtime->Register(kqpNode); Runtime->EnableScheduleForActor(KqpNodeActorId, true); WaitForBootstrap(); diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 50317b5da9b..33447c05f70 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -863,8 +863,13 @@ private: for (const auto& input : expr.Cast<TDqStageBase>().Inputs()) { if (auto source = input.Maybe<TDqSource>()) { auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>(); - YQL_ENSURE(settings.IsValid(), "only readranges sources are supported"); - Visit(settings.Cast(), stagePlanNode); + if (settings.IsValid()) { + Visit(settings.Cast(), stagePlanNode); + } else { + TOperator op; + op.Properties["Name"] = TString(source.Cast().DataSource().Cast<TCoDataSource>().Category().Value()); + AddOperator(stagePlanNode, "Source", op); + } } else { auto inputCn = input.Cast<TDqConnection>(); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 5c982d859b6..3a38836c398 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -207,7 +207,7 @@ public: TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( MakeKqpCompileServiceID(SelfId().NodeId()), CompileService); - KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, Counters)); + KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, Counters, nullptr, HttpGateway)); TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( MakeKqpNodeServiceID(SelfId().NodeId()), KqpNodeService); diff --git a/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt index c848e589bdd..ecf0c85728a 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt @@ -18,6 +18,12 @@ target_link_libraries(core-kqp-query_compiler PUBLIC core-kqp-common ydb-core-protos ydb-library-mkql_proto + yql-dq-integration + yql-dq-opt + yql-dq-tasks + library-yql-minikql + providers-common-mkql + providers-dq-common ) target_sources(core-kqp-query_compiler PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt index bd6045c8f04..cf0d3c30252 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt @@ -19,6 +19,12 @@ target_link_libraries(core-kqp-query_compiler PUBLIC core-kqp-common ydb-core-protos ydb-library-mkql_proto + yql-dq-integration + yql-dq-opt + yql-dq-tasks + library-yql-minikql + providers-common-mkql + providers-dq-common ) target_sources(core-kqp-query_compiler PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt index bd6045c8f04..cf0d3c30252 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt @@ -19,6 +19,12 @@ target_link_libraries(core-kqp-query_compiler PUBLIC core-kqp-common ydb-core-protos ydb-library-mkql_proto + yql-dq-integration + yql-dq-opt + yql-dq-tasks + library-yql-minikql + providers-common-mkql + providers-dq-common ) target_sources(core-kqp-query_compiler PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp diff --git a/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt index c848e589bdd..ecf0c85728a 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt @@ -18,6 +18,12 @@ target_link_libraries(core-kqp-query_compiler PUBLIC core-kqp-common ydb-core-protos ydb-library-mkql_proto + yql-dq-integration + yql-dq-opt + yql-dq-tasks + library-yql-minikql + providers-common-mkql + providers-dq-common ) target_sources(core-kqp-query_compiler PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 51f4b03298e..1e16ec91c57 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -11,10 +11,13 @@ #include <ydb/core/tx/schemeshard/schemeshard_utils.h> #include <ydb/library/mkql_proto/mkql_proto.h> +#include <ydb/library/yql/dq/integration/yql_dq_integration.h> #include <ydb/library/yql/dq/opt/dq_opt.h> #include <ydb/library/yql/dq/tasks/dq_task_program.h> -#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h> #include <ydb/library/yql/minikql/mkql_node_serialization.h> +#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> namespace NKikimr { namespace NKqp { @@ -439,6 +442,7 @@ public: , TypeEnv(Alloc) , KqlCtx(cluster, tablesData, TypeEnv, FuncRegistry) , KqlCompiler(CreateKqlCompiler(KqlCtx, typesCtx)) + , TypesCtx(typesCtx) { Alloc.Release(); } @@ -556,7 +560,7 @@ private: if (input.Maybe<TDqSource>()) { auto* protoSource = stageProto.AddSources(); - FillSource(input.Cast<TDqSource>(), protoSource, true, tablesMap); + FillSource(input.Cast<TDqSource>(), protoSource, true, tablesMap, ctx); protoSource->SetInputIndex(inputIndex); } else { YQL_ENSURE(input.Maybe<TDqConnection>()); @@ -805,7 +809,7 @@ private: } } - void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns, + void FillKqpSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) { if (auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>()) { @@ -864,7 +868,37 @@ private: } } } else { - YQL_ENSURE(false, "unsupported source type"); + YQL_ENSURE(false, "Unsupported source type"); + } + } + + void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns, + THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, TExprContext& ctx) + { + const TStringBuf dataSourceCategory = source.DataSource().Cast<TCoDataSource>().Category().Value(); + if (dataSourceCategory == NYql::KikimrProviderName || dataSourceCategory == NYql::YdbProviderName || dataSourceCategory == NYql::KqpReadRangesSourceName) { + FillKqpSource(source, protoSource, allowSystemColumns, tablesMap); + } else { + // Delegate source filling to dq integration of specific provider + const auto provider = TypesCtx.DataSourceMap.find(dataSourceCategory); + YQL_ENSURE(provider != TypesCtx.DataSourceMap.end(), "Unsupported data source category: \"" << dataSourceCategory << "\""); + NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration(); + YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\""); + auto& externalSource = *protoSource->MutableExternalSource(); + google::protobuf::Any& settings = *externalSource.MutableSettings(); + TString& sourceType = *externalSource.MutableType(); + dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType); + YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node"); + YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node"); + + // Partitioning + TVector<TString> partitionParams; + TString clusterName; + dqIntegration->Partition(NYql::TDqSettings(), NYql::TDqSettings::TDefault::MaxTasksPerStage, source.Ref(), partitionParams, &clusterName, ctx, false); + externalSource.SetTaskParamKey(TString(dataSourceCategory)); + for (const TString& partitionParam : partitionParams) { + externalSource.AddPartitionedTaskParams(partitionParam); + } } } @@ -999,6 +1033,7 @@ private: NMiniKQL::TTypeEnvironment TypeEnv; TKqlCompileContext KqlCtx; TIntrusivePtr<NCommon::IMkqlCallableCompiler> KqlCompiler; + TTypeAnnotationContext& TypesCtx; }; } // namespace diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 3d82733b327..16bc1f3d6fb 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -977,7 +977,8 @@ public: auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(), - RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig()); + RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig(), + HttpGateway); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp index 69471a059a9..b64aad7b937 100644 --- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp @@ -42,6 +42,7 @@ void CreateBucketWithObject(const TString& bucket, const TString& object, const { Aws::S3::Model::CreateBucketRequest req; req.SetBucket(bucket); + req.SetACL(Aws::S3::Model::BucketCannedACL::public_read_write); const Aws::S3::Model::CreateBucketOutcome result = s3Client.CreateBucket(req); UNIT_ASSERT_C(result.IsSuccess(), "Error creating bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage()); } @@ -83,22 +84,22 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { Sleep(TDuration::MilliSeconds(50)); TAsyncFetchScriptResultsResult future = db.FetchScriptResults(executeScrptsResult.Metadata().ExecutionId); results.ConstructInPlace(future.ExtractValueSync()); - ////////////////////////////////////////////////////////////////////////// tmp return; // YQ-1636 - if (results->GetStatus() == NYdb::EStatus::INTERNAL_ERROR) { - UNIT_ASSERT_STRING_CONTAINS(results->GetIssues().ToOneLineString(), "unsupported source type"); - return; - } - ////////////////////////////////////////////////////////////////////////// tmp return; // YQ-1636 if (!results->IsSuccess()) { UNIT_ASSERT_C(results->GetStatus() == NYdb::EStatus::BAD_REQUEST, results->GetStatus() << ": " << results->GetIssues().ToString()); UNIT_ASSERT_STRING_CONTAINS(results->GetIssues().ToOneLineString(), "Results are not ready"); } } while (!results->HasResultSet()); TResultSetParser resultSet(results->ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); } } diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 1a593cbf3b0..f1359d54abb 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -4,6 +4,7 @@ option cc_enable_arenas = true; package NKqpProto; option java_package = "ru.yandex.kikimr.proto"; +import "google/protobuf/any.proto"; import "ydb/library/mkql_proto/protos/minikql.proto"; import "ydb/library/yql/dq/proto/dq_tasks.proto"; import "ydb/public/api/protos/ydb_value.proto"; @@ -281,11 +282,21 @@ message TKqpReadRangesSource { repeated string SkipNullKeys = 8; } +message TKqpExternalSource { + string Type = 1; + google.protobuf.Any Settings = 2; + + // Partitioning + string TaskParamKey = 3; + repeated string PartitionedTaskParams = 4; +} + message TKqpSource { uint32 InputIndex = 1; oneof Type { TKqpReadRangesSource ReadRangesSource = 3; + TKqpExternalSource ExternalSource = 4; } }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 39fb80aead3..0b56f2b15b2 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -575,17 +575,21 @@ protected: void FillExtraData(NDqProto::TEvComputeActorState& state) { auto* extraData = state.MutableExtraData(); for (auto& [index, input] : SourcesMap) { - if (auto data = input.AsyncInput->ExtraData()) { - auto* entry = extraData->AddSourcesExtraData(); - entry->SetIndex(index); - entry->MutableData()->CopyFrom(*data); + if (input.AsyncInput) { + if (auto data = input.AsyncInput->ExtraData()) { + auto* entry = extraData->AddSourcesExtraData(); + entry->SetIndex(index); + entry->MutableData()->CopyFrom(*data); + } } } for (auto& [index, input] : InputTransformsMap) { - if (auto data = input.AsyncInput->ExtraData()) { - auto* entry = extraData->AddInputTransformsData(); - entry->SetIndex(index); - entry->MutableData()->CopyFrom(*data); + if (input.AsyncInput) { + if (auto data = input.AsyncInput->ExtraData()) { + auto* entry = extraData->AddInputTransformsData(); + entry->SetIndex(index); + entry->MutableData()->CopyFrom(*data); + } } } } diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index 9365589705c..d04939a2a7c 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -175,4 +175,6 @@ message TDqTask { string RateLimiter = 10; string RateLimiterResource = 11; uint64 InitialTaskMemoryLimit = 12; + map<string, bytes> TaskParams = 13; + map<string, string> SecureParams = 14; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 1d8da023f2b..420d42bbcc8 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -61,10 +61,8 @@ public: } ui64 Partition(const TDqSettings&, size_t maxPartitions, const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, bool) override { - TString cluster; std::vector<std::vector<TPath>> parts; if (const TMaybeNode<TDqSource> source = &node) { - cluster = source.Cast().DataSource().Cast<TS3DataSource>().Cluster().Value(); const auto settings = source.Cast().Settings().Cast<TS3SourceSettingsBase>(); for (auto i = 0u; i < settings.Paths().Size(); ++i) { const auto& packed = settings.Paths().Item(i); |