diff options
author | chertus <azuikov@ydb.tech> | 2022-09-05 16:26:32 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-09-05 16:26:32 +0300 |
commit | 3be9e8763332d8c249998fbb9c16de6ea0bae5fe (patch) | |
tree | 761b99822de592f6a8173aa21756e1d1a2cf8911 | |
parent | 83d3d0adfcecbcab4279cdfcc6bc3403d2e84867 (diff) | |
download | ydb-3be9e8763332d8c249998fbb9c16de6ea0bae5fe.tar.gz |
fix some() in SSA
-rw-r--r-- | ydb/core/formats/custom_registry.cpp | 4 | ||||
-rw-r--r-- | ydb/core/formats/program.cpp | 10 | ||||
-rw-r--r-- | ydb/core/formats/program.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/compile/kqp_olap_compiler.cpp | 10 | ||||
-rw-r--r-- | ydb/core/protos/ssa.proto | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_common.cpp | 44 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_columnshard_read_write.cpp | 194 |
7 files changed, 191 insertions, 79 deletions
diff --git a/ydb/core/formats/custom_registry.cpp b/ydb/core/formats/custom_registry.cpp index ef1fb1a193..590e3c3e07 100644 --- a/ydb/core/formats/custom_registry.cpp +++ b/ydb/core/formats/custom_registry.cpp @@ -62,12 +62,12 @@ static void RegisterYdbCast(cp::FunctionRegistry* registry) { static void RegisterHouseAggregates(cp::FunctionRegistry* registry) { #ifndef WIN32 try { - Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAny>(GetHouseFunctionName(EAggregate::Any))).ok()); + Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAny>(GetHouseFunctionName(EAggregate::Some))).ok()); Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedCount>(GetHouseFunctionName(EAggregate::Count))).ok()); Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedMin>(GetHouseFunctionName(EAggregate::Min))).ok()); Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedMax>(GetHouseFunctionName(EAggregate::Max))).ok()); Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedSum>(GetHouseFunctionName(EAggregate::Sum))).ok()); - Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAvg>(GetHouseFunctionName(EAggregate::Avg))).ok()); + //Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAvg>(GetHouseFunctionName(EAggregate::Avg))).ok()); } catch (const std::exception& /*ex*/) { Y_VERIFY(false); } diff --git a/ydb/core/formats/program.cpp b/ydb/core/formats/program.cpp index 48a615c41c..4dd463fffc 100644 --- a/ydb/core/formats/program.cpp +++ b/ydb/core/formats/program.cpp @@ -238,8 +238,6 @@ EOperation ValidateOperation(EOperation op, ui32 argsSize) { const char * GetFunctionName(EAggregate op) { switch (op) { - case EAggregate::Any: - return "any"; case EAggregate::Count: return "count"; case EAggregate::Min: @@ -248,9 +246,10 @@ const char * GetFunctionName(EAggregate op) { return "min_max"; case EAggregate::Sum: return "sum"; +#if 0 // TODO case EAggregate::Avg: return "mean"; - +#endif default: break; } @@ -259,7 +258,7 @@ const char * GetFunctionName(EAggregate op) { const char * GetHouseFunctionName(EAggregate op) { switch (op) { - case EAggregate::Any: + case EAggregate::Some: return "ch.any"; case EAggregate::Count: return "ch.count"; @@ -269,9 +268,10 @@ const char * GetHouseFunctionName(EAggregate op) { return "ch.max"; case EAggregate::Sum: return "ch.sum"; +#if 0 // TODO case EAggregate::Avg: return "ch.avg"; - +#endif default: break; } diff --git a/ydb/core/formats/program.h b/ydb/core/formats/program.h index a314272003..f52a0b6de1 100644 --- a/ydb/core/formats/program.h +++ b/ydb/core/formats/program.h @@ -88,12 +88,12 @@ enum class EOperation { enum class EAggregate { Unspecified = 0, - Any = 1, + Some = 1, Count = 2, Min = 3, Max = 4, Sum = 5, - Avg = 6, + //Avg = 6, }; const char * GetFunctionName(EOperation op); diff --git a/ydb/core/kqp/compile/kqp_olap_compiler.cpp b/ydb/core/kqp/compile/kqp_olap_compiler.cpp index 7f5cd49122..351110d476 100644 --- a/ydb/core/kqp/compile/kqp_olap_compiler.cpp +++ b/ydb/core/kqp/compile/kqp_olap_compiler.cpp @@ -94,7 +94,7 @@ private: std::unordered_map<std::string, EAggFunctionType> TKqpOlapCompileContext::AggFuncTypesMap = { { "count", TProgram::TAggregateAssignment::AGG_COUNT }, - { "some", TProgram::TAggregateAssignment::AGG_ANY }, + { "some", TProgram::TAggregateAssignment::AGG_SOME }, }; TProgram::TAssignment* CompileCondition(const TExprBase& condition, TKqpOlapCompileContext& ctx); @@ -170,7 +170,7 @@ ui32 ConvertParameterToColumn(const TCoParameter& parameter, TKqpOlapCompileCont ui32 ConvertSafeCastToColumn(const TCoSafeCast& cast, TKqpOlapCompileContext& ctx) { auto columnId = GetOrCreateColumnId(cast.Value(), ctx); - + TProgram::TAssignment* ssaValue = ctx.CreateAssignCmd(); auto newCast = ssaValue->MutableFunction(); @@ -379,7 +379,7 @@ void CompileAggregates(const TKqpOlapAgg& aggNode, TKqpOlapCompileContext& ctx) auto aggKeyColId = GetOrCreateColumnId(keyCol, ctx); aggKeyCol->SetId(aggKeyColId); aggKeyCol->SetName(keyColName); - + auto* projCol = projection->AddColumns(); projCol->SetId(aggKeyColId); projCol->SetName(keyColName); @@ -397,10 +397,10 @@ void CompileAggregates(const TKqpOlapAgg& aggNode, TKqpOlapCompileContext& ctx) auto* projCol = projection->AddColumns(); projCol->SetId(aggColId); projCol->SetName(aggColName.c_str()); - + auto* aggFunc = agg->MutableFunction(); aggFunc->SetId(ctx.GetAggFuncType(aggKqp.Type().StringValue().c_str())); - + if (aggKqp.Column() != "*") { aggFunc->AddArguments()->SetId(GetOrCreateColumnId(aggKqp.Column(), ctx)); } diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto index c1c2f3e56b..de4eeabe0a 100644 --- a/ydb/core/protos/ssa.proto +++ b/ydb/core/protos/ssa.proto @@ -99,12 +99,12 @@ message TProgram { message TAggregateAssignment { enum EAggregateFunction { AGG_UNSPECIFIED = 0; - AGG_ANY = 1; + AGG_SOME = 1; AGG_COUNT = 2; AGG_MIN = 3; AGG_MAX = 4; AGG_SUM = 5; - AGG_AVG = 6; + //AGG_AVG = 6; //AGG_VAR = 7; //AGG_COVAR = 8; //AGG_STDDEV = 9; diff --git a/ydb/core/tx/columnshard/columnshard_common.cpp b/ydb/core/tx/columnshard/columnshard_common.cpp index 14f826774b..18d578e177 100644 --- a/ydb/core/tx/columnshard/columnshard_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_common.cpp @@ -45,15 +45,26 @@ struct TContext { : ColumnResolver(columnResolver) {} - std::string GetName(ui32 columnId) const { + std::string GetName(const NKikimrSSA::TProgram::TColumn& column) const { + ui32 columnId = column.GetId(); TString name = ColumnResolver.GetColumnName(columnId, false); if (name.Empty()) { - name = ToString(columnId); + return GenerateName(column); } else { Sources[columnId] = name; } return std::string(name.data(), name.size()); } + + std::string GenerateName(const NKikimrSSA::TProgram::TColumn& column) const { + TString name; + if (column.HasName()) { + name = column.GetName(); + } else { + name = ToString(column.GetId()); + } + return std::string(name.data(), name.size()); + } }; NArrow::TAssign MakeFunction(const TContext& info, const std::string& name, @@ -64,8 +75,7 @@ NArrow::TAssign MakeFunction(const TContext& info, const std::string& name, std::vector<std::string> arguments; for (auto& col : func.GetArguments()) { - ui32 columnId = col.GetId(); - arguments.push_back(info.GetName(columnId)); + arguments.push_back(info.GetName(col)); } switch (func.GetId()) { @@ -177,11 +187,11 @@ NArrow::TAggregateAssign MakeAggregate(const TContext& info, const std::string& using TAggregateAssign = NArrow::TAggregateAssign; if (func.ArgumentsSize() == 1) { - std::string argument = info.GetName(func.GetArguments()[0].GetId()); + std::string argument = info.GetName(func.GetArguments()[0]); switch (func.GetId()) { - case TId::AGG_ANY: - return TAggregateAssign(name, EAggregate::Any, std::move(argument)); + case TId::AGG_SOME: + return TAggregateAssign(name, EAggregate::Some, std::move(argument)); case TId::AGG_COUNT: return TAggregateAssign(name, EAggregate::Count, std::move(argument)); case TId::AGG_MIN: @@ -190,9 +200,10 @@ NArrow::TAggregateAssign MakeAggregate(const TContext& info, const std::string& return TAggregateAssign(name, EAggregate::Max, std::move(argument)); case TId::AGG_SUM: return TAggregateAssign(name, EAggregate::Sum, std::move(argument)); +#if 0 // TODO case TId::AGG_AVG: return TAggregateAssign(name, EAggregate::Avg, std::move(argument)); - +#endif case TId::AGG_UNSPECIFIED: break; } @@ -232,8 +243,7 @@ bool ExtractAssign(const TContext& info, NArrow::TProgramStep& step, const NKiki { using TId = NKikimrSSA::TProgram::TAssignment; - ui32 columnId = assign.GetColumn().GetId(); - std::string columnName = info.GetName(columnId); + std::string columnName = info.GetName(assign.GetColumn()); switch (assign.GetExpressionCase()) { case TId::kFunction: @@ -272,11 +282,12 @@ bool ExtractAssign(const TContext& info, NArrow::TProgramStep& step, const NKiki } bool ExtractFilter(const TContext& info, NArrow::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter) { - ui32 columnId = filter.GetPredicate().GetId(); - if (!columnId) { + auto& column = filter.GetPredicate(); + if (!column.HasId() && !column.HasName()) { return false; } - step.Filters.push_back(info.GetName(columnId)); + // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name. + step.Filters.push_back(info.GetName(column)); return true; } @@ -284,7 +295,8 @@ bool ExtractProjection(const TContext& info, NArrow::TProgramStep& step, const NKikimrSSA::TProgram::TProjection& projection) { step.Projection.reserve(projection.ColumnsSize()); for (auto& col : projection.GetColumns()) { - step.Projection.push_back(info.GetName(col.GetId())); + // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name. + step.Projection.push_back(info.GetName(col)); } return true; } @@ -302,13 +314,13 @@ bool ExtractGroupBy(const TContext& info, NArrow::TProgramStep& step, const NKik // It adds implicit projection with aggregates and keys. Remove non aggregated columns. step.Projection.reserve(groupBy.KeyColumnsSize() + groupBy.AggregatesSize()); for (auto& col : groupBy.GetKeyColumns()) { - step.Projection.push_back(info.GetName(col.GetId())); + step.Projection.push_back(info.GetName(col)); } step.GroupBy.reserve(groupBy.AggregatesSize()); for (auto& agg : groupBy.GetAggregates()) { auto& resColumn = agg.GetColumn(); - TString columnName = ToString(resColumn.GetId()); + TString columnName = info.GenerateName(resColumn); auto func = MakeAggregate(info, columnName, agg.GetFunction()); if (!func.IsOk()) { diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp index 8daf608447..06edad85ce 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp @@ -84,7 +84,7 @@ bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::p } template <typename TArrowType> -bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const TVector<int64_t>& expected) { +bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const std::vector<int64_t>& expected) { UNIT_ASSERT(array); UNIT_ASSERT_VALUES_EQUAL(array->length(), (int)expected.size()); @@ -97,7 +97,21 @@ bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const TVect return true; } -bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const TVector<int64_t>& expected) { +template <typename TArrowArrayType> +bool CheckTypedStrValues(const std::shared_ptr<arrow::Array>& array, const std::vector<std::string>& expected) { + UNIT_ASSERT(array); + UNIT_ASSERT_VALUES_EQUAL(array->length(), (int)expected.size()); + + auto& column = dynamic_cast<const TArrowArrayType&>(*array); + + for (int i = 0; i < column.length(); ++i) { + auto value = column.GetString(i); + UNIT_ASSERT_VALUES_EQUAL(value, expected[i]); + } + return true; +} + +bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const std::vector<int64_t>& expected) { UNIT_ASSERT(array); switch (array->type()->id()) { @@ -125,7 +139,25 @@ bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const TVector<in default: UNIT_ASSERT(false); - //return false; + break; + } + return true; +} + +bool CheckStringValues(const std::shared_ptr<arrow::Array>& array, const std::vector<std::string>& expected) { + UNIT_ASSERT(array); + + switch (array->type()->id()) { + case arrow::Type::STRING: + return CheckTypedStrValues<arrow::StringArray>(array, expected); + case arrow::Type::BINARY: + return CheckTypedStrValues<arrow::BinaryArray>(array, expected); + case arrow::Type::FIXED_SIZE_BINARY: + return CheckTypedStrValues<arrow::FixedSizeBinaryArray>(array, expected); + + default: + UNIT_ASSERT(false); + break; } return true; } @@ -153,14 +185,11 @@ bool CheckOrdered(const TString& blob, const TString& srtSchema) { return true; } -bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& meta, const TVector<TString>& colNames, - size_t rowsCount = 100) { - auto schema = NArrow::DeserializeSchema(meta.GetSchema()); - auto batch = NArrow::DeserializeBatch(blob, schema); +bool CheckColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TVector<TString>& colNames, size_t rowsCount) { UNIT_ASSERT(batch); - for (auto& col : colNames) { if (!batch->GetColumnByName(col)) { + Cerr << "schema: " << batch->schema()->ToString() << "\n"; return false; } } @@ -171,6 +200,14 @@ bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& me return true; } +bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& meta, const TVector<TString>& colNames, + size_t rowsCount = 100) { + auto schema = NArrow::DeserializeSchema(meta.GetSchema()); + auto batch = NArrow::DeserializeBatch(blob, schema); + + return CheckColumns(batch, colNames, rowsCount); +} + void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, const TVector<std::pair<TString, TTypeId>>& schema = TTestSchema::YdbSchema(), NOlap::TSnapshot snap = {10, 10}, TString codec = "") { @@ -929,13 +966,17 @@ static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssig return ssa; } -// SELECT min(x), max(x), some(x), count(x) FROM t -NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId) +// SELECT min(x), max(x), some(x), count(x) FROM t [GROUP BY key[0], key[1], ...] +NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId, const std::vector<ui32>& keys = {}, + bool addProjection = true) { NKikimrSSA::TProgram ssa; auto* line1 = ssa.AddCommand(); auto* groupBy = line1->MutableGroupBy(); + for (ui32 key : keys) { + groupBy->AddKeyColumns()->SetId(key); + } // auto* l1_agg1 = groupBy->AddAggregates(); l1_agg1->MutableColumn()->SetId(100); @@ -952,7 +993,7 @@ NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId) auto* l1_agg3 = groupBy->AddAggregates(); l1_agg3->MutableColumn()->SetId(102); auto* l1_agg3_f = l1_agg3->MutableFunction(); - l1_agg3_f->SetId(TAggAssignment::AGG_ANY); + l1_agg3_f->SetId(TAggAssignment::AGG_SOME); l1_agg3_f->AddArguments()->SetId(columnId); // auto* l1_agg4 = groupBy->AddAggregates(); @@ -961,17 +1002,22 @@ NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId) l1_agg4_f->SetId(TAggAssignment::AGG_COUNT); l1_agg4_f->AddArguments()->SetId(columnId); - auto* line2 = ssa.AddCommand(); - auto* proj = line2->MutableProjection(); - proj->AddColumns()->SetId(100); - proj->AddColumns()->SetId(101); - proj->AddColumns()->SetId(102); - proj->AddColumns()->SetId(103); + // Projection by ids + if (addProjection) { + auto* line2 = ssa.AddCommand(); + auto* proj = line2->MutableProjection(); + proj->AddColumns()->SetId(100); + proj->AddColumns()->SetId(101); + proj->AddColumns()->SetId(102); + proj->AddColumns()->SetId(103); + } return ssa; } -// SELECT min(x), max(x), some(x), count(x) FROM t WHERE y = 1 -NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterColumnId) +// SELECT min(x), max(x), some(x), count(x) FROM t WHERE y = 1 [GROUP BY key[0], key[1], ...] +NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterColumnId, + const std::vector<ui32>& keys = {}, + bool addProjection = true) { NKikimrSSA::TProgram ssa; @@ -993,37 +1039,47 @@ NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterCo auto* line4 = ssa.AddCommand(); auto* groupBy = line4->MutableGroupBy(); + for (ui32 key : keys) { + groupBy->AddKeyColumns()->SetId(key); + } // auto* l4_agg1 = groupBy->AddAggregates(); - l4_agg1->MutableColumn()->SetId(100); + //l4_agg1->MutableColumn()->SetId(100); + l4_agg1->MutableColumn()->SetName("res_min"); auto* l4_agg1_f = l4_agg1->MutableFunction(); l4_agg1_f->SetId(TAggAssignment::AGG_MIN); l4_agg1_f->AddArguments()->SetId(columnId); // auto* l4_agg2 = groupBy->AddAggregates(); - l4_agg2->MutableColumn()->SetId(101); + //l4_agg2->MutableColumn()->SetId(101); + l4_agg2->MutableColumn()->SetName("res_max"); auto* l4_agg2_f = l4_agg2->MutableFunction(); l4_agg2_f->SetId(TAggAssignment::AGG_MAX); l4_agg2_f->AddArguments()->SetId(columnId); // auto* l4_agg3 = groupBy->AddAggregates(); - l4_agg3->MutableColumn()->SetId(102); + //l4_agg3->MutableColumn()->SetId(102); + l4_agg3->MutableColumn()->SetName("res_some"); auto* l4_agg3_f = l4_agg3->MutableFunction(); - l4_agg3_f->SetId(TAggAssignment::AGG_ANY); + l4_agg3_f->SetId(TAggAssignment::AGG_SOME); l4_agg3_f->AddArguments()->SetId(columnId); // auto* l4_agg4 = groupBy->AddAggregates(); - l4_agg4->MutableColumn()->SetId(103); + //l4_agg4->MutableColumn()->SetId(103); + l4_agg4->MutableColumn()->SetName("res_count"); auto* l4_agg4_f = l4_agg4->MutableFunction(); l4_agg4_f->SetId(TAggAssignment::AGG_COUNT); l4_agg4_f->AddArguments()->SetId(columnId); - auto* line5 = ssa.AddCommand(); - auto* proj = line5->MutableProjection(); - proj->AddColumns()->SetId(100); - proj->AddColumns()->SetId(101); - proj->AddColumns()->SetId(102); - proj->AddColumns()->SetId(103); + // Projection by names + if (addProjection) { + auto* line5 = ssa.AddCommand(); + auto* proj = line5->MutableProjection(); + proj->AddColumns()->SetName("res_min"); + proj->AddColumns()->SetName("res_max"); + proj->AddColumns()->SetName("res_some"); + proj->AddColumns()->SetName("res_count"); + } return ssa; } @@ -1154,7 +1210,7 @@ void TestReadWithProgram(const TVector<std::pair<TString, TTypeId>>& ydbSchema = } } -void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbAllTypesSchema()) { +void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema, const std::vector<ui32>& aggKeys = {}) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -1185,20 +1241,28 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T std::vector<TString> programs; THashSet<ui32> intResult; + THashSet<ui32> strResult; THashSet<ui32> isFiltered; THashSet<NScheme::TTypeId> intTypes = { NTypeIds::Int8, NTypeIds::Int16, NTypeIds::Int32, NTypeIds::Int64, NTypeIds::Uint8, NTypeIds::Uint16, NTypeIds::Uint32, NTypeIds::Uint64, NTypeIds::Timestamp }; + THashSet<NScheme::TTypeId> strTypes = { + NTypeIds::Utf8, NTypeIds::String, NTypeIds::Bytes + //NTypeIds::Yson, NTypeIds::Json, NTypeIds::JsonDocument + }; ui32 prog = 0; for (ui32 i = 0; i < ydbSchema.size(); ++i, ++prog) { if (intTypes.count(ydbSchema[i].second)) { intResult.insert(prog); } + if (strTypes.count(ydbSchema[i].second)) { + strResult.insert(prog); + } - NKikimrSSA::TProgram ssa = MakeSelectAggregates(i + 1); + NKikimrSSA::TProgram ssa = MakeSelectAggregates(i + 1, aggKeys, i % 2); TString serialized; UNIT_ASSERT(ssa.SerializeToString(&serialized)); NKikimrSSA::TOlapProgram program; @@ -1213,8 +1277,11 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T if (intTypes.count(ydbSchema[i].second)) { intResult.insert(prog); } + if (strTypes.count(ydbSchema[i].second)) { + strResult.insert(prog); + } - NKikimrSSA::TProgram ssa = MakeSelectAggregatesWithFilter(i + 1, 4); + NKikimrSSA::TProgram ssa = MakeSelectAggregatesWithFilter(i + 1, 4, aggKeys, i % 2); TString serialized; UNIT_ASSERT(ssa.SerializeToString(&serialized)); NKikimrSSA::TOlapProgram program; @@ -1242,6 +1309,7 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); + std::shared_ptr<arrow::RecordBatch> batch; { UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); @@ -1250,30 +1318,45 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T auto& meta = resRead.GetMeta(); auto& schema = meta.GetSchema(); + auto& data = resRead.GetData(); - TVector<TString> readData; - readData.push_back(resRead.GetData()); - auto& data = readData[0]; - - auto batch = NArrow::DeserializeBatch(data, NArrow::DeserializeSchema(schema)); + batch = NArrow::DeserializeBatch(data, NArrow::DeserializeSchema(schema)); UNIT_ASSERT(batch); + UNIT_ASSERT(batch->ValidateFull().ok()); + } - UNIT_ASSERT(CheckColumns(data, meta, {"100", "101", "102", "103"}, 1)); - - // min, max, any, count + if (aggKeys.empty()) { if (intResult.count(prog)) { if (isFiltered.count(prog)) { - UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("100"), {1})); - UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("101"), {1})); - UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), {1})); - UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("103"), {1})); + UNIT_ASSERT(CheckColumns(batch, {"res_min", "res_max", "res_some", "res_count"}, 1)); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_min"), {1})); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_max"), {1})); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_some"), {1})); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_count"), {1})); } else { + UNIT_ASSERT(CheckColumns(batch, {"100", "101", "102", "103"}, 1)); UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("100"), {0})); UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("101"), {99})); - //UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), {0})); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), {0})); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("103"), {100})); + } + } else if (strResult.count(prog)) { + if (isFiltered.count(prog)) { + UNIT_ASSERT(CheckColumns(batch, {"res_min", "res_max", "res_some", "res_count"}, 1)); + UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("res_min"), {"1"})); + UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("res_max"), {"1"})); + UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("res_some"), {"1"})); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_count"), {1})); + } else { + UNIT_ASSERT(CheckColumns(batch, {"100", "101", "102", "103"}, 1)); + UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("100"), {"0"})); + UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("101"), {"99"})); + UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("102"), {"0"})); UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("103"), {100})); } } + } else { + // TODO } ++prog; @@ -1328,8 +1411,25 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } Y_UNIT_TEST(ReadAggregate) { - TestReadAggregate(); + TestReadAggregate(TTestSchema::YdbAllTypesSchema()); + } + +#if 0 + Y_UNIT_TEST(ReadGroupBy) { + auto schema = TTestSchema::YdbAllTypesSchema(); + for (ui32 keyPos = 0; keyPos < schema.size(); ++keyPos) { + TestReadAggregate(schema, {keyPos}); + } + + for (ui32 keyPos = 0; keyPos < schema.size() - 1; ++keyPos) { + TestReadAggregate(schema, {keyPos, keyPos + 1}); + } + + for (ui32 keyPos = 0; keyPos < schema.size() - 2; ++keyPos) { + TestReadAggregate(schema, {keyPos, keyPos + 1, keyPos + 2}); + } } +#endif Y_UNIT_TEST(CompactionSplitGranule) { TTestBasicRuntime runtime; |