diff options
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_service.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 12 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_table_resolver.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_settings.h | 1 | ||||
-rw-r--r-- | ydb/core/protos/table_service_config.proto | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/proto/dq_tasks.proto | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_consumer.cpp | 76 |
11 files changed, 74 insertions, 63 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 728aeed137..f630188de5 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -648,6 +648,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.BlockChannelsMode = serviceConfig.GetBlockChannelsMode(); kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit(); kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel(); + kqpConfig.DefaultEnableShuffleElimination = serviceConfig.GetDefaultEnableShuffleElimination(); kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding(); kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes()); kqpConfig.EnableSnapshotIsolationRW = serviceConfig.GetEnableSnapshotIsolationRW(); diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 21badd4cf8..8d882887e0 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -311,6 +311,8 @@ private: ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel(); bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding(); + bool defaultEnableShuffleElimination = TableServiceConfig.GetDefaultEnableShuffleElimination(); + TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes(); bool enableSnapshotIsolationRW = TableServiceConfig.GetEnableSnapshotIsolationRW(); @@ -345,7 +347,9 @@ private: TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams || TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution || TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW || - TableServiceConfig.GetAllowMultiBroadcasts() != allowMultiBroadcasts) { + TableServiceConfig.GetAllowMultiBroadcasts() != allowMultiBroadcasts || + TableServiceConfig.GetDefaultEnableShuffleElimination() != defaultEnableShuffleElimination + ) { QueryCache->Clear(); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index bba10044f5..743ddd6e78 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -764,7 +764,7 @@ protected: } void HandleAbortExecution( - NYql::NDqProto::StatusIds::StatusCode statusCode, + NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& issues, const bool sessionSender) { LOG_D("Got EvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) @@ -1698,8 +1698,8 @@ protected: } else if (enableShuffleElimination /* save partitioning for shuffle elimination */) { std::size_t stageInternalTaskId = 0; - columnShardHashV1Params.TaskIdByHash = std::make_shared<TVector<ui64>>(); - columnShardHashV1Params.TaskIdByHash->resize(columnShardHashV1Params.SourceShardCount); + columnShardHashV1Params.TaskIndexByHash = std::make_shared<TVector<ui64>>(); + columnShardHashV1Params.TaskIndexByHash->resize(columnShardHashV1Params.SourceShardCount); for (auto&& pair : nodeShards) { const auto nodeId = pair.first; @@ -1750,7 +1750,7 @@ protected: for (const auto& readInfo: *task.Meta.Reads) { Y_ENSURE(hashByShardId.contains(readInfo.ShardId)); - (*columnShardHashV1Params.TaskIdByHash)[hashByShardId[readInfo.ShardId]] = stageInternalTaskId; + (*columnShardHashV1Params.TaskIndexByHash)[hashByShardId[readInfo.ShardId]] = stageInternalTaskId; } } @@ -1759,8 +1759,8 @@ protected: LOG_DEBUG_S( *TlsActivationContext, NKikimrServices::KQP_EXECUTER, - "Stage with scan " << "[" << stageInfo.Id.TxId << ":" << stageInfo.Id.StageId << "]" << " has keys: " - << columnShardHashV1Params.KeyTypesToString(); + "Stage with scan " << "[" << stageInfo.Id.TxId << ":" << stageInfo.Id.StageId << "]" + << " has keys: " << columnShardHashV1Params.KeyTypesToString(); ); } else { ui32 metaId = 0; diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp index e5fbb1fa59..3c6e6a10be 100644 --- a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp @@ -173,12 +173,15 @@ private: stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, stageInfo.Meta.TableConstInfo, operation); - if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.find(stageInfo.Meta.TableId) == TableRequestIds.end()) { + if (stageInfo.Meta.TableKind == ETableKind::Olap) { + if (TableRequestIds.find(stageInfo.Meta.TableId) == TableRequestIds.end()) { + auto& entry = requestNavigate->ResultSet.emplace_back(); + entry.TableId = stageInfo.Meta.TableId; + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable; + } + TableRequestIds[stageInfo.Meta.TableId].emplace_back(pair.first); - auto& entry = requestNavigate->ResultSet.emplace_back(); - entry.TableId = stageInfo.Meta.TableId; - entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; - entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable; } auto& entry = request->ResultSet.emplace_back(std::move(stageInfo.Meta.ShardKey)); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 3ec3b2b3ee..b3bdf684fc 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -511,7 +511,7 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo, } if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kMap) { // We want to enforce sourceShardCount from map connection, cause it can be at most one map connection - // and ColumnShardHash in Shuffle will use this parameter to shuffle on this map (same with taskIdByHash mapping) + // and ColumnShardHash in Shuffle will use this parameter to shuffle on this map (same with taskIndexByHash mapping) hasMap = true; break; } @@ -521,9 +521,9 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo, // if it is stage, where we don't inherit parallelism. if (enableShuffleElimination && !hasMap && !isFusedWithScanStage && stageInfo.Tasks.size() > 0 && stage.InputsSize() > 0) { columnShardHashV1Params.SourceShardCount = stageInfo.Tasks.size(); - columnShardHashV1Params.TaskIdByHash = std::make_shared<TVector<ui64>>(columnShardHashV1Params.SourceShardCount); + columnShardHashV1Params.TaskIndexByHash = std::make_shared<TVector<ui64>>(columnShardHashV1Params.SourceShardCount); for (std::size_t i = 0; i < columnShardHashV1Params.SourceShardCount; ++i) { - (*columnShardHashV1Params.TaskIdByHash)[i] = i; + (*columnShardHashV1Params.TaskIndexByHash)[i] = i; } for (auto& input : stage.GetInputs()) { @@ -1206,7 +1206,7 @@ void FillOutputDesc( << " for the columns: " << "[" << JoinSeq(",", output.KeyColumns) << "]" ); Y_ENSURE(columnShardHashV1Params.SourceShardCount != 0, "ShardCount for ColumnShardHashV1 Shuffle can't be equal to 0"); - Y_ENSURE(columnShardHashV1Params.TaskIdByHash != nullptr, "TaskIdByHash for ColumnShardHashV1 wasn't propogated to this stage"); + Y_ENSURE(columnShardHashV1Params.TaskIndexByHash != nullptr, "TaskIndexByHash for ColumnShardHashV1 wasn't propogated to this stage"); Y_ENSURE(columnShardHashV1Params.SourceTableKeyColumnTypes != nullptr, "SourceTableKeyColumnTypes for ColumnShardHashV1 wasn't propogated to this stage"); Y_ENSURE( @@ -1225,9 +1225,9 @@ void FillOutputDesc( columnTypes->Add(type.GetTypeId()); } - auto* taskIdByHash = columnShardHashV1.MutableTaskIdByHash(); - for (std::size_t taskID: *columnShardHashV1Params.TaskIdByHash) { - taskIdByHash->Add(taskID); + auto* taskIndexByHash = columnShardHashV1.MutableTaskIndexByHash(); + for (std::size_t taskID: *columnShardHashV1Params.TaskIndexByHash) { + taskIndexByHash->Add(taskID); } break; } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 8869a9ca3b..22a6f31931 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -32,7 +32,7 @@ struct TTransaction : private TMoveOnly { struct TColumnShardHashV1Params { ui64 SourceShardCount = 0; std::shared_ptr<TVector<NScheme::TTypeInfo>> SourceTableKeyColumnTypes = nullptr; - std::shared_ptr<TVector<ui64>> TaskIdByHash = nullptr; // hash belongs [0; ShardCount] + std::shared_ptr<TVector<ui64>> TaskIndexByHash = nullptr; // hash belongs [0; ShardCount] TColumnShardHashV1Params DeepCopy() const { TColumnShardHashV1Params copy; @@ -44,10 +44,10 @@ struct TColumnShardHashV1Params { copy.SourceTableKeyColumnTypes = nullptr; } - if (TaskIdByHash) { - copy.TaskIdByHash = std::make_shared<TVector<ui64>>(*TaskIdByHash); + if (TaskIndexByHash) { + copy.TaskIndexByHash = std::make_shared<TVector<ui64>>(*TaskIndexByHash); } else { - copy.TaskIdByHash = nullptr; + copy.TaskIndexByHash = nullptr; } return copy; diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index 4adbfa8616..4315c567ae 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -163,7 +163,7 @@ protected: TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) { auto maxDPhypDPTableSize = Config->MaxDPHypDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPHypDPTableSize); auto optLevel = Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel); - bool enableShuffleElimination = KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(false);; + bool enableShuffleElimination = KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(Config->DefaultEnableShuffleElimination); auto providerCtx = TKqpProviderContext(KqpCtx, optLevel); auto opt = std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(providerCtx, maxDPhypDPTableSize, ctx, enableShuffleElimination)); TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, optLevel, diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 1b6039be4b..74d0a34720 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -182,6 +182,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableAntlr4Parser = false; bool EnableSnapshotIsolationRW = false; bool AllowMultiBroadcasts = false; + bool DefaultEnableShuffleElimination = false; void SetDefaultEnabledSpillingNodes(const TString& node); ui64 GetEnabledSpillingNodes() const; diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index d98c3338ec..9fa7054841 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -363,4 +363,6 @@ message TTableServiceConfig { optional bool EnableBatchUpdates = 78 [default = false]; optional bool AllowMultiBroadcasts = 79 [default = false]; + + optional bool DefaultEnableShuffleElimination = 80 [default = false]; }; diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index eb80e01df6..f48398c2a6 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -137,7 +137,7 @@ message THashV1 { message TColumnShardHashV1 { uint64 ShardCount = 1; - repeated uint64 TaskIdByHash = 2; + repeated uint64 TaskIndexByHash = 2; repeated uint32 KeyColumnTypes = 3; } diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp index 165d2bf153..ce828f4867 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp @@ -109,11 +109,11 @@ struct THashV1 { struct TColumnShardHashV1 { TColumnShardHashV1( std::size_t shardCount, - TVector<ui64> taskIdByHash, + TVector<ui64> taskIndexByHash, TVector<NYql::NProto::TypeIds> keyColumnTypes ) : ShardCount(shardCount) - , TaskIdByHash(std::move(taskIdByHash)) + , TaskIndexByHash(std::move(taskIndexByHash)) , KeyColumnTypes(std::move(keyColumnTypes)) , HashCalcer(0) {} @@ -131,7 +131,7 @@ struct TColumnShardHashV1 { break; } case NYql::NProto::Int8: { - auto value = uv.template Get<uint8_t>(); + auto value = uv.template Get<uint8_t>(); HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); break; } @@ -140,63 +140,63 @@ struct TColumnShardHashV1 { HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); break; } - case NYql::NProto::Int16: { - auto value = uv.template Get<int16_t>(); - HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); + case NYql::NProto::Int16: { + auto value = uv.template Get<int16_t>(); + HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); break; } case NYql::NProto::Date: case NYql::NProto::TzDate: case NYql::NProto::Uint16: { - auto value = uv.template Get<uint16_t>(); - HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); + auto value = uv.template Get<uint16_t>(); + HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); break; } - case NYql::NProto::Int32: + case NYql::NProto::Int32: case NYql::NProto::Date32: case NYql::NProto::TzDate32: { - auto value = uv.template Get<int32_t>(); - HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); + auto value = uv.template Get<int32_t>(); + HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); break; } case NYql::NProto::Uint32: case NYql::NProto::Datetime: case NYql::NProto::TzDatetime: { - auto value = uv.template Get<uint32_t>(); - HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); - break; + auto value = uv.template Get<uint32_t>(); + HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); + break; } case NYql::NProto::Int64: case NYql::NProto::Interval: case NYql::NProto::Interval64: case NYql::NProto::Datetime64: case NYql::NProto::Timestamp64: { - auto value = uv.template Get<i64>(); - HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); - break; + auto value = uv.template Get<i64>(); + HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); + break; } - case NYql::NProto::Uint64: + case NYql::NProto::Uint64: case NYql::NProto::Timestamp: case NYql::NProto::TzTimestamp: { - auto value = uv.template Get<ui64>(); + auto value = uv.template Get<ui64>(); HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); - break; + break; } case NYql::NProto::Double: { - auto value = uv.template Get<double>(); - HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); + auto value = uv.template Get<double>(); + HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); break; } case NYql::NProto::Float: { - auto value = uv.template Get<float>(); - HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); - break; + auto value = uv.template Get<float>(); + HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value)); + break; } case NYql::NProto::String: case NYql::NProto::Utf8:{ auto value = uv.AsStringRef(); HashCalcer.Update(reinterpret_cast<const ui8*>(value.Data()), value.Size()); - break; + break; } default: { Y_ENSURE(false, TStringBuilder{} << "HashFunc for HashShuffle isn't supported with such type: " << static_cast<ui64>(KeyColumnTypes[keyIdx])); @@ -208,11 +208,11 @@ struct TColumnShardHashV1 { ui64 Finish() { ui64 hash = HashCalcer.Finish(); hash = std::min<ui32>(hash / (Max<ui64>() / ShardCount), ShardCount - 1); - return TaskIdByHash[hash]; + return TaskIndexByHash[hash]; } std::size_t ShardCount; - TVector<ui64> TaskIdByHash; + TVector<ui64> TaskIndexByHash; TVector<NYql::NProto::TypeIds> KeyColumnTypes; private: NArrow::NHash::NXX64::TStreamStringHashCalcer HashCalcer; @@ -247,8 +247,8 @@ protected: } public: TDqOutputHashPartitionConsumer( - TVector<IDqOutput::TPtr>&& outputs, - TVector<TColumnInfo>&& keyColumns, + TVector<IDqOutput::TPtr>&& outputs, + TVector<TColumnInfo>&& keyColumns, TMaybe<ui32> outputWidth, THashFunc hashFunc ) @@ -380,8 +380,8 @@ template <typename THashFunc> class TDqOutputHashPartitionConsumerScalar : public IDqOutputConsumer { public: TDqOutputHashPartitionConsumerScalar( - TVector<IDqOutput::TPtr>&& outputs, - TVector<TColumnInfo>&& keyColumns, + TVector<IDqOutput::TPtr>&& outputs, + TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType, THashFunc hashFunc ) @@ -689,7 +689,7 @@ private: template<typename T = THashFunc, typename std::enable_if<std::is_same<T, TColumnShardHashV1>::value, int>::type = 0> size_t GetHashPartitionIndex(const arrow::Datum* values[], ui64 blockIndex) { HashFunc.Start(); - + for (size_t keyId = 0; keyId < KeyColumns_.size(); keyId++) { const ui32 columnIndex = KeyColumns_[keyId].Index; Y_DEBUG_ABORT_UNLESS(columnIndex < OutputWidth_); @@ -853,7 +853,7 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( auto& columnShardHashV1Proto = hashPartition.GetColumnShardHashV1(); std::size_t shardCount = columnShardHashV1Proto.GetShardCount(); - TVector<ui64> taskIdByHash{columnShardHashV1Proto.GetTaskIdByHash().begin(), columnShardHashV1Proto.GetTaskIdByHash().end()}; + TVector<ui64> taskIndexByHash{columnShardHashV1Proto.GetTaskIndexByHash().begin(), columnShardHashV1Proto.GetTaskIndexByHash().end()}; TVector<NYql::NProto::TypeIds> keyColumnTypes; keyColumnTypes.reserve(columnShardHashV1Proto.GetKeyColumnTypes().size()); @@ -869,7 +869,7 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( } return "[" + JoinSeq(",", stringNames) + "]"; }; - + const auto keyTypesToString = [](const TVector<NYql::NProto::TypeIds>& keyColumnTypes) -> TString { TVector<TString> stringNames; stringNames.reserve(keyColumnTypes.size()); @@ -880,12 +880,12 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( }; Y_ENSURE( - keyColumnTypes.size() == keyColumns.size(), - TStringBuilder{} << "Hashshuffle keycolumns and keytypes args count mismatch, types: " + keyColumnTypes.size() == keyColumns.size(), + TStringBuilder{} << "Hashshuffle keycolumns and keytypes args count mismatch, types: " << keyTypesToString(keyColumnTypes) << " for the columns: " << keyColumnsToString(keyColumns) ); - TColumnShardHashV1 columnShardHashV1 = TColumnShardHashV1(shardCount, std::move(taskIdByHash), std::move(keyColumnTypes)); + TColumnShardHashV1 columnShardHashV1 = TColumnShardHashV1(shardCount, std::move(taskIndexByHash), std::move(keyColumnTypes)); return CreateOutputPartitionConsumerImpl<TColumnShardHashV1>(std::move(outputs), std::move(keyColumns), outputType, holderFactory, minFillPercentage, std::move(columnShardHashV1), pgBuilder); } case NDqProto::TTaskOutputHashPartition::kHashV1: |