diff options
author | monster <monster@ydb.tech> | 2023-04-18 17:51:00 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2023-04-18 17:51:00 +0300 |
commit | e318f0163f16e2a3e5a139c5fd8170fa7dd4af01 (patch) | |
tree | 4fa4c89e9d3cc48780b8e1f78c2be8ac33c4bbfc | |
parent | 4d12f240b9bc7a2b200f0550199646bb6034f2d6 (diff) | |
download | ydb-e318f0163f16e2a3e5a139c5fd8170fa7dd4af01.tar.gz |
get repeated calculations out of row loop
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 67 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.cpp | 19 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/comp_factory.cpp | 24 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/interface/pack.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/interface/type_desc.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp | 11 |
6 files changed, 83 insertions, 41 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index bd231ab15c9..54e55c19569 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -368,6 +368,8 @@ public: Settings.GetTable().GetTableId().GetSchemaVersion() ); + InitResultColumns(); + KeyColumnTypes.reserve(Settings.GetKeyColumnTypes().size()); for (size_t i = 0; i < Settings.KeyColumnTypesSize(); ++i) { auto typeId = Settings.GetKeyColumnTypes(i); @@ -484,12 +486,12 @@ public: auto range = state->GetBounds(Settings.GetReverse()); TVector<TKeyDesc::TColumnOp> columns; - columns.reserve(Settings.GetColumns().size()); - for (const auto& column : Settings.GetColumns()) { + columns.reserve(ResultColumns.size()); + for (const auto& column : ResultColumns) { TKeyDesc::TColumnOp op; - op.Column = column.GetId(); + op.Column = column.Tag; op.Operation = TKeyDesc::EColumnOperation::Read; - op.ExpectedType = MakeTypeInfo(column); + op.ExpectedType = column.TypeInfo; columns.emplace_back(std::move(op)); } @@ -946,15 +948,15 @@ public: NMiniKQL::TBytesStatistics GetRowSize(const NUdf::TUnboxedValue* row) { NMiniKQL::TBytesStatistics rowStats{0, 0}; size_t columnIndex = 0; - for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) { - if (IsSystemColumn(Settings.GetColumns(resultColumnIndex).GetId())) { + for (const auto& column : ResultColumns) { + if (column.IsSystem) { rowStats.AllocatedBytes += sizeof(NUdf::TUnboxedValue); } else { - rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize(row[columnIndex], MakeTypeInfo(Settings.GetColumns(resultColumnIndex)))); + rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize(row[columnIndex], column.TypeInfo)); columnIndex += 1; } } - if (Settings.ColumnsSize() == 0) { + if (ResultColumns.empty()) { rowStats.AddStatistics({sizeof(ui64), sizeof(ui64)}); } return rowStats; @@ -986,18 +988,17 @@ public: } size_t columnIndex = 0; - for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) { - auto tag = Settings.GetColumns(resultColumnIndex).GetId(); - auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(resultColumnIndex).GetType()); - if (IsSystemColumn(tag)) { + for (size_t resultColumnIndex = 0; resultColumnIndex < ResultColumns.size(); ++resultColumnIndex) { + const auto& column = ResultColumns[resultColumnIndex]; + if (column.IsSystem) { for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { - NMiniKQL::FillSystemColumn(editAccessors[rowIndex][resultColumnIndex], shardId, tag, type); + NMiniKQL::FillSystemColumn(editAccessors[rowIndex][resultColumnIndex], shardId, column.Tag, column.TypeInfo); stats.AllocatedBytes += sizeof(NUdf::TUnboxedValue); } } else { hasResultColumns = true; stats.AddStatistics( - NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->GetArrowBatch(), columnIndex, resultColumnIndex, type) + NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->GetArrowBatch(), columnIndex, resultColumnIndex, column.TypeInfo) ); columnIndex += 1; } @@ -1034,6 +1035,7 @@ public: auto& [shardId, result, batch, _, packed] = handle; NMiniKQL::TBytesStatistics stats; batch->reserve(batch->size()); + for (size_t rowIndex = packed; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { const auto& row = result->Get()->GetCells(rowIndex); NUdf::TUnboxedValue* rowItems = nullptr; @@ -1041,9 +1043,8 @@ public: i64 rowSize = 0; size_t columnIndex = 0; - for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) { - auto tag = Settings.GetColumns(resultColumnIndex).GetId(); - if (!IsSystemColumn(tag)) { + for (const auto& column : ResultColumns) { + if (!column.IsSystem) { rowSize += row[columnIndex].Size(); columnIndex += 1; } @@ -1052,13 +1053,12 @@ public: rowSize = std::max(rowSize, (i64)8); columnIndex = 0; - for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) { - auto tag = Settings.GetColumns(resultColumnIndex).GetId(); - auto type = MakeTypeInfo(Settings.GetColumns(resultColumnIndex)); - if (IsSystemColumn(tag)) { - NMiniKQL::FillSystemColumn(rowItems[resultColumnIndex], shardId, tag, type); + for (size_t resultColumnIndex = 0; resultColumnIndex < ResultColumns.size(); ++resultColumnIndex) { + const auto& column = ResultColumns[resultColumnIndex]; + if (column.IsSystem) { + NMiniKQL::FillSystemColumn(rowItems[resultColumnIndex], shardId, column.Tag, column.TypeInfo); } else { - rowItems[resultColumnIndex] = NMiniKQL::GetCellValue(row[columnIndex], type); + rowItems[resultColumnIndex] = NMiniKQL::GetCellValue(row[columnIndex], column.TypeInfo); columnIndex += 1; } } @@ -1256,7 +1256,7 @@ public: return result; } - +private: NScheme::TTypeInfo MakeTypeInfo(const NKikimrTxDataShard::TKqpTransaction_TColumnMeta& info) { auto typeId = info.GetType(); return NScheme::TTypeInfo( @@ -1267,9 +1267,28 @@ public: ) : nullptr); } + void InitResultColumns() { + ResultColumns.reserve(Settings.ColumnsSize()); + for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) { + const auto& srcColumn = Settings.GetColumns(resultColumnIndex); + TResultColumn column; + column.Tag = srcColumn.GetId(); + column.TypeInfo = MakeTypeInfo(srcColumn); + column.IsSystem = IsSystemColumn(column.Tag); + ResultColumns.push_back(column); + } + } + private: + struct TResultColumn { + bool IsSystem = false; + ui32 Tag = 0; + NScheme::TTypeInfo TypeInfo; + }; + NKikimrTxDataShard::TKqpReadRangesSourceSettings Settings; + TVector<TResultColumn> ResultColumns; TVector<NScheme::TTypeInfo> KeyColumnTypes; NMiniKQL::TBytesStatistics BytesStats; diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp index fde4e914159..f079780f3ad 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp @@ -6,6 +6,7 @@ #include <ydb/library/yql/minikql/mkql_string_util.h> #include <ydb/library/yql/parser/pg_wrapper/interface/pack.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/type_desc.h> #include <ydb/library/yql/public/udf/arrow/util.h> #include <ydb/library/yql/utils/yql_panic.h> @@ -18,6 +19,14 @@ TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, const NSc return { sizeof(NUdf::TUnboxedValue), 8 }; // Special value for NULL elements } switch (type.GetTypeId()) { + case NTypeIds::Pg: + { + return { + sizeof(NUdf::TUnboxedValue), + PgValueSize(value, NPg::TypeDescGetTypeLen(type.GetTypeDesc())) + }; + } + case NTypeIds::Bool: case NTypeIds::Int8: case NTypeIds::Uint8: @@ -63,16 +72,6 @@ TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, const NSc } } - case NTypeIds::Pg: - { - return { - sizeof(NUdf::TUnboxedValue), - NKikimr::NMiniKQL::PgValueSize( - NPg::PgTypeIdFromTypeDesc(type.GetTypeDesc()), //extra typeDesc resolve - value - ) - }; - } default: Y_VERIFY_DEBUG_S(false, "Unsupported type " << NScheme::TypeName(type.GetTypeId())); diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index fb23046b98b..a6775025147 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -2403,15 +2403,13 @@ namespace NMiniKQL { using namespace NYql; -ui64 PgValueSize(ui32 pgTypeId, const NUdf::TUnboxedValuePod& value) { - const auto& typeDesc = NYql::NPg::LookupType(pgTypeId); - - if (typeDesc.TypeLen >= 0) { - return typeDesc.TypeLen; +ui64 PgValueSize(const NUdf::TUnboxedValuePod& value, i32 typeLen) { + if (typeLen >= 0) { + return typeLen; } - Y_ENSURE(typeDesc.TypeLen == -1 || typeDesc.TypeLen == -2); + Y_ENSURE(typeLen == -1 || typeLen == -2); auto datum = PointerDatumFromPod(value); - if (typeDesc.TypeLen == -1) { + if (typeLen == -1) { const auto x = (const text*)PointerDatumFromPod(value); return GetCleanVarSize(x); } else { @@ -2420,6 +2418,11 @@ ui64 PgValueSize(ui32 pgTypeId, const NUdf::TUnboxedValuePod& value) { } } +ui64 PgValueSize(ui32 pgTypeId, const NUdf::TUnboxedValuePod& value) { + const auto& typeDesc = NYql::NPg::LookupType(pgTypeId); + return PgValueSize(value, typeDesc.TypeLen); +} + ui64 PgValueSize(const TPgType* type, const NUdf::TUnboxedValuePod& value) { return PgValueSize(type->GetTypeId(), value); } @@ -3551,6 +3554,13 @@ bool TypeDescIsComparable(void* typeDesc) { return static_cast<TPgTypeDescriptor*>(typeDesc)->CompareProcId != 0; } +i32 TypeDescGetTypeLen(void* typeDesc) { + if (!typeDesc) { + return 0; + } + return static_cast<TPgTypeDescriptor*>(typeDesc)->TypeLen; +} + ui32 TypeDescGetStoredSize(void* typeDesc) { if (!typeDesc) { return 0; diff --git a/ydb/library/yql/parser/pg_wrapper/interface/pack.h b/ydb/library/yql/parser/pg_wrapper/interface/pack.h index fc0732a0b78..7316c6a4a3d 100644 --- a/ydb/library/yql/parser/pg_wrapper/interface/pack.h +++ b/ydb/library/yql/parser/pg_wrapper/interface/pack.h @@ -19,7 +19,9 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf); void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVector<ui8>& output); NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVector<ui8>& buffer); +ui64 PgValueSize(const NUdf::TUnboxedValuePod& value, i32 typeLen); ui64 PgValueSize(ui32 pgTypeId, const NUdf::TUnboxedValuePod& value); ui64 PgValueSize(const TPgType* type, const NUdf::TUnboxedValuePod& value); + } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h b/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h index 0a8fb161b88..aeec1732028 100644 --- a/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h +++ b/ydb/library/yql/parser/pg_wrapper/interface/type_desc.h @@ -13,6 +13,7 @@ void* TypeDescFromPgTypeName(const TStringBuf name); TString TypeModFromPgTypeName(const TStringBuf name); bool TypeDescIsComparable(void* typeDesc); +i32 TypeDescGetTypeLen(void* typeDesc); ui32 TypeDescGetStoredSize(void* typeDesc); bool TypeDescNeedsCoercion(void* typeDesc); diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 19f7c942c6e..017388afce0 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -140,6 +140,12 @@ void PgReleaseThreadContext(void* ctx) { Y_UNUSED(ctx); } +ui64 PgValueSize(const NUdf::TUnboxedValuePod& value, i32 typeLen) { + Y_UNUSED(typeLen); + Y_UNUSED(value); + throw yexception() << "PG types are not supported"; +} + ui64 PgValueSize(ui32 type, const NUdf::TUnboxedValuePod& value) { Y_UNUSED(type); Y_UNUSED(value); @@ -332,6 +338,11 @@ bool TypeDescIsComparable(void* typeDesc) { throw yexception() << "PG types are not supported"; } +i32 TypeDescGetTypeLen(void* typeDesc) { + Y_UNUSED(typeDesc); + throw yexception() << "PG types are not supported"; +} + ui32 TypeDescGetStoredSize(void* typeDesc) { Y_UNUSED(typeDesc); throw yexception() << "PG types are not supported"; |