diff options
author | aneporada <aneporada@ydb.tech> | 2023-07-31 12:32:50 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-07-31 12:32:50 +0300 |
commit | 8c2a3158362287c39c3360a751aaafcb98d5e520 (patch) | |
tree | 56db941c130f99c5374f8ce7035332e13e524cea | |
parent | 7107a9b540de50846e4c8fc0908cca7dd0898dac (diff) | |
download | ydb-8c2a3158362287c39c3360a751aaafcb98d5e520.tar.gz |
Support blocks in DqCnHashShuffle connection. Fix block DqCnMerge with equal elements
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_tasks_runner.cpp | 22 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_tasks_runner.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_columns_resolve.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_columns_resolve.h | 17 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_input_producer.cpp | 18 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_consumer.cpp | 392 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_consumer.h | 5 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.cpp | 29 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.h | 11 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_type_builder.h | 12 |
12 files changed, 432 insertions, 83 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h index b32537d951..a555f7c286 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h @@ -23,9 +23,10 @@ public: IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<IDqOutput::TPtr>&& outputs) const override { - return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, std::move(outputs)); + return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs)); } IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override { diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp index 08eda24aa0..b04d62030e 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp @@ -22,22 +22,24 @@ using namespace NYql; using namespace NDq; IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const TType* type, - NUdf::IApplyContext* applyCtx, const TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs) + NUdf::IApplyContext* applyCtx, const TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TVector<IDqOutput::TPtr>&& outputs) { switch (outputDesc.GetTypeCase()) { case NDqProto::TTaskOutput::kRangePartition: { TVector<NScheme::TTypeInfo> keyColumnTypeInfos; keyColumnTypeInfos.reserve(outputDesc.GetRangePartition().GetKeyColumns().size()); - TVector<TType*> keyColumnTypes; TVector<ui32> keyColumnIndices; - GetColumnsInfo(type, outputDesc.GetRangePartition().GetKeyColumns(), keyColumnTypes, keyColumnIndices); - YQL_ENSURE(!keyColumnTypes.empty()); - std::transform(keyColumnTypes.begin(), keyColumnTypes.end(), back_inserter(keyColumnTypeInfos), [](const auto& tyPtr) { + TVector<TColumnInfo> keyColumns; + GetColumnsInfo(type, outputDesc.GetRangePartition().GetKeyColumns(), keyColumns); + YQL_ENSURE(!keyColumns.empty()); + for (auto& info : keyColumns) { // TODO: support pg types - YQL_ENSURE(tyPtr->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data); - auto dataTypeId = static_cast<NKikimr::NMiniKQL::TDataType&>(*tyPtr).GetSchemeType(); - return NScheme::TTypeInfo((NScheme::TTypeId)dataTypeId); - }); + YQL_ENSURE(info.Type->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data); + auto dataTypeId = static_cast<NKikimr::NMiniKQL::TDataType&>(*info.Type).GetSchemeType(); + keyColumnTypeInfos.emplace_back(NScheme::TTypeInfo((NScheme::TTypeId)dataTypeId)); + keyColumnIndices.emplace_back(info.Index); + } TVector<TKqpRangePartition> partitions; partitions.reserve(outputDesc.GetRangePartition().PartitionsSize()); @@ -61,7 +63,7 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp } default: { - return DqBuildOutputConsumer(outputDesc, type, typeEnv, std::move(outputs)); + return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs)); } } } diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h index edb510e939..d9be8de480 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.h +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h @@ -9,6 +9,7 @@ namespace NKqp { NYql::NDq::IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NYql::NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<NYql::NDq::IDqOutput::TPtr>&& outputs); TIntrusivePtr<NYql::NDq::IDqTaskRunner> CreateKqpTaskRunner(const NYql::NDq::TDqTaskRunnerContext& execCtx, diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 1315889234..14ac6047a4 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -1098,9 +1098,10 @@ class TKqpTaskRunnerExecutionContext : public NDq::IDqTaskRunnerExecutionContext public: NDq::IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<NDq::IDqOutput::TPtr>&& outputs) const override { - return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, std::move(outputs)); + return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs)); } NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */) const override { diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp index 70ba88e358..6d6f248290 100644 --- a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp +++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp @@ -16,7 +16,7 @@ TMaybe<TColumnInfo> FindColumnInfo(const NKikimr::NMiniKQL::TType* type, TString YQL_ENSURE(idx < multiType.GetElementsCount(), "Invalid column index"); memberType = multiType.GetElementType(idx); if (memberType->IsBlock()) { - auto blockType = static_cast<const TBlockType*>(type); + auto blockType = static_cast<const TBlockType*>(memberType); isScalar = blockType->GetShape() == TBlockType::EShape::Scalar; memberType = blockType->GetItemType(); } diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.h b/ydb/library/yql/dq/runtime/dq_columns_resolve.h index f46e4e0eb0..f9e3e504a0 100644 --- a/ydb/library/yql/dq/runtime/dq_columns_resolve.h +++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.h @@ -45,28 +45,21 @@ TColumnInfo GetColumnInfo(const NKikimr::NMiniKQL::TType* type, TStringBuf colum template<typename TList> void GetColumnsInfo(const NKikimr::NMiniKQL::TType* type, const TList& columns, - TVector<NKikimr::NMiniKQL::TType*>& columnTypes, TVector<ui32>& columnIndices) + TVector<TColumnInfo>& result) { - columnTypes.clear(); - columnIndices.clear(); - - columnTypes.reserve(columns.size()); - columnIndices.reserve(columns.size()); - + result.clear(); + result.reserve(columns.size()); for (auto& column : columns) { - auto columnInfo = GetColumnInfo(type, column); - columnTypes.push_back(columnInfo.Type); - columnIndices.push_back(columnInfo.Index); + result.emplace_back(GetColumnInfo(type, column)); } } template<typename TList> -void GetColumnsInfo(const NKikimr::NMiniKQL::TType* type, const TList& protoSortCols, +void GetSortColumnsInfo(const NKikimr::NMiniKQL::TType* type, const TList& protoSortCols, TVector<TSortColumnInfo>& sortCols) { sortCols.clear(); sortCols.reserve(protoSortCols.size()); - for (const auto& protoSortCol : protoSortCols) { TSortColumnInfo colInfo = static_cast<TSortColumnInfo>(GetColumnInfo(type, protoSortCol.GetColumn())); colInfo.Ascending = protoSortCol.GetAscending(); diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp index 1707ba41a3..dee5aeb508 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp @@ -343,17 +343,6 @@ TVector<NKikimr::NMiniKQL::TType*> ExtractBlockItemTypes(const NKikimr::NMiniKQL return result; } -ui64 CalcMaxBlockSize(const TVector<NKikimr::NMiniKQL::TType*> itemTypes) { - TTypeInfoHelper helper; - ui64 maxBlockLen = Max<ui64>(); - for (auto& itemType : itemTypes) { - if (itemType) { - maxBlockLen = std::min(maxBlockLen, helper.GetMaxBlockLength(itemType)); - } - } - return maxBlockLen; -} - TVector<std::unique_ptr<IBlockReader>> MakeReaders(const TVector<NKikimr::NMiniKQL::TType*> itemTypes) { TVector<std::unique_ptr<IBlockReader>> result; for (auto& itemType : itemTypes) { @@ -405,13 +394,14 @@ public: : TBase(memInfo) , SortCols_(std::move(sortCols)) , ItemTypes_(ExtractBlockItemTypes(inputs.front()->GetInputType())) - , MaxOutputBlockLen_(CalcMaxBlockSize(ItemTypes_)) + , MaxOutputBlockLen_(CalcMaxBlockLength(ItemTypes_.begin(), ItemTypes_.end(), TTypeInfoHelper())) , Comparators_(MakeComparators(SortCols_, ItemTypes_)) , Builders_(MakeBuilders(MaxOutputBlockLen_, ItemTypes_)) , Factory_(factory) , Stats_(stats) { YQL_ENSURE(!inputs.empty()); + YQL_ENSURE(MaxOutputBlockLen_ > 0); InputData_.reserve(inputs.size()); for (auto& input : inputs) { InputData_.emplace_back(std::move(input), this); @@ -534,7 +524,9 @@ private: return compare > 0; } } - return false; + // resolve equal elements: when InputIndex()es are equal, we must first process element with smaller BlockIndex() + // Note: operator> here since this comparator is used together with max-heap + return std::tuple(InputIndex(), BlockIndex()) > std::tuple(other.InputIndex(), other.BlockIndex()); } TBlockItem GetItem(ui32 columnIndex) const { diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp index e78288160f..5b640f16e2 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp @@ -1,9 +1,15 @@ #include "dq_output_consumer.h" #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> -#include <ydb/library/yql/public/udf/udf_value.h> +#include <ydb/library/yql/minikql/computation/mkql_block_builder.h> +#include <ydb/library/yql/minikql/computation/mkql_block_reader.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_type_builder.h> + +#include <ydb/library/yql/public/udf/arrow/args_dechunker.h> +#include <ydb/library/yql/public/udf/udf_value.h> + #include <ydb/library/yql/utils/yql_panic.h> namespace NYql::NDq { @@ -117,18 +123,13 @@ protected: return !IsWaitingFlag; } public: - TDqOutputHashPartitionConsumer(TVector<IDqOutput::TPtr>&& outputs, - TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, - TMaybe<ui32> outputWidth) + TDqOutputHashPartitionConsumer(TVector<IDqOutput::TPtr>&& outputs, TVector<TColumnInfo>&& keyColumns, TMaybe<ui32> outputWidth) : Outputs(std::move(outputs)) - , KeyColumnIndices(std::move(keyColumnIndices)) - , ValueHashers(KeyColumnIndices.size(), NUdf::IHash::TPtr{}) + , KeyColumns(std::move(keyColumns)) , OutputWidth(outputWidth) { - MKQL_ENSURE_S(keyColumnTypes.size() == KeyColumnIndices.size()); - - for (auto i = 0U; i < keyColumnTypes.size(); i++) { - ValueHashers[i] = MakeHashImpl(keyColumnTypes[i]); + for (auto& column : KeyColumns) { + ValueHashers.emplace_back(MakeHashImpl(column.Type)); } if (outputWidth.Defined()) { @@ -161,9 +162,7 @@ public: YQL_ENSURE(!IsWaitingFlag); IsWaitingFlag = true; OutputWaiting = Outputs[partitionIndex]; - for (ui32 i = 0; i < count; ++i) { - WideWaitingValues[i] = std::move(values[i]); - } + std::move(values, values + count, WideWaitingValues.data()); } else { Outputs[partitionIndex]->WidePush(values, count); } @@ -185,8 +184,8 @@ private: size_t GetHashPartitionIndex(const TUnboxedValue& value) { ui64 hash = 0; - for (size_t keyId = 0; keyId < KeyColumnIndices.size(); keyId++) { - auto columnValue = value.GetElement(KeyColumnIndices[keyId]); + for (size_t keyId = 0; keyId < KeyColumns.size(); keyId++) { + auto columnValue = value.GetElement(KeyColumns[keyId].Index); hash = CombineHashes(hash, HashColumn(keyId, columnValue)); } @@ -196,9 +195,9 @@ private: size_t GetHashPartitionIndex(const TUnboxedValue* values) { ui64 hash = 0; - for (size_t keyId = 0; keyId < KeyColumnIndices.size(); keyId++) { - MKQL_ENSURE_S(KeyColumnIndices[keyId] < OutputWidth); - hash = CombineHashes(hash, HashColumn(keyId, values[KeyColumnIndices[keyId]])); + for (size_t keyId = 0; keyId < KeyColumns.size(); keyId++) { + MKQL_ENSURE_S(KeyColumns[keyId].Index < OutputWidth); + hash = CombineHashes(hash, HashColumn(keyId, values[KeyColumns[keyId].Index])); } return hash % Outputs.size(); @@ -212,12 +211,342 @@ private: } private: - TVector<IDqOutput::TPtr> Outputs; - TVector<ui32> KeyColumnIndices; + const TVector<IDqOutput::TPtr> Outputs; + const TVector<TColumnInfo> KeyColumns; TVector<NUdf::IHash::TPtr> ValueHashers; const TMaybe<ui32> OutputWidth; }; +class TDqOutputHashPartitionConsumerScalar : public IDqOutputConsumer { +public: + TDqOutputHashPartitionConsumerScalar(TVector<IDqOutput::TPtr>&& outputs, TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType) + : Outputs_(std::move(outputs)) + , KeyColumns_(std::move(keyColumns)) + , OutputWidth_(static_cast<const NMiniKQL::TMultiType*>(outputType)->GetElementsCount()) + , WaitingValues_(OutputWidth_) + { + auto multiType = static_cast<const NMiniKQL::TMultiType*>(outputType); + TBlockTypeHelper helper; + for (auto& column : KeyColumns_) { + auto columnType = multiType->GetElementType(column.Index); + YQL_ENSURE(columnType->IsBlock()); + auto blockType = static_cast<const NMiniKQL::TBlockType*>(columnType); + YQL_ENSURE(blockType->GetShape() == NMiniKQL::TBlockType::EShape::Scalar); + Readers_.emplace_back(MakeBlockReader(TTypeInfoHelper(), blockType->GetItemType())); + Hashers_.emplace_back(helper.MakeHasher(blockType->GetItemType())); + } + } +private: + bool IsFull() const final { + DrainWaiting(); + return IsWaitingFlag_; + } + + void Consume(TUnboxedValue&& value) final { + Y_UNUSED(value); + YQL_ENSURE(false, "Consume() called on wide block stream"); + } + + void WideConsume(TUnboxedValue* values, ui32 count) final { + YQL_ENSURE(count == OutputWidth_); + const ui64 inputBlockLen = TArrowBlock::From(values[count - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + if (!inputBlockLen) { + return; + } + + if (!Output_) { + Output_ = Outputs_[GetHashPartitionIndex(values)]; + } + if (Output_->IsFull()) { + YQL_ENSURE(!IsWaitingFlag_); + IsWaitingFlag_ = true; + std::move(values, values + count, WaitingValues_.data()); + } else { + Output_->WidePush(values, count); + } + } + + void Consume(NDqProto::TCheckpoint&& checkpoint) override { + for (auto& output : Outputs_) { + output->Push(NDqProto::TCheckpoint(checkpoint)); + } + } + + void Finish() final { + for (auto& output : Outputs_) { + output->Finish(); + } + } + + void DrainWaiting() const { + if (Y_UNLIKELY(IsWaitingFlag_)) { + YQL_ENSURE(Output_); + if (Output_->IsFull()) { + return; + } + YQL_ENSURE(OutputWidth_ == WaitingValues_.size()); + Output_->WidePush(WaitingValues_.data(), OutputWidth_); + IsWaitingFlag_ = false; + } + } + + bool DoTryFinish() final { + DrainWaiting(); + return !IsWaitingFlag_; + } + + size_t GetHashPartitionIndex(const TUnboxedValue* values) { + ui64 hash = 0; + + for (size_t keyId = 0; keyId < KeyColumns_.size(); keyId++) { + YQL_ENSURE(KeyColumns_[keyId].Index < OutputWidth_); + hash = CombineHashes(hash, HashColumn(keyId, values[KeyColumns_[keyId].Index])); + } + + return hash % Outputs_.size(); + } + + ui64 HashColumn(size_t keyId, const TUnboxedValue& value) const { + TBlockItem item = Readers_[keyId]->GetScalarItem(*TArrowBlock::From(value).GetDatum().scalar()); + return Hashers_[keyId]->Hash(item); + } + +private: + const TVector<IDqOutput::TPtr> Outputs_; + const TVector<TColumnInfo> KeyColumns_; + const ui32 OutputWidth_; + TVector<NUdf::IBlockItemHasher::TPtr> Hashers_; + TVector<std::unique_ptr<IBlockReader>> Readers_; + IDqOutput::TPtr Output_; + mutable bool IsWaitingFlag_ = false; + mutable TUnboxedValueVector WaitingValues_; +}; + +class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer { +public: + TDqOutputHashPartitionConsumerBlock(TVector<IDqOutput::TPtr>&& outputs, TVector<TColumnInfo>&& keyColumns, + const NKikimr::NMiniKQL::TType* outputType, + const NKikimr::NMiniKQL::THolderFactory& holderFactory) + : OutputType_(static_cast<const NMiniKQL::TMultiType*>(outputType)) + , HolderFactory_(holderFactory) + , Outputs_(std::move(outputs)) + , KeyColumns_(std::move(keyColumns)) + , ScalarColumnHashes_(KeyColumns_.size()) + , OutputWidth_(OutputType_->GetElementsCount()) + { + TTypeInfoHelper helper; + YQL_ENSURE(OutputWidth_ > KeyColumns_.size()); + + TVector<const NMiniKQL::TType*> blockTypes; + for (auto& columnType : OutputType_->GetElements()) { + YQL_ENSURE(columnType->IsBlock()); + auto blockType = static_cast<const NMiniKQL::TBlockType*>(columnType); + if (blockType->GetShape() == NMiniKQL::TBlockType::EShape::Many) { + blockTypes.emplace_back(blockType->GetItemType()); + } + } + ui64 maxBlockLen = CalcMaxBlockLength(blockTypes.begin(), blockTypes.end(), helper); + YQL_ENSURE(maxBlockLen > 0); + MakeBuilders(maxBlockLen); + + TBlockTypeHelper blockHelper; + for (auto& column : KeyColumns_) { + auto columnType = OutputType_->GetElementType(column.Index); + YQL_ENSURE(columnType->IsBlock()); + auto blockType = static_cast<const NMiniKQL::TBlockType*>(columnType); + Readers_.emplace_back(MakeBlockReader(helper, blockType->GetItemType())); + Hashers_.emplace_back(blockHelper.MakeHasher(blockType->GetItemType())); + } + } + +private: + bool IsFull() const final { + DrainWaiting(); + return IsWaitingFlag_; + } + + void Consume(TUnboxedValue&& value) final { + Y_UNUSED(value); + YQL_ENSURE(false, "Consume() called on wide block stream"); + } + + void WideConsume(TUnboxedValue* values, ui32 count) final { + YQL_ENSURE(!IsWaitingFlag_); + YQL_ENSURE(count == OutputWidth_); + + const ui64 inputBlockLen = TArrowBlock::From(values[count - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + if (!inputBlockLen) { + return; + } + + TVector<const arrow::Datum*> datums; + datums.reserve(count - 1); + for (ui32 i = 0; i + 1 < count; ++i) { + datums.push_back(&TArrowBlock::From(values[i]).GetDatum()); + } + + TVector<TVector<ui64>> outputBlockIndexes(Outputs_.size()); + for (ui64 i = 0; i < inputBlockLen; ++i) { + outputBlockIndexes[GetHashPartitionIndex(datums.data(), i)].push_back(i); + } + + ui64 maxLen = 0; + for (auto& indexes : outputBlockIndexes) { + maxLen = std::max(maxLen, indexes.size()); + } + + if (maxLen > MaxOutputBlockLen_) { + MakeBuilders(maxLen); + } + + TVector<std::unique_ptr<TArgsDechunker>> outputData; + for (size_t i = 0; i < Outputs_.size(); ++i) { + ui64 outputBlockLen = outputBlockIndexes[i].size(); + if (!outputBlockLen) { + outputData.emplace_back(); + continue; + } + const ui64* indexes = outputBlockIndexes[i].data(); + + std::vector<arrow::Datum> output; + for (size_t j = 0; j < datums.size(); ++j) { + const arrow::Datum* src = datums[j]; + if (src->is_scalar()) { + output.emplace_back(*src); + } else { + IArrayBuilder::TArrayDataItem dataItem; + dataItem.Data = src->array().get(); + dataItem.StartOffset = 0; + Builders_[j]->AddMany(&dataItem, 1, indexes, outputBlockLen); + output.emplace_back(Builders_[j]->Build(false)); + } + } + output.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(outputBlockLen))); + outputData.emplace_back(std::make_unique<TArgsDechunker>(std::move(output))); + } + + DoConsume(std::move(outputData)); + } + + void DoConsume(TVector<std::unique_ptr<TArgsDechunker>>&& outputData) const { + while (!outputData.empty()) { + bool hasData = false; + for (size_t i = 0; i < Outputs_.size(); ++i) { + if (Outputs_[i]->IsFull()) { + IsWaitingFlag_ = true; + OutputData_ = std::move(outputData); + return; + } + + std::vector<arrow::Datum> chunk; + if (outputData[i] && outputData[i]->Next(chunk)) { + hasData = true; + TUnboxedValueVector outputValues; + for (auto& datum : chunk) { + outputValues.emplace_back(HolderFactory_.CreateArrowBlock(std::move(datum))); + } + Outputs_[i]->WidePush(outputValues.data(), outputValues.size()); + } + } + if (!hasData) { + outputData.clear(); + } + } + } + + void Consume(NDqProto::TCheckpoint&& checkpoint) override { + for (auto& output : Outputs_) { + output->Push(NDqProto::TCheckpoint(checkpoint)); + } + } + + void Finish() final { + for (auto& output : Outputs_) { + output->Finish(); + } + } + + void DrainWaiting() const { + if (Y_UNLIKELY(IsWaitingFlag_)) { + TVector<std::unique_ptr<TArgsDechunker>> outputData; + outputData.swap(OutputData_); + DoConsume(std::move(outputData)); + if (OutputData_.empty()) { + IsWaitingFlag_ = false; + } + } + } + + bool DoTryFinish() final { + DrainWaiting(); + return !IsWaitingFlag_; + } + + size_t GetHashPartitionIndex(const arrow::Datum** values, ui64 blockIndex) { + ui64 hash = 0; + for (size_t keyId = 0; keyId < KeyColumns_.size(); keyId++) { + const ui32 columnIndex = KeyColumns_[keyId].Index; + Y_VERIFY_DEBUG(columnIndex < OutputWidth_); + ui64 keyHash; + if (*KeyColumns_[keyId].IsScalar) { + if (!ScalarColumnHashes_[keyId].Defined()) { + ScalarColumnHashes_[keyId] = HashScalarColumn(keyId, *values[columnIndex]->scalar()); + } + keyHash = *ScalarColumnHashes_[keyId]; + } else { + keyHash = HashBlockColumn(keyId, *values[columnIndex]->array(), blockIndex); + } + hash = CombineHashes(hash, keyHash); + } + return hash % Outputs_.size(); + } + + inline ui64 HashScalarColumn(size_t keyId, const arrow::Scalar& value) const { + TBlockItem item = Readers_[keyId]->GetScalarItem(value); + return Hashers_[keyId]->Hash(item); + } + + inline ui64 HashBlockColumn(size_t keyId, const arrow::ArrayData& value, ui64 index) const { + TBlockItem item = Readers_[keyId]->GetItem(value, index); + return Hashers_[keyId]->Hash(item); + } + + void MakeBuilders(ui64 maxBlockLen) { + Builders_.clear(); + TTypeInfoHelper helper; + for (auto& columnType : OutputType_->GetElements()) { + YQL_ENSURE(columnType->IsBlock()); + auto blockType = static_cast<const NMiniKQL::TBlockType*>(columnType); + if (blockType->GetShape() == NMiniKQL::TBlockType::EShape::Many) { + auto itemType = blockType->GetItemType(); + YQL_ENSURE(!itemType->IsPg(), "pg types are not supported yet"); + Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *arrow::default_memory_pool(), maxBlockLen, nullptr)); + } else { + Builders_.emplace_back(); + } + } + MaxOutputBlockLen_ = maxBlockLen; + } + +private: + const NKikimr::NMiniKQL::TMultiType* const OutputType_; + const NKikimr::NMiniKQL::THolderFactory& HolderFactory_; + + const TVector<IDqOutput::TPtr> Outputs_; + mutable TVector<std::unique_ptr<TArgsDechunker>> OutputData_; + + const TVector<TColumnInfo> KeyColumns_; + TVector<TMaybe<ui64>> ScalarColumnHashes_; + const ui32 OutputWidth_; + + TVector<NUdf::IBlockItemHasher::TPtr> Hashers_; + TVector<std::unique_ptr<IBlockReader>> Readers_; + TVector<std::unique_ptr<IArrayBuilder>> Builders_; + + ui64 MaxOutputBlockLen_ = 0; + mutable bool IsWaitingFlag_ = false; +}; + class TDqOutputBroadcastConsumer : public IDqOutputConsumer { public: TDqOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth) @@ -277,10 +606,27 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output) { IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( TVector<IDqOutput::TPtr>&& outputs, - TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, TMaybe<ui32> outputWidth) + TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType, + const NKikimr::NMiniKQL::THolderFactory& holderFactory) { - return MakeIntrusive<TDqOutputHashPartitionConsumer>(std::move(outputs), std::move(keyColumnTypes), - std::move(keyColumnIndices), outputWidth); + YQL_ENSURE(!outputs.empty()); + YQL_ENSURE(!keyColumns.empty()); + TMaybe<ui32> outputWidth; + if (outputType->IsMulti()) { + outputWidth = static_cast<const NMiniKQL::TMultiType*>(outputType)->GetElementsCount(); + } + + if (AnyOf(keyColumns, [](const auto& info) { return !info.IsBlockOrScalar(); })) { + return MakeIntrusive<TDqOutputHashPartitionConsumer>(std::move(outputs), std::move(keyColumns), outputWidth); + } + + YQL_ENSURE(outputWidth.Defined(), "Expecting wide stream for block data"); + if (AllOf(keyColumns, [](const auto& info) { return *info.IsScalar; })) { + // all key columns are scalars - all data will go to single output + return MakeIntrusive<TDqOutputHashPartitionConsumerScalar>(std::move(outputs), std::move(keyColumns), outputType); + } + + return MakeIntrusive<TDqOutputHashPartitionConsumerBlock>(std::move(outputs), std::move(keyColumns), outputType, holderFactory); } IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth) { diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h index 8bb7ea1ece..94fc30ae9e 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.h +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h @@ -1,11 +1,13 @@ #pragma once +#include "dq_columns_resolve.h" #include "dq_output.h" #include <ydb/library/yql/minikql/mkql_alloc.h> namespace NKikimr::NMiniKQL { class TTypeEnvironment; +class THolderFactory; } // namespace NKikimr::NMiniKQL namespace NYql::NDq { @@ -44,7 +46,8 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output); IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( TVector<IDqOutput::TPtr>&& outputs, - TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, TMaybe<ui32> outputWidth); + TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType, + const NKikimr::NMiniKQL::THolderFactory& holderFactory); IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 6c1542a999..c281d7565a 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -150,7 +150,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con case NYql::NDqProto::TTaskInput::kMerge: { const auto& protoSortCols = inputDesc.GetMerge().GetSortColumns(); TVector<TSortColumnInfo> sortColsInfo; - GetColumnsInfo(type, protoSortCols, sortColsInfo); + GetSortColumnsInfo(type, protoSortCols, sortColsInfo); YQL_ENSURE(!sortColsInfo.empty()); return CreateInputMergeValue(std::move(inputs), std::move(sortColsInfo), holderFactory, stats); @@ -161,7 +161,8 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con } IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, - const NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs) + const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TVector<IDqOutput::TPtr>&& outputs) { TMaybe<ui32> outputWidth; if (type->IsMulti()) { @@ -178,19 +179,11 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu } case NDqProto::TTaskOutput::kHashPartition: { - TVector<TType*> keyColumnTypes; - TVector<ui32> keyColumnIndices; - GetColumnsInfo(type, outputDesc.GetHashPartition().GetKeyColumns(), keyColumnTypes, keyColumnIndices); - YQL_ENSURE(!keyColumnTypes.empty()); - + TVector<TColumnInfo> keyColumns; + GetColumnsInfo(type, outputDesc.GetHashPartition().GetKeyColumns(), keyColumns); + YQL_ENSURE(!keyColumns.empty()); YQL_ENSURE(outputDesc.GetHashPartition().GetPartitionsCount() == outputDesc.ChannelsSize()); - TVector<ui64> channelIds(outputDesc.GetHashPartition().GetPartitionsCount()); - for (ui32 i = 0; i < outputDesc.ChannelsSize(); ++i) { - channelIds[i] = outputDesc.GetChannels(i).GetId(); - } - - return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumnTypes), - std::move(keyColumnIndices), outputWidth); + return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumns), type, holderFactory); } case NDqProto::TTaskOutput::kBroadcast: { @@ -213,9 +206,9 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu IDqOutputConsumer::TPtr TDqTaskRunnerExecutionContext::CreateOutputConsumer(const TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext*, const TTypeEnvironment& typeEnv, - TVector<IDqOutput::TPtr>&& outputs) const + const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<IDqOutput::TPtr>&& outputs) const { - return DqBuildOutputConsumer(outputDesc, type, typeEnv, std::move(outputs)); + return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs)); } IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 /* channelId */) const { @@ -652,7 +645,7 @@ public: if (transform) { auto guard = BindAllocator(); transform->TransformOutput = execCtx.CreateOutputConsumer(outputDesc, transform->TransformOutputType, - Context.ApplyCtx, typeEnv, std::move(outputs)); + Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs)); outputs.clear(); outputs.emplace_back(transform->TransformInput); @@ -661,7 +654,7 @@ public: { auto guard = BindAllocator(); outputConsumers[i] = execCtx.CreateOutputConsumer(outputDesc, entry->OutputItemTypes[i], - Context.ApplyCtx, typeEnv, std::move(outputs)); + Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs)); } } diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index b5b0af84c7..868b2bd98d 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -241,7 +241,9 @@ public: virtual IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, - const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs) const = 0; + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TVector<IDqOutput::TPtr>&& outputs) const = 0; virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const = 0; }; @@ -250,7 +252,9 @@ class TDqTaskRunnerExecutionContext : public IDqTaskRunnerExecutionContext { public: IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, - const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs) const override; + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TVector<IDqOutput::TPtr>&& outputs) const override; IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override; }; @@ -275,7 +279,8 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con TVector<IDqInputChannel::TPtr>&& channels, const NKikimr::NMiniKQL::THolderFactory& holderFactory); IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type, - const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& channels); + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TVector<IDqOutput::TPtr>&& channels); using TDqTaskRunnerParameterProvider = std::function< bool(std::string_view name, NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, diff --git a/ydb/library/yql/minikql/mkql_type_builder.h b/ydb/library/yql/minikql/mkql_type_builder.h index 8806c014ab..175507632e 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.h +++ b/ydb/library/yql/minikql/mkql_type_builder.h @@ -224,5 +224,17 @@ NUdf::IHash::TPtr MakeHashImpl(const NMiniKQL::TType* type); NUdf::ICompare::TPtr MakeCompareImpl(const NMiniKQL::TType* type); NUdf::IEquate::TPtr MakeEquateImpl(const NMiniKQL::TType* type); +template<typename T> +ui64 CalcMaxBlockLength(T beginIt, T endIt, const NUdf::ITypeInfoHelper& helper) { + ui64 maxBlockLen = Max<ui64>(); + while (beginIt != endIt) { + const TType* itemType = *beginIt++; + if (itemType) { + maxBlockLen = std::min(maxBlockLen, helper.GetMaxBlockLength(itemType)); + } + } + return (maxBlockLen == Max<ui64>()) ? 0 : maxBlockLen; +} + } // namespace NMiniKQL } // namespace Nkikimr |