diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-16 16:04:59 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-16 16:04:59 +0300 |
commit | 4a928ef858041e897b5943782f8873ba1c8f56bd (patch) | |
tree | 45d6a1a417a4ebc6b0587dd0a3974fcc5204d8d9 | |
parent | 7858e0d83acd62a0f5790a12eec397441c189c8e (diff) | |
download | ydb-4a928ef858041e897b5943782f8873ba1c8f56bd.tar.gz |
speed up size calculation method for estimate channel loading
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.cpp | 421 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.h | 39 |
2 files changed, 292 insertions, 168 deletions
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp index 52800f87a11..3ded07d28e3 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp @@ -11,10 +11,10 @@ namespace NKikimr { namespace NMiniKQL { -TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type) { +TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, const NScheme::TTypeInfo& type) { namespace NTypeIds = NScheme::NTypeIds; if (!value) { - return {sizeof(NUdf::TUnboxedValue), 8}; // Special value for NULL elements + return { sizeof(NUdf::TUnboxedValue), 8 }; // Special value for NULL elements } switch (type.GetTypeId()) { case NTypeIds::Bool: @@ -36,14 +36,15 @@ TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme:: case NTypeIds::Timestamp: case NTypeIds::Interval: case NTypeIds::ActorId: - case NTypeIds::StepOrderId: { + case NTypeIds::StepOrderId: + { YQL_ENSURE(value.IsEmbedded(), "Passed wrong type: " << NScheme::TypeName(type.GetTypeId())); - return {sizeof(NUdf::TUnboxedValue), sizeof(i64)}; + return { sizeof(NUdf::TUnboxedValue), sizeof(i64) }; } case NTypeIds::Decimal: { YQL_ENSURE(value.IsEmbedded(), "Passed wrong type: " << NScheme::TypeName(type.GetTypeId())); - return {sizeof(NUdf::TUnboxedValue), sizeof(NYql::NDecimal::TInt128)}; + return { sizeof(NUdf::TUnboxedValue), sizeof(NYql::NDecimal::TInt128) }; } case NTypeIds::String: case NTypeIds::Utf8: @@ -51,16 +52,18 @@ TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme:: case NTypeIds::Yson: case NTypeIds::JsonDocument: case NTypeIds::DyNumber: - case NTypeIds::PairUi64Ui64: { + case NTypeIds::PairUi64Ui64: + { if (value.IsEmbedded()) { - return {sizeof(NUdf::TUnboxedValue), std::max((ui32) 8, value.AsStringRef().Size())}; + return { sizeof(NUdf::TUnboxedValue), std::max((ui32)8, value.AsStringRef().Size()) }; } else { Y_VERIFY_DEBUG_S(8 < value.AsStringRef().Size(), "Small string of size " << value.AsStringRef().Size() << " is not embedded."); - return {sizeof(NUdf::TUnboxedValue) + value.AsStringRef().Size(), value.AsStringRef().Size()}; + return { sizeof(NUdf::TUnboxedValue) + value.AsStringRef().Size(), value.AsStringRef().Size() }; } } - case NTypeIds::Pg: { + case NTypeIds::Pg: + { return { sizeof(NUdf::TUnboxedValue), NKikimr::NMiniKQL::PgValueSize( @@ -73,9 +76,9 @@ TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme:: default: Y_VERIFY_DEBUG_S(false, "Unsupported type " << NScheme::TypeName(type.GetTypeId())); if (value.IsEmbedded()) { - return {sizeof(NUdf::TUnboxedValue), sizeof(NUdf::TUnboxedValue)}; + return { sizeof(NUdf::TUnboxedValue), sizeof(NUdf::TUnboxedValue) }; } else { - return {sizeof(NUdf::TUnboxedValue) + value.AsStringRef().Size(), value.AsStringRef().Size()}; + return { sizeof(NUdf::TUnboxedValue) + value.AsStringRef().Size(), value.AsStringRef().Size() }; } } } @@ -93,14 +96,13 @@ void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable namespace { TBytesStatistics GetRowSize(const NUdf::TUnboxedValue* row, const TSmallVec<TKqpComputeContextBase::TColumn>& columns, - const TSmallVec<TKqpComputeContextBase::TColumn>& systemColumns) -{ - TBytesStatistics rowStats{systemColumns.size() * sizeof(NUdf::TUnboxedValue), 0}; + const TSmallVec<TKqpComputeContextBase::TColumn>& systemColumns) { + TBytesStatistics rowStats{ systemColumns.size() * sizeof(NUdf::TUnboxedValue), 0 }; for (size_t columnIndex = 0; columnIndex < columns.size(); ++columnIndex) { - rowStats.AddStatistics(GetUnboxedValueSize(row[columnIndex], columns[columnIndex].Type)); + rowStats += GetUnboxedValueSize(row[columnIndex], columns[columnIndex].Type); } if (columns.empty()) { - rowStats.AddStatistics({sizeof(ui64), sizeof(ui64)}); + rowStats.AddStatistics({ sizeof(ui64), sizeof(ui64) }); } return rowStats; } @@ -116,41 +118,15 @@ NUdf::TUnboxedValue MakeUnboxedValue(arrow::Array* column, ui32 row) { auto array = reinterpret_cast<TArrayType*>(column); return NUdf::TUnboxedValuePod(static_cast<TValueType>(array->Value(row))); } - -NUdf::TUnboxedValue MakeUnboxedValueFromBinaryData(arrow::Array* column, ui32 row) { - auto array = reinterpret_cast<arrow::BinaryArray*>(column); - auto data = array->GetView(row); - return MakeString(NUdf::TStringRef(data.data(), data.size())); -} - -NUdf::TUnboxedValue MakeUnboxedValueFromFixedSizeBinaryData(arrow::Array* column, ui32 row) { - auto array = reinterpret_cast<arrow::FixedSizeBinaryArray*>(column); - auto data = array->GetView(row); - return MakeString(NUdf::TStringRef(data.data(), data.size()-1)); -} - -NUdf::TUnboxedValue MakeUnboxedValueFromDecimal128Array(arrow::Array* column, ui32 row) { - auto array = reinterpret_cast<arrow::Decimal128Array*>(column); - auto data = array->GetView(row); - // It's known that Decimal params are always Decimal(22,9), - // so we verify Decimal type here before store it in UnboxedValue. - const auto& type = arrow::internal::checked_cast<const arrow::Decimal128Type&>(*array->type()); - YQL_ENSURE(type.precision() == NScheme::DECIMAL_PRECISION, "Unsupported Decimal precision."); - YQL_ENSURE(type.scale() == NScheme::DECIMAL_SCALE, "Unsupported Decimal scale."); - YQL_ENSURE(data.size() == sizeof(NYql::NDecimal::TInt128), "Wrong data size"); - NYql::NDecimal::TInt128 val; - std::memcpy(reinterpret_cast<char*>(&val), data.data(), data.size()); - return NUdf::TUnboxedValuePod(val); -} - } // namespace ui32 TKqpScanComputeContext::TScanData::RowBatch::FillUnboxedCells(NUdf::TUnboxedValue* const* result) { ui32 resultColumnsCount = 0; if (ColumnsCount) { + auto* data = &Cells[CurrentRow * CellsCountForRow]; for (ui32 i = 0; i < CellsCountForRow; ++i) { if (result[i]) { - *result[i] = std::move(Cells[CurrentRow * CellsCountForRow + i]); + *result[i] = std::move(*(data + i)); ++resultColumnsCount; } } @@ -159,129 +135,245 @@ ui32 TKqpScanComputeContext::TScanData::RowBatch::FillUnboxedCells(NUdf::TUnboxe return resultColumnsCount; } +namespace { + +class TDefaultStatAccumulator { +private: + ui32 NullsCount = 0; + TBytesStatistics BytesAllocated; + NScheme::TTypeInfo TypeInfo; +public: + TDefaultStatAccumulator(const NScheme::TTypeInfo& tInfo) + : TypeInfo(tInfo) + { + + } + + void AddNull() { + ++NullsCount; + } + void AddValue(const NYql::NUdf::TUnboxedValue& value) { + BytesAllocated += GetUnboxedValueSize(value, TypeInfo); + } + + TBytesStatistics Finish() const { + return BytesAllocated + GetUnboxedValueSize(NYql::NUdf::TUnboxedValue(), TypeInfo) * NullsCount; + } +}; + +class TFixedWidthStatAccumulator { +private: + ui32 NullsCount = 0; + ui32 ValuesCount = 0; + TBytesStatistics RowSize; + NScheme::TTypeInfo TypeInfo; +public: + TFixedWidthStatAccumulator(const NScheme::TTypeInfo& tInfo) + : TypeInfo(tInfo) { + + } + + void AddNull() { + ++NullsCount; + } + void AddValue(const NYql::NUdf::TUnboxedValue& value) { + if (++ValuesCount == 1) { + RowSize = GetUnboxedValueSize(value, TypeInfo); + } + } + + TBytesStatistics Finish() const { + return RowSize * ValuesCount + GetUnboxedValueSize(NYql::NUdf::TUnboxedValue(), TypeInfo) * NullsCount; + } +}; + +template <class TArrayTypeExt, class TValueType = typename TArrayTypeExt::value_type> +class TElementAccessor { +public: + using TArrayType = TArrayTypeExt; + static NYql::NUdf::TUnboxedValue ExtractValue(TArrayType& array, const ui32 rowIndex) { + return NUdf::TUnboxedValuePod(static_cast<TValueType>(array.Value(rowIndex))); + } + + static void Validate(TArrayType& /*array*/) { + + } + + static TFixedWidthStatAccumulator BuildStatAccumulator(const NScheme::TTypeInfo& typeInfo) { + return TFixedWidthStatAccumulator(typeInfo); + } +}; + +template <> +class TElementAccessor<arrow::Decimal128Array, NYql::NDecimal::TInt128> { +public: + using TArrayType = arrow::Decimal128Array; + static void Validate(arrow::Decimal128Array& array) { + const auto& type = arrow::internal::checked_cast<const arrow::Decimal128Type&>(*array.type()); + YQL_ENSURE(type.precision() == NScheme::DECIMAL_PRECISION, "Unsupported Decimal precision."); + YQL_ENSURE(type.scale() == NScheme::DECIMAL_SCALE, "Unsupported Decimal scale."); + } + + static NYql::NUdf::TUnboxedValue ExtractValue(arrow::Decimal128Array& array, const ui32 rowIndex) { + auto data = array.GetView(rowIndex); + YQL_ENSURE(data.size() == sizeof(NYql::NDecimal::TInt128), "Wrong data size"); + NYql::NDecimal::TInt128 val; + std::memcpy(reinterpret_cast<char*>(&val), data.data(), data.size()); + return NUdf::TUnboxedValuePod(val); + } + static TFixedWidthStatAccumulator BuildStatAccumulator(const NScheme::TTypeInfo& typeInfo) { + return TFixedWidthStatAccumulator(typeInfo); + } +}; + +template <> +class TElementAccessor<arrow::BinaryArray, NUdf::TStringRef> { +public: + using TArrayType = arrow::BinaryArray; + static void Validate(arrow::BinaryArray& /*array*/) { + } + + static NYql::NUdf::TUnboxedValue ExtractValue(arrow::BinaryArray& array, const ui32 rowIndex) { + auto data = array.GetView(rowIndex); + return MakeString(NUdf::TStringRef(data.data(), data.size())); + } + static TDefaultStatAccumulator BuildStatAccumulator(const NScheme::TTypeInfo& typeInfo) { + return TDefaultStatAccumulator(typeInfo); + } +}; + +template <> +class TElementAccessor<arrow::FixedSizeBinaryArray, NUdf::TStringRef> { +public: + using TArrayType = arrow::FixedSizeBinaryArray; + static void Validate(arrow::FixedSizeBinaryArray& /*array*/) { + } + + static NYql::NUdf::TUnboxedValue ExtractValue(arrow::FixedSizeBinaryArray& array, const ui32 rowIndex) { + auto data = array.GetView(rowIndex); + return MakeString(NUdf::TStringRef(data.data(), data.size() - 1)); + } + static TFixedWidthStatAccumulator BuildStatAccumulator(const NScheme::TTypeInfo& typeInfo) { + return TFixedWidthStatAccumulator(typeInfo); + } +}; + +} + +template <class TElementAccessor, class TAccessor> +TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor, + const arrow::RecordBatch& batch, const ui32 columnIndex, arrow::Array* arrayExt, NScheme::TTypeInfo columnType) { + auto& array = *reinterpret_cast<typename TElementAccessor::TArrayType*>(arrayExt); + TElementAccessor::Validate(array); + auto statAccumulator = TElementAccessor::BuildStatAccumulator(columnType); + for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) { + auto& rowItem = editAccessor(rowIndex, columnIndex); + if (array.IsNull(rowIndex)) { + statAccumulator.AddNull(); + rowItem = NUdf::TUnboxedValue(); + } else { + rowItem = TElementAccessor::ExtractValue(array, rowIndex); + statAccumulator.AddValue(rowItem); + } + } + return statAccumulator.Finish(); +} + + template <class TAccessor> TBytesStatistics WriteColumnValuesFromArrowImpl(TAccessor editAccessor, const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType) { - TBytesStatistics columnStats; - // Hold pointer to column until function end std::shared_ptr<arrow::Array> columnSharedPtr = batch.column(columnIndex); arrow::Array* columnPtr = columnSharedPtr.get(); namespace NTypeIds = NScheme::NTypeIds; - for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) { - auto& rowItem = editAccessor(rowIndex, columnIndex); - if (columnPtr->IsNull(rowIndex)) { - rowItem = NUdf::TUnboxedValue(); - } else { - switch (columnType.GetTypeId()) { - case NTypeIds::Bool: - { - rowItem = MakeUnboxedValue<arrow::BooleanArray, bool>(columnPtr, rowIndex); - break; - } - case NTypeIds::Int8: - { - rowItem = MakeUnboxedValue<arrow::Int8Array>(columnPtr, rowIndex); - break; - } - case NTypeIds::Int16: - { - rowItem = MakeUnboxedValue<arrow::Int16Array>(columnPtr, rowIndex); - break; - } - case NTypeIds::Int32: - { - rowItem = MakeUnboxedValue<arrow::Int32Array>(columnPtr, rowIndex); - break; - } - case NTypeIds::Int64: - { - rowItem = MakeUnboxedValue<arrow::Int64Array, i64>(columnPtr, rowIndex); - break; - } - case NTypeIds::Uint8: - { - rowItem = MakeUnboxedValue<arrow::UInt8Array>(columnPtr, rowIndex); - break; - } - case NTypeIds::Uint16: - { - rowItem = MakeUnboxedValue<arrow::UInt16Array>(columnPtr, rowIndex); - break; - } - case NTypeIds::Uint32: - { - rowItem = MakeUnboxedValue<arrow::UInt32Array>(columnPtr, rowIndex); - break; - } - case NTypeIds::Uint64: - { - rowItem = MakeUnboxedValue<arrow::UInt64Array, ui64>(columnPtr, rowIndex); - break; - } - case NTypeIds::Float: - { - rowItem = MakeUnboxedValue<arrow::FloatArray>(columnPtr, rowIndex); - break; - } - case NTypeIds::Double: - { - rowItem = MakeUnboxedValue<arrow::DoubleArray>(columnPtr, rowIndex); - break; - } - case NTypeIds::String: - case NTypeIds::Utf8: - case NTypeIds::Json: - case NTypeIds::Yson: - case NTypeIds::JsonDocument: - case NTypeIds::DyNumber: - { - rowItem = MakeUnboxedValueFromBinaryData(columnPtr, rowIndex); - break; - } - case NTypeIds::Date: - { - rowItem = MakeUnboxedValue<arrow::UInt16Array>(columnPtr, rowIndex); - break; - } - case NTypeIds::Datetime: - { - rowItem = MakeUnboxedValue<arrow::UInt32Array>(columnPtr, rowIndex); - break; - } - case NTypeIds::Timestamp: - { - rowItem = MakeUnboxedValue<arrow::TimestampArray, ui64>(columnPtr, rowIndex); - break; - } - case NTypeIds::Interval: - { - rowItem = MakeUnboxedValue<arrow::DurationArray, ui64>(columnPtr, rowIndex); - break; - } - case NTypeIds::Decimal: - { - rowItem = MakeUnboxedValueFromDecimal128Array(columnPtr, rowIndex); - break; - } - case NTypeIds::PairUi64Ui64: - case NTypeIds::ActorId: - case NTypeIds::StepOrderId: - { - Y_VERIFY_DEBUG_S(false, "Unsupported (deprecated) type: " << NScheme::TypeName(columnType.GetTypeId())); - rowItem = MakeUnboxedValueFromFixedSizeBinaryData(columnPtr, rowIndex); - break; - } - case NTypeIds::Pg: - // TODO: support pg types - YQL_ENSURE(false, "Unsupported pg type at column " << columnIndex); - - default: - YQL_ENSURE(false, "Unsupported type: " << NScheme::TypeName(columnType.GetTypeId()) << " at column " << columnIndex); - } + switch (columnType.GetTypeId()) { + case NTypeIds::Bool: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::BooleanArray, bool>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Int8: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::Int8Array>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Int16: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::Int16Array>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Int32: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::Int32Array>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Int64: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::Int64Array, i64>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Uint8: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt8Array>>(editAccessor, batch, columnIndex, columnPtr, columnType); } - columnStats.AddStatistics(GetUnboxedValueSize(rowItem, columnType)); + case NTypeIds::Uint16: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt16Array>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Uint32: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt32Array>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Uint64: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt64Array, ui64>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Float: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::FloatArray>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Double: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::DoubleArray>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::String: + case NTypeIds::Utf8: + case NTypeIds::Json: + case NTypeIds::Yson: + case NTypeIds::JsonDocument: + case NTypeIds::DyNumber: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::BinaryArray, NUdf::TStringRef>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Date: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt16Array>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Datetime: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt32Array>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Timestamp: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::TimestampArray, ui64>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Interval: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::DurationArray, ui64>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Decimal: + { + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::Decimal128Array, NYql::NDecimal::TInt128>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::PairUi64Ui64: + case NTypeIds::ActorId: + case NTypeIds::StepOrderId: + { + Y_VERIFY_DEBUG_S(false, "Unsupported (deprecated) type: " << NScheme::TypeName(columnType.GetTypeId())); + return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::FixedSizeBinaryArray, NUdf::TStringRef>>(editAccessor, batch, columnIndex, columnPtr, columnType); + } + case NTypeIds::Pg: + // TODO: support pg types + YQL_ENSURE(false, "Unsupported pg type at column " << columnIndex); + + default: + YQL_ENSURE(false, "Unsupported type: " << NScheme::TypeName(columnType.GetTypeId()) << " at column " << columnIndex); } - return columnStats; } TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors, @@ -310,14 +402,13 @@ std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& val ui32 TKqpScanComputeContext::TScanData::FillUnboxedCells(NUdf::TUnboxedValue* const* result) { YQL_ENSURE(!RowBatches.empty()); auto& batch = RowBatches.front(); - auto rowStats = GetRowSize(batch.GetCurrentData(), ResultColumns, SystemColumns); const ui32 resultColumnsCount = batch.FillUnboxedCells(result); if (batch.IsFinished()) { RowBatches.pop(); } - StoredBytes -= rowStats.AllocatedBytes; - YQL_ENSURE(RowBatches.empty() == (StoredBytes == 0), "StoredBytes miscalculated!"); + StoredBytes -= batch.BytesForRecordEstimation(); + YQL_ENSURE(RowBatches.empty() == (StoredBytes < 1), "StoredBytes miscalculated!"); return resultColumnsCount; } @@ -410,11 +501,11 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const TVector<TOwnedCellVec>& ba } FillSystemColumns(vectorStart + ResultColumns.size(), shardId, SystemColumns); - stats.AddStatistics(GetRowSize(vectorStart, ResultColumns, SystemColumns)); + stats += GetRowSize(vectorStart, ResultColumns, SystemColumns); } } if (cells.size()) { - RowBatches.emplace(RowBatch(ColumnsCount(), std::move(cells), shardId)); + RowBatches.emplace(RowBatch(ColumnsCount(), batch.size(), std::move(cells), shardId, stats.AllocatedBytes)); } StoredBytes += stats.AllocatedBytes; @@ -459,7 +550,7 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch, } if (cells.size()) { - RowBatches.emplace(RowBatch(ColumnsCount(), std::move(cells), shardId)); + RowBatches.emplace(RowBatch(ColumnsCount(), batch.num_rows(), std::move(cells), shardId, stats.AllocatedBytes)); } StoredBytes += stats.AllocatedBytes; diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h index 13f0f1c070d..d61e8b3c0a3 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.h +++ b/ydb/core/kqp/runtime/kqp_scan_data.h @@ -27,6 +27,29 @@ struct TBytesStatistics { ui64 AllocatedBytes = 0; ui64 DataBytes = 0; + TBytesStatistics() = default; + TBytesStatistics(const ui64 allocated, const ui64 data) + : AllocatedBytes(allocated) + , DataBytes(data) + { + + } + + TBytesStatistics operator+(const TBytesStatistics& item) const { + return TBytesStatistics(AllocatedBytes + item.AllocatedBytes, DataBytes + item.DataBytes); + } + + void operator+=(const TBytesStatistics& item) { + AllocatedBytes += item.AllocatedBytes; + DataBytes += item.DataBytes; + } + + template <class T> + TBytesStatistics operator*(const T kff) const { + static_assert(std::is_arithmetic<T>()); + return TBytesStatistics(AllocatedBytes * kff, DataBytes * kff); + } + void AddStatistics(const TBytesStatistics& other) { AllocatedBytes += other.AllocatedBytes; DataBytes += other.DataBytes; @@ -34,7 +57,7 @@ struct TBytesStatistics { }; -TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type); +TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, const NScheme::TTypeInfo& type); TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType); TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors, @@ -146,17 +169,27 @@ public: private: const ui32 CellsCountForRow; const ui32 ColumnsCount; + const ui32 RowsCount; TUnboxedValueVector Cells; ui64 CurrentRow = 0; + const ui64 AllocatedBytes; public: TMaybe<ui64> ShardId; - explicit RowBatch(const ui32 columnsCount, TUnboxedValueVector&& cells, TMaybe<ui64> shardId) + explicit RowBatch(const ui32 columnsCount, const ui32 rowsCount, TUnboxedValueVector&& cells, TMaybe<ui64> shardId, const ui64 allocatedBytes) : CellsCountForRow(columnsCount ? columnsCount : 1) , ColumnsCount(columnsCount) + , RowsCount(rowsCount) , Cells(std::move(cells)) + , AllocatedBytes(allocatedBytes) , ShardId(shardId) { + Y_VERIFY(AllocatedBytes); + Y_VERIFY(RowsCount); + } + + double BytesForRecordEstimation() { + return 1.0 * AllocatedBytes / RowsCount; } const NUdf::TUnboxedValue* GetCurrentData() const { @@ -174,7 +207,7 @@ public: TSmallVec<TColumn> SystemColumns; TSmallVec<TColumn> ResultColumns; TQueue<RowBatch> RowBatches; - ui64 StoredBytes = 0; + double StoredBytes = 0; bool Finished = false; }; |