aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-07-27 13:18:04 +0300
committeraneporada <aneporada@ydb.tech>2023-07-27 13:18:04 +0300
commit479f755935f129820b3f33ff07381351c8a42830 (patch)
treeeaf6b579a96cc1afc59ca4b94f60e3b818bf5762
parent4a7691c519e6114e013dc1dd0c3b2528154507f9 (diff)
downloadydb-479f755935f129820b3f33ff07381351c8a42830.tar.gz
Support blocks in DqCnMerge connection
-rw-r--r--ydb/library/yql/dq/runtime/dq_columns_resolve.cpp8
-rw-r--r--ydb/library/yql/dq/runtime/dq_columns_resolve.h13
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_producer.cpp410
3 files changed, 429 insertions, 2 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
index 81b3761004d..70ba88e3581 100644
--- a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
+++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
@@ -9,11 +9,17 @@ using namespace NKikimr::NMiniKQL;
TMaybe<TColumnInfo> FindColumnInfo(const NKikimr::NMiniKQL::TType* type, TStringBuf columnName) {
TType* memberType = nullptr;
ui32 idx;
+ TMaybe<bool> isScalar;
if (type->GetKind() == TType::EKind::Multi) {
const auto& multiType = static_cast<const TMultiType&>(*type);
YQL_ENSURE(TryFromString(columnName, idx), "Expecting number as column name");
YQL_ENSURE(idx < multiType.GetElementsCount(), "Invalid column index");
memberType = multiType.GetElementType(idx);
+ if (memberType->IsBlock()) {
+ auto blockType = static_cast<const TBlockType*>(type);
+ isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
+ memberType = blockType->GetItemType();
+ }
} else {
YQL_ENSURE(type->GetKind() == TType::EKind::Struct);
const auto& structType = static_cast<const TStructType&>(*type);
@@ -29,7 +35,7 @@ TMaybe<TColumnInfo> FindColumnInfo(const NKikimr::NMiniKQL::TType* type, TString
memberType = static_cast<TOptionalType&>(*memberType).GetItemType();
}
- return TColumnInfo{TString(columnName), idx, memberType};
+ return TColumnInfo{TString(columnName), idx, memberType, isScalar};
}
TColumnInfo GetColumnInfo(const TType* type, TStringBuf columnName) {
diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.h b/ydb/library/yql/dq/runtime/dq_columns_resolve.h
index de333872004..f46e4e0eb09 100644
--- a/ydb/library/yql/dq/runtime/dq_columns_resolve.h
+++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.h
@@ -11,8 +11,19 @@ struct TColumnInfo {
TString Name;
ui32 Index;
NKikimr::NMiniKQL::TType* Type;
+ TMaybe<bool> IsScalar; // defined only on block types
- TColumnInfo(TString name, ui32 index, NKikimr::NMiniKQL::TType* type) : Name(name), Index(index), Type(type) {};
+ TColumnInfo(TString name, ui32 index, NKikimr::NMiniKQL::TType* type, TMaybe<bool> isScalar)
+ : Name(name)
+ , Index(index)
+ , Type(type)
+ , IsScalar(isScalar)
+ {
+ }
+
+ bool IsBlockOrScalar() const {
+ return IsScalar.Defined();
+ }
NUdf::TDataTypeId GetTypeId() const {
YQL_ENSURE(Type->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data);
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
index 489bd93884e..1707ba41a3d 100644
--- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
@@ -2,6 +2,13 @@
#include "dq_columns_resolve.h"
+#include <ydb/library/yql/minikql/computation/mkql_block_reader.h>
+#include <ydb/library/yql/minikql/computation/mkql_block_builder.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_type_builder.h>
+
+#include <ydb/library/yql/public/udf/arrow/args_dechunker.h>
+
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
namespace NYql::NDq {
@@ -314,6 +321,400 @@ bool IsWideInputs(const TVector<IDqInput::TPtr>& inputs) {
return isWide;
}
+TVector<NKikimr::NMiniKQL::TType*> ExtractBlockItemTypes(const NKikimr::NMiniKQL::TType* type) {
+ TVector<NKikimr::NMiniKQL::TType*> result;
+
+ YQL_ENSURE(type->IsMulti());
+ auto multiType = static_cast<const NKikimr::NMiniKQL::TMultiType*>(type);
+ YQL_ENSURE(multiType->GetElementsCount() > 0);
+
+ for (ui32 i = 0; i < multiType->GetElementsCount(); ++i) {
+ auto itemType = multiType->GetElementType(i);
+ YQL_ENSURE(itemType->IsBlock());
+ auto blockType = static_cast<const NKikimr::NMiniKQL::TBlockType*>(itemType);
+ const bool isScalar = blockType->GetShape() == NKikimr::NMiniKQL::TBlockType::EShape::Scalar;
+
+ if (i + 1 == multiType->GetElementsCount()) {
+ YQL_ENSURE(isScalar);
+ } else {
+ result.emplace_back(isScalar ? nullptr : blockType->GetItemType());
+ }
+ }
+ return result;
+}
+
+ui64 CalcMaxBlockSize(const TVector<NKikimr::NMiniKQL::TType*> itemTypes) {
+ TTypeInfoHelper helper;
+ ui64 maxBlockLen = Max<ui64>();
+ for (auto& itemType : itemTypes) {
+ if (itemType) {
+ maxBlockLen = std::min(maxBlockLen, helper.GetMaxBlockLength(itemType));
+ }
+ }
+ return maxBlockLen;
+}
+
+TVector<std::unique_ptr<IBlockReader>> MakeReaders(const TVector<NKikimr::NMiniKQL::TType*> itemTypes) {
+ TVector<std::unique_ptr<IBlockReader>> result;
+ for (auto& itemType : itemTypes) {
+ if (itemType) {
+ result.emplace_back(MakeBlockReader(TTypeInfoHelper(), itemType));
+ } else {
+ result.emplace_back();
+ }
+ }
+ return result;
+}
+
+TVector<std::unique_ptr<IArrayBuilder>> MakeBuilders(ui64 blockLen, const TVector<NKikimr::NMiniKQL::TType*> itemTypes) {
+ TVector<std::unique_ptr<IArrayBuilder>> result;
+ TTypeInfoHelper helper;
+ for (auto& itemType : itemTypes) {
+ if (itemType) {
+ // TODO: pass memory pool
+ // TODO: IPgBuilder
+ YQL_ENSURE(!itemType->IsPg(), "pg types are not supported yet");
+ result.emplace_back(MakeArrayBuilder(helper, itemType, *arrow::default_memory_pool(), blockLen, nullptr));
+ } else {
+ result.emplace_back();
+ }
+ }
+ return result;
+}
+
+TVector<IBlockItemComparator::TPtr> MakeComparators(const TVector<TSortColumnInfo>& sortCols,
+ const TVector<NKikimr::NMiniKQL::TType*> itemTypes)
+{
+ TVector<IBlockItemComparator::TPtr> result;
+ TBlockTypeHelper helper;
+ for (auto& sortCol : sortCols) {
+ YQL_ENSURE(sortCol.Index < itemTypes.size());
+
+ auto itemType = itemTypes[sortCol.Index];
+ YQL_ENSURE(itemType);
+ result.emplace_back(helper.MakeComparator(itemType));
+ }
+ return result;
+}
+
+class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBlockStreamValue> {
+ using TBase = TComputationValue<TDqInputMergeBlockStreamValue>;
+public:
+ TDqInputMergeBlockStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs,
+ TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats)
+ : TBase(memInfo)
+ , SortCols_(std::move(sortCols))
+ , ItemTypes_(ExtractBlockItemTypes(inputs.front()->GetInputType()))
+ , MaxOutputBlockLen_(CalcMaxBlockSize(ItemTypes_))
+ , Comparators_(MakeComparators(SortCols_, ItemTypes_))
+ , Builders_(MakeBuilders(MaxOutputBlockLen_, ItemTypes_))
+ , Factory_(factory)
+ , Stats_(stats)
+ {
+ YQL_ENSURE(!inputs.empty());
+ InputData_.reserve(inputs.size());
+ for (auto& input : inputs) {
+ InputData_.emplace_back(std::move(input), this);
+ }
+ }
+
+private:
+ struct TDqInputBatch : private TMoveOnly {
+ explicit TDqInputBatch(IDqInput::TPtr&& input, const TDqInputMergeBlockStreamValue* parent)
+ : Input_(std::move(input))
+ , FetchedValues_(Input_->GetInputType())
+ , Readers_(MakeReaders(parent->ItemTypes_))
+ , Parent_(parent)
+ {
+ YQL_ENSURE(Parent_);
+ CurrentRow_.reserve(parent->ItemTypes_.size());
+ }
+
+ ui64 CurrentIndex() const {
+ return CurrBlockIndex_;
+ }
+
+ ui64 BlockLen() const {
+ return BlockLen_;
+ }
+
+ bool IsEmpty() const {
+ return CurrBlockIndex_ >= BlockLen_;
+ }
+
+ void NextRow() {
+ Y_VERIFY_DEBUG(!IsEmpty());
+ ++CurrBlockIndex_;
+ }
+
+ NUdf::EFetchStatus FetchNext() {
+ if (IsFinished_) {
+ return NUdf::EFetchStatus::Finish;
+ }
+
+ const ui32 width = Parent_->ItemTypes_.size();
+ while (IsEmpty()) {
+ while (FetchedValues_.empty()) {
+ if (!Input_->Pop(FetchedValues_)) {
+ if (Input_->IsFinished()) {
+ IsFinished_ = true;
+ return NUdf::EFetchStatus::Finish;
+ }
+ return NUdf::EFetchStatus::Yield;
+ }
+ }
+ NUdf::TUnboxedValue* values = FetchedValues_.Head();
+ CurrentRow_.clear();
+ for (ui32 i = 0; i < width; ++i) {
+ CurrentRow_.emplace_back(TArrowBlock::From(values[i]).GetDatum());
+ }
+ CurrBlockIndex_ = 0;
+ BlockLen_ = TArrowBlock::From(values[width]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ FetchedValues_.Pop();
+ }
+ return NUdf::EFetchStatus::Ok;
+ }
+
+ const TDqInputMergeBlockStreamValue& Parent() const {
+ return *Parent_;
+ }
+
+ TBlockItem GetColumnItem(ui32 columnIndex, ui64 blockIndex) const {
+ Y_VERIFY_DEBUG(columnIndex < CurrentRow_.size());
+ auto& datum = CurrentRow_[columnIndex];
+ Y_VERIFY_DEBUG(datum.is_array());
+ auto& reader = Readers_[columnIndex];
+ return reader->GetItem(*datum.array(), blockIndex);
+ }
+
+ arrow::Datum GetScalarColumn(ui32 columnIndex) const {
+ YQL_ENSURE(columnIndex < CurrentRow_.size());
+ YQL_ENSURE(CurrentRow_[columnIndex].is_scalar());
+ return CurrentRow_[columnIndex];
+ }
+
+ private:
+ IDqInput::TPtr Input_;
+ TUnboxedValueBatch FetchedValues_;
+
+ TVector<arrow::Datum> CurrentRow_;
+ ui64 CurrBlockIndex_ = 0;
+ ui64 BlockLen_ = 0;
+ TVector<std::unique_ptr<IBlockReader>> Readers_;
+ const TDqInputMergeBlockStreamValue* const Parent_;
+ bool IsFinished_ = false;
+ };
+
+ class TDqInputBatchIterator {
+ public:
+ explicit TDqInputBatchIterator(ui64 inputIndex, TDqInputBatch* data, ui64 blockIndex)
+ : InputIndex_(inputIndex)
+ , Data_(data)
+ , BlockIndex_(blockIndex)
+ {
+ Y_VERIFY_DEBUG(Data_);
+ }
+
+ bool operator<(const TDqInputBatchIterator& other) const {
+ Y_VERIFY_DEBUG(&Data_->Parent() == &other.Data_->Parent());
+ const auto& comparators = Data_->Parent().Comparators_;
+ ui32 comporatorIndex = 0;
+ for (auto& sortCol : Data_->Parent().SortCols_) {
+ ui32 idx = sortCol.Index;
+
+ TBlockItem myValue = GetItem(idx);
+ TBlockItem otherValue = other.GetItem(idx);
+
+ i64 compare = comparators[comporatorIndex++]->Compare(myValue, otherValue);
+ if (!sortCol.Ascending) {
+ compare = -compare;
+ }
+
+ if (compare) {
+ return compare > 0;
+ }
+ }
+ return false;
+ }
+
+ TBlockItem GetItem(ui32 columnIndex) const {
+ return Data_->GetColumnItem(columnIndex, BlockIndex_);
+ }
+
+ TDqInputBatch& Input() const {
+ return *Data_;
+ }
+
+ ui64 BlockIndex() const {
+ return BlockIndex_;
+ }
+
+ size_t InputIndex() const {
+ return InputIndex_;
+ }
+
+ private:
+ size_t InputIndex_ = 0;
+ TDqInputBatch* Data_;
+ ui64 BlockIndex_ = 0;
+ };
+
+ NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) final {
+ Y_UNUSED(result);
+ YQL_ENSURE(false, "Using Fetch() on wide block input");
+ }
+
+ NUdf::EFetchStatus WideFetch(NKikimr::NUdf::TUnboxedValue* result, ui32 width) final {
+ YQL_ENSURE(width == ItemTypes_.size() + 1);
+ if (IsFinished_) {
+ return NUdf::EFetchStatus::Finish;
+ }
+
+ std::vector<arrow::Datum> chunk;
+ while (!Output_ || !Output_->Next(chunk)) {
+ auto status = DoMerge();
+ if (status != NUdf::EFetchStatus::Ok) {
+ IsFinished_ = status == NUdf::EFetchStatus::Finish;
+ return status;
+ }
+ YQL_ENSURE(Output_);
+ }
+
+ TMaybe<ui64> blockLen;
+ YQL_ENSURE(width == chunk.size() + 1);
+ for (ui32 i = 0; i < chunk.size(); ++i) {
+ auto& item = chunk[i];
+ if (item.is_array()) {
+ if (blockLen.Defined()) {
+ YQL_ENSURE(*blockLen == (ui64)item.array()->length);
+ } else {
+ blockLen = item.array()->length;
+ }
+ } else {
+ YQL_ENSURE(item.is_scalar());
+ }
+ result[i] = Factory_.CreateArrowBlock(std::move(item));
+ }
+
+ YQL_ENSURE(blockLen.Defined());
+ result[chunk.size()] = Factory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(*blockLen)));
+ // TODO: support stats for blocks
+ //if (Stats_) {
+ // Stats_.Add(result, width);
+ //}
+ return NUdf::EFetchStatus::Ok;
+ }
+
+ NUdf::EFetchStatus FetchInput(size_t index) {
+ YQL_ENSURE(InputData_[index].IsEmpty());
+ auto status = InputData_[index].FetchNext();
+ if (status == NUdf::EFetchStatus::Ok) {
+ if (!FirstSeenInputIndex_.Defined()) {
+ FirstSeenInputIndex_ = index;
+ }
+ ui64 idx = InputData_[index].CurrentIndex();
+ ui64 len = InputData_[index].BlockLen();
+ while (idx < len) {
+ InputRows_.emplace_back(index, &InputData_[index], idx++);
+ }
+ }
+ return status;
+ }
+
+ NUdf::EFetchStatus DoMerge() {
+ if (!Initialized_) {
+ for (size_t i = StartInputIndex_; i < InputData_.size(); ++i) {
+ auto status = FetchInput(i);
+ if (status == NUdf::EFetchStatus::Yield) {
+ return status;
+ }
+ ++StartInputIndex_;
+ }
+ std::make_heap(InputRows_.begin(), InputRows_.end());
+ Initialized_ = true;
+ }
+
+ if (StartInputIndex_ < InputData_.size()) {
+ auto status = FetchInput(StartInputIndex_);
+ if (status == NUdf::EFetchStatus::Yield) {
+ return status;
+ }
+ if (status == NUdf::EFetchStatus::Ok) {
+ std::make_heap(InputRows_.begin(), InputRows_.end());
+ }
+ StartInputIndex_ = InputData_.size();
+ }
+
+ while (OutputBlockLen_ < MaxOutputBlockLen_ && !InputRows_.empty()) {
+ std::pop_heap(InputRows_.begin(), InputRows_.end());
+ auto& smallest = InputRows_.back();
+
+ TDqInputBatch& input = smallest.Input();
+ const auto inputIndex = smallest.InputIndex();
+ YQL_ENSURE(!input.IsEmpty());
+ YQL_ENSURE(smallest.BlockIndex() == input.CurrentIndex(), "Sort order violation on input #" << inputIndex);
+
+ for (size_t i = 0; i < Builders_.size(); ++i) {
+ if (Builders_[i]) {
+ Builders_[i]->Add(smallest.GetItem(i));
+ }
+ }
+ OutputBlockLen_++;
+ input.NextRow();
+ InputRows_.pop_back();
+ if (input.IsEmpty()) {
+ auto status = input.FetchNext();
+ if (status == NUdf::EFetchStatus::Yield) {
+ StartInputIndex_ = inputIndex;
+ return status;
+ }
+ if (status == NUdf::EFetchStatus::Ok) {
+ std::make_heap(InputRows_.begin(), InputRows_.end());
+ }
+ }
+ }
+
+ if (!OutputBlockLen_) {
+ return NUdf::EFetchStatus::Finish;
+ }
+
+ std::vector<arrow::Datum> output;
+ Y_VERIFY_DEBUG(FirstSeenInputIndex_.Defined());
+ for (size_t i = 0; i < Builders_.size(); ++i) {
+ if (Builders_[i]) {
+ output.emplace_back(Builders_[i]->Build(false));
+ } else {
+ output.emplace_back(InputData_[*FirstSeenInputIndex_].GetScalarColumn(i));
+ }
+ }
+
+ Output_ = std::make_unique<TArgsDechunker>(std::move(output));
+ OutputBlockLen_ = 0;
+ return NUdf::EFetchStatus::Ok;
+ }
+private:
+ TVector<TSortColumnInfo> SortCols_;
+ const TVector<NKikimr::NMiniKQL::TType*> ItemTypes_;
+ const ui64 MaxOutputBlockLen_;
+ TVector<IBlockItemComparator::TPtr> Comparators_;
+
+ TVector<TDqInputBatch> InputData_;
+ TMaybe<size_t> FirstSeenInputIndex_;
+ bool Initialized_ = false;
+ size_t StartInputIndex_ = 0;
+ TVector<TDqInputBatchIterator> InputRows_;
+
+
+ TVector<std::unique_ptr<IArrayBuilder>> Builders_;
+ ui64 OutputBlockLen_ = 0;
+
+ const NKikimr::NMiniKQL::THolderFactory& Factory_;
+ TDqMeteringStats::TInputStatsMeter Stats_;
+
+ std::unique_ptr<TArgsDechunker> Output_;
+ bool IsFinished_ = false;
+};
+
} // namespace
void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue& val) {
@@ -359,7 +760,16 @@ NUdf::TUnboxedValue CreateInputUnionValue(TVector<IDqInput::TPtr>&& inputs,
NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(TVector<IDqInput::TPtr>&& inputs,
TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats)
{
+ YQL_ENSURE(!inputs.empty());
if (IsWideInputs(inputs)) {
+ if (AnyOf(sortCols, [](const auto& sortCol) { return sortCol.IsBlockOrScalar(); })) {
+ // we can ignore scalar columns, since all they have exactly the same value in all inputs
+ EraseIf(sortCols, [](const auto& sortCol) { return *sortCol.IsScalar; });
+ if (sortCols.empty()) {
+ return factory.Create<TDqInputUnionStreamValue<true>>(std::move(inputs), stats);
+ }
+ return factory.Create<TDqInputMergeBlockStreamValue>(std::move(inputs), std::move(sortCols), factory, stats);
+ }
return factory.Create<TDqInputMergeStreamValue<true>>(std::move(inputs), std::move(sortCols), stats);
}
return factory.Create<TDqInputMergeStreamValue<false>>(std::move(inputs), std::move(sortCols), stats);