diff options
author | whcrc <whcrc@yandex-team.ru> | 2022-07-05 13:33:20 +0300 |
---|---|---|
committer | whcrc <whcrc@yandex-team.ru> | 2022-07-05 13:33:20 +0300 |
commit | 1c667dec81484d589e639467ef4eda15a2dc4413 (patch) | |
tree | d0b4cfebdfd15da1ff8778c2404080baa3abb5f4 | |
parent | 871d290e1321d5a6c7d93774ae1091d50f322d75 (diff) | |
download | ydb-1c667dec81484d589e639467ef4eda15a2dc4413.tar.gz |
YQL-14757: DqCnHashShuffle accepts complex types
ref:84742d44bb04ba1d766cb420e7d694cfd2b31d87
-rw-r--r-- | ydb/core/kqp/executer/kqp_partition_helper.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_tasks_runner.cpp | 10 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_columns_resolve.cpp | 5 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_columns_resolve.h | 14 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_input_producer.cpp | 5 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_consumer.cpp | 53 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_consumer.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.cpp | 6 | ||||
-rw-r--r-- | ydb/library/yql/dq/type_ann/dq_type_ann.cpp | 5 |
9 files changed, 46 insertions, 59 deletions
diff --git a/ydb/core/kqp/executer/kqp_partition_helper.cpp b/ydb/core/kqp/executer/kqp_partition_helper.cpp index 28c715e31b1..9a58274eb34 100644 --- a/ydb/core/kqp/executer/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer/kqp_partition_helper.cpp @@ -112,8 +112,8 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix(const NDq:: break; } - keyFullType.push_back(columnInfo->TypeId); - keyPrefixType.push_back(columnInfo->TypeId); + keyFullType.push_back(columnInfo->GetTypeId()); + keyPrefixType.push_back(columnInfo->GetTypeId()); keyPrefixIndices.push_back(columnInfo->Index); } diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp index 70b1038e6cd..2d555fbc006 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp @@ -26,10 +26,16 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp { switch (outputDesc.GetTypeCase()) { case NDqProto::TTaskOutput::kRangePartition: { - TVector<NUdf::TDataTypeId> keyColumnTypes; + TVector<NUdf::TDataTypeId> keyColumnTypeIds; + keyColumnTypeIds.reserve(outputDesc.GetRangePartition().GetKeyColumns().size()); + TVector<TType*> keyColumnTypes; TVector<ui32> keyColumnIndices; GetColumnsInfo(type, outputDesc.GetRangePartition().GetKeyColumns(), keyColumnTypes, keyColumnIndices); YQL_ENSURE(!keyColumnTypes.empty()); + std::transform(keyColumnTypes.begin(), keyColumnTypes.end(), back_inserter(keyColumnTypeIds), [](const auto& tyPtr) { + YQL_ENSURE(tyPtr->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data); + return static_cast<NKikimr::NMiniKQL::TDataType&>(*tyPtr).GetSchemeType(); + }); TVector<TKqpRangePartition> partitions; partitions.reserve(outputDesc.GetRangePartition().PartitionsSize()); @@ -45,7 +51,7 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp } return CreateOutputRangePartitionConsumer(std::move(outputs), std::move(partitions), - std::move(keyColumnTypes), std::move(keyColumnIndices), typeEnv); + std::move(keyColumnTypeIds), std::move(keyColumnIndices), typeEnv); } case NDqProto::TTaskOutput::kEffects: { diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp index 850f6108d9e..a5b1b4babcd 100644 --- a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp +++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp @@ -21,10 +21,7 @@ TMaybe<TColumnInfo> FindColumnInfo(const NKikimr::NMiniKQL::TType* type, TString memberType = static_cast<TOptionalType&>(*memberType).GetItemType(); } - YQL_ENSURE(memberType->GetKind() == TType::EKind::Data); - auto columnType = static_cast<TDataType&>(*memberType).GetSchemeType(); - - return TColumnInfo{TString(columnName), *columnIndex, columnType}; + return TColumnInfo{TString(columnName), *columnIndex, memberType}; } TColumnInfo GetColumnInfo(const TType* type, TStringBuf columnName) { diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.h b/ydb/library/yql/dq/runtime/dq_columns_resolve.h index b61d14f4778..de333872004 100644 --- a/ydb/library/yql/dq/runtime/dq_columns_resolve.h +++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/library/yql/utils/yql_panic.h> #include <ydb/library/yql/public/udf/udf_data_type.h> #include <ydb/library/yql/minikql/mkql_node.h> // #include <ydb/library/yql/dq/proto/dq_tasks.pb.h> @@ -9,7 +10,14 @@ namespace NYql::NDq { struct TColumnInfo { TString Name; ui32 Index; - NUdf::TDataTypeId TypeId; + NKikimr::NMiniKQL::TType* Type; + + TColumnInfo(TString name, ui32 index, NKikimr::NMiniKQL::TType* type) : Name(name), Index(index), Type(type) {}; + + NUdf::TDataTypeId GetTypeId() const { + YQL_ENSURE(Type->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data); + return static_cast<NKikimr::NMiniKQL::TDataType&>(*Type).GetSchemeType(); + } }; struct TSortColumnInfo : public TColumnInfo { @@ -26,7 +34,7 @@ TColumnInfo GetColumnInfo(const NKikimr::NMiniKQL::TType* type, TStringBuf colum template<typename TList> void GetColumnsInfo(const NKikimr::NMiniKQL::TType* type, const TList& columns, - TVector<NUdf::TDataTypeId>& columnTypes, TVector<ui32>& columnIndices) + TVector<NKikimr::NMiniKQL::TType*>& columnTypes, TVector<ui32>& columnIndices) { columnTypes.clear(); columnIndices.clear(); @@ -36,7 +44,7 @@ void GetColumnsInfo(const NKikimr::NMiniKQL::TType* type, const TList& columns, for (auto& column : columns) { auto columnInfo = GetColumnInfo(type, column); - columnTypes.push_back(columnInfo.TypeId); + columnTypes.push_back(columnInfo.Type); columnIndices.push_back(columnInfo.Index); } } diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp index 3fe4830a08a..594e2495e3f 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp @@ -71,8 +71,9 @@ public: CurrentItemIndexes.resize(InputsSize, 0); Finished.resize(InputsSize, false); for (auto& sortCol : SortCols) { - TMaybe<EDataSlot> maybeDataSlot = FindDataSlot(sortCol.TypeId); - YQL_ENSURE(maybeDataSlot, "Trying to compare columns with unknown type id: " << sortCol.TypeId); + const auto typeId = sortCol.GetTypeId(); + TMaybe<EDataSlot> maybeDataSlot = FindDataSlot(typeId); + YQL_ENSURE(maybeDataSlot, "Trying to compare columns with unknown type id: " << typeId); YQL_ENSURE(IsTypeSupportedInMergeCn(*maybeDataSlot), "Column '" << sortCol.Name << "' has unsupported type for Merge connection: " << *maybeDataSlot); SortColTypes[sortCol.Index] = *maybeDataSlot; diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp index 4b612b4de46..7f182c3e426 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp @@ -82,28 +82,16 @@ private: class TDqOutputHashPartitionConsumer : public IDqOutputConsumer { public: - TDqOutputHashPartitionConsumer(const TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs, - TVector<TDataTypeId>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices) - : TypeEnv(typeEnv) - , Outputs(std::move(outputs)) - , KeyColumnTypes(std::move(keyColumnTypes)) + TDqOutputHashPartitionConsumer(TVector<IDqOutput::TPtr>&& outputs, + TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices) + : Outputs(std::move(outputs)) , KeyColumnIndices(std::move(keyColumnIndices)) + , ValueHashers(KeyColumnIndices.size(), NUdf::IHash::TPtr{}) { - MKQL_ENSURE_S(KeyColumnTypes.size() == KeyColumnIndices.size()); - - for (size_t keyId = 0; keyId < KeyColumnTypes.size(); keyId++) { - NMiniKQL::TType* type; - if (KeyColumnTypes[keyId] == NUdf::TDataType<NUdf::TDecimal>::Id) { - type = NMiniKQL::TDataDecimalType::Create(22, 9, TypeEnv); - } else { - type = NMiniKQL::TDataType::Create(KeyColumnTypes[keyId], TypeEnv); - } - bool isTuple; - bool encoded; - bool useIHash; - GetDictionaryKeyTypes(type, KeyTypes, isTuple, encoded, useIHash); - - ValueHashers.emplace_back(KeyTypes, isTuple, useIHash ? MakeHashImpl(type) : nullptr); + MKQL_ENSURE_S(keyColumnTypes.size() == KeyColumnIndices.size()); + + for (auto i = 0U; i < keyColumnTypes.size(); i++) { + ValueHashers[i] = MakeHashImpl(keyColumnTypes[i]); } } @@ -144,29 +132,13 @@ private: if (!value.HasValue()) { return 0; } - - #define APPLY_HASHER(type, layout) \ - case TDataType<type>::Id: return hasher(value); - - auto& hasher = ValueHashers[keyId]; - switch (KeyColumnTypes[keyId]) { - KNOWN_FIXED_VALUE_TYPES(APPLY_HASHER) - case TDataType<TDecimal>::Id: - return GetValueHash<EDataSlot::Decimal>(value); - } - - #undef APPLY_HASHER - - return GetValueHash<EDataSlot::String>(value); + return ValueHashers[keyId]->Hash(value); } private: - const TTypeEnvironment& TypeEnv; TVector<IDqOutput::TPtr> Outputs; - TVector<TDataTypeId> KeyColumnTypes; TVector<ui32> KeyColumnIndices; - TVector<TValueHasher> ValueHashers; - TKeyTypes KeyTypes; + TVector<NUdf::IHash::TPtr> ValueHashers; }; class TDqOutputBroadcastConsumer : public IDqOutputConsumer { @@ -213,10 +185,9 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output) { IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( TVector<IDqOutput::TPtr>&& outputs, - TVector<NUdf::TDataTypeId>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, - const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv) + TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices) { - return MakeIntrusive<TDqOutputHashPartitionConsumer>(typeEnv, std::move(outputs), std::move(keyColumnTypes), + return MakeIntrusive<TDqOutputHashPartitionConsumer>(std::move(outputs), std::move(keyColumnTypes), std::move(keyColumnIndices)); } diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h index 1438c15c23b..2b717bb4171 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.h +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h @@ -29,8 +29,7 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output); IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( TVector<IDqOutput::TPtr>&& outputs, - TVector<NUdf::TDataTypeId>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, - const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv); + TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices); IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 42de67823fe..9a1f02fbc18 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -147,7 +147,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con } IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, - const NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs) + const NMiniKQL::TTypeEnvironment& /*typeEnv*/, TVector<IDqOutput::TPtr>&& outputs) { switch (outputDesc.GetTypeCase()) { case NDqProto::TTaskOutput::kSink: @@ -159,7 +159,7 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu } case NDqProto::TTaskOutput::kHashPartition: { - TVector<NUdf::TDataTypeId> keyColumnTypes; + TVector<TType*> keyColumnTypes; TVector<ui32> keyColumnIndices; GetColumnsInfo(type, outputDesc.GetHashPartition().GetKeyColumns(), keyColumnTypes, keyColumnIndices); YQL_ENSURE(!keyColumnTypes.empty()); @@ -171,7 +171,7 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu } return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumnTypes), - std::move(keyColumnIndices), typeEnv); + std::move(keyColumnIndices)); } case NDqProto::TTaskOutput::kBroadcast: { diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index aea96d65b2f..5f46d1bb537 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -587,6 +587,11 @@ TStatus AnnotateDqCnHashShuffle(const TExprNode::TPtr& input, TExprContext& ctx) TStringBuilder() << "Missing key column: " << column->Content())); return TStatus::Error; } + if (const auto ty = structType->FindItemType(column->Content()); !ty->IsHashable()) { + ctx.AddError(TIssue(ctx.GetPosition(column->Pos()), + TStringBuilder() << "Non-hashable key column: " << column->Content())); + return TStatus::Error; + } } input->SetTypeAnn(outputType); |