aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-08-28 16:00:45 +0300
committeraneporada <aneporada@ydb.tech>2023-08-28 16:56:56 +0300
commit5462cc6a10db1479840dc940f1ff0099fa3a8dcf (patch)
tree5d4c4ff6f779f6b8266673416d962e76cc74f6dd
parentfef56d31ee04026138bb820b190f5308ebbbae86 (diff)
downloadydb-5462cc6a10db1479840dc940f1ff0099fa3a8dcf.tar.gz
Fix dq statistics for wide block flow
initial
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_producer.cpp18
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.cpp19
-rw-r--r--ydb/library/yql/public/udf/arrow/block_reader.h105
3 files changed, 135 insertions, 7 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
index 9a473bb914a..8cdf5172807 100644
--- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
@@ -580,10 +580,9 @@ private:
YQL_ENSURE(blockLen > 0);
result[chunk.size()] = Factory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen)));
- // TODO: support stats for blocks
- //if (Stats_) {
- // Stats_.Add(result, width);
- //}
+ if (Stats_) {
+ Stats_.Add(result, width);
+ }
return NUdf::EFetchStatus::Ok;
}
@@ -711,22 +710,29 @@ void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue&
}
void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue* row, ui32 width) {
- Stats->RowsConsumed += 1;
if (InputType) {
YQL_ENSURE(InputType->IsMulti());
auto multiType = static_cast<const TMultiType*>(InputType);
YQL_ENSURE(width == multiType->GetElementsCount());
+ const bool isBlock = AnyOf(multiType->GetElements(), [](auto itemType) { return itemType->IsBlock(); });
+ if (isBlock) {
+ Stats->RowsConsumed += TArrowBlock::From(row[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ } else {
+ Stats->RowsConsumed += 1;
+ }
NYql::NDq::TDqDataSerializer::TEstimateSizeSettings settings;
settings.DiscardUnsupportedTypes = true;
settings.WithHeaders = false;
ui64 size = 0;
- for (ui32 i = 0; i < multiType->GetElementsCount(); ++i) {
+ for (ui32 i = 0; (isBlock ? (i + 1) : i) < multiType->GetElementsCount(); ++i) {
size += TDqDataSerializer::EstimateSize(row[i], multiType->GetElementType(i), nullptr, settings);
}
Stats->BytesConsumed += Max<ui64>(size, 8 /* billing size for count(*) */);
+ } else {
+ Stats->RowsConsumed += 1;
}
}
diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp
index f22f0210aff..e3c1998746d 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.cpp
+++ b/ydb/library/yql/dq/runtime/dq_transport.cpp
@@ -1,6 +1,7 @@
#include "dq_transport.h"
#include <ydb/library/mkql_proto/mkql_proto.h>
+#include <ydb/library/yql/minikql/computation/mkql_block_reader.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h>
#include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h>
@@ -361,6 +362,23 @@ ui64 EstimateSizeImpl(const NUdf::TUnboxedValuePod& value, const NKikimr::NMiniK
return EstimateSizeImpl(value, taggedType->GetBaseType(), fixed, settings);
}
+ case TType::EKind::Block: {
+ auto blockType = static_cast<const TBlockType*>(type);
+ if (fixed) {
+ *fixed = false;
+ }
+
+ auto reader = MakeBlockReader(TTypeInfoHelper(), blockType->GetItemType());
+ ui64 size;
+ if (blockType->GetShape() == TBlockType::EShape::Many) {
+ size = reader->GetDataWeight(*TArrowBlock::From(value).GetDatum().array());
+ } else {
+ auto blockItem = reader->GetScalarItem(*TArrowBlock::From(value).GetDatum().scalar());
+ size = reader->GetDataWeight(blockItem);
+ }
+ return size;
+ }
+
case TType::EKind::Type:
case TType::EKind::Stream:
case TType::EKind::Callable:
@@ -368,7 +386,6 @@ ui64 EstimateSizeImpl(const NUdf::TUnboxedValuePod& value, const NKikimr::NMiniK
case TType::EKind::Resource:
case TType::EKind::Flow:
case TType::EKind::ReservedKind:
- case TType::EKind::Block:
case TType::EKind::Multi: {
if (settings.DiscardUnsupportedTypes) {
return 0;
diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h
index b09e3f4e79a..07951069674 100644
--- a/ydb/library/yql/public/udf/arrow/block_reader.h
+++ b/ydb/library/yql/public/udf/arrow/block_reader.h
@@ -18,6 +18,10 @@ public:
virtual TBlockItem GetItem(const arrow::ArrayData& data, size_t index) = 0;
virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0;
+ virtual ui64 GetDataWeight(const arrow::ArrayData& data) const = 0;
+ virtual ui64 GetDataWeight(TBlockItem item) const = 0;
+ virtual ui64 GetDefaultValueWeight() const = 0;
+
virtual void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const = 0;
virtual void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const = 0;
};
@@ -51,6 +55,25 @@ public:
return TBlockItem(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
}
+ ui64 GetDataWeight(const arrow::ArrayData& data) const final {
+ if constexpr (Nullable) {
+ return (1 + sizeof(T)) * data.length;
+ }
+ return sizeof(T) * data.length;
+ }
+
+ ui64 GetDataWeight(TBlockItem item) const final {
+ Y_UNUSED(item);
+ return GetDefaultValueWeight();
+ }
+
+ ui64 GetDefaultValueWeight() const final {
+ if constexpr (Nullable) {
+ return 1 + sizeof(T);
+ }
+ return sizeof(T);
+ }
+
void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
if constexpr (Nullable) {
if (IsNull(data, index)) {
@@ -106,6 +129,29 @@ public:
return TBlockItem(str);
}
+ ui64 GetDataWeight(const arrow::ArrayData& data) const final {
+ ui64 size = 0;
+ if constexpr (Nullable) {
+ size += data.length;
+ }
+ size += data.buffers[2] ? data.buffers[2]->size() : 0;
+ return size;
+ }
+
+ ui64 GetDataWeight(TBlockItem item) const final {
+ if constexpr (Nullable) {
+ return 1 + (item ? item.AsStringRef().Size() : 0);
+ }
+ return item.AsStringRef().Size();
+ }
+
+ ui64 GetDefaultValueWeight() const final {
+ if constexpr (Nullable) {
+ return 1;
+ }
+ return 0;
+ }
+
void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
Y_VERIFY_DEBUG(data.buffers.size() == 3);
if constexpr (Nullable) {
@@ -174,6 +220,50 @@ public:
return TBlockItem(Items.data());
}
+ ui64 GetDataWeight(const arrow::ArrayData& data) const final {
+ ui64 size = 0;
+ if constexpr (Nullable) {
+ size += data.length;
+ }
+
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ size += Children[i]->GetDataWeight(*data.child_data[i]);
+ }
+
+ return size;
+ }
+
+ ui64 GetDataWeight(TBlockItem item) const final {
+ const TBlockItem* items = nullptr;
+ ui64 size = 0;
+ if constexpr (Nullable) {
+ if (!item) {
+ return GetDefaultValueWeight();
+ }
+ size = 1;
+ items = item.GetOptionalValue().GetElements();
+ } else {
+ items = item.GetElements();
+ }
+
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ size += Children[i]->GetDataWeight(items[i]);
+ }
+
+ return size;
+ }
+
+ ui64 GetDefaultValueWeight() const final {
+ ui64 size = 0;
+ if constexpr (Nullable) {
+ size = 1;
+ }
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ size += Children[i]->GetDefaultValueWeight();
+ }
+ return size;
+ }
+
void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
if constexpr (Nullable) {
if (IsNull(data, index)) {
@@ -230,6 +320,21 @@ public:
return Inner->GetScalarItem(*structScalar.value[0]).MakeOptional();
}
+ ui64 GetDataWeight(const arrow::ArrayData& data) const final {
+ return data.length + Inner->GetDataWeight(*data.child_data[0]);
+ }
+
+ ui64 GetDataWeight(TBlockItem item) const final {
+ if (!item) {
+ return GetDefaultValueWeight();
+ }
+ return 1 + Inner->GetDataWeight(item.GetOptionalValue());
+ }
+
+ ui64 GetDefaultValueWeight() const final {
+ return 1 + Inner->GetDefaultValueWeight();
+ }
+
void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
if (IsNull(data, index)) {
return out.PushChar(0);