aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-07-31 12:32:50 +0300
committeraneporada <aneporada@ydb.tech>2023-07-31 12:32:50 +0300
commit8c2a3158362287c39c3360a751aaafcb98d5e520 (patch)
tree56db941c130f99c5374f8ce7035332e13e524cea
parent7107a9b540de50846e4c8fc0908cca7dd0898dac (diff)
downloadydb-8c2a3158362287c39c3360a751aaafcb98d5e520.tar.gz
Support blocks in DqCnHashShuffle connection. Fix block DqCnMerge with equal elements
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h3
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.cpp22
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.h1
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp3
-rw-r--r--ydb/library/yql/dq/runtime/dq_columns_resolve.cpp2
-rw-r--r--ydb/library/yql/dq/runtime/dq_columns_resolve.h17
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_producer.cpp18
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.cpp392
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.h5
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp29
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h11
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.h12
12 files changed, 432 insertions, 83 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
index b32537d951..a555f7c286 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
@@ -23,9 +23,10 @@ public:
IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc,
const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<IDqOutput::TPtr>&& outputs) const override
{
- return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, std::move(outputs));
+ return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs));
}
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override {
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
index 08eda24aa0..b04d62030e 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
@@ -22,22 +22,24 @@ using namespace NYql;
using namespace NDq;
IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const TType* type,
- NUdf::IApplyContext* applyCtx, const TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs)
+ NUdf::IApplyContext* applyCtx, const TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ TVector<IDqOutput::TPtr>&& outputs)
{
switch (outputDesc.GetTypeCase()) {
case NDqProto::TTaskOutput::kRangePartition: {
TVector<NScheme::TTypeInfo> keyColumnTypeInfos;
keyColumnTypeInfos.reserve(outputDesc.GetRangePartition().GetKeyColumns().size());
- TVector<TType*> keyColumnTypes;
TVector<ui32> keyColumnIndices;
- GetColumnsInfo(type, outputDesc.GetRangePartition().GetKeyColumns(), keyColumnTypes, keyColumnIndices);
- YQL_ENSURE(!keyColumnTypes.empty());
- std::transform(keyColumnTypes.begin(), keyColumnTypes.end(), back_inserter(keyColumnTypeInfos), [](const auto& tyPtr) {
+ TVector<TColumnInfo> keyColumns;
+ GetColumnsInfo(type, outputDesc.GetRangePartition().GetKeyColumns(), keyColumns);
+ YQL_ENSURE(!keyColumns.empty());
+ for (auto& info : keyColumns) {
// TODO: support pg types
- YQL_ENSURE(tyPtr->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data);
- auto dataTypeId = static_cast<NKikimr::NMiniKQL::TDataType&>(*tyPtr).GetSchemeType();
- return NScheme::TTypeInfo((NScheme::TTypeId)dataTypeId);
- });
+ YQL_ENSURE(info.Type->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data);
+ auto dataTypeId = static_cast<NKikimr::NMiniKQL::TDataType&>(*info.Type).GetSchemeType();
+ keyColumnTypeInfos.emplace_back(NScheme::TTypeInfo((NScheme::TTypeId)dataTypeId));
+ keyColumnIndices.emplace_back(info.Index);
+ }
TVector<TKqpRangePartition> partitions;
partitions.reserve(outputDesc.GetRangePartition().PartitionsSize());
@@ -61,7 +63,7 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp
}
default: {
- return DqBuildOutputConsumer(outputDesc, type, typeEnv, std::move(outputs));
+ return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs));
}
}
}
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h
index edb510e939..d9be8de480 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.h
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h
@@ -9,6 +9,7 @@ namespace NKqp {
NYql::NDq::IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NYql::NDqProto::TTaskOutput& outputDesc,
const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<NYql::NDq::IDqOutput::TPtr>&& outputs);
TIntrusivePtr<NYql::NDq::IDqTaskRunner> CreateKqpTaskRunner(const NYql::NDq::TDqTaskRunnerContext& execCtx,
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 1315889234..14ac6047a4 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -1098,9 +1098,10 @@ class TKqpTaskRunnerExecutionContext : public NDq::IDqTaskRunnerExecutionContext
public:
NDq::IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc,
const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<NDq::IDqOutput::TPtr>&& outputs) const override
{
- return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, std::move(outputs));
+ return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs));
}
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */) const override {
diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
index 70ba88e358..6d6f248290 100644
--- a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
+++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
@@ -16,7 +16,7 @@ TMaybe<TColumnInfo> FindColumnInfo(const NKikimr::NMiniKQL::TType* type, TString
YQL_ENSURE(idx < multiType.GetElementsCount(), "Invalid column index");
memberType = multiType.GetElementType(idx);
if (memberType->IsBlock()) {
- auto blockType = static_cast<const TBlockType*>(type);
+ auto blockType = static_cast<const TBlockType*>(memberType);
isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
memberType = blockType->GetItemType();
}
diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.h b/ydb/library/yql/dq/runtime/dq_columns_resolve.h
index f46e4e0eb0..f9e3e504a0 100644
--- a/ydb/library/yql/dq/runtime/dq_columns_resolve.h
+++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.h
@@ -45,28 +45,21 @@ TColumnInfo GetColumnInfo(const NKikimr::NMiniKQL::TType* type, TStringBuf colum
template<typename TList>
void GetColumnsInfo(const NKikimr::NMiniKQL::TType* type, const TList& columns,
- TVector<NKikimr::NMiniKQL::TType*>& columnTypes, TVector<ui32>& columnIndices)
+ TVector<TColumnInfo>& result)
{
- columnTypes.clear();
- columnIndices.clear();
-
- columnTypes.reserve(columns.size());
- columnIndices.reserve(columns.size());
-
+ result.clear();
+ result.reserve(columns.size());
for (auto& column : columns) {
- auto columnInfo = GetColumnInfo(type, column);
- columnTypes.push_back(columnInfo.Type);
- columnIndices.push_back(columnInfo.Index);
+ result.emplace_back(GetColumnInfo(type, column));
}
}
template<typename TList>
-void GetColumnsInfo(const NKikimr::NMiniKQL::TType* type, const TList& protoSortCols,
+void GetSortColumnsInfo(const NKikimr::NMiniKQL::TType* type, const TList& protoSortCols,
TVector<TSortColumnInfo>& sortCols)
{
sortCols.clear();
sortCols.reserve(protoSortCols.size());
-
for (const auto& protoSortCol : protoSortCols) {
TSortColumnInfo colInfo = static_cast<TSortColumnInfo>(GetColumnInfo(type, protoSortCol.GetColumn()));
colInfo.Ascending = protoSortCol.GetAscending();
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
index 1707ba41a3..dee5aeb508 100644
--- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
@@ -343,17 +343,6 @@ TVector<NKikimr::NMiniKQL::TType*> ExtractBlockItemTypes(const NKikimr::NMiniKQL
return result;
}
-ui64 CalcMaxBlockSize(const TVector<NKikimr::NMiniKQL::TType*> itemTypes) {
- TTypeInfoHelper helper;
- ui64 maxBlockLen = Max<ui64>();
- for (auto& itemType : itemTypes) {
- if (itemType) {
- maxBlockLen = std::min(maxBlockLen, helper.GetMaxBlockLength(itemType));
- }
- }
- return maxBlockLen;
-}
-
TVector<std::unique_ptr<IBlockReader>> MakeReaders(const TVector<NKikimr::NMiniKQL::TType*> itemTypes) {
TVector<std::unique_ptr<IBlockReader>> result;
for (auto& itemType : itemTypes) {
@@ -405,13 +394,14 @@ public:
: TBase(memInfo)
, SortCols_(std::move(sortCols))
, ItemTypes_(ExtractBlockItemTypes(inputs.front()->GetInputType()))
- , MaxOutputBlockLen_(CalcMaxBlockSize(ItemTypes_))
+ , MaxOutputBlockLen_(CalcMaxBlockLength(ItemTypes_.begin(), ItemTypes_.end(), TTypeInfoHelper()))
, Comparators_(MakeComparators(SortCols_, ItemTypes_))
, Builders_(MakeBuilders(MaxOutputBlockLen_, ItemTypes_))
, Factory_(factory)
, Stats_(stats)
{
YQL_ENSURE(!inputs.empty());
+ YQL_ENSURE(MaxOutputBlockLen_ > 0);
InputData_.reserve(inputs.size());
for (auto& input : inputs) {
InputData_.emplace_back(std::move(input), this);
@@ -534,7 +524,9 @@ private:
return compare > 0;
}
}
- return false;
+ // resolve equal elements: when InputIndex()es are equal, we must first process element with smaller BlockIndex()
+ // Note: operator> here since this comparator is used together with max-heap
+ return std::tuple(InputIndex(), BlockIndex()) > std::tuple(other.InputIndex(), other.BlockIndex());
}
TBlockItem GetItem(ui32 columnIndex) const {
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
index e78288160f..5b640f16e2 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
@@ -1,9 +1,15 @@
#include "dq_output_consumer.h"
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
-#include <ydb/library/yql/public/udf/udf_value.h>
+#include <ydb/library/yql/minikql/computation/mkql_block_builder.h>
+#include <ydb/library/yql/minikql/computation/mkql_block_reader.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_type_builder.h>
+
+#include <ydb/library/yql/public/udf/arrow/args_dechunker.h>
+#include <ydb/library/yql/public/udf/udf_value.h>
+
#include <ydb/library/yql/utils/yql_panic.h>
namespace NYql::NDq {
@@ -117,18 +123,13 @@ protected:
return !IsWaitingFlag;
}
public:
- TDqOutputHashPartitionConsumer(TVector<IDqOutput::TPtr>&& outputs,
- TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices,
- TMaybe<ui32> outputWidth)
+ TDqOutputHashPartitionConsumer(TVector<IDqOutput::TPtr>&& outputs, TVector<TColumnInfo>&& keyColumns, TMaybe<ui32> outputWidth)
: Outputs(std::move(outputs))
- , KeyColumnIndices(std::move(keyColumnIndices))
- , ValueHashers(KeyColumnIndices.size(), NUdf::IHash::TPtr{})
+ , KeyColumns(std::move(keyColumns))
, OutputWidth(outputWidth)
{
- MKQL_ENSURE_S(keyColumnTypes.size() == KeyColumnIndices.size());
-
- for (auto i = 0U; i < keyColumnTypes.size(); i++) {
- ValueHashers[i] = MakeHashImpl(keyColumnTypes[i]);
+ for (auto& column : KeyColumns) {
+ ValueHashers.emplace_back(MakeHashImpl(column.Type));
}
if (outputWidth.Defined()) {
@@ -161,9 +162,7 @@ public:
YQL_ENSURE(!IsWaitingFlag);
IsWaitingFlag = true;
OutputWaiting = Outputs[partitionIndex];
- for (ui32 i = 0; i < count; ++i) {
- WideWaitingValues[i] = std::move(values[i]);
- }
+ std::move(values, values + count, WideWaitingValues.data());
} else {
Outputs[partitionIndex]->WidePush(values, count);
}
@@ -185,8 +184,8 @@ private:
size_t GetHashPartitionIndex(const TUnboxedValue& value) {
ui64 hash = 0;
- for (size_t keyId = 0; keyId < KeyColumnIndices.size(); keyId++) {
- auto columnValue = value.GetElement(KeyColumnIndices[keyId]);
+ for (size_t keyId = 0; keyId < KeyColumns.size(); keyId++) {
+ auto columnValue = value.GetElement(KeyColumns[keyId].Index);
hash = CombineHashes(hash, HashColumn(keyId, columnValue));
}
@@ -196,9 +195,9 @@ private:
size_t GetHashPartitionIndex(const TUnboxedValue* values) {
ui64 hash = 0;
- for (size_t keyId = 0; keyId < KeyColumnIndices.size(); keyId++) {
- MKQL_ENSURE_S(KeyColumnIndices[keyId] < OutputWidth);
- hash = CombineHashes(hash, HashColumn(keyId, values[KeyColumnIndices[keyId]]));
+ for (size_t keyId = 0; keyId < KeyColumns.size(); keyId++) {
+ MKQL_ENSURE_S(KeyColumns[keyId].Index < OutputWidth);
+ hash = CombineHashes(hash, HashColumn(keyId, values[KeyColumns[keyId].Index]));
}
return hash % Outputs.size();
@@ -212,12 +211,342 @@ private:
}
private:
- TVector<IDqOutput::TPtr> Outputs;
- TVector<ui32> KeyColumnIndices;
+ const TVector<IDqOutput::TPtr> Outputs;
+ const TVector<TColumnInfo> KeyColumns;
TVector<NUdf::IHash::TPtr> ValueHashers;
const TMaybe<ui32> OutputWidth;
};
+class TDqOutputHashPartitionConsumerScalar : public IDqOutputConsumer {
+public:
+ TDqOutputHashPartitionConsumerScalar(TVector<IDqOutput::TPtr>&& outputs, TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType)
+ : Outputs_(std::move(outputs))
+ , KeyColumns_(std::move(keyColumns))
+ , OutputWidth_(static_cast<const NMiniKQL::TMultiType*>(outputType)->GetElementsCount())
+ , WaitingValues_(OutputWidth_)
+ {
+ auto multiType = static_cast<const NMiniKQL::TMultiType*>(outputType);
+ TBlockTypeHelper helper;
+ for (auto& column : KeyColumns_) {
+ auto columnType = multiType->GetElementType(column.Index);
+ YQL_ENSURE(columnType->IsBlock());
+ auto blockType = static_cast<const NMiniKQL::TBlockType*>(columnType);
+ YQL_ENSURE(blockType->GetShape() == NMiniKQL::TBlockType::EShape::Scalar);
+ Readers_.emplace_back(MakeBlockReader(TTypeInfoHelper(), blockType->GetItemType()));
+ Hashers_.emplace_back(helper.MakeHasher(blockType->GetItemType()));
+ }
+ }
+private:
+ bool IsFull() const final {
+ DrainWaiting();
+ return IsWaitingFlag_;
+ }
+
+ void Consume(TUnboxedValue&& value) final {
+ Y_UNUSED(value);
+ YQL_ENSURE(false, "Consume() called on wide block stream");
+ }
+
+ void WideConsume(TUnboxedValue* values, ui32 count) final {
+ YQL_ENSURE(count == OutputWidth_);
+ const ui64 inputBlockLen = TArrowBlock::From(values[count - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ if (!inputBlockLen) {
+ return;
+ }
+
+ if (!Output_) {
+ Output_ = Outputs_[GetHashPartitionIndex(values)];
+ }
+ if (Output_->IsFull()) {
+ YQL_ENSURE(!IsWaitingFlag_);
+ IsWaitingFlag_ = true;
+ std::move(values, values + count, WaitingValues_.data());
+ } else {
+ Output_->WidePush(values, count);
+ }
+ }
+
+ void Consume(NDqProto::TCheckpoint&& checkpoint) override {
+ for (auto& output : Outputs_) {
+ output->Push(NDqProto::TCheckpoint(checkpoint));
+ }
+ }
+
+ void Finish() final {
+ for (auto& output : Outputs_) {
+ output->Finish();
+ }
+ }
+
+ void DrainWaiting() const {
+ if (Y_UNLIKELY(IsWaitingFlag_)) {
+ YQL_ENSURE(Output_);
+ if (Output_->IsFull()) {
+ return;
+ }
+ YQL_ENSURE(OutputWidth_ == WaitingValues_.size());
+ Output_->WidePush(WaitingValues_.data(), OutputWidth_);
+ IsWaitingFlag_ = false;
+ }
+ }
+
+ bool DoTryFinish() final {
+ DrainWaiting();
+ return !IsWaitingFlag_;
+ }
+
+ size_t GetHashPartitionIndex(const TUnboxedValue* values) {
+ ui64 hash = 0;
+
+ for (size_t keyId = 0; keyId < KeyColumns_.size(); keyId++) {
+ YQL_ENSURE(KeyColumns_[keyId].Index < OutputWidth_);
+ hash = CombineHashes(hash, HashColumn(keyId, values[KeyColumns_[keyId].Index]));
+ }
+
+ return hash % Outputs_.size();
+ }
+
+ ui64 HashColumn(size_t keyId, const TUnboxedValue& value) const {
+ TBlockItem item = Readers_[keyId]->GetScalarItem(*TArrowBlock::From(value).GetDatum().scalar());
+ return Hashers_[keyId]->Hash(item);
+ }
+
+private:
+ const TVector<IDqOutput::TPtr> Outputs_;
+ const TVector<TColumnInfo> KeyColumns_;
+ const ui32 OutputWidth_;
+ TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
+ TVector<std::unique_ptr<IBlockReader>> Readers_;
+ IDqOutput::TPtr Output_;
+ mutable bool IsWaitingFlag_ = false;
+ mutable TUnboxedValueVector WaitingValues_;
+};
+
+class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
+public:
+ TDqOutputHashPartitionConsumerBlock(TVector<IDqOutput::TPtr>&& outputs, TVector<TColumnInfo>&& keyColumns,
+ const NKikimr::NMiniKQL::TType* outputType,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory)
+ : OutputType_(static_cast<const NMiniKQL::TMultiType*>(outputType))
+ , HolderFactory_(holderFactory)
+ , Outputs_(std::move(outputs))
+ , KeyColumns_(std::move(keyColumns))
+ , ScalarColumnHashes_(KeyColumns_.size())
+ , OutputWidth_(OutputType_->GetElementsCount())
+ {
+ TTypeInfoHelper helper;
+ YQL_ENSURE(OutputWidth_ > KeyColumns_.size());
+
+ TVector<const NMiniKQL::TType*> blockTypes;
+ for (auto& columnType : OutputType_->GetElements()) {
+ YQL_ENSURE(columnType->IsBlock());
+ auto blockType = static_cast<const NMiniKQL::TBlockType*>(columnType);
+ if (blockType->GetShape() == NMiniKQL::TBlockType::EShape::Many) {
+ blockTypes.emplace_back(blockType->GetItemType());
+ }
+ }
+ ui64 maxBlockLen = CalcMaxBlockLength(blockTypes.begin(), blockTypes.end(), helper);
+ YQL_ENSURE(maxBlockLen > 0);
+ MakeBuilders(maxBlockLen);
+
+ TBlockTypeHelper blockHelper;
+ for (auto& column : KeyColumns_) {
+ auto columnType = OutputType_->GetElementType(column.Index);
+ YQL_ENSURE(columnType->IsBlock());
+ auto blockType = static_cast<const NMiniKQL::TBlockType*>(columnType);
+ Readers_.emplace_back(MakeBlockReader(helper, blockType->GetItemType()));
+ Hashers_.emplace_back(blockHelper.MakeHasher(blockType->GetItemType()));
+ }
+ }
+
+private:
+ bool IsFull() const final {
+ DrainWaiting();
+ return IsWaitingFlag_;
+ }
+
+ void Consume(TUnboxedValue&& value) final {
+ Y_UNUSED(value);
+ YQL_ENSURE(false, "Consume() called on wide block stream");
+ }
+
+ void WideConsume(TUnboxedValue* values, ui32 count) final {
+ YQL_ENSURE(!IsWaitingFlag_);
+ YQL_ENSURE(count == OutputWidth_);
+
+ const ui64 inputBlockLen = TArrowBlock::From(values[count - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ if (!inputBlockLen) {
+ return;
+ }
+
+ TVector<const arrow::Datum*> datums;
+ datums.reserve(count - 1);
+ for (ui32 i = 0; i + 1 < count; ++i) {
+ datums.push_back(&TArrowBlock::From(values[i]).GetDatum());
+ }
+
+ TVector<TVector<ui64>> outputBlockIndexes(Outputs_.size());
+ for (ui64 i = 0; i < inputBlockLen; ++i) {
+ outputBlockIndexes[GetHashPartitionIndex(datums.data(), i)].push_back(i);
+ }
+
+ ui64 maxLen = 0;
+ for (auto& indexes : outputBlockIndexes) {
+ maxLen = std::max(maxLen, indexes.size());
+ }
+
+ if (maxLen > MaxOutputBlockLen_) {
+ MakeBuilders(maxLen);
+ }
+
+ TVector<std::unique_ptr<TArgsDechunker>> outputData;
+ for (size_t i = 0; i < Outputs_.size(); ++i) {
+ ui64 outputBlockLen = outputBlockIndexes[i].size();
+ if (!outputBlockLen) {
+ outputData.emplace_back();
+ continue;
+ }
+ const ui64* indexes = outputBlockIndexes[i].data();
+
+ std::vector<arrow::Datum> output;
+ for (size_t j = 0; j < datums.size(); ++j) {
+ const arrow::Datum* src = datums[j];
+ if (src->is_scalar()) {
+ output.emplace_back(*src);
+ } else {
+ IArrayBuilder::TArrayDataItem dataItem;
+ dataItem.Data = src->array().get();
+ dataItem.StartOffset = 0;
+ Builders_[j]->AddMany(&dataItem, 1, indexes, outputBlockLen);
+ output.emplace_back(Builders_[j]->Build(false));
+ }
+ }
+ output.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(outputBlockLen)));
+ outputData.emplace_back(std::make_unique<TArgsDechunker>(std::move(output)));
+ }
+
+ DoConsume(std::move(outputData));
+ }
+
+ void DoConsume(TVector<std::unique_ptr<TArgsDechunker>>&& outputData) const {
+ while (!outputData.empty()) {
+ bool hasData = false;
+ for (size_t i = 0; i < Outputs_.size(); ++i) {
+ if (Outputs_[i]->IsFull()) {
+ IsWaitingFlag_ = true;
+ OutputData_ = std::move(outputData);
+ return;
+ }
+
+ std::vector<arrow::Datum> chunk;
+ if (outputData[i] && outputData[i]->Next(chunk)) {
+ hasData = true;
+ TUnboxedValueVector outputValues;
+ for (auto& datum : chunk) {
+ outputValues.emplace_back(HolderFactory_.CreateArrowBlock(std::move(datum)));
+ }
+ Outputs_[i]->WidePush(outputValues.data(), outputValues.size());
+ }
+ }
+ if (!hasData) {
+ outputData.clear();
+ }
+ }
+ }
+
+ void Consume(NDqProto::TCheckpoint&& checkpoint) override {
+ for (auto& output : Outputs_) {
+ output->Push(NDqProto::TCheckpoint(checkpoint));
+ }
+ }
+
+ void Finish() final {
+ for (auto& output : Outputs_) {
+ output->Finish();
+ }
+ }
+
+ void DrainWaiting() const {
+ if (Y_UNLIKELY(IsWaitingFlag_)) {
+ TVector<std::unique_ptr<TArgsDechunker>> outputData;
+ outputData.swap(OutputData_);
+ DoConsume(std::move(outputData));
+ if (OutputData_.empty()) {
+ IsWaitingFlag_ = false;
+ }
+ }
+ }
+
+ bool DoTryFinish() final {
+ DrainWaiting();
+ return !IsWaitingFlag_;
+ }
+
+ size_t GetHashPartitionIndex(const arrow::Datum** values, ui64 blockIndex) {
+ ui64 hash = 0;
+ for (size_t keyId = 0; keyId < KeyColumns_.size(); keyId++) {
+ const ui32 columnIndex = KeyColumns_[keyId].Index;
+ Y_VERIFY_DEBUG(columnIndex < OutputWidth_);
+ ui64 keyHash;
+ if (*KeyColumns_[keyId].IsScalar) {
+ if (!ScalarColumnHashes_[keyId].Defined()) {
+ ScalarColumnHashes_[keyId] = HashScalarColumn(keyId, *values[columnIndex]->scalar());
+ }
+ keyHash = *ScalarColumnHashes_[keyId];
+ } else {
+ keyHash = HashBlockColumn(keyId, *values[columnIndex]->array(), blockIndex);
+ }
+ hash = CombineHashes(hash, keyHash);
+ }
+ return hash % Outputs_.size();
+ }
+
+ inline ui64 HashScalarColumn(size_t keyId, const arrow::Scalar& value) const {
+ TBlockItem item = Readers_[keyId]->GetScalarItem(value);
+ return Hashers_[keyId]->Hash(item);
+ }
+
+ inline ui64 HashBlockColumn(size_t keyId, const arrow::ArrayData& value, ui64 index) const {
+ TBlockItem item = Readers_[keyId]->GetItem(value, index);
+ return Hashers_[keyId]->Hash(item);
+ }
+
+ void MakeBuilders(ui64 maxBlockLen) {
+ Builders_.clear();
+ TTypeInfoHelper helper;
+ for (auto& columnType : OutputType_->GetElements()) {
+ YQL_ENSURE(columnType->IsBlock());
+ auto blockType = static_cast<const NMiniKQL::TBlockType*>(columnType);
+ if (blockType->GetShape() == NMiniKQL::TBlockType::EShape::Many) {
+ auto itemType = blockType->GetItemType();
+ YQL_ENSURE(!itemType->IsPg(), "pg types are not supported yet");
+ Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *arrow::default_memory_pool(), maxBlockLen, nullptr));
+ } else {
+ Builders_.emplace_back();
+ }
+ }
+ MaxOutputBlockLen_ = maxBlockLen;
+ }
+
+private:
+ const NKikimr::NMiniKQL::TMultiType* const OutputType_;
+ const NKikimr::NMiniKQL::THolderFactory& HolderFactory_;
+
+ const TVector<IDqOutput::TPtr> Outputs_;
+ mutable TVector<std::unique_ptr<TArgsDechunker>> OutputData_;
+
+ const TVector<TColumnInfo> KeyColumns_;
+ TVector<TMaybe<ui64>> ScalarColumnHashes_;
+ const ui32 OutputWidth_;
+
+ TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
+ TVector<std::unique_ptr<IBlockReader>> Readers_;
+ TVector<std::unique_ptr<IArrayBuilder>> Builders_;
+
+ ui64 MaxOutputBlockLen_ = 0;
+ mutable bool IsWaitingFlag_ = false;
+};
+
class TDqOutputBroadcastConsumer : public IDqOutputConsumer {
public:
TDqOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth)
@@ -277,10 +606,27 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output) {
IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
TVector<IDqOutput::TPtr>&& outputs,
- TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, TMaybe<ui32> outputWidth)
+ TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory)
{
- return MakeIntrusive<TDqOutputHashPartitionConsumer>(std::move(outputs), std::move(keyColumnTypes),
- std::move(keyColumnIndices), outputWidth);
+ YQL_ENSURE(!outputs.empty());
+ YQL_ENSURE(!keyColumns.empty());
+ TMaybe<ui32> outputWidth;
+ if (outputType->IsMulti()) {
+ outputWidth = static_cast<const NMiniKQL::TMultiType*>(outputType)->GetElementsCount();
+ }
+
+ if (AnyOf(keyColumns, [](const auto& info) { return !info.IsBlockOrScalar(); })) {
+ return MakeIntrusive<TDqOutputHashPartitionConsumer>(std::move(outputs), std::move(keyColumns), outputWidth);
+ }
+
+ YQL_ENSURE(outputWidth.Defined(), "Expecting wide stream for block data");
+ if (AllOf(keyColumns, [](const auto& info) { return *info.IsScalar; })) {
+ // all key columns are scalars - all data will go to single output
+ return MakeIntrusive<TDqOutputHashPartitionConsumerScalar>(std::move(outputs), std::move(keyColumns), outputType);
+ }
+
+ return MakeIntrusive<TDqOutputHashPartitionConsumerBlock>(std::move(outputs), std::move(keyColumns), outputType, holderFactory);
}
IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth) {
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h
index 8bb7ea1ece..94fc30ae9e 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.h
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h
@@ -1,11 +1,13 @@
#pragma once
+#include "dq_columns_resolve.h"
#include "dq_output.h"
#include <ydb/library/yql/minikql/mkql_alloc.h>
namespace NKikimr::NMiniKQL {
class TTypeEnvironment;
+class THolderFactory;
} // namespace NKikimr::NMiniKQL
namespace NYql::NDq {
@@ -44,7 +46,8 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output);
IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
TVector<IDqOutput::TPtr>&& outputs,
- TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, TMaybe<ui32> outputWidth);
+ TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory);
IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth);
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 6c1542a999..c281d7565a 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -150,7 +150,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con
case NYql::NDqProto::TTaskInput::kMerge: {
const auto& protoSortCols = inputDesc.GetMerge().GetSortColumns();
TVector<TSortColumnInfo> sortColsInfo;
- GetColumnsInfo(type, protoSortCols, sortColsInfo);
+ GetSortColumnsInfo(type, protoSortCols, sortColsInfo);
YQL_ENSURE(!sortColsInfo.empty());
return CreateInputMergeValue(std::move(inputs), std::move(sortColsInfo), holderFactory, stats);
@@ -161,7 +161,8 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con
}
IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type,
- const NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs)
+ const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ TVector<IDqOutput::TPtr>&& outputs)
{
TMaybe<ui32> outputWidth;
if (type->IsMulti()) {
@@ -178,19 +179,11 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu
}
case NDqProto::TTaskOutput::kHashPartition: {
- TVector<TType*> keyColumnTypes;
- TVector<ui32> keyColumnIndices;
- GetColumnsInfo(type, outputDesc.GetHashPartition().GetKeyColumns(), keyColumnTypes, keyColumnIndices);
- YQL_ENSURE(!keyColumnTypes.empty());
-
+ TVector<TColumnInfo> keyColumns;
+ GetColumnsInfo(type, outputDesc.GetHashPartition().GetKeyColumns(), keyColumns);
+ YQL_ENSURE(!keyColumns.empty());
YQL_ENSURE(outputDesc.GetHashPartition().GetPartitionsCount() == outputDesc.ChannelsSize());
- TVector<ui64> channelIds(outputDesc.GetHashPartition().GetPartitionsCount());
- for (ui32 i = 0; i < outputDesc.ChannelsSize(); ++i) {
- channelIds[i] = outputDesc.GetChannels(i).GetId();
- }
-
- return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumnTypes),
- std::move(keyColumnIndices), outputWidth);
+ return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumns), type, holderFactory);
}
case NDqProto::TTaskOutput::kBroadcast: {
@@ -213,9 +206,9 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu
IDqOutputConsumer::TPtr TDqTaskRunnerExecutionContext::CreateOutputConsumer(const TTaskOutput& outputDesc,
const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext*, const TTypeEnvironment& typeEnv,
- TVector<IDqOutput::TPtr>&& outputs) const
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<IDqOutput::TPtr>&& outputs) const
{
- return DqBuildOutputConsumer(outputDesc, type, typeEnv, std::move(outputs));
+ return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs));
}
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 /* channelId */) const {
@@ -652,7 +645,7 @@ public:
if (transform) {
auto guard = BindAllocator();
transform->TransformOutput = execCtx.CreateOutputConsumer(outputDesc, transform->TransformOutputType,
- Context.ApplyCtx, typeEnv, std::move(outputs));
+ Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs));
outputs.clear();
outputs.emplace_back(transform->TransformInput);
@@ -661,7 +654,7 @@ public:
{
auto guard = BindAllocator();
outputConsumers[i] = execCtx.CreateOutputConsumer(outputDesc, entry->OutputItemTypes[i],
- Context.ApplyCtx, typeEnv, std::move(outputs));
+ Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs));
}
}
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index b5b0af84c7..868b2bd98d 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -241,7 +241,9 @@ public:
virtual IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc,
const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx,
- const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs) const = 0;
+ const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ TVector<IDqOutput::TPtr>&& outputs) const = 0;
virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const = 0;
};
@@ -250,7 +252,9 @@ class TDqTaskRunnerExecutionContext : public IDqTaskRunnerExecutionContext {
public:
IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc,
const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx,
- const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs) const override;
+ const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ TVector<IDqOutput::TPtr>&& outputs) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override;
};
@@ -275,7 +279,8 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con
TVector<IDqInputChannel::TPtr>&& channels, const NKikimr::NMiniKQL::THolderFactory& holderFactory);
IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type,
- const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& channels);
+ const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ TVector<IDqOutput::TPtr>&& channels);
using TDqTaskRunnerParameterProvider = std::function<
bool(std::string_view name, NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
diff --git a/ydb/library/yql/minikql/mkql_type_builder.h b/ydb/library/yql/minikql/mkql_type_builder.h
index 8806c014ab..175507632e 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.h
+++ b/ydb/library/yql/minikql/mkql_type_builder.h
@@ -224,5 +224,17 @@ NUdf::IHash::TPtr MakeHashImpl(const NMiniKQL::TType* type);
NUdf::ICompare::TPtr MakeCompareImpl(const NMiniKQL::TType* type);
NUdf::IEquate::TPtr MakeEquateImpl(const NMiniKQL::TType* type);
+template<typename T>
+ui64 CalcMaxBlockLength(T beginIt, T endIt, const NUdf::ITypeInfoHelper& helper) {
+ ui64 maxBlockLen = Max<ui64>();
+ while (beginIt != endIt) {
+ const TType* itemType = *beginIt++;
+ if (itemType) {
+ maxBlockLen = std::min(maxBlockLen, helper.GetMaxBlockLength(itemType));
+ }
+ }
+ return (maxBlockLen == Max<ui64>()) ? 0 : maxBlockLen;
+}
+
} // namespace NMiniKQL
} // namespace Nkikimr