aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <aidarsamer@ydb.tech>2022-12-08 14:39:43 +0300
committeraidarsamer <aidarsamer@ydb.tech>2022-12-08 14:39:43 +0300
commitb73d9229efb225a963105ec0623ff6fb7a3880e8 (patch)
tree8582b2062f399aa043b1e9f7c9377e5184079a32
parent2e77ddea288b1279cb28699cdb446e659d97380d (diff)
downloadydb-b73d9229efb225a963105ec0623ff6fb7a3880e8.tar.gz
Add final projection to SSA program.
Fix SSA Projection command apply to work correctly. Add unit test for final Projection order. Add final projection to SSA program. Fix SSA Projection command apply to work correctly.
-rw-r--r--ydb/core/formats/program.cpp18
-rw-r--r--ydb/core/formats/program.h2
-rw-r--r--ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp84
-rw-r--r--ydb/core/kqp/query_compiler/kqp_olap_compiler.h2
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp37
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp55
-rw-r--r--ydb/core/tx/columnshard/columnshard_common.cpp7
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_read_write.cpp24
8 files changed, 150 insertions, 79 deletions
diff --git a/ydb/core/formats/program.cpp b/ydb/core/formats/program.cpp
index aa023ea76a7..f1a2cea3535 100644
--- a/ydb/core/formats/program.cpp
+++ b/ydb/core/formats/program.cpp
@@ -678,21 +678,15 @@ arrow::Status TProgramStep::ApplyProjection(TDatumBatch& batch) const {
if (Projection.empty()) {
return arrow::Status::OK();
}
- std::unordered_set<std::string_view> projSet;
- for (auto& str: Projection) {
- projSet.insert(str);
- }
std::vector<std::shared_ptr<arrow::Field>> newFields;
std::vector<arrow::Datum> newDatums;
- for (int64_t i = 0; i < batch.Schema->num_fields(); ++i) {
- auto& cur_field_name = batch.Schema->field(i)->name();
- if (projSet.contains(cur_field_name)) {
- newFields.push_back(batch.Schema->field(i));
- if (!newFields.back()) {
- return arrow::Status::Invalid("Wrong projection.");
- }
- newDatums.push_back(batch.Datums[i]);
+ for (size_t i = 0; i < Projection.size(); ++i) {
+ int schemaFieldIndex = batch.Schema->GetFieldIndex(Projection[i]);
+ if (schemaFieldIndex == -1) {
+ return arrow::Status::Invalid("Could not find column " + Projection[i] + " in record batch schema.");
}
+ newFields.push_back(batch.Schema->field(schemaFieldIndex));
+ newDatums.push_back(batch.Datums[schemaFieldIndex]);
}
batch.Schema = std::make_shared<arrow::Schema>(newFields);
batch.Datums = std::move(newDatums);
diff --git a/ydb/core/formats/program.h b/ydb/core/formats/program.h
index 8567f0ca130..590025a4fb3 100644
--- a/ydb/core/formats/program.h
+++ b/ydb/core/formats/program.h
@@ -192,7 +192,7 @@ struct TProgramStep {
};
bool Empty() const {
- return Assignes.empty() && Filters.empty() && Projection.empty();
+ return Assignes.empty() && Filters.empty() && Projection.empty() && GroupBy.empty() && GroupByKeys.empty();
}
arrow::Status Apply(std::shared_ptr<arrow::RecordBatch>& batch, arrow::compute::ExecContext* ctx) const;
diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
index 99fcb501b73..a0915341cd9 100644
--- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
@@ -27,10 +27,11 @@ struct TAggColInfo {
class TKqpOlapCompileContext {
public:
TKqpOlapCompileContext(const TCoArgument& row, const TKikimrTableMetadata& tableMeta,
- NKqpProto::TKqpPhyOpReadOlapRanges& readProto)
+ NKqpProto::TKqpPhyOpReadOlapRanges& readProto, const std::vector<std::string>& resultColNames)
: Row(row)
, MaxColumnId(0)
, ReadProto(readProto)
+ , ResultColNames(resultColNames)
{
for (const auto& [_, columnMeta] : tableMeta.Columns) {
YQL_ENSURE(ReadColumns.emplace(columnMeta.Name, columnMeta.Id).second);
@@ -40,11 +41,16 @@ public:
Program.SetVersion(OLAP_PROGRAM_VERSION);
}
- ui32 GetColumnId(const TStringBuf& name) const {
- auto column = ReadColumns.FindPtr(name);
- YQL_ENSURE(column);
+ ui32 GetColumnId(const std::string& name) const {
+ auto columnIt = ReadColumns.find(name);
+ if (columnIt == ReadColumns.end()) {
+ auto resColNameIt = std::find(ResultColNames.begin(), ResultColNames.end(), name);
+ YQL_ENSURE(resColNameIt != ResultColNames.end());
- return *column;
+ columnIt = KqpAggColNameToId.find(*resColNameIt);
+ YQL_ENSURE(columnIt != KqpAggColNameToId.end());
+ }
+ return columnIt->second;
}
ui32 NewColumnId() {
@@ -91,14 +97,28 @@ public:
return AggFuncTypesMap.at(funcName);
}
+ void MapKqpAggColNameToId(const std::string& colName, ui32 id) {
+ KqpAggColNameToId.emplace(colName, id);
+ }
+
+ std::vector<std::string> GetResultColNames() {
+ return ResultColNames;
+ }
+
+ bool IsEmptyProgram() {
+ return Program.GetCommand().empty();
+ }
+
private:
static std::unordered_map<std::string, EAggFunctionType> AggFuncTypesMap;
TCoArgument Row;
- TMap<TString, ui32> ReadColumns;
+ std::unordered_map<std::string, ui32> ReadColumns;
ui32 MaxColumnId;
TProgram Program;
NKqpProto::TKqpPhyOpReadOlapRanges& ReadProto;
+ const std::vector<std::string>& ResultColNames;
+ std::unordered_map<std::string, ui32> KqpAggColNameToId;
};
std::unordered_map<std::string, EAggFunctionType> TKqpOlapCompileContext::AggFuncTypesMap = {
@@ -232,7 +252,7 @@ ui64 GetOrCreateColumnId(const TExprBase& node, TKqpOlapCompileContext& ctx) {
}
if (auto maybeAtom = node.Maybe<TCoAtom>()) {
- return ctx.GetColumnId(maybeAtom.Cast().Value());
+ return ctx.GetColumnId(maybeAtom.Cast().StringValue());
}
if (auto maybeParameter = node.Maybe<TCoParameter>()) {
@@ -393,6 +413,7 @@ std::vector<TAggColInfo> CollectAggregationInfos(const TKqpOlapAgg& aggNode, TKq
colInfo.AggColId = ctx.NewColumnId();
colInfo.BaseColName = aggKqp.Column().StringValue().c_str();
colInfo.Operation = aggKqp.Type().StringValue();
+ ctx.MapKqpAggColNameToId(colInfo.AggColName, colInfo.AggColId);
auto opType = aggKqp.Type().StringValue();
if (opType != "count" || (opType == "count" && colInfo.BaseColName != "*")) {
@@ -406,26 +427,12 @@ std::vector<TAggColInfo> CollectAggregationInfos(const TKqpOlapAgg& aggNode, TKq
void CompileAggregates(const TKqpOlapAgg& aggNode, TKqpOlapCompileContext& ctx) {
std::vector<TAggColInfo> aggColInfos = CollectAggregationInfos(aggNode, ctx);
auto* groupBy = ctx.CreateGroupBy();
- auto* projection = ctx.CreateProjection();
-
- for (auto keyCol : aggNode.KeyColumns()) {
- auto aggKeyCol = groupBy->AddKeyColumns();
- auto keyColName = keyCol.StringValue();
- auto aggKeyColId = GetOrCreateColumnId(keyCol, ctx);
- aggKeyCol->SetId(aggKeyColId);
-
- auto* projCol = projection->AddColumns();
- projCol->SetId(aggKeyColId);
- }
for (auto aggColInfo : aggColInfos) {
auto* agg = groupBy->AddAggregates();
auto* aggCol = agg->MutableColumn();
aggCol->SetId(aggColInfo.AggColId);
- auto* projCol = projection->AddColumns();
- projCol->SetId(aggColInfo.AggColId);
-
auto* aggFunc = agg->MutableFunction();
aggFunc->SetId(ctx.GetAggFuncType(aggColInfo.Operation));
@@ -433,17 +440,22 @@ void CompileAggregates(const TKqpOlapAgg& aggNode, TKqpOlapCompileContext& ctx)
aggFunc->AddArguments()->SetId(aggColInfo.BaseColId);
}
}
-}
-void CompileProjection(const TKqpOlapExtractMembers& extractMembers, TKqpOlapCompileContext& ctx) {
- if (extractMembers.Members().Size() == 0) {
- // Case for single COUNT(*), no columns are extracted
- return;
+ for (auto keyCol : aggNode.KeyColumns()) {
+ auto aggKeyCol = groupBy->AddKeyColumns();
+ auto keyColName = keyCol.StringValue();
+ auto aggKeyColId = GetOrCreateColumnId(keyCol, ctx);
+ aggKeyCol->SetId(aggKeyColId);
}
+}
+
+void CompileFinalProjection(TKqpOlapCompileContext& ctx) {
+ auto resultColNames = ctx.GetResultColNames();
+ YQL_ENSURE(!resultColNames.empty());
+
auto* projection = ctx.CreateProjection();
- for (auto col : extractMembers.Members()) {
- auto colName = col.StringValue();
- auto colId = GetOrCreateColumnId(col, ctx);
+ for (auto colName : resultColNames) {
+ auto colId = ctx.GetColumnId(colName);
auto* projCol = projection->AddColumns();
projCol->SetId(colId);
@@ -459,14 +471,10 @@ void CompileOlapProgramImpl(TExprBase operation, TKqpOlapCompileContext& ctx) {
CompileOlapProgramImpl(maybeOlapOperation.Cast().Input(), ctx);
if (auto maybeFilter = operation.Maybe<TKqpOlapFilter>()) {
CompileFilter(maybeFilter.Cast(), ctx);
- return;
} else if (auto maybeAgg = operation.Maybe<TKqpOlapAgg>()) {
CompileAggregates(maybeAgg.Cast(), ctx);
- return;
- } else if (auto maybeExtractMembers = operation.Maybe<TKqpOlapExtractMembers>()) {
- CompileProjection(maybeExtractMembers.Cast(), ctx);
- return;
}
+ return;
}
YQL_ENSURE(operation.Maybe<TCallable>(), "Unexpected OLAP operation node type: " << operation.Ref().Type());
@@ -476,13 +484,17 @@ void CompileOlapProgramImpl(TExprBase operation, TKqpOlapCompileContext& ctx) {
} // namespace
void CompileOlapProgram(const TCoLambda& lambda, const TKikimrTableMetadata& tableMeta,
- NKqpProto::TKqpPhyOpReadOlapRanges& readProto)
+ NKqpProto::TKqpPhyOpReadOlapRanges& readProto, const std::vector<std::string>& resultColNames)
{
YQL_ENSURE(lambda.Args().Size() == 1);
- TKqpOlapCompileContext ctx(lambda.Args().Arg(0), tableMeta, readProto);
+ TKqpOlapCompileContext ctx(lambda.Args().Arg(0), tableMeta, readProto, resultColNames);
CompileOlapProgramImpl(lambda.Body(), ctx);
+ if (!ctx.IsEmptyProgram()) {
+ CompileFinalProjection(ctx);
+ }
+
ctx.SerializeToProto();
}
diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.h b/ydb/core/kqp/query_compiler/kqp_olap_compiler.h
index 17ca3d0b3ca..dff2d703386 100644
--- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.h
+++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.h
@@ -9,7 +9,7 @@ namespace NKikimr {
namespace NKqp {
void CompileOlapProgram(const NYql::NNodes::TCoLambda& lambda, const NYql::TKikimrTableMetadata& tableMeta,
- NKqpProto::TKqpPhyOpReadOlapRanges& readProto);
+ NKqpProto::TKqpPhyOpReadOlapRanges& readProto, const std::vector<std::string>& resultColNames);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index e3b3aeee9c2..6a365a94331 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -330,10 +330,27 @@ void FillLookup(const TKqpLookupTable& lookup, NKqpProto::TKqpPhyOpLookup& looku
}
}
-void FillOlapProgram(const TCoLambda& process, const TKikimrTableMetadata& tableMeta,
- NKqpProto::TKqpPhyOpReadOlapRanges& readProto)
+std::vector<std::string> GetResultColumnNames(const NKikimr::NMiniKQL::TType* resultType) {
+ YQL_ENSURE(resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct
+ || resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Tuple);
+
+ auto* resultStructType = static_cast<const NKikimr::NMiniKQL::TStructType*>(resultType);
+ ui32 resultColsCount = resultStructType->GetMembersCount();
+
+ std::vector<std::string> resultColNames;
+ resultColNames.reserve(resultColsCount);
+
+ for (ui32 i = 0; i < resultColsCount; ++i) {
+ resultColNames.emplace_back(resultStructType->GetMemberName(i));
+ }
+ return resultColNames;
+}
+
+void FillOlapProgram(const TCoLambda& process, const NKikimr::NMiniKQL::TType* miniKqlResultType,
+ const TKikimrTableMetadata& tableMeta, NKqpProto::TKqpPhyOpReadOlapRanges& readProto)
{
- CompileOlapProgram(process, tableMeta, readProto);
+ auto resultColNames = GetResultColumnNames(miniKqlResultType);
+ CompileOlapProgram(process, tableMeta, readProto, resultColNames);
}
class TKqpQueryCompiler : public IKqpQueryCompiler {
@@ -527,8 +544,9 @@ private:
FillTable(readTableRanges.Table(), *tableOp.MutableTable());
FillColumns(readTableRanges.Columns(), *tableMeta, tableOp, true);
FillReadRanges(readTableRanges, *tableMeta, *tableOp.MutableReadOlapRange());
- FillOlapProgram(readTableRanges.Process(), *tableMeta, *tableOp.MutableReadOlapRange());
- FillResultType(readTableRanges.Process().Ref().GetTypeAnn(), *tableOp.MutableReadOlapRange());
+ auto miniKqlResultType = GetMKqlResultType(readTableRanges.Process().Ref().GetTypeAnn());
+ FillOlapProgram(readTableRanges.Process(), miniKqlResultType, *tableMeta, *tableOp.MutableReadOlapRange());
+ FillResultType(miniKqlResultType, *tableOp.MutableReadOlapRange());
} else if (node.Maybe<TCoSort>()) {
hasSort = true;
} else if (node.Maybe<TCoMapJoinCore>()) {
@@ -828,12 +846,17 @@ private:
YQL_ENSURE(false, "Unexpected connection type: " << connection.CallableName());
}
- void FillResultType(const TTypeAnnotationNode* resultType, NKqpProto::TKqpPhyOpReadOlapRanges& opProto)
+ void FillResultType(NKikimr::NMiniKQL::TType* miniKqlResultType, NKqpProto::TKqpPhyOpReadOlapRanges& opProto)
+ {
+ ExportTypeToProto(miniKqlResultType, *opProto.MutableResultType());
+ }
+
+ NKikimr::NMiniKQL::TType* GetMKqlResultType(const TTypeAnnotationNode* resultType)
{
YQL_ENSURE(resultType->GetKind() == NYql::ETypeAnnotationKind::Flow, "Unexpected type: " << NYql::FormatType(resultType));
TProgramBuilder pgmBuilder(TypeEnv, FuncRegistry);
const auto resultItemType = resultType->Cast<TFlowExprType>()->GetItemType();
- ExportTypeToProto(CompileType(pgmBuilder, *resultItemType), *opProto.MutableResultType());
+ return CompileType(pgmBuilder, *resultItemType);
}
private:
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index d4e4030ec23..471791d7d6d 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -396,7 +396,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
};
}
- void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient, const std::vector<std::string>& planNodes) {
+ void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient, const std::vector<std::string>& planNodes,
+ const std::string& readNodeType)
+ {
TStreamExecScanQuerySettings scanSettings;
scanSettings.Explain(true);
auto res = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync();
@@ -412,6 +414,23 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_C(ast.find(planNode) != std::string::npos,
TStringBuilder() << planNode << " was not pushed down. Query: " << query);
}
+
+ if (!readNodeType.empty()) {
+ NJson::TJsonValue planJson;
+ NJson::ReadJsonTree(*planRes.PlanJson, &planJson, true);
+ auto readNode = FindPlanNodeByKv(planJson, "Node Type", readNodeType.c_str());
+ UNIT_ASSERT(readNode.IsDefined());
+
+ auto& operators = readNode.GetMapSafe().at("Operators").GetArraySafe();
+ for (auto& op : operators) {
+ if (op.GetMapSafe().at("Name") == "TableFullScan") {
+ auto ssaProgram = op.GetMapSafe().at("SsaProgram");
+ UNIT_ASSERT(ssaProgram.IsDefined());
+ UNIT_ASSERT(FindPlanNodes(ssaProgram, "Projection").size());
+ break;
+ }
+ }
+ }
}
Y_UNIT_TEST(SimpleQueryOlap) {
@@ -1220,7 +1239,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[23000u;]])");
// Check plan
- CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" });
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }, "TableFullScan");
}
}
@@ -1264,7 +1283,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[[0];4600u];[[1];4600u];[[2];4600u];[[3];4600u];[[4];4600u]])");
// Check plan
- CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" });
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }, "TableFullScan");
}
}
@@ -1307,7 +1326,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[23000u;]])");
// Check plan
- CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" });
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }, "TableFullScan");
}
}
@@ -1417,6 +1436,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TString ExpectedReply;
std::vector<std::string> ExpectedPlanOptions;
bool Pushdown = true;
+ std::string ExpectedReadNodeType;
TExpectedLimitChecker LimitChecker;
TExpectedRecordChecker RecordChecker;
public:
@@ -1470,6 +1490,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
const std::vector<std::string>& GetExpectedPlanOptions() const {
return ExpectedPlanOptions;
}
+
+ TAggregationTestCase& SetExpectedReadNodeType(const std::string& value) {
+ ExpectedReadNodeType = value;
+ return *this;
+ }
+
+ const std::string& GetExpectedReadNodeType() const {
+ return ExpectedReadNodeType;
+ }
};
void TestAggregationsBase(const std::vector<TAggregationTestCase>& cases) {
@@ -1502,7 +1531,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, i.GetExpectedReply());
}
}
- CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions());
+ CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
}
}
@@ -1825,6 +1854,22 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TestAggregations({ testCase });
}
+ Y_UNIT_TEST(Aggregation_ProjectionOrder) {
+ TAggregationTestCase testCase;
+ testCase.SetQuery(R"(
+ SELECT
+ resource_id, level, count(*) as c
+ FROM `/Root/olapStore/olapTable`
+ GROUP BY resource_id, level
+ ORDER BY c, resource_id DESC LIMIT 3
+ )")
+ .SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]")
+ .AddExpectedPlanOptions("TKqpOlapAgg")
+ .SetExpectedReadNodeType("TableFullScan");
+
+ TestAggregations({ testCase });
+ }
+
Y_UNIT_TEST(StatsSysView) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
diff --git a/ydb/core/tx/columnshard/columnshard_common.cpp b/ydb/core/tx/columnshard/columnshard_common.cpp
index 5e1e0fe3766..f4c74b4d5ca 100644
--- a/ydb/core/tx/columnshard/columnshard_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_common.cpp
@@ -322,12 +322,6 @@ bool ExtractGroupBy(const TContext& info, NSsa::TProgramStep& step, const NKikim
return false;
}
- // It adds implicit projection with aggregates and keys. Remove non aggregated columns.
- step.Projection.reserve(groupBy.KeyColumnsSize() + groupBy.AggregatesSize());
- for (auto& col : groupBy.GetKeyColumns()) {
- step.Projection.push_back(info.GetName(col));
- }
-
step.GroupBy.reserve(groupBy.AggregatesSize());
step.GroupByKeys.reserve(groupBy.KeyColumnsSize());
for (auto& agg : groupBy.GetAggregates()) {
@@ -339,7 +333,6 @@ bool ExtractGroupBy(const TContext& info, NSsa::TProgramStep& step, const NKikim
return false;
}
step.GroupBy.push_back(std::move(func));
- step.Projection.push_back(columnName);
}
for (auto& key : groupBy.GetKeyColumns()) {
step.GroupByKeys.push_back(info.GetName(key));
diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
index 573b1003855..408efc1422e 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
@@ -261,16 +261,19 @@ bool CheckOrdered(const TString& blob, const TString& srtSchema) {
bool CheckColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& colNames, size_t rowsCount) {
UNIT_ASSERT(batch);
- for (auto& col : colNames) {
- if (!batch->GetColumnByName(col)) {
- Cerr << "schema: " << batch->schema()->ToString() << "\n";
+ UNIT_ASSERT_VALUES_EQUAL((ui64)batch->num_columns(), colNames.size());
+ UNIT_ASSERT_VALUES_EQUAL((ui64)batch->num_rows(), rowsCount);
+ UNIT_ASSERT(batch->ValidateFull().ok());
+
+ for (size_t i = 0; i < colNames.size(); ++i) {
+ auto batchColName = batch->schema()->field(i)->name();
+ if (batchColName != colNames[i]) {
+ Cerr << "Incorrect order of columns. Expected: `" << colNames[i] << "` got: `" << batchColName << "`.\n";
+ Cerr << "Batch schema: " << batch->schema()->ToString() << "\n";
return false;
}
}
- UNIT_ASSERT_VALUES_EQUAL((ui64)batch->num_columns(), colNames.size());
- UNIT_ASSERT_VALUES_EQUAL((ui64)batch->num_rows(), rowsCount);
- UNIT_ASSERT(batch->ValidateFull().ok());
return true;
}
@@ -1068,11 +1071,11 @@ void TestCompactionInGranuleImpl(bool reboots,
using TAssignment = NKikimrSSA::TProgram::TAssignment;
using TAggAssignment = NKikimrSSA::TProgram::TAggregateAssignment;
-// SELECT timestamp FROM t WHERE timestamp <op> saved_at
+// SELECT level, timestamp FROM t WHERE timestamp <op> saved_at
static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssignment::FUNC_CMP_EQUAL) {
NKikimrSSA::TProgram ssa;
- std::vector<ui32> columnIds = {1, 9};
+ std::vector<ui32> columnIds = {1, 9, 5};
ui32 tmpColumnId = 100;
auto* line1 = ssa.AddCommand();
@@ -1087,6 +1090,7 @@ static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssig
line2->MutableFilter()->MutablePredicate()->SetId(tmpColumnId);
auto* line3 = ssa.AddCommand();
+ line3->MutableProjection()->AddColumns()->SetId(columnIds[2]);
line3->MutableProjection()->AddColumns()->SetId(columnIds[0]);
return ssa;
}
@@ -1321,11 +1325,11 @@ void TestReadWithProgram(const TestTableDescription& table = {})
switch (i) {
case 1:
- UNIT_ASSERT(CheckColumns(readData[0], meta, {"timestamp"}));
+ UNIT_ASSERT(CheckColumns(readData[0], meta, {"level", "timestamp"}));
UNIT_ASSERT(DataHas(readData, schema, {0, 100}, true));
break;
case 2:
- UNIT_ASSERT(CheckColumns(readData[0], meta, {"timestamp"}, 0));
+ UNIT_ASSERT(CheckColumns(readData[0], meta, {"level", "timestamp"}, 0));
break;
default:
break;