aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-01-16 16:04:59 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-01-16 16:04:59 +0300
commit4a928ef858041e897b5943782f8873ba1c8f56bd (patch)
tree45d6a1a417a4ebc6b0587dd0a3974fcc5204d8d9
parent7858e0d83acd62a0f5790a12eec397441c189c8e (diff)
downloadydb-4a928ef858041e897b5943782f8873ba1c8f56bd.tar.gz
speed up size calculation method for estimate channel loading
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.cpp421
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.h39
2 files changed, 292 insertions, 168 deletions
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp
index 52800f87a11..3ded07d28e3 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp
@@ -11,10 +11,10 @@
namespace NKikimr {
namespace NMiniKQL {
-TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type) {
+TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, const NScheme::TTypeInfo& type) {
namespace NTypeIds = NScheme::NTypeIds;
if (!value) {
- return {sizeof(NUdf::TUnboxedValue), 8}; // Special value for NULL elements
+ return { sizeof(NUdf::TUnboxedValue), 8 }; // Special value for NULL elements
}
switch (type.GetTypeId()) {
case NTypeIds::Bool:
@@ -36,14 +36,15 @@ TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::
case NTypeIds::Timestamp:
case NTypeIds::Interval:
case NTypeIds::ActorId:
- case NTypeIds::StepOrderId: {
+ case NTypeIds::StepOrderId:
+ {
YQL_ENSURE(value.IsEmbedded(), "Passed wrong type: " << NScheme::TypeName(type.GetTypeId()));
- return {sizeof(NUdf::TUnboxedValue), sizeof(i64)};
+ return { sizeof(NUdf::TUnboxedValue), sizeof(i64) };
}
case NTypeIds::Decimal:
{
YQL_ENSURE(value.IsEmbedded(), "Passed wrong type: " << NScheme::TypeName(type.GetTypeId()));
- return {sizeof(NUdf::TUnboxedValue), sizeof(NYql::NDecimal::TInt128)};
+ return { sizeof(NUdf::TUnboxedValue), sizeof(NYql::NDecimal::TInt128) };
}
case NTypeIds::String:
case NTypeIds::Utf8:
@@ -51,16 +52,18 @@ TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::
case NTypeIds::Yson:
case NTypeIds::JsonDocument:
case NTypeIds::DyNumber:
- case NTypeIds::PairUi64Ui64: {
+ case NTypeIds::PairUi64Ui64:
+ {
if (value.IsEmbedded()) {
- return {sizeof(NUdf::TUnboxedValue), std::max((ui32) 8, value.AsStringRef().Size())};
+ return { sizeof(NUdf::TUnboxedValue), std::max((ui32)8, value.AsStringRef().Size()) };
} else {
Y_VERIFY_DEBUG_S(8 < value.AsStringRef().Size(), "Small string of size " << value.AsStringRef().Size() << " is not embedded.");
- return {sizeof(NUdf::TUnboxedValue) + value.AsStringRef().Size(), value.AsStringRef().Size()};
+ return { sizeof(NUdf::TUnboxedValue) + value.AsStringRef().Size(), value.AsStringRef().Size() };
}
}
- case NTypeIds::Pg: {
+ case NTypeIds::Pg:
+ {
return {
sizeof(NUdf::TUnboxedValue),
NKikimr::NMiniKQL::PgValueSize(
@@ -73,9 +76,9 @@ TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::
default:
Y_VERIFY_DEBUG_S(false, "Unsupported type " << NScheme::TypeName(type.GetTypeId()));
if (value.IsEmbedded()) {
- return {sizeof(NUdf::TUnboxedValue), sizeof(NUdf::TUnboxedValue)};
+ return { sizeof(NUdf::TUnboxedValue), sizeof(NUdf::TUnboxedValue) };
} else {
- return {sizeof(NUdf::TUnboxedValue) + value.AsStringRef().Size(), value.AsStringRef().Size()};
+ return { sizeof(NUdf::TUnboxedValue) + value.AsStringRef().Size(), value.AsStringRef().Size() };
}
}
}
@@ -93,14 +96,13 @@ void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable
namespace {
TBytesStatistics GetRowSize(const NUdf::TUnboxedValue* row, const TSmallVec<TKqpComputeContextBase::TColumn>& columns,
- const TSmallVec<TKqpComputeContextBase::TColumn>& systemColumns)
-{
- TBytesStatistics rowStats{systemColumns.size() * sizeof(NUdf::TUnboxedValue), 0};
+ const TSmallVec<TKqpComputeContextBase::TColumn>& systemColumns) {
+ TBytesStatistics rowStats{ systemColumns.size() * sizeof(NUdf::TUnboxedValue), 0 };
for (size_t columnIndex = 0; columnIndex < columns.size(); ++columnIndex) {
- rowStats.AddStatistics(GetUnboxedValueSize(row[columnIndex], columns[columnIndex].Type));
+ rowStats += GetUnboxedValueSize(row[columnIndex], columns[columnIndex].Type);
}
if (columns.empty()) {
- rowStats.AddStatistics({sizeof(ui64), sizeof(ui64)});
+ rowStats.AddStatistics({ sizeof(ui64), sizeof(ui64) });
}
return rowStats;
}
@@ -116,41 +118,15 @@ NUdf::TUnboxedValue MakeUnboxedValue(arrow::Array* column, ui32 row) {
auto array = reinterpret_cast<TArrayType*>(column);
return NUdf::TUnboxedValuePod(static_cast<TValueType>(array->Value(row)));
}
-
-NUdf::TUnboxedValue MakeUnboxedValueFromBinaryData(arrow::Array* column, ui32 row) {
- auto array = reinterpret_cast<arrow::BinaryArray*>(column);
- auto data = array->GetView(row);
- return MakeString(NUdf::TStringRef(data.data(), data.size()));
-}
-
-NUdf::TUnboxedValue MakeUnboxedValueFromFixedSizeBinaryData(arrow::Array* column, ui32 row) {
- auto array = reinterpret_cast<arrow::FixedSizeBinaryArray*>(column);
- auto data = array->GetView(row);
- return MakeString(NUdf::TStringRef(data.data(), data.size()-1));
-}
-
-NUdf::TUnboxedValue MakeUnboxedValueFromDecimal128Array(arrow::Array* column, ui32 row) {
- auto array = reinterpret_cast<arrow::Decimal128Array*>(column);
- auto data = array->GetView(row);
- // It's known that Decimal params are always Decimal(22,9),
- // so we verify Decimal type here before store it in UnboxedValue.
- const auto& type = arrow::internal::checked_cast<const arrow::Decimal128Type&>(*array->type());
- YQL_ENSURE(type.precision() == NScheme::DECIMAL_PRECISION, "Unsupported Decimal precision.");
- YQL_ENSURE(type.scale() == NScheme::DECIMAL_SCALE, "Unsupported Decimal scale.");
- YQL_ENSURE(data.size() == sizeof(NYql::NDecimal::TInt128), "Wrong data size");
- NYql::NDecimal::TInt128 val;
- std::memcpy(reinterpret_cast<char*>(&val), data.data(), data.size());
- return NUdf::TUnboxedValuePod(val);
-}
-
} // namespace
ui32 TKqpScanComputeContext::TScanData::RowBatch::FillUnboxedCells(NUdf::TUnboxedValue* const* result) {
ui32 resultColumnsCount = 0;
if (ColumnsCount) {
+ auto* data = &Cells[CurrentRow * CellsCountForRow];
for (ui32 i = 0; i < CellsCountForRow; ++i) {
if (result[i]) {
- *result[i] = std::move(Cells[CurrentRow * CellsCountForRow + i]);
+ *result[i] = std::move(*(data + i));
++resultColumnsCount;
}
}
@@ -159,129 +135,245 @@ ui32 TKqpScanComputeContext::TScanData::RowBatch::FillUnboxedCells(NUdf::TUnboxe
return resultColumnsCount;
}
+namespace {
+
+class TDefaultStatAccumulator {
+private:
+ ui32 NullsCount = 0;
+ TBytesStatistics BytesAllocated;
+ NScheme::TTypeInfo TypeInfo;
+public:
+ TDefaultStatAccumulator(const NScheme::TTypeInfo& tInfo)
+ : TypeInfo(tInfo)
+ {
+
+ }
+
+ void AddNull() {
+ ++NullsCount;
+ }
+ void AddValue(const NYql::NUdf::TUnboxedValue& value) {
+ BytesAllocated += GetUnboxedValueSize(value, TypeInfo);
+ }
+
+ TBytesStatistics Finish() const {
+ return BytesAllocated + GetUnboxedValueSize(NYql::NUdf::TUnboxedValue(), TypeInfo) * NullsCount;
+ }
+};
+
+class TFixedWidthStatAccumulator {
+private:
+ ui32 NullsCount = 0;
+ ui32 ValuesCount = 0;
+ TBytesStatistics RowSize;
+ NScheme::TTypeInfo TypeInfo;
+public:
+ TFixedWidthStatAccumulator(const NScheme::TTypeInfo& tInfo)
+ : TypeInfo(tInfo) {
+
+ }
+
+ void AddNull() {
+ ++NullsCount;
+ }
+ void AddValue(const NYql::NUdf::TUnboxedValue& value) {
+ if (++ValuesCount == 1) {
+ RowSize = GetUnboxedValueSize(value, TypeInfo);
+ }
+ }
+
+ TBytesStatistics Finish() const {
+ return RowSize * ValuesCount + GetUnboxedValueSize(NYql::NUdf::TUnboxedValue(), TypeInfo) * NullsCount;
+ }
+};
+
+template <class TArrayTypeExt, class TValueType = typename TArrayTypeExt::value_type>
+class TElementAccessor {
+public:
+ using TArrayType = TArrayTypeExt;
+ static NYql::NUdf::TUnboxedValue ExtractValue(TArrayType& array, const ui32 rowIndex) {
+ return NUdf::TUnboxedValuePod(static_cast<TValueType>(array.Value(rowIndex)));
+ }
+
+ static void Validate(TArrayType& /*array*/) {
+
+ }
+
+ static TFixedWidthStatAccumulator BuildStatAccumulator(const NScheme::TTypeInfo& typeInfo) {
+ return TFixedWidthStatAccumulator(typeInfo);
+ }
+};
+
+template <>
+class TElementAccessor<arrow::Decimal128Array, NYql::NDecimal::TInt128> {
+public:
+ using TArrayType = arrow::Decimal128Array;
+ static void Validate(arrow::Decimal128Array& array) {
+ const auto& type = arrow::internal::checked_cast<const arrow::Decimal128Type&>(*array.type());
+ YQL_ENSURE(type.precision() == NScheme::DECIMAL_PRECISION, "Unsupported Decimal precision.");
+ YQL_ENSURE(type.scale() == NScheme::DECIMAL_SCALE, "Unsupported Decimal scale.");
+ }
+
+ static NYql::NUdf::TUnboxedValue ExtractValue(arrow::Decimal128Array& array, const ui32 rowIndex) {
+ auto data = array.GetView(rowIndex);
+ YQL_ENSURE(data.size() == sizeof(NYql::NDecimal::TInt128), "Wrong data size");
+ NYql::NDecimal::TInt128 val;
+ std::memcpy(reinterpret_cast<char*>(&val), data.data(), data.size());
+ return NUdf::TUnboxedValuePod(val);
+ }
+ static TFixedWidthStatAccumulator BuildStatAccumulator(const NScheme::TTypeInfo& typeInfo) {
+ return TFixedWidthStatAccumulator(typeInfo);
+ }
+};
+
+template <>
+class TElementAccessor<arrow::BinaryArray, NUdf::TStringRef> {
+public:
+ using TArrayType = arrow::BinaryArray;
+ static void Validate(arrow::BinaryArray& /*array*/) {
+ }
+
+ static NYql::NUdf::TUnboxedValue ExtractValue(arrow::BinaryArray& array, const ui32 rowIndex) {
+ auto data = array.GetView(rowIndex);
+ return MakeString(NUdf::TStringRef(data.data(), data.size()));
+ }
+ static TDefaultStatAccumulator BuildStatAccumulator(const NScheme::TTypeInfo& typeInfo) {
+ return TDefaultStatAccumulator(typeInfo);
+ }
+};
+
+template <>
+class TElementAccessor<arrow::FixedSizeBinaryArray, NUdf::TStringRef> {
+public:
+ using TArrayType = arrow::FixedSizeBinaryArray;
+ static void Validate(arrow::FixedSizeBinaryArray& /*array*/) {
+ }
+
+ static NYql::NUdf::TUnboxedValue ExtractValue(arrow::FixedSizeBinaryArray& array, const ui32 rowIndex) {
+ auto data = array.GetView(rowIndex);
+ return MakeString(NUdf::TStringRef(data.data(), data.size() - 1));
+ }
+ static TFixedWidthStatAccumulator BuildStatAccumulator(const NScheme::TTypeInfo& typeInfo) {
+ return TFixedWidthStatAccumulator(typeInfo);
+ }
+};
+
+}
+
+template <class TElementAccessor, class TAccessor>
+TBytesStatistics WriteColumnValuesFromArrowSpecImpl(TAccessor editAccessor,
+ const arrow::RecordBatch& batch, const ui32 columnIndex, arrow::Array* arrayExt, NScheme::TTypeInfo columnType) {
+ auto& array = *reinterpret_cast<typename TElementAccessor::TArrayType*>(arrayExt);
+ TElementAccessor::Validate(array);
+ auto statAccumulator = TElementAccessor::BuildStatAccumulator(columnType);
+ for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) {
+ auto& rowItem = editAccessor(rowIndex, columnIndex);
+ if (array.IsNull(rowIndex)) {
+ statAccumulator.AddNull();
+ rowItem = NUdf::TUnboxedValue();
+ } else {
+ rowItem = TElementAccessor::ExtractValue(array, rowIndex);
+ statAccumulator.AddValue(rowItem);
+ }
+ }
+ return statAccumulator.Finish();
+}
+
+
template <class TAccessor>
TBytesStatistics WriteColumnValuesFromArrowImpl(TAccessor editAccessor,
const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType) {
- TBytesStatistics columnStats;
- // Hold pointer to column until function end
std::shared_ptr<arrow::Array> columnSharedPtr = batch.column(columnIndex);
arrow::Array* columnPtr = columnSharedPtr.get();
namespace NTypeIds = NScheme::NTypeIds;
- for (i64 rowIndex = 0; rowIndex < batch.num_rows(); ++rowIndex) {
- auto& rowItem = editAccessor(rowIndex, columnIndex);
- if (columnPtr->IsNull(rowIndex)) {
- rowItem = NUdf::TUnboxedValue();
- } else {
- switch (columnType.GetTypeId()) {
- case NTypeIds::Bool:
- {
- rowItem = MakeUnboxedValue<arrow::BooleanArray, bool>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Int8:
- {
- rowItem = MakeUnboxedValue<arrow::Int8Array>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Int16:
- {
- rowItem = MakeUnboxedValue<arrow::Int16Array>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Int32:
- {
- rowItem = MakeUnboxedValue<arrow::Int32Array>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Int64:
- {
- rowItem = MakeUnboxedValue<arrow::Int64Array, i64>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Uint8:
- {
- rowItem = MakeUnboxedValue<arrow::UInt8Array>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Uint16:
- {
- rowItem = MakeUnboxedValue<arrow::UInt16Array>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Uint32:
- {
- rowItem = MakeUnboxedValue<arrow::UInt32Array>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Uint64:
- {
- rowItem = MakeUnboxedValue<arrow::UInt64Array, ui64>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Float:
- {
- rowItem = MakeUnboxedValue<arrow::FloatArray>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Double:
- {
- rowItem = MakeUnboxedValue<arrow::DoubleArray>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::String:
- case NTypeIds::Utf8:
- case NTypeIds::Json:
- case NTypeIds::Yson:
- case NTypeIds::JsonDocument:
- case NTypeIds::DyNumber:
- {
- rowItem = MakeUnboxedValueFromBinaryData(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Date:
- {
- rowItem = MakeUnboxedValue<arrow::UInt16Array>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Datetime:
- {
- rowItem = MakeUnboxedValue<arrow::UInt32Array>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Timestamp:
- {
- rowItem = MakeUnboxedValue<arrow::TimestampArray, ui64>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Interval:
- {
- rowItem = MakeUnboxedValue<arrow::DurationArray, ui64>(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Decimal:
- {
- rowItem = MakeUnboxedValueFromDecimal128Array(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::PairUi64Ui64:
- case NTypeIds::ActorId:
- case NTypeIds::StepOrderId:
- {
- Y_VERIFY_DEBUG_S(false, "Unsupported (deprecated) type: " << NScheme::TypeName(columnType.GetTypeId()));
- rowItem = MakeUnboxedValueFromFixedSizeBinaryData(columnPtr, rowIndex);
- break;
- }
- case NTypeIds::Pg:
- // TODO: support pg types
- YQL_ENSURE(false, "Unsupported pg type at column " << columnIndex);
-
- default:
- YQL_ENSURE(false, "Unsupported type: " << NScheme::TypeName(columnType.GetTypeId()) << " at column " << columnIndex);
- }
+ switch (columnType.GetTypeId()) {
+ case NTypeIds::Bool:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::BooleanArray, bool>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Int8:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::Int8Array>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Int16:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::Int16Array>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Int32:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::Int32Array>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Int64:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::Int64Array, i64>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Uint8:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt8Array>>(editAccessor, batch, columnIndex, columnPtr, columnType);
}
- columnStats.AddStatistics(GetUnboxedValueSize(rowItem, columnType));
+ case NTypeIds::Uint16:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt16Array>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Uint32:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt32Array>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Uint64:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt64Array, ui64>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Float:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::FloatArray>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Double:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::DoubleArray>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::String:
+ case NTypeIds::Utf8:
+ case NTypeIds::Json:
+ case NTypeIds::Yson:
+ case NTypeIds::JsonDocument:
+ case NTypeIds::DyNumber:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::BinaryArray, NUdf::TStringRef>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Date:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt16Array>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Datetime:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::UInt32Array>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Timestamp:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::TimestampArray, ui64>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Interval:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::DurationArray, ui64>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Decimal:
+ {
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::Decimal128Array, NYql::NDecimal::TInt128>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::PairUi64Ui64:
+ case NTypeIds::ActorId:
+ case NTypeIds::StepOrderId:
+ {
+ Y_VERIFY_DEBUG_S(false, "Unsupported (deprecated) type: " << NScheme::TypeName(columnType.GetTypeId()));
+ return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::FixedSizeBinaryArray, NUdf::TStringRef>>(editAccessor, batch, columnIndex, columnPtr, columnType);
+ }
+ case NTypeIds::Pg:
+ // TODO: support pg types
+ YQL_ENSURE(false, "Unsupported pg type at column " << columnIndex);
+
+ default:
+ YQL_ENSURE(false, "Unsupported type: " << NScheme::TypeName(columnType.GetTypeId()) << " at column " << columnIndex);
}
- return columnStats;
}
TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors,
@@ -310,14 +402,13 @@ std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& val
ui32 TKqpScanComputeContext::TScanData::FillUnboxedCells(NUdf::TUnboxedValue* const* result) {
YQL_ENSURE(!RowBatches.empty());
auto& batch = RowBatches.front();
- auto rowStats = GetRowSize(batch.GetCurrentData(), ResultColumns, SystemColumns);
const ui32 resultColumnsCount = batch.FillUnboxedCells(result);
if (batch.IsFinished()) {
RowBatches.pop();
}
- StoredBytes -= rowStats.AllocatedBytes;
- YQL_ENSURE(RowBatches.empty() == (StoredBytes == 0), "StoredBytes miscalculated!");
+ StoredBytes -= batch.BytesForRecordEstimation();
+ YQL_ENSURE(RowBatches.empty() == (StoredBytes < 1), "StoredBytes miscalculated!");
return resultColumnsCount;
}
@@ -410,11 +501,11 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const TVector<TOwnedCellVec>& ba
}
FillSystemColumns(vectorStart + ResultColumns.size(), shardId, SystemColumns);
- stats.AddStatistics(GetRowSize(vectorStart, ResultColumns, SystemColumns));
+ stats += GetRowSize(vectorStart, ResultColumns, SystemColumns);
}
}
if (cells.size()) {
- RowBatches.emplace(RowBatch(ColumnsCount(), std::move(cells), shardId));
+ RowBatches.emplace(RowBatch(ColumnsCount(), batch.size(), std::move(cells), shardId, stats.AllocatedBytes));
}
StoredBytes += stats.AllocatedBytes;
@@ -459,7 +550,7 @@ ui64 TKqpScanComputeContext::TScanData::AddRows(const arrow::RecordBatch& batch,
}
if (cells.size()) {
- RowBatches.emplace(RowBatch(ColumnsCount(), std::move(cells), shardId));
+ RowBatches.emplace(RowBatch(ColumnsCount(), batch.num_rows(), std::move(cells), shardId, stats.AllocatedBytes));
}
StoredBytes += stats.AllocatedBytes;
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h
index 13f0f1c070d..d61e8b3c0a3 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.h
+++ b/ydb/core/kqp/runtime/kqp_scan_data.h
@@ -27,6 +27,29 @@ struct TBytesStatistics {
ui64 AllocatedBytes = 0;
ui64 DataBytes = 0;
+ TBytesStatistics() = default;
+ TBytesStatistics(const ui64 allocated, const ui64 data)
+ : AllocatedBytes(allocated)
+ , DataBytes(data)
+ {
+
+ }
+
+ TBytesStatistics operator+(const TBytesStatistics& item) const {
+ return TBytesStatistics(AllocatedBytes + item.AllocatedBytes, DataBytes + item.DataBytes);
+ }
+
+ void operator+=(const TBytesStatistics& item) {
+ AllocatedBytes += item.AllocatedBytes;
+ DataBytes += item.DataBytes;
+ }
+
+ template <class T>
+ TBytesStatistics operator*(const T kff) const {
+ static_assert(std::is_arithmetic<T>());
+ return TBytesStatistics(AllocatedBytes * kff, DataBytes * kff);
+ }
+
void AddStatistics(const TBytesStatistics& other) {
AllocatedBytes += other.AllocatedBytes;
DataBytes += other.DataBytes;
@@ -34,7 +57,7 @@ struct TBytesStatistics {
};
-TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type);
+TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, const NScheme::TTypeInfo& type);
TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType);
TBytesStatistics WriteColumnValuesFromArrow(NUdf::TUnboxedValue* editAccessors,
@@ -146,17 +169,27 @@ public:
private:
const ui32 CellsCountForRow;
const ui32 ColumnsCount;
+ const ui32 RowsCount;
TUnboxedValueVector Cells;
ui64 CurrentRow = 0;
+ const ui64 AllocatedBytes;
public:
TMaybe<ui64> ShardId;
- explicit RowBatch(const ui32 columnsCount, TUnboxedValueVector&& cells, TMaybe<ui64> shardId)
+ explicit RowBatch(const ui32 columnsCount, const ui32 rowsCount, TUnboxedValueVector&& cells, TMaybe<ui64> shardId, const ui64 allocatedBytes)
: CellsCountForRow(columnsCount ? columnsCount : 1)
, ColumnsCount(columnsCount)
+ , RowsCount(rowsCount)
, Cells(std::move(cells))
+ , AllocatedBytes(allocatedBytes)
, ShardId(shardId)
{
+ Y_VERIFY(AllocatedBytes);
+ Y_VERIFY(RowsCount);
+ }
+
+ double BytesForRecordEstimation() {
+ return 1.0 * AllocatedBytes / RowsCount;
}
const NUdf::TUnboxedValue* GetCurrentData() const {
@@ -174,7 +207,7 @@ public:
TSmallVec<TColumn> SystemColumns;
TSmallVec<TColumn> ResultColumns;
TQueue<RowBatch> RowBatches;
- ui64 StoredBytes = 0;
+ double StoredBytes = 0;
bool Finished = false;
};