aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-11-03 16:41:57 +0300
committerhor911 <hor911@ydb.tech>2023-11-03 17:07:10 +0300
commit17e44ee7839e018dc51fe65a7b4fafdd44cea383 (patch)
tree5cfb3b0862be9a7cb02e5eb0957f7369ec3c6bcf
parent3035222b78898af94cac7fe967a03a41af9635f6 (diff)
downloadydb-17e44ee7839e018dc51fe65a7b4fafdd44cea383.tar.gz
Correct rows count for different input formats
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_impl.h117
1 files changed, 115 insertions, 2 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_input_impl.h b/ydb/library/yql/dq/runtime/dq_input_impl.h
index d0d96cdfdaf..3a0d7f8cf67 100644
--- a/ydb/library/yql/dq/runtime/dq_input_impl.h
+++ b/ydb/library/yql/dq/runtime/dq_input_impl.h
@@ -1,10 +1,23 @@
#pragma once
+#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node.h>
namespace NYql::NDq {
+// remove LEGACY* support after upgrade S3/Generic Sources to use modern format
+
+enum TInputChannelFormat {
+ FORMAT_UNKNOWN,
+ SIMPLE_SCALAR,
+ SIMPLE_WIDE,
+ BLOCK_WIDE,
+ LEGACY_CH,
+ LEGACY_SIMPLE_BLOCK,
+ LEGACY_TUPLED_BLOCK
+};
+
template <class TDerived, class IInputInterface>
class TDqInputImpl : public IInputInterface {
public:
@@ -28,6 +41,104 @@ public:
return Batches.empty() || (IsPaused() && GetBatchesBeforePause() == 0);
}
+ bool IsLegacySimpleBlock(NKikimr::NMiniKQL::TStructType* structType, ui32& blockLengthIndex) {
+ auto index = structType->FindMemberIndex(BlockLengthColumnName);
+ if (index) {
+ for (ui32 i = 0; i < structType->GetMembersCount(); i++) {
+ auto type = structType->GetMemberType(i);
+ if (!type->IsBlock()) {
+ return false;
+ }
+ }
+ blockLengthIndex = *index;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ TInputChannelFormat GetFormat() {
+ if (Width) {
+ if (InputType->IsStruct()) {
+ auto structType = static_cast<NKikimr::NMiniKQL::TStructType*>(InputType);
+ for (ui32 i = 0; i < structType->GetMembersCount(); i++) {
+ if (structType->GetMemberType(i)->IsBlock()) {
+ return BLOCK_WIDE;
+ }
+ }
+ } else if (InputType->IsTuple()) {
+ auto tupleType= static_cast<NKikimr::NMiniKQL::TTupleType*>(InputType);
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); i++) {
+ if (tupleType->GetElementType(i)->IsBlock()) {
+ return BLOCK_WIDE;
+ }
+ }
+ } else {
+ return SIMPLE_WIDE;
+ }
+ }
+
+ if (InputType->IsStruct()) {
+ return IsLegacySimpleBlock(static_cast<NKikimr::NMiniKQL::TStructType*>(InputType), LegacyBlockLengthIndex) ? LEGACY_SIMPLE_BLOCK : SIMPLE_SCALAR;
+ } else if (InputType->IsResource()) {
+ if (static_cast<NKikimr::NMiniKQL::TResourceType*>(InputType)->GetTag() == "ClickHouseClient.Block") {
+ return LEGACY_CH;
+ }
+ } else if (InputType->IsTuple()) {
+ auto tupleType= static_cast<NKikimr::NMiniKQL::TTupleType*>(InputType);
+ if (tupleType->GetElementsCount() == 2) {
+ auto type = tupleType->GetElementType(0);
+ if (type->IsStruct()) {
+ return IsLegacySimpleBlock(static_cast<NKikimr::NMiniKQL::TStructType*>(type), LegacyBlockLengthIndex) ? LEGACY_TUPLED_BLOCK : SIMPLE_SCALAR;
+ } else if (InputType->IsResource()) {
+ if (static_cast<NKikimr::NMiniKQL::TResourceType*>(InputType)->GetTag() == "ClickHouseClient.Block") {
+ return LEGACY_CH;
+ }
+ }
+ }
+ }
+
+ return SIMPLE_SCALAR;
+ }
+
+ ui64 GetRowsCount(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) {
+ if (Y_UNLIKELY(Format == FORMAT_UNKNOWN)) {
+ Format = GetFormat();
+ }
+
+ switch (Format) {
+ case BLOCK_WIDE: {
+ ui64 result = 0;
+ batch.ForEachRowWide([&](NUdf::TUnboxedValue* values, ui32 width) {
+ result += NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ });
+ return result;
+ }
+ case LEGACY_CH:
+ // can't count rows inside CH UDF resource
+ return 0;
+ case LEGACY_SIMPLE_BLOCK: {
+ ui64 result = 0;
+ batch.ForEachRow([&](NUdf::TUnboxedValue& value) {
+ result += NKikimr::NMiniKQL::TArrowBlock::From(value.GetElement(LegacyBlockLengthIndex)).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ });
+ return result;
+ }
+ case LEGACY_TUPLED_BLOCK: {
+ ui64 result = 0;
+ batch.ForEachRow([&](NUdf::TUnboxedValue& value) {
+ auto value0 = value.GetElement(0);
+ result += NKikimr::NMiniKQL::TArrowBlock::From(value0.GetElement(LegacyBlockLengthIndex)).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ });
+ return result;
+ }
+ case SIMPLE_SCALAR:
+ case SIMPLE_WIDE:
+ default:
+ return batch.RowCount();
+ }
+ }
+
void AddBatch(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) {
Y_ABORT_UNLESS(batch.Width() == GetWidth());
@@ -36,7 +147,7 @@ public:
if (static_cast<TDerived*>(this)->PushStats.CollectBasic()) {
static_cast<TDerived*>(this)->PushStats.Bytes += space;
- static_cast<TDerived*>(this)->PushStats.Rows += batch.RowCount();
+ static_cast<TDerived*>(this)->PushStats.Rows += GetRowsCount(batch);
static_cast<TDerived*>(this)->PushStats.Chunks++;
static_cast<TDerived*>(this)->PushStats.Resume();
if (static_cast<TDerived*>(this)->PushStats.CollectFull()) {
@@ -119,7 +230,7 @@ public:
if (static_cast<TDerived*>(this)->PopStats.CollectBasic()) {
static_cast<TDerived*>(this)->PopStats.Bytes += popBytes;
- static_cast<TDerived*>(this)->PopStats.Rows += batch.RowCount(); // may do not match to pushed row count
+ static_cast<TDerived*>(this)->PopStats.Rows += GetRowsCount(batch);
static_cast<TDerived*>(this)->PopStats.Chunks++;
}
@@ -178,6 +289,8 @@ protected:
ui64 StoredBytesBeforePause = 0;
ui64 StoredRowsBeforePause = 0;
static constexpr ui64 PauseMask = 1llu << 63llu;
+ TInputChannelFormat Format = FORMAT_UNKNOWN;
+ ui32 LegacyBlockLengthIndex = 0;
};
} // namespace NYql::NDq