aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@yandex-team.ru>2022-07-05 13:33:20 +0300
committerwhcrc <whcrc@yandex-team.ru>2022-07-05 13:33:20 +0300
commit1c667dec81484d589e639467ef4eda15a2dc4413 (patch)
treed0b4cfebdfd15da1ff8778c2404080baa3abb5f4
parent871d290e1321d5a6c7d93774ae1091d50f322d75 (diff)
downloadydb-1c667dec81484d589e639467ef4eda15a2dc4413.tar.gz
YQL-14757: DqCnHashShuffle accepts complex types
ref:84742d44bb04ba1d766cb420e7d694cfd2b31d87
-rw-r--r--ydb/core/kqp/executer/kqp_partition_helper.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.cpp10
-rw-r--r--ydb/library/yql/dq/runtime/dq_columns_resolve.cpp5
-rw-r--r--ydb/library/yql/dq/runtime/dq_columns_resolve.h14
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_producer.cpp5
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.cpp53
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.h3
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp6
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.cpp5
9 files changed, 46 insertions, 59 deletions
diff --git a/ydb/core/kqp/executer/kqp_partition_helper.cpp b/ydb/core/kqp/executer/kqp_partition_helper.cpp
index 28c715e31b1..9a58274eb34 100644
--- a/ydb/core/kqp/executer/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer/kqp_partition_helper.cpp
@@ -112,8 +112,8 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix(const NDq::
break;
}
- keyFullType.push_back(columnInfo->TypeId);
- keyPrefixType.push_back(columnInfo->TypeId);
+ keyFullType.push_back(columnInfo->GetTypeId());
+ keyPrefixType.push_back(columnInfo->GetTypeId());
keyPrefixIndices.push_back(columnInfo->Index);
}
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
index 70b1038e6cd..2d555fbc006 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
@@ -26,10 +26,16 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp
{
switch (outputDesc.GetTypeCase()) {
case NDqProto::TTaskOutput::kRangePartition: {
- TVector<NUdf::TDataTypeId> keyColumnTypes;
+ TVector<NUdf::TDataTypeId> keyColumnTypeIds;
+ keyColumnTypeIds.reserve(outputDesc.GetRangePartition().GetKeyColumns().size());
+ TVector<TType*> keyColumnTypes;
TVector<ui32> keyColumnIndices;
GetColumnsInfo(type, outputDesc.GetRangePartition().GetKeyColumns(), keyColumnTypes, keyColumnIndices);
YQL_ENSURE(!keyColumnTypes.empty());
+ std::transform(keyColumnTypes.begin(), keyColumnTypes.end(), back_inserter(keyColumnTypeIds), [](const auto& tyPtr) {
+ YQL_ENSURE(tyPtr->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data);
+ return static_cast<NKikimr::NMiniKQL::TDataType&>(*tyPtr).GetSchemeType();
+ });
TVector<TKqpRangePartition> partitions;
partitions.reserve(outputDesc.GetRangePartition().PartitionsSize());
@@ -45,7 +51,7 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp
}
return CreateOutputRangePartitionConsumer(std::move(outputs), std::move(partitions),
- std::move(keyColumnTypes), std::move(keyColumnIndices), typeEnv);
+ std::move(keyColumnTypeIds), std::move(keyColumnIndices), typeEnv);
}
case NDqProto::TTaskOutput::kEffects: {
diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
index 850f6108d9e..a5b1b4babcd 100644
--- a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
+++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
@@ -21,10 +21,7 @@ TMaybe<TColumnInfo> FindColumnInfo(const NKikimr::NMiniKQL::TType* type, TString
memberType = static_cast<TOptionalType&>(*memberType).GetItemType();
}
- YQL_ENSURE(memberType->GetKind() == TType::EKind::Data);
- auto columnType = static_cast<TDataType&>(*memberType).GetSchemeType();
-
- return TColumnInfo{TString(columnName), *columnIndex, columnType};
+ return TColumnInfo{TString(columnName), *columnIndex, memberType};
}
TColumnInfo GetColumnInfo(const TType* type, TStringBuf columnName) {
diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.h b/ydb/library/yql/dq/runtime/dq_columns_resolve.h
index b61d14f4778..de333872004 100644
--- a/ydb/library/yql/dq/runtime/dq_columns_resolve.h
+++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/library/yql/utils/yql_panic.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/minikql/mkql_node.h>
// #include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
@@ -9,7 +10,14 @@ namespace NYql::NDq {
struct TColumnInfo {
TString Name;
ui32 Index;
- NUdf::TDataTypeId TypeId;
+ NKikimr::NMiniKQL::TType* Type;
+
+ TColumnInfo(TString name, ui32 index, NKikimr::NMiniKQL::TType* type) : Name(name), Index(index), Type(type) {};
+
+ NUdf::TDataTypeId GetTypeId() const {
+ YQL_ENSURE(Type->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data);
+ return static_cast<NKikimr::NMiniKQL::TDataType&>(*Type).GetSchemeType();
+ }
};
struct TSortColumnInfo : public TColumnInfo {
@@ -26,7 +34,7 @@ TColumnInfo GetColumnInfo(const NKikimr::NMiniKQL::TType* type, TStringBuf colum
template<typename TList>
void GetColumnsInfo(const NKikimr::NMiniKQL::TType* type, const TList& columns,
- TVector<NUdf::TDataTypeId>& columnTypes, TVector<ui32>& columnIndices)
+ TVector<NKikimr::NMiniKQL::TType*>& columnTypes, TVector<ui32>& columnIndices)
{
columnTypes.clear();
columnIndices.clear();
@@ -36,7 +44,7 @@ void GetColumnsInfo(const NKikimr::NMiniKQL::TType* type, const TList& columns,
for (auto& column : columns) {
auto columnInfo = GetColumnInfo(type, column);
- columnTypes.push_back(columnInfo.TypeId);
+ columnTypes.push_back(columnInfo.Type);
columnIndices.push_back(columnInfo.Index);
}
}
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
index 3fe4830a08a..594e2495e3f 100644
--- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
@@ -71,8 +71,9 @@ public:
CurrentItemIndexes.resize(InputsSize, 0);
Finished.resize(InputsSize, false);
for (auto& sortCol : SortCols) {
- TMaybe<EDataSlot> maybeDataSlot = FindDataSlot(sortCol.TypeId);
- YQL_ENSURE(maybeDataSlot, "Trying to compare columns with unknown type id: " << sortCol.TypeId);
+ const auto typeId = sortCol.GetTypeId();
+ TMaybe<EDataSlot> maybeDataSlot = FindDataSlot(typeId);
+ YQL_ENSURE(maybeDataSlot, "Trying to compare columns with unknown type id: " << typeId);
YQL_ENSURE(IsTypeSupportedInMergeCn(*maybeDataSlot), "Column '" << sortCol.Name <<
"' has unsupported type for Merge connection: " << *maybeDataSlot);
SortColTypes[sortCol.Index] = *maybeDataSlot;
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
index 4b612b4de46..7f182c3e426 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
@@ -82,28 +82,16 @@ private:
class TDqOutputHashPartitionConsumer : public IDqOutputConsumer {
public:
- TDqOutputHashPartitionConsumer(const TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs,
- TVector<TDataTypeId>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices)
- : TypeEnv(typeEnv)
- , Outputs(std::move(outputs))
- , KeyColumnTypes(std::move(keyColumnTypes))
+ TDqOutputHashPartitionConsumer(TVector<IDqOutput::TPtr>&& outputs,
+ TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices)
+ : Outputs(std::move(outputs))
, KeyColumnIndices(std::move(keyColumnIndices))
+ , ValueHashers(KeyColumnIndices.size(), NUdf::IHash::TPtr{})
{
- MKQL_ENSURE_S(KeyColumnTypes.size() == KeyColumnIndices.size());
-
- for (size_t keyId = 0; keyId < KeyColumnTypes.size(); keyId++) {
- NMiniKQL::TType* type;
- if (KeyColumnTypes[keyId] == NUdf::TDataType<NUdf::TDecimal>::Id) {
- type = NMiniKQL::TDataDecimalType::Create(22, 9, TypeEnv);
- } else {
- type = NMiniKQL::TDataType::Create(KeyColumnTypes[keyId], TypeEnv);
- }
- bool isTuple;
- bool encoded;
- bool useIHash;
- GetDictionaryKeyTypes(type, KeyTypes, isTuple, encoded, useIHash);
-
- ValueHashers.emplace_back(KeyTypes, isTuple, useIHash ? MakeHashImpl(type) : nullptr);
+ MKQL_ENSURE_S(keyColumnTypes.size() == KeyColumnIndices.size());
+
+ for (auto i = 0U; i < keyColumnTypes.size(); i++) {
+ ValueHashers[i] = MakeHashImpl(keyColumnTypes[i]);
}
}
@@ -144,29 +132,13 @@ private:
if (!value.HasValue()) {
return 0;
}
-
- #define APPLY_HASHER(type, layout) \
- case TDataType<type>::Id: return hasher(value);
-
- auto& hasher = ValueHashers[keyId];
- switch (KeyColumnTypes[keyId]) {
- KNOWN_FIXED_VALUE_TYPES(APPLY_HASHER)
- case TDataType<TDecimal>::Id:
- return GetValueHash<EDataSlot::Decimal>(value);
- }
-
- #undef APPLY_HASHER
-
- return GetValueHash<EDataSlot::String>(value);
+ return ValueHashers[keyId]->Hash(value);
}
private:
- const TTypeEnvironment& TypeEnv;
TVector<IDqOutput::TPtr> Outputs;
- TVector<TDataTypeId> KeyColumnTypes;
TVector<ui32> KeyColumnIndices;
- TVector<TValueHasher> ValueHashers;
- TKeyTypes KeyTypes;
+ TVector<NUdf::IHash::TPtr> ValueHashers;
};
class TDqOutputBroadcastConsumer : public IDqOutputConsumer {
@@ -213,10 +185,9 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output) {
IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
TVector<IDqOutput::TPtr>&& outputs,
- TVector<NUdf::TDataTypeId>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices,
- const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv)
+ TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices)
{
- return MakeIntrusive<TDqOutputHashPartitionConsumer>(typeEnv, std::move(outputs), std::move(keyColumnTypes),
+ return MakeIntrusive<TDqOutputHashPartitionConsumer>(std::move(outputs), std::move(keyColumnTypes),
std::move(keyColumnIndices));
}
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h
index 1438c15c23b..2b717bb4171 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.h
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h
@@ -29,8 +29,7 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output);
IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
TVector<IDqOutput::TPtr>&& outputs,
- TVector<NUdf::TDataTypeId>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices,
- const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv);
+ TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices);
IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs);
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 42de67823fe..9a1f02fbc18 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -147,7 +147,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con
}
IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type,
- const NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs)
+ const NMiniKQL::TTypeEnvironment& /*typeEnv*/, TVector<IDqOutput::TPtr>&& outputs)
{
switch (outputDesc.GetTypeCase()) {
case NDqProto::TTaskOutput::kSink:
@@ -159,7 +159,7 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu
}
case NDqProto::TTaskOutput::kHashPartition: {
- TVector<NUdf::TDataTypeId> keyColumnTypes;
+ TVector<TType*> keyColumnTypes;
TVector<ui32> keyColumnIndices;
GetColumnsInfo(type, outputDesc.GetHashPartition().GetKeyColumns(), keyColumnTypes, keyColumnIndices);
YQL_ENSURE(!keyColumnTypes.empty());
@@ -171,7 +171,7 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu
}
return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumnTypes),
- std::move(keyColumnIndices), typeEnv);
+ std::move(keyColumnIndices));
}
case NDqProto::TTaskOutput::kBroadcast: {
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
index aea96d65b2f..5f46d1bb537 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
@@ -587,6 +587,11 @@ TStatus AnnotateDqCnHashShuffle(const TExprNode::TPtr& input, TExprContext& ctx)
TStringBuilder() << "Missing key column: " << column->Content()));
return TStatus::Error;
}
+ if (const auto ty = structType->FindItemType(column->Content()); !ty->IsHashable()) {
+ ctx.AddError(TIssue(ctx.GetPosition(column->Pos()),
+ TStringBuilder() << "Non-hashable key column: " << column->Content()));
+ return TStatus::Error;
+ }
}
input->SetTypeAnn(outputType);