aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@yandex-team.ru>2022-04-26 15:41:17 +0300
committeraozeritsky <aozeritsky@yandex-team.ru>2022-04-26 15:41:17 +0300
commitb9075f100dc7e5e913fb97c1ae0b6984b1ff5c88 (patch)
tree0461cb046024ed17b6249a9d8b84b07ec0ccff79
parent70eabc9449f88f18556c7839ba6cfbacc69de968 (diff)
downloadydb-b9075f100dc7e5e913fb97c1ae0b6984b1ff5c88.tar.gz
YQL-10818 Make RuntimeJobNode for files with size > 1_MB
ref:c5a8c6b69406b67262ee1e902a4869221e0564c5
-rw-r--r--CMakeLists.darwin.txt2
-rw-r--r--CMakeLists.linux.txt2
-rw-r--r--ydb/library/yql/dq/comp_nodes/CMakeLists.txt3
-rw-r--r--ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.cpp10
-rw-r--r--ydb/library/yql/providers/dq/provider/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp23
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp8
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_state.h3
8 files changed, 44 insertions, 8 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 6847794e71..1eb4ac1875 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -621,6 +621,7 @@ add_subdirectory(ydb/library/yql/providers/common/db_id_async_resolver)
add_subdirectory(ydb/library/yql/providers/dq/provider)
add_subdirectory(library/cpp/threading/task_scheduler)
add_subdirectory(ydb/public/lib/yson_value)
+add_subdirectory(ydb/library/yql/dq/comp_nodes)
add_subdirectory(ydb/library/yql/providers/common/metrics)
add_subdirectory(ydb/library/yql/providers/common/transform)
add_subdirectory(ydb/library/yql/providers/dq/api/grpc)
@@ -807,7 +808,6 @@ add_subdirectory(ydb/core/yq/libs/health)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery)
add_subdirectory(ydb/core/yq/libs/test_connection)
add_subdirectory(ydb/core/yq/libs/test_connection/events)
-add_subdirectory(ydb/library/yql/dq/comp_nodes)
add_subdirectory(ydb/library/yql/providers/clickhouse/actors)
add_subdirectory(ydb/library/yql/providers/pq/async_io)
add_subdirectory(ydb/library/yql/providers/pq/gateway/native)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index a3ffccb62a..5059d076b1 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -701,6 +701,7 @@ add_subdirectory(ydb/library/yql/providers/common/db_id_async_resolver)
add_subdirectory(ydb/library/yql/providers/dq/provider)
add_subdirectory(library/cpp/threading/task_scheduler)
add_subdirectory(ydb/public/lib/yson_value)
+add_subdirectory(ydb/library/yql/dq/comp_nodes)
add_subdirectory(ydb/library/yql/providers/common/metrics)
add_subdirectory(ydb/library/yql/providers/common/transform)
add_subdirectory(ydb/library/yql/providers/dq/api/grpc)
@@ -887,7 +888,6 @@ add_subdirectory(ydb/core/yq/libs/health)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery)
add_subdirectory(ydb/core/yq/libs/test_connection)
add_subdirectory(ydb/core/yq/libs/test_connection/events)
-add_subdirectory(ydb/library/yql/dq/comp_nodes)
add_subdirectory(ydb/library/yql/providers/clickhouse/actors)
add_subdirectory(ydb/library/yql/providers/pq/async_io)
add_subdirectory(ydb/library/yql/providers/pq/gateway/native)
diff --git a/ydb/library/yql/dq/comp_nodes/CMakeLists.txt b/ydb/library/yql/dq/comp_nodes/CMakeLists.txt
index 60664b75c5..257099d085 100644
--- a/ydb/library/yql/dq/comp_nodes/CMakeLists.txt
+++ b/ydb/library/yql/dq/comp_nodes/CMakeLists.txt
@@ -15,9 +15,10 @@ target_link_libraries(yql-dq-comp_nodes PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ dq-actors-compute
yql-minikql-computation
library-yql-minikql
- dq-actors-compute
+ library-yql-utils
)
target_sources(yql-dq-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.cpp
diff --git a/ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.cpp b/ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.cpp
index d4a4b07a6a..89940a2d40 100644
--- a/ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.cpp
+++ b/ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.cpp
@@ -3,8 +3,10 @@
#include <ydb/library/yql/minikql/mkql_function_registry.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/utils/yql_panic.h>
#include <library/cpp/actors/core/actor.h>
+#include <util/stream/file.h>
namespace NYql {
@@ -26,6 +28,14 @@ public:
return pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(selfId.ToString());
};
}
+ if (name == "FileContentJob") {
+ return [](TCallable& callable, const TTypeEnvironment& env) {
+ YQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 argument");
+ const TString path(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef());
+ auto content = TFileInput(path).ReadAll();
+ return TRuntimeNode(BuildDataLiteral(content, NUdf::TDataType<char*>::Id, env), true);
+ };
+ }
return TCallableVisitFunc();
}
diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.txt
index 990310699b..03bddd9f96 100644
--- a/ydb/library/yql/providers/dq/provider/CMakeLists.txt
+++ b/ydb/library/yql/providers/dq/provider/CMakeLists.txt
@@ -19,6 +19,7 @@ target_link_libraries(providers-dq-provider PUBLIC
public-lib-yson_value
cpp-client-ydb_driver
library-yql-core
+ yql-dq-comp_nodes
yql-dq-tasks
yql-dq-type_ann
providers-common-gateway
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 7fcc73768b..95857b7d4d 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -124,6 +124,7 @@ public:
executionContext.ComputationFactory = State->ComputationFactory;
executionContext.RandomProvider = randomProvider.Get();
executionContext.TimeProvider = timeProvider.Get();
+ executionContext.FuncProvider = State->TransformFactory({}, executionContext.FuncRegistry);
NDq::TDqTaskRunnerMemoryLimits limits;
limits.ChannelBufferSize = 10_MB;
@@ -379,13 +380,23 @@ private:
const TProgramBuilder pgmBuilder(typeEnv, *State->FunctionRegistry);
TRuntimeNode result;
+ bool doUpload = false;
switch (block->Type) {
case EUserDataType::URL:
case EUserDataType::PATH: {
- TString content = (name == TStringBuf("FilePath"))
- ? fullFileName
- : TFileInput(block->FrozenFile->GetPath()).ReadAll();
- result = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(content);
+ TString content = fullFileName;
+ if (name == TStringBuf("FileContent")) {
+ if (block->FrozenFile->GetSize() < MaxFileReadSize) {
+ content = TFileInput(block->FrozenFile->GetPath()).ReadAll();
+ } else {
+ TCallableBuilder builder(typeEnv, TStringBuf("FileContentJob"), callable.GetType()->GetReturnType(), false);
+ builder.Add(pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(fullFileName));
+ result = TRuntimeNode(builder.Build(), false);
+ doUpload = true;
+ }
+ }
+ if (!result)
+ result = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(content);
break;
}
case EUserDataType::RAW_INLINE_DATA: {
@@ -402,7 +413,7 @@ private:
if (result.GetNode() != node) {
callable.SetResult(result, typeEnv);
}
- if (name == TStringBuf("FilePath")) {
+ if (name == TStringBuf("FilePath") || doUpload) {
// filePath, fileName, md5
auto f = IDqGateway::TFileResource();
f.SetLocalPath(filePath);
@@ -1257,6 +1268,8 @@ private:
THolder<IGraphTransformer> DqTypeAnnotationTransformer;
mutable THashMap<TString, TFileLinkPtr> FileLinks;
mutable THashMap<TString, TString> ModulesMapping;
+
+ const ui64 MaxFileReadSize = 1_MB;
};
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
index 69788c5c76..cc75204159 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
@@ -8,6 +8,9 @@
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+#include <ydb/library/yql/providers/dq/interface/yql_dq_task_transform.h>
+#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.h>
+
#include <ydb/library/yql/utils/log/log.h>
namespace NYql {
@@ -32,11 +35,16 @@ TDataProviderInitializer GetDqDataProviderInitializer(
) {
Y_UNUSED(userName);
+ auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({
+ NYql::CreateCommonDqTaskTransformFactory()
+ });
+
TDqStatePtr state = MakeIntrusive<TDqState>(
dqGateway, // nullptr for yqlrun
gatewaysConfig,
functionRegistry,
compFactory,
+ dqTaskTransformFactory,
randomProvider,
typeCtx.Get(),
progressWriter,
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_state.h b/ydb/library/yql/providers/dq/provider/yql_dq_state.h
index a3e54dcb11..a3fcaf1dbd 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_state.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_state.h
@@ -18,6 +18,7 @@ struct TDqState: public TThrRefBase {
const TGatewaysConfig* GatewaysConfig;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory;
+ TTaskTransformFactory TransformFactory;
TIntrusivePtr<IRandomProvider> RandomProvider;
TTypeAnnotationContext* TypeCtx;
const TOperationProgressWriter ProgressWriter;
@@ -40,6 +41,7 @@ struct TDqState: public TThrRefBase {
const TGatewaysConfig* gatewaysConfig,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
+ TTaskTransformFactory transformFactory,
const TIntrusivePtr<IRandomProvider>& randomProvider,
TTypeAnnotationContext* typeCtx,
const TOperationProgressWriter& progressWriter,
@@ -54,6 +56,7 @@ struct TDqState: public TThrRefBase {
, GatewaysConfig(gatewaysConfig)
, FunctionRegistry(functionRegistry)
, ComputationFactory(compFactory)
+ , TransformFactory(transformFactory)
, RandomProvider(randomProvider)
, TypeCtx(typeCtx)
, ProgressWriter(progressWriter)