diff options
author | hor911 <hor911@ydb.tech> | 2023-11-03 16:41:57 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-11-03 17:07:10 +0300 |
commit | 17e44ee7839e018dc51fe65a7b4fafdd44cea383 (patch) | |
tree | 5cfb3b0862be9a7cb02e5eb0957f7369ec3c6bcf | |
parent | 3035222b78898af94cac7fe967a03a41af9635f6 (diff) | |
download | ydb-17e44ee7839e018dc51fe65a7b4fafdd44cea383.tar.gz |
Correct rows count for different input formats
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_input_impl.h | 117 |
1 files changed, 115 insertions, 2 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_input_impl.h b/ydb/library/yql/dq/runtime/dq_input_impl.h index d0d96cdfdaf..3a0d7f8cf67 100644 --- a/ydb/library/yql/dq/runtime/dq_input_impl.h +++ b/ydb/library/yql/dq/runtime/dq_input_impl.h @@ -1,10 +1,23 @@ #pragma once +#include <ydb/library/yql/core/yql_expr_type_annotation.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node.h> namespace NYql::NDq { +// remove LEGACY* support after upgrade S3/Generic Sources to use modern format + +enum TInputChannelFormat { + FORMAT_UNKNOWN, + SIMPLE_SCALAR, + SIMPLE_WIDE, + BLOCK_WIDE, + LEGACY_CH, + LEGACY_SIMPLE_BLOCK, + LEGACY_TUPLED_BLOCK +}; + template <class TDerived, class IInputInterface> class TDqInputImpl : public IInputInterface { public: @@ -28,6 +41,104 @@ public: return Batches.empty() || (IsPaused() && GetBatchesBeforePause() == 0); } + bool IsLegacySimpleBlock(NKikimr::NMiniKQL::TStructType* structType, ui32& blockLengthIndex) { + auto index = structType->FindMemberIndex(BlockLengthColumnName); + if (index) { + for (ui32 i = 0; i < structType->GetMembersCount(); i++) { + auto type = structType->GetMemberType(i); + if (!type->IsBlock()) { + return false; + } + } + blockLengthIndex = *index; + return true; + } else { + return false; + } + } + + TInputChannelFormat GetFormat() { + if (Width) { + if (InputType->IsStruct()) { + auto structType = static_cast<NKikimr::NMiniKQL::TStructType*>(InputType); + for (ui32 i = 0; i < structType->GetMembersCount(); i++) { + if (structType->GetMemberType(i)->IsBlock()) { + return BLOCK_WIDE; + } + } + } else if (InputType->IsTuple()) { + auto tupleType= static_cast<NKikimr::NMiniKQL::TTupleType*>(InputType); + for (ui32 i = 0; i < tupleType->GetElementsCount(); i++) { + if (tupleType->GetElementType(i)->IsBlock()) { + return BLOCK_WIDE; + } + } + } else { + return SIMPLE_WIDE; + } + } + + if (InputType->IsStruct()) { + return IsLegacySimpleBlock(static_cast<NKikimr::NMiniKQL::TStructType*>(InputType), LegacyBlockLengthIndex) ? LEGACY_SIMPLE_BLOCK : SIMPLE_SCALAR; + } else if (InputType->IsResource()) { + if (static_cast<NKikimr::NMiniKQL::TResourceType*>(InputType)->GetTag() == "ClickHouseClient.Block") { + return LEGACY_CH; + } + } else if (InputType->IsTuple()) { + auto tupleType= static_cast<NKikimr::NMiniKQL::TTupleType*>(InputType); + if (tupleType->GetElementsCount() == 2) { + auto type = tupleType->GetElementType(0); + if (type->IsStruct()) { + return IsLegacySimpleBlock(static_cast<NKikimr::NMiniKQL::TStructType*>(type), LegacyBlockLengthIndex) ? LEGACY_TUPLED_BLOCK : SIMPLE_SCALAR; + } else if (InputType->IsResource()) { + if (static_cast<NKikimr::NMiniKQL::TResourceType*>(InputType)->GetTag() == "ClickHouseClient.Block") { + return LEGACY_CH; + } + } + } + } + + return SIMPLE_SCALAR; + } + + ui64 GetRowsCount(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) { + if (Y_UNLIKELY(Format == FORMAT_UNKNOWN)) { + Format = GetFormat(); + } + + switch (Format) { + case BLOCK_WIDE: { + ui64 result = 0; + batch.ForEachRowWide([&](NUdf::TUnboxedValue* values, ui32 width) { + result += NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + }); + return result; + } + case LEGACY_CH: + // can't count rows inside CH UDF resource + return 0; + case LEGACY_SIMPLE_BLOCK: { + ui64 result = 0; + batch.ForEachRow([&](NUdf::TUnboxedValue& value) { + result += NKikimr::NMiniKQL::TArrowBlock::From(value.GetElement(LegacyBlockLengthIndex)).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + }); + return result; + } + case LEGACY_TUPLED_BLOCK: { + ui64 result = 0; + batch.ForEachRow([&](NUdf::TUnboxedValue& value) { + auto value0 = value.GetElement(0); + result += NKikimr::NMiniKQL::TArrowBlock::From(value0.GetElement(LegacyBlockLengthIndex)).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + }); + return result; + } + case SIMPLE_SCALAR: + case SIMPLE_WIDE: + default: + return batch.RowCount(); + } + } + void AddBatch(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) { Y_ABORT_UNLESS(batch.Width() == GetWidth()); @@ -36,7 +147,7 @@ public: if (static_cast<TDerived*>(this)->PushStats.CollectBasic()) { static_cast<TDerived*>(this)->PushStats.Bytes += space; - static_cast<TDerived*>(this)->PushStats.Rows += batch.RowCount(); + static_cast<TDerived*>(this)->PushStats.Rows += GetRowsCount(batch); static_cast<TDerived*>(this)->PushStats.Chunks++; static_cast<TDerived*>(this)->PushStats.Resume(); if (static_cast<TDerived*>(this)->PushStats.CollectFull()) { @@ -119,7 +230,7 @@ public: if (static_cast<TDerived*>(this)->PopStats.CollectBasic()) { static_cast<TDerived*>(this)->PopStats.Bytes += popBytes; - static_cast<TDerived*>(this)->PopStats.Rows += batch.RowCount(); // may do not match to pushed row count + static_cast<TDerived*>(this)->PopStats.Rows += GetRowsCount(batch); static_cast<TDerived*>(this)->PopStats.Chunks++; } @@ -178,6 +289,8 @@ protected: ui64 StoredBytesBeforePause = 0; ui64 StoredRowsBeforePause = 0; static constexpr ui64 PauseMask = 1llu << 63llu; + TInputChannelFormat Format = FORMAT_UNKNOWN; + ui32 LegacyBlockLengthIndex = 0; }; } // namespace NYql::NDq |