aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp1
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h12
-rw-r--r--ydb/core/kqp/executer_actor/kqp_table_resolver.cpp13
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp14
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h8
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.h1
-rw-r--r--ydb/core/protos/table_service_config.proto2
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto2
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.cpp76
11 files changed, 74 insertions, 63 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 728aeed137..f630188de5 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -648,6 +648,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.BlockChannelsMode = serviceConfig.GetBlockChannelsMode();
kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit();
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
+ kqpConfig.DefaultEnableShuffleElimination = serviceConfig.GetDefaultEnableShuffleElimination();
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
kqpConfig.EnableSnapshotIsolationRW = serviceConfig.GetEnableSnapshotIsolationRW();
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index 21badd4cf8..8d882887e0 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -311,6 +311,8 @@ private:
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();
+ bool defaultEnableShuffleElimination = TableServiceConfig.GetDefaultEnableShuffleElimination();
+
TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();
bool enableSnapshotIsolationRW = TableServiceConfig.GetEnableSnapshotIsolationRW();
@@ -345,7 +347,9 @@ private:
TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams ||
TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution ||
TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW ||
- TableServiceConfig.GetAllowMultiBroadcasts() != allowMultiBroadcasts) {
+ TableServiceConfig.GetAllowMultiBroadcasts() != allowMultiBroadcasts ||
+ TableServiceConfig.GetDefaultEnableShuffleElimination() != defaultEnableShuffleElimination
+ ) {
QueryCache->Clear();
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index bba10044f5..743ddd6e78 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -764,7 +764,7 @@ protected:
}
void HandleAbortExecution(
- NYql::NDqProto::StatusIds::StatusCode statusCode,
+ NYql::NDqProto::StatusIds::StatusCode statusCode,
const NYql::TIssues& issues,
const bool sessionSender) {
LOG_D("Got EvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode)
@@ -1698,8 +1698,8 @@ protected:
} else if (enableShuffleElimination /* save partitioning for shuffle elimination */) {
std::size_t stageInternalTaskId = 0;
- columnShardHashV1Params.TaskIdByHash = std::make_shared<TVector<ui64>>();
- columnShardHashV1Params.TaskIdByHash->resize(columnShardHashV1Params.SourceShardCount);
+ columnShardHashV1Params.TaskIndexByHash = std::make_shared<TVector<ui64>>();
+ columnShardHashV1Params.TaskIndexByHash->resize(columnShardHashV1Params.SourceShardCount);
for (auto&& pair : nodeShards) {
const auto nodeId = pair.first;
@@ -1750,7 +1750,7 @@ protected:
for (const auto& readInfo: *task.Meta.Reads) {
Y_ENSURE(hashByShardId.contains(readInfo.ShardId));
- (*columnShardHashV1Params.TaskIdByHash)[hashByShardId[readInfo.ShardId]] = stageInternalTaskId;
+ (*columnShardHashV1Params.TaskIndexByHash)[hashByShardId[readInfo.ShardId]] = stageInternalTaskId;
}
}
@@ -1759,8 +1759,8 @@ protected:
LOG_DEBUG_S(
*TlsActivationContext,
NKikimrServices::KQP_EXECUTER,
- "Stage with scan " << "[" << stageInfo.Id.TxId << ":" << stageInfo.Id.StageId << "]" << " has keys: "
- << columnShardHashV1Params.KeyTypesToString();
+ "Stage with scan " << "[" << stageInfo.Id.TxId << ":" << stageInfo.Id.StageId << "]"
+ << " has keys: " << columnShardHashV1Params.KeyTypesToString();
);
} else {
ui32 metaId = 0;
diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
index e5fbb1fa59..3c6e6a10be 100644
--- a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
@@ -173,12 +173,15 @@ private:
stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, stageInfo.Meta.TableConstInfo, operation);
- if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.find(stageInfo.Meta.TableId) == TableRequestIds.end()) {
+ if (stageInfo.Meta.TableKind == ETableKind::Olap) {
+ if (TableRequestIds.find(stageInfo.Meta.TableId) == TableRequestIds.end()) {
+ auto& entry = requestNavigate->ResultSet.emplace_back();
+ entry.TableId = stageInfo.Meta.TableId;
+ entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable;
+ }
+
TableRequestIds[stageInfo.Meta.TableId].emplace_back(pair.first);
- auto& entry = requestNavigate->ResultSet.emplace_back();
- entry.TableId = stageInfo.Meta.TableId;
- entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
- entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable;
}
auto& entry = request->ResultSet.emplace_back(std::move(stageInfo.Meta.ShardKey));
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 3ec3b2b3ee..b3bdf684fc 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -511,7 +511,7 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
}
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kMap) {
// We want to enforce sourceShardCount from map connection, cause it can be at most one map connection
- // and ColumnShardHash in Shuffle will use this parameter to shuffle on this map (same with taskIdByHash mapping)
+ // and ColumnShardHash in Shuffle will use this parameter to shuffle on this map (same with taskIndexByHash mapping)
hasMap = true;
break;
}
@@ -521,9 +521,9 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
// if it is stage, where we don't inherit parallelism.
if (enableShuffleElimination && !hasMap && !isFusedWithScanStage && stageInfo.Tasks.size() > 0 && stage.InputsSize() > 0) {
columnShardHashV1Params.SourceShardCount = stageInfo.Tasks.size();
- columnShardHashV1Params.TaskIdByHash = std::make_shared<TVector<ui64>>(columnShardHashV1Params.SourceShardCount);
+ columnShardHashV1Params.TaskIndexByHash = std::make_shared<TVector<ui64>>(columnShardHashV1Params.SourceShardCount);
for (std::size_t i = 0; i < columnShardHashV1Params.SourceShardCount; ++i) {
- (*columnShardHashV1Params.TaskIdByHash)[i] = i;
+ (*columnShardHashV1Params.TaskIndexByHash)[i] = i;
}
for (auto& input : stage.GetInputs()) {
@@ -1206,7 +1206,7 @@ void FillOutputDesc(
<< " for the columns: " << "[" << JoinSeq(",", output.KeyColumns) << "]"
);
Y_ENSURE(columnShardHashV1Params.SourceShardCount != 0, "ShardCount for ColumnShardHashV1 Shuffle can't be equal to 0");
- Y_ENSURE(columnShardHashV1Params.TaskIdByHash != nullptr, "TaskIdByHash for ColumnShardHashV1 wasn't propogated to this stage");
+ Y_ENSURE(columnShardHashV1Params.TaskIndexByHash != nullptr, "TaskIndexByHash for ColumnShardHashV1 wasn't propogated to this stage");
Y_ENSURE(columnShardHashV1Params.SourceTableKeyColumnTypes != nullptr, "SourceTableKeyColumnTypes for ColumnShardHashV1 wasn't propogated to this stage");
Y_ENSURE(
@@ -1225,9 +1225,9 @@ void FillOutputDesc(
columnTypes->Add(type.GetTypeId());
}
- auto* taskIdByHash = columnShardHashV1.MutableTaskIdByHash();
- for (std::size_t taskID: *columnShardHashV1Params.TaskIdByHash) {
- taskIdByHash->Add(taskID);
+ auto* taskIndexByHash = columnShardHashV1.MutableTaskIndexByHash();
+ for (std::size_t taskID: *columnShardHashV1Params.TaskIndexByHash) {
+ taskIndexByHash->Add(taskID);
}
break;
}
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index 8869a9ca3b..22a6f31931 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -32,7 +32,7 @@ struct TTransaction : private TMoveOnly {
struct TColumnShardHashV1Params {
ui64 SourceShardCount = 0;
std::shared_ptr<TVector<NScheme::TTypeInfo>> SourceTableKeyColumnTypes = nullptr;
- std::shared_ptr<TVector<ui64>> TaskIdByHash = nullptr; // hash belongs [0; ShardCount]
+ std::shared_ptr<TVector<ui64>> TaskIndexByHash = nullptr; // hash belongs [0; ShardCount]
TColumnShardHashV1Params DeepCopy() const {
TColumnShardHashV1Params copy;
@@ -44,10 +44,10 @@ struct TColumnShardHashV1Params {
copy.SourceTableKeyColumnTypes = nullptr;
}
- if (TaskIdByHash) {
- copy.TaskIdByHash = std::make_shared<TVector<ui64>>(*TaskIdByHash);
+ if (TaskIndexByHash) {
+ copy.TaskIndexByHash = std::make_shared<TVector<ui64>>(*TaskIndexByHash);
} else {
- copy.TaskIdByHash = nullptr;
+ copy.TaskIndexByHash = nullptr;
}
return copy;
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
index 4adbfa8616..4315c567ae 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
@@ -163,7 +163,7 @@ protected:
TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
auto maxDPhypDPTableSize = Config->MaxDPHypDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPHypDPTableSize);
auto optLevel = Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel);
- bool enableShuffleElimination = KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(false);;
+ bool enableShuffleElimination = KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(Config->DefaultEnableShuffleElimination);
auto providerCtx = TKqpProviderContext(KqpCtx, optLevel);
auto opt = std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(providerCtx, maxDPhypDPTableSize, ctx, enableShuffleElimination));
TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, optLevel,
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h
index 1b6039be4b..74d0a34720 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.h
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.h
@@ -182,6 +182,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableAntlr4Parser = false;
bool EnableSnapshotIsolationRW = false;
bool AllowMultiBroadcasts = false;
+ bool DefaultEnableShuffleElimination = false;
void SetDefaultEnabledSpillingNodes(const TString& node);
ui64 GetEnabledSpillingNodes() const;
diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto
index d98c3338ec..9fa7054841 100644
--- a/ydb/core/protos/table_service_config.proto
+++ b/ydb/core/protos/table_service_config.proto
@@ -363,4 +363,6 @@ message TTableServiceConfig {
optional bool EnableBatchUpdates = 78 [default = false];
optional bool AllowMultiBroadcasts = 79 [default = false];
+
+ optional bool DefaultEnableShuffleElimination = 80 [default = false];
};
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index eb80e01df6..f48398c2a6 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -137,7 +137,7 @@ message THashV1 {
message TColumnShardHashV1 {
uint64 ShardCount = 1;
- repeated uint64 TaskIdByHash = 2;
+ repeated uint64 TaskIndexByHash = 2;
repeated uint32 KeyColumnTypes = 3;
}
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
index 165d2bf153..ce828f4867 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
@@ -109,11 +109,11 @@ struct THashV1 {
struct TColumnShardHashV1 {
TColumnShardHashV1(
std::size_t shardCount,
- TVector<ui64> taskIdByHash,
+ TVector<ui64> taskIndexByHash,
TVector<NYql::NProto::TypeIds> keyColumnTypes
)
: ShardCount(shardCount)
- , TaskIdByHash(std::move(taskIdByHash))
+ , TaskIndexByHash(std::move(taskIndexByHash))
, KeyColumnTypes(std::move(keyColumnTypes))
, HashCalcer(0)
{}
@@ -131,7 +131,7 @@ struct TColumnShardHashV1 {
break;
}
case NYql::NProto::Int8: {
- auto value = uv.template Get<uint8_t>();
+ auto value = uv.template Get<uint8_t>();
HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
break;
}
@@ -140,63 +140,63 @@ struct TColumnShardHashV1 {
HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
break;
}
- case NYql::NProto::Int16: {
- auto value = uv.template Get<int16_t>();
- HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
+ case NYql::NProto::Int16: {
+ auto value = uv.template Get<int16_t>();
+ HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
break;
}
case NYql::NProto::Date:
case NYql::NProto::TzDate:
case NYql::NProto::Uint16: {
- auto value = uv.template Get<uint16_t>();
- HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
+ auto value = uv.template Get<uint16_t>();
+ HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
break;
}
- case NYql::NProto::Int32:
+ case NYql::NProto::Int32:
case NYql::NProto::Date32:
case NYql::NProto::TzDate32: {
- auto value = uv.template Get<int32_t>();
- HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
+ auto value = uv.template Get<int32_t>();
+ HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
break;
}
case NYql::NProto::Uint32:
case NYql::NProto::Datetime:
case NYql::NProto::TzDatetime: {
- auto value = uv.template Get<uint32_t>();
- HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
- break;
+ auto value = uv.template Get<uint32_t>();
+ HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
+ break;
}
case NYql::NProto::Int64:
case NYql::NProto::Interval:
case NYql::NProto::Interval64:
case NYql::NProto::Datetime64:
case NYql::NProto::Timestamp64: {
- auto value = uv.template Get<i64>();
- HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
- break;
+ auto value = uv.template Get<i64>();
+ HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
+ break;
}
- case NYql::NProto::Uint64:
+ case NYql::NProto::Uint64:
case NYql::NProto::Timestamp:
case NYql::NProto::TzTimestamp: {
- auto value = uv.template Get<ui64>();
+ auto value = uv.template Get<ui64>();
HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
- break;
+ break;
}
case NYql::NProto::Double: {
- auto value = uv.template Get<double>();
- HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
+ auto value = uv.template Get<double>();
+ HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
break;
}
case NYql::NProto::Float: {
- auto value = uv.template Get<float>();
- HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
- break;
+ auto value = uv.template Get<float>();
+ HashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
+ break;
}
case NYql::NProto::String:
case NYql::NProto::Utf8:{
auto value = uv.AsStringRef();
HashCalcer.Update(reinterpret_cast<const ui8*>(value.Data()), value.Size());
- break;
+ break;
}
default: {
Y_ENSURE(false, TStringBuilder{} << "HashFunc for HashShuffle isn't supported with such type: " << static_cast<ui64>(KeyColumnTypes[keyIdx]));
@@ -208,11 +208,11 @@ struct TColumnShardHashV1 {
ui64 Finish() {
ui64 hash = HashCalcer.Finish();
hash = std::min<ui32>(hash / (Max<ui64>() / ShardCount), ShardCount - 1);
- return TaskIdByHash[hash];
+ return TaskIndexByHash[hash];
}
std::size_t ShardCount;
- TVector<ui64> TaskIdByHash;
+ TVector<ui64> TaskIndexByHash;
TVector<NYql::NProto::TypeIds> KeyColumnTypes;
private:
NArrow::NHash::NXX64::TStreamStringHashCalcer HashCalcer;
@@ -247,8 +247,8 @@ protected:
}
public:
TDqOutputHashPartitionConsumer(
- TVector<IDqOutput::TPtr>&& outputs,
- TVector<TColumnInfo>&& keyColumns,
+ TVector<IDqOutput::TPtr>&& outputs,
+ TVector<TColumnInfo>&& keyColumns,
TMaybe<ui32> outputWidth,
THashFunc hashFunc
)
@@ -380,8 +380,8 @@ template <typename THashFunc>
class TDqOutputHashPartitionConsumerScalar : public IDqOutputConsumer {
public:
TDqOutputHashPartitionConsumerScalar(
- TVector<IDqOutput::TPtr>&& outputs,
- TVector<TColumnInfo>&& keyColumns,
+ TVector<IDqOutput::TPtr>&& outputs,
+ TVector<TColumnInfo>&& keyColumns,
const NKikimr::NMiniKQL::TType* outputType,
THashFunc hashFunc
)
@@ -689,7 +689,7 @@ private:
template<typename T = THashFunc, typename std::enable_if<std::is_same<T, TColumnShardHashV1>::value, int>::type = 0>
size_t GetHashPartitionIndex(const arrow::Datum* values[], ui64 blockIndex) {
HashFunc.Start();
-
+
for (size_t keyId = 0; keyId < KeyColumns_.size(); keyId++) {
const ui32 columnIndex = KeyColumns_[keyId].Index;
Y_DEBUG_ABORT_UNLESS(columnIndex < OutputWidth_);
@@ -853,7 +853,7 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
auto& columnShardHashV1Proto = hashPartition.GetColumnShardHashV1();
std::size_t shardCount = columnShardHashV1Proto.GetShardCount();
- TVector<ui64> taskIdByHash{columnShardHashV1Proto.GetTaskIdByHash().begin(), columnShardHashV1Proto.GetTaskIdByHash().end()};
+ TVector<ui64> taskIndexByHash{columnShardHashV1Proto.GetTaskIndexByHash().begin(), columnShardHashV1Proto.GetTaskIndexByHash().end()};
TVector<NYql::NProto::TypeIds> keyColumnTypes;
keyColumnTypes.reserve(columnShardHashV1Proto.GetKeyColumnTypes().size());
@@ -869,7 +869,7 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
}
return "[" + JoinSeq(",", stringNames) + "]";
};
-
+
const auto keyTypesToString = [](const TVector<NYql::NProto::TypeIds>& keyColumnTypes) -> TString {
TVector<TString> stringNames;
stringNames.reserve(keyColumnTypes.size());
@@ -880,12 +880,12 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
};
Y_ENSURE(
- keyColumnTypes.size() == keyColumns.size(),
- TStringBuilder{} << "Hashshuffle keycolumns and keytypes args count mismatch, types: "
+ keyColumnTypes.size() == keyColumns.size(),
+ TStringBuilder{} << "Hashshuffle keycolumns and keytypes args count mismatch, types: "
<< keyTypesToString(keyColumnTypes) << " for the columns: " << keyColumnsToString(keyColumns)
);
- TColumnShardHashV1 columnShardHashV1 = TColumnShardHashV1(shardCount, std::move(taskIdByHash), std::move(keyColumnTypes));
+ TColumnShardHashV1 columnShardHashV1 = TColumnShardHashV1(shardCount, std::move(taskIndexByHash), std::move(keyColumnTypes));
return CreateOutputPartitionConsumerImpl<TColumnShardHashV1>(std::move(outputs), std::move(keyColumns), outputType, holderFactory, minFillPercentage, std::move(columnShardHashV1), pgBuilder);
}
case NDqProto::TTaskOutputHashPartition::kHashV1: