summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/library/yql/providers/dq/common/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/library/yql/providers/dq/common/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/dq/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/dq/common/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/dq/common/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/dq/common/ya.make1
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h11
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp10
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp8
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp9
10 files changed, 23 insertions, 21 deletions
diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.darwin-arm64.txt b/ydb/library/yql/providers/dq/common/CMakeLists.darwin-arm64.txt
index a1ecc96fcc6..0b8b32a892d 100644
--- a/ydb/library/yql/providers/dq/common/CMakeLists.darwin-arm64.txt
+++ b/ydb/library/yql/providers/dq/common/CMakeLists.darwin-arm64.txt
@@ -26,6 +26,7 @@ target_link_libraries(providers-dq-common PUBLIC
library-yql-utils
yql-utils-log
yql-dq-actors
+ yql-dq-proto
tools-enum_parser-enum_serialization_runtime
)
target_sources(providers-dq-common PRIVATE
diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/common/CMakeLists.darwin-x86_64.txt
index a1ecc96fcc6..0b8b32a892d 100644
--- a/ydb/library/yql/providers/dq/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/dq/common/CMakeLists.darwin-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(providers-dq-common PUBLIC
library-yql-utils
yql-utils-log
yql-dq-actors
+ yql-dq-proto
tools-enum_parser-enum_serialization_runtime
)
target_sources(providers-dq-common PRIVATE
diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/common/CMakeLists.linux-aarch64.txt
index 0df62e46693..7072fef4c85 100644
--- a/ydb/library/yql/providers/dq/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/dq/common/CMakeLists.linux-aarch64.txt
@@ -27,6 +27,7 @@ target_link_libraries(providers-dq-common PUBLIC
library-yql-utils
yql-utils-log
yql-dq-actors
+ yql-dq-proto
tools-enum_parser-enum_serialization_runtime
)
target_sources(providers-dq-common PRIVATE
diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/common/CMakeLists.linux-x86_64.txt
index 0df62e46693..7072fef4c85 100644
--- a/ydb/library/yql/providers/dq/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/dq/common/CMakeLists.linux-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(providers-dq-common PUBLIC
library-yql-utils
yql-utils-log
yql-dq-actors
+ yql-dq-proto
tools-enum_parser-enum_serialization_runtime
)
target_sources(providers-dq-common PRIVATE
diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/dq/common/CMakeLists.windows-x86_64.txt
index a1ecc96fcc6..0b8b32a892d 100644
--- a/ydb/library/yql/providers/dq/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/dq/common/CMakeLists.windows-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(providers-dq-common PUBLIC
library-yql-utils
yql-utils-log
yql-dq-actors
+ yql-dq-proto
tools-enum_parser-enum_serialization_runtime
)
target_sources(providers-dq-common PRIVATE
diff --git a/ydb/library/yql/providers/dq/common/ya.make b/ydb/library/yql/providers/dq/common/ya.make
index 8344d6cc91c..82704ed75da 100644
--- a/ydb/library/yql/providers/dq/common/ya.make
+++ b/ydb/library/yql/providers/dq/common/ya.make
@@ -7,6 +7,7 @@ PEERDIR(
ydb/library/yql/utils
ydb/library/yql/utils/log
ydb/library/yql/dq/actors
+ ydb/library/yql/dq/proto
)
GENERATE_ENUM_SERIALIZATION(yql_dq_settings.h)
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index bd70221220b..c1943584ffe 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -5,6 +5,7 @@
#include <ydb/library/yql/core/yql_data_provider.h>
#include <ydb/library/yql/dq/common/dq_common.h>
+#include <ydb/library/yql/dq/proto/dq_transport.pb.h>
#include <library/cpp/string_utils/parse_size/parse_size.h>
@@ -189,6 +190,16 @@ struct TDqSettings {
return copy;
}
+
+ NDqProto::EDataTransportVersion GetDataTransportVersion() const {
+ const bool fastPickle = UseFastPickleTransport.Get().GetOrElse(TDqSettings::TDefault::UseFastPickleTransport);
+ const bool oob = UseOOBTransport.Get().GetOrElse(TDqSettings::TDefault::UseOOBTransport);
+ if (oob) {
+ return fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0 : NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0;
+ } else {
+ return fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0 : NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0;
+ }
+ }
};
struct TDqConfiguration: public TDqSettings, public NCommon::TSettingDispatcher {
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index cbd3ec6e55d..9f6faaa50c9 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -647,15 +647,7 @@ void TDqsExecutionPlanner::BuildAllPrograms() {
channelDesc.SetSrcTaskId(channel.SrcTask);
channelDesc.SetDstTaskId(channel.DstTask);
channelDesc.SetCheckpointingMode(channel.CheckpointingMode);
- const bool fastPickle = Settings->UseFastPickleTransport.Get().GetOrElse(TDqSettings::TDefault::UseFastPickleTransport);
- const bool oob = Settings->UseOOBTransport.Get().GetOrElse(TDqSettings::TDefault::UseOOBTransport);
- if (oob) {
- channelDesc.SetTransportVersion(fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0 :
- NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0);
- } else {
- channelDesc.SetTransportVersion(fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0 :
- NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0);
- }
+ channelDesc.SetTransportVersion(Settings->GetDataTransportVersion());
if (channel.SrcTask) {
NActors::ActorIdToProto(TasksGraph.GetTask(channel.SrcTask).ComputeActorId,
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 06ca141aa77..acccf5f5c3f 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -647,13 +647,7 @@ public:
if (!DqConfiguration->CollectCoreDumps.Get().GetOrElse(false)) {
DontCollectDumps();
}
- const bool fastPickle = DqConfiguration->UseFastPickleTransport.Get().GetOrElse(TDqSettings::TDefault::UseFastPickleTransport);
- const bool oob = DqConfiguration->UseOOBTransport.Get().GetOrElse(TDqSettings::TDefault::UseOOBTransport);
- if (oob) {
- DataTransportVersion = fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0 : NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0;
- } else {
- DataTransportVersion = fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0 : NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0;
- }
+ DataTransportVersion = DqConfiguration->GetDataTransportVersion();
// TODO: Maybe use taskParams from task.GetTask().GetParameters()
THashMap<TString, TString> taskParams;
for (const auto& x: taskMeta.GetTaskParams()) {
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 15651b1f4b8..0047308089c 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -260,10 +260,7 @@ private:
YQL_ENSURE(!batch.IsWide());
auto source = TaskRunner->GetSource(index);
- TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(),
- // NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0
- NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0
- );
+ TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), DataTransportVersion);
TDqSerializedBatch serialized = dataSerializer.Serialize(batch, source->GetInputType());
Invoker->Invoke([serialized=std::move(serialized),taskRunner=TaskRunner, actorSystem, selfId, cookie, parentId=ParentId, space, finish, index, settings=Settings, stageId=StageId]() mutable {
@@ -370,7 +367,7 @@ private:
auto guard = TaskRunner->BindAllocator();
NKikimr::NMiniKQL::TUnboxedValueBatch batch;
auto sink = TaskRunner->GetSink(ev->Get()->Index);
- TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
+ TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), (NDqProto::EDataTransportVersion)ev->Get()->Batch.Proto.GetTransportVersion());
dataSerializer.Deserialize(std::move(ev->Get()->Batch), sink->GetOutputType(), batch);
Parent->SinkSend(
@@ -459,6 +456,7 @@ private:
ev->Get()->Task.GetMeta().UnpackTo(&taskMeta);
Settings->Dispatch(taskMeta.GetSettings());
Settings->FreezeDefaults();
+ DataTransportVersion = Settings->GetDataTransportVersion();
StageId = taskMeta.GetStageId();
NDq::TDqTaskSettings settings(&ev->Get()->Task);
@@ -578,6 +576,7 @@ private:
THashSet<ui32> Inputs;
THashSet<ui32> Sources;
TIntrusivePtr<TDqConfiguration> Settings;
+ NDqProto::EDataTransportVersion DataTransportVersion;
ui64 StageId;
TWorkerRuntimeData* RuntimeData;
TString ClusterName;