aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-09-05 16:26:32 +0300
committerchertus <azuikov@ydb.tech>2022-09-05 16:26:32 +0300
commit3be9e8763332d8c249998fbb9c16de6ea0bae5fe (patch)
tree761b99822de592f6a8173aa21756e1d1a2cf8911
parent83d3d0adfcecbcab4279cdfcc6bc3403d2e84867 (diff)
downloadydb-3be9e8763332d8c249998fbb9c16de6ea0bae5fe.tar.gz
fix some() in SSA
-rw-r--r--ydb/core/formats/custom_registry.cpp4
-rw-r--r--ydb/core/formats/program.cpp10
-rw-r--r--ydb/core/formats/program.h4
-rw-r--r--ydb/core/kqp/compile/kqp_olap_compiler.cpp10
-rw-r--r--ydb/core/protos/ssa.proto4
-rw-r--r--ydb/core/tx/columnshard/columnshard_common.cpp44
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_read_write.cpp194
7 files changed, 191 insertions, 79 deletions
diff --git a/ydb/core/formats/custom_registry.cpp b/ydb/core/formats/custom_registry.cpp
index ef1fb1a193..590e3c3e07 100644
--- a/ydb/core/formats/custom_registry.cpp
+++ b/ydb/core/formats/custom_registry.cpp
@@ -62,12 +62,12 @@ static void RegisterYdbCast(cp::FunctionRegistry* registry) {
static void RegisterHouseAggregates(cp::FunctionRegistry* registry) {
#ifndef WIN32
try {
- Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAny>(GetHouseFunctionName(EAggregate::Any))).ok());
+ Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAny>(GetHouseFunctionName(EAggregate::Some))).ok());
Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedCount>(GetHouseFunctionName(EAggregate::Count))).ok());
Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedMin>(GetHouseFunctionName(EAggregate::Min))).ok());
Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedMax>(GetHouseFunctionName(EAggregate::Max))).ok());
Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedSum>(GetHouseFunctionName(EAggregate::Sum))).ok());
- Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAvg>(GetHouseFunctionName(EAggregate::Avg))).ok());
+ //Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAvg>(GetHouseFunctionName(EAggregate::Avg))).ok());
} catch (const std::exception& /*ex*/) {
Y_VERIFY(false);
}
diff --git a/ydb/core/formats/program.cpp b/ydb/core/formats/program.cpp
index 48a615c41c..4dd463fffc 100644
--- a/ydb/core/formats/program.cpp
+++ b/ydb/core/formats/program.cpp
@@ -238,8 +238,6 @@ EOperation ValidateOperation(EOperation op, ui32 argsSize) {
const char * GetFunctionName(EAggregate op) {
switch (op) {
- case EAggregate::Any:
- return "any";
case EAggregate::Count:
return "count";
case EAggregate::Min:
@@ -248,9 +246,10 @@ const char * GetFunctionName(EAggregate op) {
return "min_max";
case EAggregate::Sum:
return "sum";
+#if 0 // TODO
case EAggregate::Avg:
return "mean";
-
+#endif
default:
break;
}
@@ -259,7 +258,7 @@ const char * GetFunctionName(EAggregate op) {
const char * GetHouseFunctionName(EAggregate op) {
switch (op) {
- case EAggregate::Any:
+ case EAggregate::Some:
return "ch.any";
case EAggregate::Count:
return "ch.count";
@@ -269,9 +268,10 @@ const char * GetHouseFunctionName(EAggregate op) {
return "ch.max";
case EAggregate::Sum:
return "ch.sum";
+#if 0 // TODO
case EAggregate::Avg:
return "ch.avg";
-
+#endif
default:
break;
}
diff --git a/ydb/core/formats/program.h b/ydb/core/formats/program.h
index a314272003..f52a0b6de1 100644
--- a/ydb/core/formats/program.h
+++ b/ydb/core/formats/program.h
@@ -88,12 +88,12 @@ enum class EOperation {
enum class EAggregate {
Unspecified = 0,
- Any = 1,
+ Some = 1,
Count = 2,
Min = 3,
Max = 4,
Sum = 5,
- Avg = 6,
+ //Avg = 6,
};
const char * GetFunctionName(EOperation op);
diff --git a/ydb/core/kqp/compile/kqp_olap_compiler.cpp b/ydb/core/kqp/compile/kqp_olap_compiler.cpp
index 7f5cd49122..351110d476 100644
--- a/ydb/core/kqp/compile/kqp_olap_compiler.cpp
+++ b/ydb/core/kqp/compile/kqp_olap_compiler.cpp
@@ -94,7 +94,7 @@ private:
std::unordered_map<std::string, EAggFunctionType> TKqpOlapCompileContext::AggFuncTypesMap = {
{ "count", TProgram::TAggregateAssignment::AGG_COUNT },
- { "some", TProgram::TAggregateAssignment::AGG_ANY },
+ { "some", TProgram::TAggregateAssignment::AGG_SOME },
};
TProgram::TAssignment* CompileCondition(const TExprBase& condition, TKqpOlapCompileContext& ctx);
@@ -170,7 +170,7 @@ ui32 ConvertParameterToColumn(const TCoParameter& parameter, TKqpOlapCompileCont
ui32 ConvertSafeCastToColumn(const TCoSafeCast& cast, TKqpOlapCompileContext& ctx)
{
auto columnId = GetOrCreateColumnId(cast.Value(), ctx);
-
+
TProgram::TAssignment* ssaValue = ctx.CreateAssignCmd();
auto newCast = ssaValue->MutableFunction();
@@ -379,7 +379,7 @@ void CompileAggregates(const TKqpOlapAgg& aggNode, TKqpOlapCompileContext& ctx)
auto aggKeyColId = GetOrCreateColumnId(keyCol, ctx);
aggKeyCol->SetId(aggKeyColId);
aggKeyCol->SetName(keyColName);
-
+
auto* projCol = projection->AddColumns();
projCol->SetId(aggKeyColId);
projCol->SetName(keyColName);
@@ -397,10 +397,10 @@ void CompileAggregates(const TKqpOlapAgg& aggNode, TKqpOlapCompileContext& ctx)
auto* projCol = projection->AddColumns();
projCol->SetId(aggColId);
projCol->SetName(aggColName.c_str());
-
+
auto* aggFunc = agg->MutableFunction();
aggFunc->SetId(ctx.GetAggFuncType(aggKqp.Type().StringValue().c_str()));
-
+
if (aggKqp.Column() != "*") {
aggFunc->AddArguments()->SetId(GetOrCreateColumnId(aggKqp.Column(), ctx));
}
diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto
index c1c2f3e56b..de4eeabe0a 100644
--- a/ydb/core/protos/ssa.proto
+++ b/ydb/core/protos/ssa.proto
@@ -99,12 +99,12 @@ message TProgram {
message TAggregateAssignment {
enum EAggregateFunction {
AGG_UNSPECIFIED = 0;
- AGG_ANY = 1;
+ AGG_SOME = 1;
AGG_COUNT = 2;
AGG_MIN = 3;
AGG_MAX = 4;
AGG_SUM = 5;
- AGG_AVG = 6;
+ //AGG_AVG = 6;
//AGG_VAR = 7;
//AGG_COVAR = 8;
//AGG_STDDEV = 9;
diff --git a/ydb/core/tx/columnshard/columnshard_common.cpp b/ydb/core/tx/columnshard/columnshard_common.cpp
index 14f826774b..18d578e177 100644
--- a/ydb/core/tx/columnshard/columnshard_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_common.cpp
@@ -45,15 +45,26 @@ struct TContext {
: ColumnResolver(columnResolver)
{}
- std::string GetName(ui32 columnId) const {
+ std::string GetName(const NKikimrSSA::TProgram::TColumn& column) const {
+ ui32 columnId = column.GetId();
TString name = ColumnResolver.GetColumnName(columnId, false);
if (name.Empty()) {
- name = ToString(columnId);
+ return GenerateName(column);
} else {
Sources[columnId] = name;
}
return std::string(name.data(), name.size());
}
+
+ std::string GenerateName(const NKikimrSSA::TProgram::TColumn& column) const {
+ TString name;
+ if (column.HasName()) {
+ name = column.GetName();
+ } else {
+ name = ToString(column.GetId());
+ }
+ return std::string(name.data(), name.size());
+ }
};
NArrow::TAssign MakeFunction(const TContext& info, const std::string& name,
@@ -64,8 +75,7 @@ NArrow::TAssign MakeFunction(const TContext& info, const std::string& name,
std::vector<std::string> arguments;
for (auto& col : func.GetArguments()) {
- ui32 columnId = col.GetId();
- arguments.push_back(info.GetName(columnId));
+ arguments.push_back(info.GetName(col));
}
switch (func.GetId()) {
@@ -177,11 +187,11 @@ NArrow::TAggregateAssign MakeAggregate(const TContext& info, const std::string&
using TAggregateAssign = NArrow::TAggregateAssign;
if (func.ArgumentsSize() == 1) {
- std::string argument = info.GetName(func.GetArguments()[0].GetId());
+ std::string argument = info.GetName(func.GetArguments()[0]);
switch (func.GetId()) {
- case TId::AGG_ANY:
- return TAggregateAssign(name, EAggregate::Any, std::move(argument));
+ case TId::AGG_SOME:
+ return TAggregateAssign(name, EAggregate::Some, std::move(argument));
case TId::AGG_COUNT:
return TAggregateAssign(name, EAggregate::Count, std::move(argument));
case TId::AGG_MIN:
@@ -190,9 +200,10 @@ NArrow::TAggregateAssign MakeAggregate(const TContext& info, const std::string&
return TAggregateAssign(name, EAggregate::Max, std::move(argument));
case TId::AGG_SUM:
return TAggregateAssign(name, EAggregate::Sum, std::move(argument));
+#if 0 // TODO
case TId::AGG_AVG:
return TAggregateAssign(name, EAggregate::Avg, std::move(argument));
-
+#endif
case TId::AGG_UNSPECIFIED:
break;
}
@@ -232,8 +243,7 @@ bool ExtractAssign(const TContext& info, NArrow::TProgramStep& step, const NKiki
{
using TId = NKikimrSSA::TProgram::TAssignment;
- ui32 columnId = assign.GetColumn().GetId();
- std::string columnName = info.GetName(columnId);
+ std::string columnName = info.GetName(assign.GetColumn());
switch (assign.GetExpressionCase()) {
case TId::kFunction:
@@ -272,11 +282,12 @@ bool ExtractAssign(const TContext& info, NArrow::TProgramStep& step, const NKiki
}
bool ExtractFilter(const TContext& info, NArrow::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter) {
- ui32 columnId = filter.GetPredicate().GetId();
- if (!columnId) {
+ auto& column = filter.GetPredicate();
+ if (!column.HasId() && !column.HasName()) {
return false;
}
- step.Filters.push_back(info.GetName(columnId));
+ // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name.
+ step.Filters.push_back(info.GetName(column));
return true;
}
@@ -284,7 +295,8 @@ bool ExtractProjection(const TContext& info, NArrow::TProgramStep& step,
const NKikimrSSA::TProgram::TProjection& projection) {
step.Projection.reserve(projection.ColumnsSize());
for (auto& col : projection.GetColumns()) {
- step.Projection.push_back(info.GetName(col.GetId()));
+ // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name.
+ step.Projection.push_back(info.GetName(col));
}
return true;
}
@@ -302,13 +314,13 @@ bool ExtractGroupBy(const TContext& info, NArrow::TProgramStep& step, const NKik
// It adds implicit projection with aggregates and keys. Remove non aggregated columns.
step.Projection.reserve(groupBy.KeyColumnsSize() + groupBy.AggregatesSize());
for (auto& col : groupBy.GetKeyColumns()) {
- step.Projection.push_back(info.GetName(col.GetId()));
+ step.Projection.push_back(info.GetName(col));
}
step.GroupBy.reserve(groupBy.AggregatesSize());
for (auto& agg : groupBy.GetAggregates()) {
auto& resColumn = agg.GetColumn();
- TString columnName = ToString(resColumn.GetId());
+ TString columnName = info.GenerateName(resColumn);
auto func = MakeAggregate(info, columnName, agg.GetFunction());
if (!func.IsOk()) {
diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
index 8daf608447..06edad85ce 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
@@ -84,7 +84,7 @@ bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::p
}
template <typename TArrowType>
-bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const TVector<int64_t>& expected) {
+bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const std::vector<int64_t>& expected) {
UNIT_ASSERT(array);
UNIT_ASSERT_VALUES_EQUAL(array->length(), (int)expected.size());
@@ -97,7 +97,21 @@ bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const TVect
return true;
}
-bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const TVector<int64_t>& expected) {
+template <typename TArrowArrayType>
+bool CheckTypedStrValues(const std::shared_ptr<arrow::Array>& array, const std::vector<std::string>& expected) {
+ UNIT_ASSERT(array);
+ UNIT_ASSERT_VALUES_EQUAL(array->length(), (int)expected.size());
+
+ auto& column = dynamic_cast<const TArrowArrayType&>(*array);
+
+ for (int i = 0; i < column.length(); ++i) {
+ auto value = column.GetString(i);
+ UNIT_ASSERT_VALUES_EQUAL(value, expected[i]);
+ }
+ return true;
+}
+
+bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const std::vector<int64_t>& expected) {
UNIT_ASSERT(array);
switch (array->type()->id()) {
@@ -125,7 +139,25 @@ bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const TVector<in
default:
UNIT_ASSERT(false);
- //return false;
+ break;
+ }
+ return true;
+}
+
+bool CheckStringValues(const std::shared_ptr<arrow::Array>& array, const std::vector<std::string>& expected) {
+ UNIT_ASSERT(array);
+
+ switch (array->type()->id()) {
+ case arrow::Type::STRING:
+ return CheckTypedStrValues<arrow::StringArray>(array, expected);
+ case arrow::Type::BINARY:
+ return CheckTypedStrValues<arrow::BinaryArray>(array, expected);
+ case arrow::Type::FIXED_SIZE_BINARY:
+ return CheckTypedStrValues<arrow::FixedSizeBinaryArray>(array, expected);
+
+ default:
+ UNIT_ASSERT(false);
+ break;
}
return true;
}
@@ -153,14 +185,11 @@ bool CheckOrdered(const TString& blob, const TString& srtSchema) {
return true;
}
-bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& meta, const TVector<TString>& colNames,
- size_t rowsCount = 100) {
- auto schema = NArrow::DeserializeSchema(meta.GetSchema());
- auto batch = NArrow::DeserializeBatch(blob, schema);
+bool CheckColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TVector<TString>& colNames, size_t rowsCount) {
UNIT_ASSERT(batch);
-
for (auto& col : colNames) {
if (!batch->GetColumnByName(col)) {
+ Cerr << "schema: " << batch->schema()->ToString() << "\n";
return false;
}
}
@@ -171,6 +200,14 @@ bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& me
return true;
}
+bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& meta, const TVector<TString>& colNames,
+ size_t rowsCount = 100) {
+ auto schema = NArrow::DeserializeSchema(meta.GetSchema());
+ auto batch = NArrow::DeserializeBatch(blob, schema);
+
+ return CheckColumns(batch, colNames, rowsCount);
+}
+
void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
const TVector<std::pair<TString, TTypeId>>& schema = TTestSchema::YdbSchema(),
NOlap::TSnapshot snap = {10, 10}, TString codec = "") {
@@ -929,13 +966,17 @@ static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssig
return ssa;
}
-// SELECT min(x), max(x), some(x), count(x) FROM t
-NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId)
+// SELECT min(x), max(x), some(x), count(x) FROM t [GROUP BY key[0], key[1], ...]
+NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId, const std::vector<ui32>& keys = {},
+ bool addProjection = true)
{
NKikimrSSA::TProgram ssa;
auto* line1 = ssa.AddCommand();
auto* groupBy = line1->MutableGroupBy();
+ for (ui32 key : keys) {
+ groupBy->AddKeyColumns()->SetId(key);
+ }
//
auto* l1_agg1 = groupBy->AddAggregates();
l1_agg1->MutableColumn()->SetId(100);
@@ -952,7 +993,7 @@ NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId)
auto* l1_agg3 = groupBy->AddAggregates();
l1_agg3->MutableColumn()->SetId(102);
auto* l1_agg3_f = l1_agg3->MutableFunction();
- l1_agg3_f->SetId(TAggAssignment::AGG_ANY);
+ l1_agg3_f->SetId(TAggAssignment::AGG_SOME);
l1_agg3_f->AddArguments()->SetId(columnId);
//
auto* l1_agg4 = groupBy->AddAggregates();
@@ -961,17 +1002,22 @@ NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId)
l1_agg4_f->SetId(TAggAssignment::AGG_COUNT);
l1_agg4_f->AddArguments()->SetId(columnId);
- auto* line2 = ssa.AddCommand();
- auto* proj = line2->MutableProjection();
- proj->AddColumns()->SetId(100);
- proj->AddColumns()->SetId(101);
- proj->AddColumns()->SetId(102);
- proj->AddColumns()->SetId(103);
+ // Projection by ids
+ if (addProjection) {
+ auto* line2 = ssa.AddCommand();
+ auto* proj = line2->MutableProjection();
+ proj->AddColumns()->SetId(100);
+ proj->AddColumns()->SetId(101);
+ proj->AddColumns()->SetId(102);
+ proj->AddColumns()->SetId(103);
+ }
return ssa;
}
-// SELECT min(x), max(x), some(x), count(x) FROM t WHERE y = 1
-NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterColumnId)
+// SELECT min(x), max(x), some(x), count(x) FROM t WHERE y = 1 [GROUP BY key[0], key[1], ...]
+NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterColumnId,
+ const std::vector<ui32>& keys = {},
+ bool addProjection = true)
{
NKikimrSSA::TProgram ssa;
@@ -993,37 +1039,47 @@ NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterCo
auto* line4 = ssa.AddCommand();
auto* groupBy = line4->MutableGroupBy();
+ for (ui32 key : keys) {
+ groupBy->AddKeyColumns()->SetId(key);
+ }
//
auto* l4_agg1 = groupBy->AddAggregates();
- l4_agg1->MutableColumn()->SetId(100);
+ //l4_agg1->MutableColumn()->SetId(100);
+ l4_agg1->MutableColumn()->SetName("res_min");
auto* l4_agg1_f = l4_agg1->MutableFunction();
l4_agg1_f->SetId(TAggAssignment::AGG_MIN);
l4_agg1_f->AddArguments()->SetId(columnId);
//
auto* l4_agg2 = groupBy->AddAggregates();
- l4_agg2->MutableColumn()->SetId(101);
+ //l4_agg2->MutableColumn()->SetId(101);
+ l4_agg2->MutableColumn()->SetName("res_max");
auto* l4_agg2_f = l4_agg2->MutableFunction();
l4_agg2_f->SetId(TAggAssignment::AGG_MAX);
l4_agg2_f->AddArguments()->SetId(columnId);
//
auto* l4_agg3 = groupBy->AddAggregates();
- l4_agg3->MutableColumn()->SetId(102);
+ //l4_agg3->MutableColumn()->SetId(102);
+ l4_agg3->MutableColumn()->SetName("res_some");
auto* l4_agg3_f = l4_agg3->MutableFunction();
- l4_agg3_f->SetId(TAggAssignment::AGG_ANY);
+ l4_agg3_f->SetId(TAggAssignment::AGG_SOME);
l4_agg3_f->AddArguments()->SetId(columnId);
//
auto* l4_agg4 = groupBy->AddAggregates();
- l4_agg4->MutableColumn()->SetId(103);
+ //l4_agg4->MutableColumn()->SetId(103);
+ l4_agg4->MutableColumn()->SetName("res_count");
auto* l4_agg4_f = l4_agg4->MutableFunction();
l4_agg4_f->SetId(TAggAssignment::AGG_COUNT);
l4_agg4_f->AddArguments()->SetId(columnId);
- auto* line5 = ssa.AddCommand();
- auto* proj = line5->MutableProjection();
- proj->AddColumns()->SetId(100);
- proj->AddColumns()->SetId(101);
- proj->AddColumns()->SetId(102);
- proj->AddColumns()->SetId(103);
+ // Projection by names
+ if (addProjection) {
+ auto* line5 = ssa.AddCommand();
+ auto* proj = line5->MutableProjection();
+ proj->AddColumns()->SetName("res_min");
+ proj->AddColumns()->SetName("res_max");
+ proj->AddColumns()->SetName("res_some");
+ proj->AddColumns()->SetName("res_count");
+ }
return ssa;
}
@@ -1154,7 +1210,7 @@ void TestReadWithProgram(const TVector<std::pair<TString, TTypeId>>& ydbSchema =
}
}
-void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbAllTypesSchema()) {
+void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema, const std::vector<ui32>& aggKeys = {}) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -1185,20 +1241,28 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
std::vector<TString> programs;
THashSet<ui32> intResult;
+ THashSet<ui32> strResult;
THashSet<ui32> isFiltered;
THashSet<NScheme::TTypeId> intTypes = {
NTypeIds::Int8, NTypeIds::Int16, NTypeIds::Int32, NTypeIds::Int64,
NTypeIds::Uint8, NTypeIds::Uint16, NTypeIds::Uint32, NTypeIds::Uint64,
NTypeIds::Timestamp
};
+ THashSet<NScheme::TTypeId> strTypes = {
+ NTypeIds::Utf8, NTypeIds::String, NTypeIds::Bytes
+ //NTypeIds::Yson, NTypeIds::Json, NTypeIds::JsonDocument
+ };
ui32 prog = 0;
for (ui32 i = 0; i < ydbSchema.size(); ++i, ++prog) {
if (intTypes.count(ydbSchema[i].second)) {
intResult.insert(prog);
}
+ if (strTypes.count(ydbSchema[i].second)) {
+ strResult.insert(prog);
+ }
- NKikimrSSA::TProgram ssa = MakeSelectAggregates(i + 1);
+ NKikimrSSA::TProgram ssa = MakeSelectAggregates(i + 1, aggKeys, i % 2);
TString serialized;
UNIT_ASSERT(ssa.SerializeToString(&serialized));
NKikimrSSA::TOlapProgram program;
@@ -1213,8 +1277,11 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
if (intTypes.count(ydbSchema[i].second)) {
intResult.insert(prog);
}
+ if (strTypes.count(ydbSchema[i].second)) {
+ strResult.insert(prog);
+ }
- NKikimrSSA::TProgram ssa = MakeSelectAggregatesWithFilter(i + 1, 4);
+ NKikimrSSA::TProgram ssa = MakeSelectAggregatesWithFilter(i + 1, 4, aggKeys, i % 2);
TString serialized;
UNIT_ASSERT(ssa.SerializeToString(&serialized));
NKikimrSSA::TOlapProgram program;
@@ -1242,6 +1309,7 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
+ std::shared_ptr<arrow::RecordBatch> batch;
{
UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
@@ -1250,30 +1318,45 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
auto& meta = resRead.GetMeta();
auto& schema = meta.GetSchema();
+ auto& data = resRead.GetData();
- TVector<TString> readData;
- readData.push_back(resRead.GetData());
- auto& data = readData[0];
-
- auto batch = NArrow::DeserializeBatch(data, NArrow::DeserializeSchema(schema));
+ batch = NArrow::DeserializeBatch(data, NArrow::DeserializeSchema(schema));
UNIT_ASSERT(batch);
+ UNIT_ASSERT(batch->ValidateFull().ok());
+ }
- UNIT_ASSERT(CheckColumns(data, meta, {"100", "101", "102", "103"}, 1));
-
- // min, max, any, count
+ if (aggKeys.empty()) {
if (intResult.count(prog)) {
if (isFiltered.count(prog)) {
- UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("100"), {1}));
- UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("101"), {1}));
- UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), {1}));
- UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("103"), {1}));
+ UNIT_ASSERT(CheckColumns(batch, {"res_min", "res_max", "res_some", "res_count"}, 1));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_min"), {1}));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_max"), {1}));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_some"), {1}));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_count"), {1}));
} else {
+ UNIT_ASSERT(CheckColumns(batch, {"100", "101", "102", "103"}, 1));
UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("100"), {0}));
UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("101"), {99}));
- //UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), {0}));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), {0}));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("103"), {100}));
+ }
+ } else if (strResult.count(prog)) {
+ if (isFiltered.count(prog)) {
+ UNIT_ASSERT(CheckColumns(batch, {"res_min", "res_max", "res_some", "res_count"}, 1));
+ UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("res_min"), {"1"}));
+ UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("res_max"), {"1"}));
+ UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("res_some"), {"1"}));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_count"), {1}));
+ } else {
+ UNIT_ASSERT(CheckColumns(batch, {"100", "101", "102", "103"}, 1));
+ UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("100"), {"0"}));
+ UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("101"), {"99"}));
+ UNIT_ASSERT(CheckStringValues(batch->GetColumnByName("102"), {"0"}));
UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("103"), {100}));
}
}
+ } else {
+ // TODO
}
++prog;
@@ -1328,8 +1411,25 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
Y_UNIT_TEST(ReadAggregate) {
- TestReadAggregate();
+ TestReadAggregate(TTestSchema::YdbAllTypesSchema());
+ }
+
+#if 0
+ Y_UNIT_TEST(ReadGroupBy) {
+ auto schema = TTestSchema::YdbAllTypesSchema();
+ for (ui32 keyPos = 0; keyPos < schema.size(); ++keyPos) {
+ TestReadAggregate(schema, {keyPos});
+ }
+
+ for (ui32 keyPos = 0; keyPos < schema.size() - 1; ++keyPos) {
+ TestReadAggregate(schema, {keyPos, keyPos + 1});
+ }
+
+ for (ui32 keyPos = 0; keyPos < schema.size() - 2; ++keyPos) {
+ TestReadAggregate(schema, {keyPos, keyPos + 1, keyPos + 2});
+ }
}
+#endif
Y_UNIT_TEST(CompactionSplitGranule) {
TTestBasicRuntime runtime;