diff options
10 files changed, 23 insertions, 21 deletions
diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.darwin-arm64.txt b/ydb/library/yql/providers/dq/common/CMakeLists.darwin-arm64.txt index a1ecc96fcc6..0b8b32a892d 100644 --- a/ydb/library/yql/providers/dq/common/CMakeLists.darwin-arm64.txt +++ b/ydb/library/yql/providers/dq/common/CMakeLists.darwin-arm64.txt @@ -26,6 +26,7 @@ target_link_libraries(providers-dq-common PUBLIC library-yql-utils yql-utils-log yql-dq-actors + yql-dq-proto tools-enum_parser-enum_serialization_runtime ) target_sources(providers-dq-common PRIVATE diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/common/CMakeLists.darwin-x86_64.txt index a1ecc96fcc6..0b8b32a892d 100644 --- a/ydb/library/yql/providers/dq/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/dq/common/CMakeLists.darwin-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(providers-dq-common PUBLIC library-yql-utils yql-utils-log yql-dq-actors + yql-dq-proto tools-enum_parser-enum_serialization_runtime ) target_sources(providers-dq-common PRIVATE diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/common/CMakeLists.linux-aarch64.txt index 0df62e46693..7072fef4c85 100644 --- a/ydb/library/yql/providers/dq/common/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/dq/common/CMakeLists.linux-aarch64.txt @@ -27,6 +27,7 @@ target_link_libraries(providers-dq-common PUBLIC library-yql-utils yql-utils-log yql-dq-actors + yql-dq-proto tools-enum_parser-enum_serialization_runtime ) target_sources(providers-dq-common PRIVATE diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/common/CMakeLists.linux-x86_64.txt index 0df62e46693..7072fef4c85 100644 --- a/ydb/library/yql/providers/dq/common/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/dq/common/CMakeLists.linux-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(providers-dq-common PUBLIC library-yql-utils yql-utils-log yql-dq-actors + yql-dq-proto tools-enum_parser-enum_serialization_runtime ) target_sources(providers-dq-common PRIVATE diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/dq/common/CMakeLists.windows-x86_64.txt index a1ecc96fcc6..0b8b32a892d 100644 --- a/ydb/library/yql/providers/dq/common/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/dq/common/CMakeLists.windows-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(providers-dq-common PUBLIC library-yql-utils yql-utils-log yql-dq-actors + yql-dq-proto tools-enum_parser-enum_serialization_runtime ) target_sources(providers-dq-common PRIVATE diff --git a/ydb/library/yql/providers/dq/common/ya.make b/ydb/library/yql/providers/dq/common/ya.make index 8344d6cc91c..82704ed75da 100644 --- a/ydb/library/yql/providers/dq/common/ya.make +++ b/ydb/library/yql/providers/dq/common/ya.make @@ -7,6 +7,7 @@ PEERDIR( ydb/library/yql/utils ydb/library/yql/utils/log ydb/library/yql/dq/actors + ydb/library/yql/dq/proto ) GENERATE_ENUM_SERIALIZATION(yql_dq_settings.h) diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index bd70221220b..c1943584ffe 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -5,6 +5,7 @@ #include <ydb/library/yql/core/yql_data_provider.h> #include <ydb/library/yql/dq/common/dq_common.h> +#include <ydb/library/yql/dq/proto/dq_transport.pb.h> #include <library/cpp/string_utils/parse_size/parse_size.h> @@ -189,6 +190,16 @@ struct TDqSettings { return copy; } + + NDqProto::EDataTransportVersion GetDataTransportVersion() const { + const bool fastPickle = UseFastPickleTransport.Get().GetOrElse(TDqSettings::TDefault::UseFastPickleTransport); + const bool oob = UseOOBTransport.Get().GetOrElse(TDqSettings::TDefault::UseOOBTransport); + if (oob) { + return fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0 : NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0; + } else { + return fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0 : NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0; + } + } }; struct TDqConfiguration: public TDqSettings, public NCommon::TSettingDispatcher { diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index cbd3ec6e55d..9f6faaa50c9 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -647,15 +647,7 @@ void TDqsExecutionPlanner::BuildAllPrograms() { channelDesc.SetSrcTaskId(channel.SrcTask); channelDesc.SetDstTaskId(channel.DstTask); channelDesc.SetCheckpointingMode(channel.CheckpointingMode); - const bool fastPickle = Settings->UseFastPickleTransport.Get().GetOrElse(TDqSettings::TDefault::UseFastPickleTransport); - const bool oob = Settings->UseOOBTransport.Get().GetOrElse(TDqSettings::TDefault::UseOOBTransport); - if (oob) { - channelDesc.SetTransportVersion(fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0 : - NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0); - } else { - channelDesc.SetTransportVersion(fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0 : - NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0); - } + channelDesc.SetTransportVersion(Settings->GetDataTransportVersion()); if (channel.SrcTask) { NActors::ActorIdToProto(TasksGraph.GetTask(channel.SrcTask).ComputeActorId, diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 06ca141aa77..acccf5f5c3f 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -647,13 +647,7 @@ public: if (!DqConfiguration->CollectCoreDumps.Get().GetOrElse(false)) { DontCollectDumps(); } - const bool fastPickle = DqConfiguration->UseFastPickleTransport.Get().GetOrElse(TDqSettings::TDefault::UseFastPickleTransport); - const bool oob = DqConfiguration->UseOOBTransport.Get().GetOrElse(TDqSettings::TDefault::UseOOBTransport); - if (oob) { - DataTransportVersion = fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0 : NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0; - } else { - DataTransportVersion = fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0 : NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0; - } + DataTransportVersion = DqConfiguration->GetDataTransportVersion(); // TODO: Maybe use taskParams from task.GetTask().GetParameters() THashMap<TString, TString> taskParams; for (const auto& x: taskMeta.GetTaskParams()) { diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 15651b1f4b8..0047308089c 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -260,10 +260,7 @@ private: YQL_ENSURE(!batch.IsWide()); auto source = TaskRunner->GetSource(index); - TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), - // NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0 - NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0 - ); + TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), DataTransportVersion); TDqSerializedBatch serialized = dataSerializer.Serialize(batch, source->GetInputType()); Invoker->Invoke([serialized=std::move(serialized),taskRunner=TaskRunner, actorSystem, selfId, cookie, parentId=ParentId, space, finish, index, settings=Settings, stageId=StageId]() mutable { @@ -370,7 +367,7 @@ private: auto guard = TaskRunner->BindAllocator(); NKikimr::NMiniKQL::TUnboxedValueBatch batch; auto sink = TaskRunner->GetSink(ev->Get()->Index); - TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); + TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), (NDqProto::EDataTransportVersion)ev->Get()->Batch.Proto.GetTransportVersion()); dataSerializer.Deserialize(std::move(ev->Get()->Batch), sink->GetOutputType(), batch); Parent->SinkSend( @@ -459,6 +456,7 @@ private: ev->Get()->Task.GetMeta().UnpackTo(&taskMeta); Settings->Dispatch(taskMeta.GetSettings()); Settings->FreezeDefaults(); + DataTransportVersion = Settings->GetDataTransportVersion(); StageId = taskMeta.GetStageId(); NDq::TDqTaskSettings settings(&ev->Get()->Task); @@ -578,6 +576,7 @@ private: THashSet<ui32> Inputs; THashSet<ui32> Sources; TIntrusivePtr<TDqConfiguration> Settings; + NDqProto::EDataTransportVersion DataTransportVersion; ui64 StageId; TWorkerRuntimeData* RuntimeData; TString ClusterName; |
