diff options
author | aneporada <aneporada@ydb.tech> | 2023-08-28 16:00:45 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-08-28 16:56:56 +0300 |
commit | 5462cc6a10db1479840dc940f1ff0099fa3a8dcf (patch) | |
tree | 5d4c4ff6f779f6b8266673416d962e76cc74f6dd | |
parent | fef56d31ee04026138bb820b190f5308ebbbae86 (diff) | |
download | ydb-5462cc6a10db1479840dc940f1ff0099fa3a8dcf.tar.gz |
Fix dq statistics for wide block flow
initial
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_input_producer.cpp | 18 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_transport.cpp | 19 | ||||
-rw-r--r-- | ydb/library/yql/public/udf/arrow/block_reader.h | 105 |
3 files changed, 135 insertions, 7 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp index 9a473bb914a..8cdf5172807 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp @@ -580,10 +580,9 @@ private: YQL_ENSURE(blockLen > 0); result[chunk.size()] = Factory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))); - // TODO: support stats for blocks - //if (Stats_) { - // Stats_.Add(result, width); - //} + if (Stats_) { + Stats_.Add(result, width); + } return NUdf::EFetchStatus::Ok; } @@ -711,22 +710,29 @@ void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue& } void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue* row, ui32 width) { - Stats->RowsConsumed += 1; if (InputType) { YQL_ENSURE(InputType->IsMulti()); auto multiType = static_cast<const TMultiType*>(InputType); YQL_ENSURE(width == multiType->GetElementsCount()); + const bool isBlock = AnyOf(multiType->GetElements(), [](auto itemType) { return itemType->IsBlock(); }); + if (isBlock) { + Stats->RowsConsumed += TArrowBlock::From(row[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + } else { + Stats->RowsConsumed += 1; + } NYql::NDq::TDqDataSerializer::TEstimateSizeSettings settings; settings.DiscardUnsupportedTypes = true; settings.WithHeaders = false; ui64 size = 0; - for (ui32 i = 0; i < multiType->GetElementsCount(); ++i) { + for (ui32 i = 0; (isBlock ? (i + 1) : i) < multiType->GetElementsCount(); ++i) { size += TDqDataSerializer::EstimateSize(row[i], multiType->GetElementType(i), nullptr, settings); } Stats->BytesConsumed += Max<ui64>(size, 8 /* billing size for count(*) */); + } else { + Stats->RowsConsumed += 1; } } diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp index f22f0210aff..e3c1998746d 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.cpp +++ b/ydb/library/yql/dq/runtime/dq_transport.cpp @@ -1,6 +1,7 @@ #include "dq_transport.h" #include <ydb/library/mkql_proto/mkql_proto.h> +#include <ydb/library/yql/minikql/computation/mkql_block_reader.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h> #include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h> @@ -361,6 +362,23 @@ ui64 EstimateSizeImpl(const NUdf::TUnboxedValuePod& value, const NKikimr::NMiniK return EstimateSizeImpl(value, taggedType->GetBaseType(), fixed, settings); } + case TType::EKind::Block: { + auto blockType = static_cast<const TBlockType*>(type); + if (fixed) { + *fixed = false; + } + + auto reader = MakeBlockReader(TTypeInfoHelper(), blockType->GetItemType()); + ui64 size; + if (blockType->GetShape() == TBlockType::EShape::Many) { + size = reader->GetDataWeight(*TArrowBlock::From(value).GetDatum().array()); + } else { + auto blockItem = reader->GetScalarItem(*TArrowBlock::From(value).GetDatum().scalar()); + size = reader->GetDataWeight(blockItem); + } + return size; + } + case TType::EKind::Type: case TType::EKind::Stream: case TType::EKind::Callable: @@ -368,7 +386,6 @@ ui64 EstimateSizeImpl(const NUdf::TUnboxedValuePod& value, const NKikimr::NMiniK case TType::EKind::Resource: case TType::EKind::Flow: case TType::EKind::ReservedKind: - case TType::EKind::Block: case TType::EKind::Multi: { if (settings.DiscardUnsupportedTypes) { return 0; diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h index b09e3f4e79a..07951069674 100644 --- a/ydb/library/yql/public/udf/arrow/block_reader.h +++ b/ydb/library/yql/public/udf/arrow/block_reader.h @@ -18,6 +18,10 @@ public: virtual TBlockItem GetItem(const arrow::ArrayData& data, size_t index) = 0; virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0; + virtual ui64 GetDataWeight(const arrow::ArrayData& data) const = 0; + virtual ui64 GetDataWeight(TBlockItem item) const = 0; + virtual ui64 GetDefaultValueWeight() const = 0; + virtual void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const = 0; virtual void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const = 0; }; @@ -51,6 +55,25 @@ public: return TBlockItem(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data())); } + ui64 GetDataWeight(const arrow::ArrayData& data) const final { + if constexpr (Nullable) { + return (1 + sizeof(T)) * data.length; + } + return sizeof(T) * data.length; + } + + ui64 GetDataWeight(TBlockItem item) const final { + Y_UNUSED(item); + return GetDefaultValueWeight(); + } + + ui64 GetDefaultValueWeight() const final { + if constexpr (Nullable) { + return 1 + sizeof(T); + } + return sizeof(T); + } + void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { if constexpr (Nullable) { if (IsNull(data, index)) { @@ -106,6 +129,29 @@ public: return TBlockItem(str); } + ui64 GetDataWeight(const arrow::ArrayData& data) const final { + ui64 size = 0; + if constexpr (Nullable) { + size += data.length; + } + size += data.buffers[2] ? data.buffers[2]->size() : 0; + return size; + } + + ui64 GetDataWeight(TBlockItem item) const final { + if constexpr (Nullable) { + return 1 + (item ? item.AsStringRef().Size() : 0); + } + return item.AsStringRef().Size(); + } + + ui64 GetDefaultValueWeight() const final { + if constexpr (Nullable) { + return 1; + } + return 0; + } + void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { Y_VERIFY_DEBUG(data.buffers.size() == 3); if constexpr (Nullable) { @@ -174,6 +220,50 @@ public: return TBlockItem(Items.data()); } + ui64 GetDataWeight(const arrow::ArrayData& data) const final { + ui64 size = 0; + if constexpr (Nullable) { + size += data.length; + } + + for (ui32 i = 0; i < Children.size(); ++i) { + size += Children[i]->GetDataWeight(*data.child_data[i]); + } + + return size; + } + + ui64 GetDataWeight(TBlockItem item) const final { + const TBlockItem* items = nullptr; + ui64 size = 0; + if constexpr (Nullable) { + if (!item) { + return GetDefaultValueWeight(); + } + size = 1; + items = item.GetOptionalValue().GetElements(); + } else { + items = item.GetElements(); + } + + for (ui32 i = 0; i < Children.size(); ++i) { + size += Children[i]->GetDataWeight(items[i]); + } + + return size; + } + + ui64 GetDefaultValueWeight() const final { + ui64 size = 0; + if constexpr (Nullable) { + size = 1; + } + for (ui32 i = 0; i < Children.size(); ++i) { + size += Children[i]->GetDefaultValueWeight(); + } + return size; + } + void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { if constexpr (Nullable) { if (IsNull(data, index)) { @@ -230,6 +320,21 @@ public: return Inner->GetScalarItem(*structScalar.value[0]).MakeOptional(); } + ui64 GetDataWeight(const arrow::ArrayData& data) const final { + return data.length + Inner->GetDataWeight(*data.child_data[0]); + } + + ui64 GetDataWeight(TBlockItem item) const final { + if (!item) { + return GetDefaultValueWeight(); + } + return 1 + Inner->GetDataWeight(item.GetOptionalValue()); + } + + ui64 GetDefaultValueWeight() const final { + return 1 + Inner->GetDefaultValueWeight(); + } + void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { if (IsNull(data, index)) { return out.PushChar(0); |