diff options
author | aneporada <aneporada@ydb.tech> | 2023-07-27 13:18:04 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-07-27 13:18:04 +0300 |
commit | 479f755935f129820b3f33ff07381351c8a42830 (patch) | |
tree | eaf6b579a96cc1afc59ca4b94f60e3b818bf5762 | |
parent | 4a7691c519e6114e013dc1dd0c3b2528154507f9 (diff) | |
download | ydb-479f755935f129820b3f33ff07381351c8a42830.tar.gz |
Support blocks in DqCnMerge connection
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_columns_resolve.cpp | 8 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_columns_resolve.h | 13 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_input_producer.cpp | 410 |
3 files changed, 429 insertions, 2 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp index 81b3761004d..70ba88e3581 100644 --- a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp +++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp @@ -9,11 +9,17 @@ using namespace NKikimr::NMiniKQL; TMaybe<TColumnInfo> FindColumnInfo(const NKikimr::NMiniKQL::TType* type, TStringBuf columnName) { TType* memberType = nullptr; ui32 idx; + TMaybe<bool> isScalar; if (type->GetKind() == TType::EKind::Multi) { const auto& multiType = static_cast<const TMultiType&>(*type); YQL_ENSURE(TryFromString(columnName, idx), "Expecting number as column name"); YQL_ENSURE(idx < multiType.GetElementsCount(), "Invalid column index"); memberType = multiType.GetElementType(idx); + if (memberType->IsBlock()) { + auto blockType = static_cast<const TBlockType*>(type); + isScalar = blockType->GetShape() == TBlockType::EShape::Scalar; + memberType = blockType->GetItemType(); + } } else { YQL_ENSURE(type->GetKind() == TType::EKind::Struct); const auto& structType = static_cast<const TStructType&>(*type); @@ -29,7 +35,7 @@ TMaybe<TColumnInfo> FindColumnInfo(const NKikimr::NMiniKQL::TType* type, TString memberType = static_cast<TOptionalType&>(*memberType).GetItemType(); } - return TColumnInfo{TString(columnName), idx, memberType}; + return TColumnInfo{TString(columnName), idx, memberType, isScalar}; } TColumnInfo GetColumnInfo(const TType* type, TStringBuf columnName) { diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.h b/ydb/library/yql/dq/runtime/dq_columns_resolve.h index de333872004..f46e4e0eb09 100644 --- a/ydb/library/yql/dq/runtime/dq_columns_resolve.h +++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.h @@ -11,8 +11,19 @@ struct TColumnInfo { TString Name; ui32 Index; NKikimr::NMiniKQL::TType* Type; + TMaybe<bool> IsScalar; // defined only on block types - TColumnInfo(TString name, ui32 index, NKikimr::NMiniKQL::TType* type) : Name(name), Index(index), Type(type) {}; + TColumnInfo(TString name, ui32 index, NKikimr::NMiniKQL::TType* type, TMaybe<bool> isScalar) + : Name(name) + , Index(index) + , Type(type) + , IsScalar(isScalar) + { + } + + bool IsBlockOrScalar() const { + return IsScalar.Defined(); + } NUdf::TDataTypeId GetTypeId() const { YQL_ENSURE(Type->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data); diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp index 489bd93884e..1707ba41a3d 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp @@ -2,6 +2,13 @@ #include "dq_columns_resolve.h" +#include <ydb/library/yql/minikql/computation/mkql_block_reader.h> +#include <ydb/library/yql/minikql/computation/mkql_block_builder.h> +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_type_builder.h> + +#include <ydb/library/yql/public/udf/arrow/args_dechunker.h> + #include <ydb/library/yql/dq/type_ann/dq_type_ann.h> namespace NYql::NDq { @@ -314,6 +321,400 @@ bool IsWideInputs(const TVector<IDqInput::TPtr>& inputs) { return isWide; } +TVector<NKikimr::NMiniKQL::TType*> ExtractBlockItemTypes(const NKikimr::NMiniKQL::TType* type) { + TVector<NKikimr::NMiniKQL::TType*> result; + + YQL_ENSURE(type->IsMulti()); + auto multiType = static_cast<const NKikimr::NMiniKQL::TMultiType*>(type); + YQL_ENSURE(multiType->GetElementsCount() > 0); + + for (ui32 i = 0; i < multiType->GetElementsCount(); ++i) { + auto itemType = multiType->GetElementType(i); + YQL_ENSURE(itemType->IsBlock()); + auto blockType = static_cast<const NKikimr::NMiniKQL::TBlockType*>(itemType); + const bool isScalar = blockType->GetShape() == NKikimr::NMiniKQL::TBlockType::EShape::Scalar; + + if (i + 1 == multiType->GetElementsCount()) { + YQL_ENSURE(isScalar); + } else { + result.emplace_back(isScalar ? nullptr : blockType->GetItemType()); + } + } + return result; +} + +ui64 CalcMaxBlockSize(const TVector<NKikimr::NMiniKQL::TType*> itemTypes) { + TTypeInfoHelper helper; + ui64 maxBlockLen = Max<ui64>(); + for (auto& itemType : itemTypes) { + if (itemType) { + maxBlockLen = std::min(maxBlockLen, helper.GetMaxBlockLength(itemType)); + } + } + return maxBlockLen; +} + +TVector<std::unique_ptr<IBlockReader>> MakeReaders(const TVector<NKikimr::NMiniKQL::TType*> itemTypes) { + TVector<std::unique_ptr<IBlockReader>> result; + for (auto& itemType : itemTypes) { + if (itemType) { + result.emplace_back(MakeBlockReader(TTypeInfoHelper(), itemType)); + } else { + result.emplace_back(); + } + } + return result; +} + +TVector<std::unique_ptr<IArrayBuilder>> MakeBuilders(ui64 blockLen, const TVector<NKikimr::NMiniKQL::TType*> itemTypes) { + TVector<std::unique_ptr<IArrayBuilder>> result; + TTypeInfoHelper helper; + for (auto& itemType : itemTypes) { + if (itemType) { + // TODO: pass memory pool + // TODO: IPgBuilder + YQL_ENSURE(!itemType->IsPg(), "pg types are not supported yet"); + result.emplace_back(MakeArrayBuilder(helper, itemType, *arrow::default_memory_pool(), blockLen, nullptr)); + } else { + result.emplace_back(); + } + } + return result; +} + +TVector<IBlockItemComparator::TPtr> MakeComparators(const TVector<TSortColumnInfo>& sortCols, + const TVector<NKikimr::NMiniKQL::TType*> itemTypes) +{ + TVector<IBlockItemComparator::TPtr> result; + TBlockTypeHelper helper; + for (auto& sortCol : sortCols) { + YQL_ENSURE(sortCol.Index < itemTypes.size()); + + auto itemType = itemTypes[sortCol.Index]; + YQL_ENSURE(itemType); + result.emplace_back(helper.MakeComparator(itemType)); + } + return result; +} + +class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBlockStreamValue> { + using TBase = TComputationValue<TDqInputMergeBlockStreamValue>; +public: + TDqInputMergeBlockStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs, + TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats) + : TBase(memInfo) + , SortCols_(std::move(sortCols)) + , ItemTypes_(ExtractBlockItemTypes(inputs.front()->GetInputType())) + , MaxOutputBlockLen_(CalcMaxBlockSize(ItemTypes_)) + , Comparators_(MakeComparators(SortCols_, ItemTypes_)) + , Builders_(MakeBuilders(MaxOutputBlockLen_, ItemTypes_)) + , Factory_(factory) + , Stats_(stats) + { + YQL_ENSURE(!inputs.empty()); + InputData_.reserve(inputs.size()); + for (auto& input : inputs) { + InputData_.emplace_back(std::move(input), this); + } + } + +private: + struct TDqInputBatch : private TMoveOnly { + explicit TDqInputBatch(IDqInput::TPtr&& input, const TDqInputMergeBlockStreamValue* parent) + : Input_(std::move(input)) + , FetchedValues_(Input_->GetInputType()) + , Readers_(MakeReaders(parent->ItemTypes_)) + , Parent_(parent) + { + YQL_ENSURE(Parent_); + CurrentRow_.reserve(parent->ItemTypes_.size()); + } + + ui64 CurrentIndex() const { + return CurrBlockIndex_; + } + + ui64 BlockLen() const { + return BlockLen_; + } + + bool IsEmpty() const { + return CurrBlockIndex_ >= BlockLen_; + } + + void NextRow() { + Y_VERIFY_DEBUG(!IsEmpty()); + ++CurrBlockIndex_; + } + + NUdf::EFetchStatus FetchNext() { + if (IsFinished_) { + return NUdf::EFetchStatus::Finish; + } + + const ui32 width = Parent_->ItemTypes_.size(); + while (IsEmpty()) { + while (FetchedValues_.empty()) { + if (!Input_->Pop(FetchedValues_)) { + if (Input_->IsFinished()) { + IsFinished_ = true; + return NUdf::EFetchStatus::Finish; + } + return NUdf::EFetchStatus::Yield; + } + } + NUdf::TUnboxedValue* values = FetchedValues_.Head(); + CurrentRow_.clear(); + for (ui32 i = 0; i < width; ++i) { + CurrentRow_.emplace_back(TArrowBlock::From(values[i]).GetDatum()); + } + CurrBlockIndex_ = 0; + BlockLen_ = TArrowBlock::From(values[width]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + FetchedValues_.Pop(); + } + return NUdf::EFetchStatus::Ok; + } + + const TDqInputMergeBlockStreamValue& Parent() const { + return *Parent_; + } + + TBlockItem GetColumnItem(ui32 columnIndex, ui64 blockIndex) const { + Y_VERIFY_DEBUG(columnIndex < CurrentRow_.size()); + auto& datum = CurrentRow_[columnIndex]; + Y_VERIFY_DEBUG(datum.is_array()); + auto& reader = Readers_[columnIndex]; + return reader->GetItem(*datum.array(), blockIndex); + } + + arrow::Datum GetScalarColumn(ui32 columnIndex) const { + YQL_ENSURE(columnIndex < CurrentRow_.size()); + YQL_ENSURE(CurrentRow_[columnIndex].is_scalar()); + return CurrentRow_[columnIndex]; + } + + private: + IDqInput::TPtr Input_; + TUnboxedValueBatch FetchedValues_; + + TVector<arrow::Datum> CurrentRow_; + ui64 CurrBlockIndex_ = 0; + ui64 BlockLen_ = 0; + TVector<std::unique_ptr<IBlockReader>> Readers_; + const TDqInputMergeBlockStreamValue* const Parent_; + bool IsFinished_ = false; + }; + + class TDqInputBatchIterator { + public: + explicit TDqInputBatchIterator(ui64 inputIndex, TDqInputBatch* data, ui64 blockIndex) + : InputIndex_(inputIndex) + , Data_(data) + , BlockIndex_(blockIndex) + { + Y_VERIFY_DEBUG(Data_); + } + + bool operator<(const TDqInputBatchIterator& other) const { + Y_VERIFY_DEBUG(&Data_->Parent() == &other.Data_->Parent()); + const auto& comparators = Data_->Parent().Comparators_; + ui32 comporatorIndex = 0; + for (auto& sortCol : Data_->Parent().SortCols_) { + ui32 idx = sortCol.Index; + + TBlockItem myValue = GetItem(idx); + TBlockItem otherValue = other.GetItem(idx); + + i64 compare = comparators[comporatorIndex++]->Compare(myValue, otherValue); + if (!sortCol.Ascending) { + compare = -compare; + } + + if (compare) { + return compare > 0; + } + } + return false; + } + + TBlockItem GetItem(ui32 columnIndex) const { + return Data_->GetColumnItem(columnIndex, BlockIndex_); + } + + TDqInputBatch& Input() const { + return *Data_; + } + + ui64 BlockIndex() const { + return BlockIndex_; + } + + size_t InputIndex() const { + return InputIndex_; + } + + private: + size_t InputIndex_ = 0; + TDqInputBatch* Data_; + ui64 BlockIndex_ = 0; + }; + + NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) final { + Y_UNUSED(result); + YQL_ENSURE(false, "Using Fetch() on wide block input"); + } + + NUdf::EFetchStatus WideFetch(NKikimr::NUdf::TUnboxedValue* result, ui32 width) final { + YQL_ENSURE(width == ItemTypes_.size() + 1); + if (IsFinished_) { + return NUdf::EFetchStatus::Finish; + } + + std::vector<arrow::Datum> chunk; + while (!Output_ || !Output_->Next(chunk)) { + auto status = DoMerge(); + if (status != NUdf::EFetchStatus::Ok) { + IsFinished_ = status == NUdf::EFetchStatus::Finish; + return status; + } + YQL_ENSURE(Output_); + } + + TMaybe<ui64> blockLen; + YQL_ENSURE(width == chunk.size() + 1); + for (ui32 i = 0; i < chunk.size(); ++i) { + auto& item = chunk[i]; + if (item.is_array()) { + if (blockLen.Defined()) { + YQL_ENSURE(*blockLen == (ui64)item.array()->length); + } else { + blockLen = item.array()->length; + } + } else { + YQL_ENSURE(item.is_scalar()); + } + result[i] = Factory_.CreateArrowBlock(std::move(item)); + } + + YQL_ENSURE(blockLen.Defined()); + result[chunk.size()] = Factory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(*blockLen))); + // TODO: support stats for blocks + //if (Stats_) { + // Stats_.Add(result, width); + //} + return NUdf::EFetchStatus::Ok; + } + + NUdf::EFetchStatus FetchInput(size_t index) { + YQL_ENSURE(InputData_[index].IsEmpty()); + auto status = InputData_[index].FetchNext(); + if (status == NUdf::EFetchStatus::Ok) { + if (!FirstSeenInputIndex_.Defined()) { + FirstSeenInputIndex_ = index; + } + ui64 idx = InputData_[index].CurrentIndex(); + ui64 len = InputData_[index].BlockLen(); + while (idx < len) { + InputRows_.emplace_back(index, &InputData_[index], idx++); + } + } + return status; + } + + NUdf::EFetchStatus DoMerge() { + if (!Initialized_) { + for (size_t i = StartInputIndex_; i < InputData_.size(); ++i) { + auto status = FetchInput(i); + if (status == NUdf::EFetchStatus::Yield) { + return status; + } + ++StartInputIndex_; + } + std::make_heap(InputRows_.begin(), InputRows_.end()); + Initialized_ = true; + } + + if (StartInputIndex_ < InputData_.size()) { + auto status = FetchInput(StartInputIndex_); + if (status == NUdf::EFetchStatus::Yield) { + return status; + } + if (status == NUdf::EFetchStatus::Ok) { + std::make_heap(InputRows_.begin(), InputRows_.end()); + } + StartInputIndex_ = InputData_.size(); + } + + while (OutputBlockLen_ < MaxOutputBlockLen_ && !InputRows_.empty()) { + std::pop_heap(InputRows_.begin(), InputRows_.end()); + auto& smallest = InputRows_.back(); + + TDqInputBatch& input = smallest.Input(); + const auto inputIndex = smallest.InputIndex(); + YQL_ENSURE(!input.IsEmpty()); + YQL_ENSURE(smallest.BlockIndex() == input.CurrentIndex(), "Sort order violation on input #" << inputIndex); + + for (size_t i = 0; i < Builders_.size(); ++i) { + if (Builders_[i]) { + Builders_[i]->Add(smallest.GetItem(i)); + } + } + OutputBlockLen_++; + input.NextRow(); + InputRows_.pop_back(); + if (input.IsEmpty()) { + auto status = input.FetchNext(); + if (status == NUdf::EFetchStatus::Yield) { + StartInputIndex_ = inputIndex; + return status; + } + if (status == NUdf::EFetchStatus::Ok) { + std::make_heap(InputRows_.begin(), InputRows_.end()); + } + } + } + + if (!OutputBlockLen_) { + return NUdf::EFetchStatus::Finish; + } + + std::vector<arrow::Datum> output; + Y_VERIFY_DEBUG(FirstSeenInputIndex_.Defined()); + for (size_t i = 0; i < Builders_.size(); ++i) { + if (Builders_[i]) { + output.emplace_back(Builders_[i]->Build(false)); + } else { + output.emplace_back(InputData_[*FirstSeenInputIndex_].GetScalarColumn(i)); + } + } + + Output_ = std::make_unique<TArgsDechunker>(std::move(output)); + OutputBlockLen_ = 0; + return NUdf::EFetchStatus::Ok; + } +private: + TVector<TSortColumnInfo> SortCols_; + const TVector<NKikimr::NMiniKQL::TType*> ItemTypes_; + const ui64 MaxOutputBlockLen_; + TVector<IBlockItemComparator::TPtr> Comparators_; + + TVector<TDqInputBatch> InputData_; + TMaybe<size_t> FirstSeenInputIndex_; + bool Initialized_ = false; + size_t StartInputIndex_ = 0; + TVector<TDqInputBatchIterator> InputRows_; + + + TVector<std::unique_ptr<IArrayBuilder>> Builders_; + ui64 OutputBlockLen_ = 0; + + const NKikimr::NMiniKQL::THolderFactory& Factory_; + TDqMeteringStats::TInputStatsMeter Stats_; + + std::unique_ptr<TArgsDechunker> Output_; + bool IsFinished_ = false; +}; + } // namespace void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue& val) { @@ -359,7 +760,16 @@ NUdf::TUnboxedValue CreateInputUnionValue(TVector<IDqInput::TPtr>&& inputs, NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(TVector<IDqInput::TPtr>&& inputs, TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats) { + YQL_ENSURE(!inputs.empty()); if (IsWideInputs(inputs)) { + if (AnyOf(sortCols, [](const auto& sortCol) { return sortCol.IsBlockOrScalar(); })) { + // we can ignore scalar columns, since all they have exactly the same value in all inputs + EraseIf(sortCols, [](const auto& sortCol) { return *sortCol.IsScalar; }); + if (sortCols.empty()) { + return factory.Create<TDqInputUnionStreamValue<true>>(std::move(inputs), stats); + } + return factory.Create<TDqInputMergeBlockStreamValue>(std::move(inputs), std::move(sortCols), factory, stats); + } return factory.Create<TDqInputMergeStreamValue<true>>(std::move(inputs), std::move(sortCols), stats); } return factory.Create<TDqInputMergeStreamValue<false>>(std::move(inputs), std::move(sortCols), stats); |