diff options
author | Artem Zuikov <[email protected]> | 2022-05-23 03:37:07 +0300 |
---|---|---|
committer | Artem Zuikov <[email protected]> | 2022-05-23 03:37:07 +0300 |
commit | 36b979155c2920bedad7f9aaa0e6c1503ca32e1f (patch) | |
tree | a80ec094f7dc6ab860a57b3a74ed609fd5ca08a7 | |
parent | d17840111fdb9918731661ca8bb6100cea295471 (diff) |
KIKIMR-14822: SSA improvements
ref:4d8bf98afc4ca447411112b192cc9599ab35f9e5
-rw-r--r-- | ydb/core/protos/ssa.proto | 21 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__read.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_common.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_common.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/read_actor.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_columnshard_read_write.cpp | 256 |
6 files changed, 284 insertions, 35 deletions
diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto index eebf965f9ae..0b6774cf17c 100644 --- a/ydb/core/protos/ssa.proto +++ b/ydb/core/protos/ssa.proto @@ -98,12 +98,29 @@ message TProgram { message TAggregateAssignment { enum EAggregateFunction { AGG_UNSPECIFIED = 0; - // TODO + AGG_ANY = 1; + AGG_COUNT = 2; + //AGG_MIN = 3; + //AGG_MAX = 4; + //AGG_SUM = 5; + //AGG_AVG = 6; + //AGG_VAR = 7; + //AGG_COVAR = 8; + //AGG_STDDEV = 9; + //AGG_CORR = 10; + //AGG_ARG_MIN = 11; + //AGG_ARG_MAX = 12; + //AGG_COUNT_DISTINCT = 13; + //AGG_QUANTILES = 14; + //AGG_TOP_COUNT = 15; + //AGG_TOP_SUM = 16; } message TAggregateFunction { optional uint32 Id = 1; // EAggregateFunction repeated TColumn Arguments = 2; + optional string Variant = 3; // i.e. POP/SAMP for AGG_VAR, AGG_COVAR, AGG_STDDEV + // TODO: Parameters, i.e. N for topK(N)(arg) } optional TColumn Column = 1; @@ -147,4 +164,4 @@ message TOlapProgram { // RecordBatch deserialization require arrow::Schema, thus store it here optional bytes ParametersSchema = 2; optional bytes Parameters = 3; -}
\ No newline at end of file +} diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index 1db2290a4ef..8b9c7313731 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -204,8 +204,10 @@ bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapP read.ProgramParameters = NArrow::DeserializeBatch(olapProgram.GetParameters(), schema); } - read.AddProgram(columnResolver, program); - + if (!read.AddProgram(columnResolver, program)) { + ErrorDescription = TStringBuilder() << "Wrong olap program"; + return false; + } return true; } diff --git a/ydb/core/tx/columnshard/columnshard_common.cpp b/ydb/core/tx/columnshard/columnshard_common.cpp index 5b663700440..0662e5115f0 100644 --- a/ydb/core/tx/columnshard/columnshard_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_common.cpp @@ -305,7 +305,7 @@ std::pair<TPredicate, TPredicate> RangePredicates(const TSerializedTableRange& r TPredicate(EOperation::Less, rightBorder, NArrow::MakeArrowSchema(rightColumns), toInclusive)); } -void TReadDescription::AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program) +bool TReadDescription::AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program) { using TId = NKikimrSSA::TProgram::TCommand; @@ -325,8 +325,10 @@ void TReadDescription::AddProgram(const IColumnResolver& columnResolver, const N step = std::make_shared<NArrow::TProgramStep>(); break; case TId::kGroupBy: + // TODO + return false; // not implemented case TId::LINE_NOT_SET: - Y_VERIFY(false); // not implemented + Y_VERIFY(false); break; } } @@ -337,6 +339,7 @@ void TReadDescription::AddProgram(const IColumnResolver& columnResolver, const N } ProgramSourceColumns = std::move(info.Sources); + return true; } } diff --git a/ydb/core/tx/columnshard/columnshard_common.h b/ydb/core/tx/columnshard/columnshard_common.h index cf11921a839..bbec34cbbd8 100644 --- a/ydb/core/tx/columnshard/columnshard_common.h +++ b/ydb/core/tx/columnshard/columnshard_common.h @@ -48,7 +48,7 @@ struct TReadDescription { ui64 PlanStep = 0; ui64 TxId = 0; - void AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program); + bool AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program); }; } diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 07b19a4ab99..2fff0b14416 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -60,9 +60,8 @@ public: auto ready = IndexedData.GetReadyResults(Max<i64>()); size_t next = 1; for (auto it = ready.begin(); it != ready.end(); ++it, ++next) { - TString data = NArrow::SerializeBatchNoCompression(it->ResultBatch); bool lastOne = Finished() && (next == ready.size()); - SendResult(ctx, data, lastOne); + SendResult(ctx, it->ResultBatch, lastOne); } DieFinished(ctx); @@ -81,11 +80,16 @@ public: SendResult(ctx, {}, true, status); } - void SendResult(const TActorContext& ctx, TString data, bool finished = false, + void SendResult(const TActorContext& ctx, const std::shared_ptr<arrow::RecordBatch>& batch, bool finished = false, NKikimrTxColumnShard::EResultStatus status = NKikimrTxColumnShard::EResultStatus::SUCCESS) { auto chunkEvent = std::make_unique<TEvColumnShard::TEvReadResult>(*Result); auto& proto = Proto(chunkEvent.get()); + TString data; + if (batch) { + data = NArrow::SerializeBatchNoCompression(batch); + } + if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) { Y_VERIFY(!data.empty()); } @@ -98,7 +102,7 @@ public: auto metadata = proto.MutableMeta(); metadata->SetFormat(NKikimrTxColumnShard::FORMAT_ARROW); - metadata->SetSchema(GetSerializedSchema()); + metadata->SetSchema(GetSerializedSchema(batch)); if (finished) { auto stats = ReadMetadata->ReadStats; auto* proto = metadata->MutableReadStats(); @@ -221,12 +225,19 @@ private: ui32 ReturnedBatchNo; mutable TString SerializedSchema; - TString GetSerializedSchema() const { - if (!SerializedSchema.empty()) { + TString GetSerializedSchema(const std::shared_ptr<arrow::RecordBatch>& batch) const { + Y_VERIFY(ReadMetadata->ResultSchema); + + // TODO: make real ResultSchema with SSA effects + if (ReadMetadata->ResultSchema->Equals(batch->schema())) { + if (!SerializedSchema.empty()) { + return SerializedSchema; + } + SerializedSchema = NArrow::SerializeSchema(*ReadMetadata->ResultSchema); return SerializedSchema; } - SerializedSchema = NArrow::SerializeSchema(*ReadMetadata->ResultSchema); - return SerializedSchema; + + return NArrow::SerializeSchema(*batch->schema()); } }; diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp index f7467e6e301..da93bf78de8 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp @@ -135,7 +135,7 @@ void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, PlanSchemaTx(runtime, sender, snap); } -void TestWriteImpl(const TVector<std::pair<TString, TTypeId>>& ydbSchema) { +void TestWrite(const TVector<std::pair<TString, TTypeId>>& ydbSchema) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -278,8 +278,8 @@ void TestWriteReadDup() { } } -void TestWriteReadImpl(bool reboots, const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema(), - TString codec = "") { +void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema(), + TString codec = "") { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -856,7 +856,62 @@ void TestCompactionInGranuleImpl(bool reboots) { } } -void TestReadWithProgramImpl() +using TAssignment = NKikimrSSA::TProgram::TAssignment; +using TAggAssignment = NKikimrSSA::TProgram::TAggregateAssignment; + +// SELECT timestamp FROM t WHERE timestamp <op> saved_at +static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssignment::FUNC_CMP_EQUAL) { + NKikimrSSA::TProgram ssa; + + std::vector<ui32> columnIds = {1, 9}; + ui32 tmpColumnId = 100; + + auto* line1 = ssa.AddCommand(); + auto* l1_assign = line1->MutableAssign(); + l1_assign->MutableColumn()->SetId(tmpColumnId); + auto* l1_func = l1_assign->MutableFunction(); + l1_func->SetId(compareId); + l1_func->AddArguments()->SetId(columnIds[0]); + l1_func->AddArguments()->SetId(columnIds[1]); + + auto* line2 = ssa.AddCommand(); + line2->MutableFilter()->MutablePredicate()->SetId(tmpColumnId); + + auto* line3 = ssa.AddCommand(); + line3->MutableProjection()->AddColumns()->SetId(columnIds[0]); + return ssa; +} + +// SELECT some(timestamp), some(saved_at) FROM t +NKikimrSSA::TProgram MakeSelectAggregates(TAggAssignment::EAggregateFunction aggId = TAggAssignment::AGG_ANY) { + NKikimrSSA::TProgram ssa; + + std::vector<ui32> columnIds = {1, 9}; + ui32 tmpColumnId = 100; + + auto* line1 = ssa.AddCommand(); + auto* groupBy = line1->MutableGroupBy(); + // + auto* l1_agg1 = groupBy->AddAggregates(); + l1_agg1->MutableColumn()->SetId(tmpColumnId); + auto* l1_agg1_f = l1_agg1->MutableFunction(); + l1_agg1_f->SetId(aggId); + l1_agg1_f->AddArguments()->SetId(columnIds[0]); + // + auto* l1_agg2 = groupBy->AddAggregates(); + l1_agg2->MutableColumn()->SetId(tmpColumnId + 1); + auto* l1_agg2_f = l1_agg2->MutableFunction(); + l1_agg2_f->SetId(aggId); + l1_agg2_f->AddArguments()->SetId(columnIds[1]); + + auto* line2 = ssa.AddCommand(); + auto* proj = line2->MutableProjection(); + proj->AddColumns()->SetId(tmpColumnId); + proj->AddColumns()->SetId(tmpColumnId + 1); + return ssa; +} + +void TestReadWithProgram(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema()) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -869,15 +924,53 @@ void TestReadWithProgramImpl() runtime.DispatchEvents(options); ui64 metaShard = TTestTxConfig::TxTablet1; + ui64 writeId = 0; ui64 tableId = 1; + ui64 planStep = 100; + ui64 txId = 100; SetupSchema(runtime, sender, tableId); + + { // write some data + bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema)); + UNIT_ASSERT(ok); + + ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + PlanCommit(runtime, sender, planStep, txId); + } + + ui32 numWrong = 1; + std::vector<TString> programs; + programs.push_back("XXXYYYZZZ"); + + { + NKikimrSSA::TProgram ssa = MakeSelect(TAssignment::FUNC_CMP_EQUAL); + TString serialized; + UNIT_ASSERT(ssa.SerializeToString(&serialized)); + NKikimrSSA::TOlapProgram program; + program.SetProgram(serialized); + + programs.push_back(""); + UNIT_ASSERT(program.SerializeToString(&programs.back())); + } + { - auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, 0, 0, tableId); + NKikimrSSA::TProgram ssa = MakeSelect(TAssignment::FUNC_CMP_NOT_EQUAL); + TString serialized; + UNIT_ASSERT(ssa.SerializeToString(&serialized)); + NKikimrSSA::TOlapProgram program; + program.SetProgram(serialized); + + programs.push_back(""); + UNIT_ASSERT(program.SerializeToString(&programs.back())); + } + + for (auto& programText : programs) { + auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId); auto& readProto = Proto(readEvent); readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM); - readProto.SetOlapProgram("XXXYYYZZZ"); + readProto.SetOlapProgram(programText); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent); @@ -895,12 +988,116 @@ void TestReadWithProgramImpl() UNIT_ASSERT_EQUAL(resRead.GetData(), ""); } + ui32 i = 0; + for (auto& programText : programs) { + auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId); + auto& readProto = Proto(readEvent); + + readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS); + readProto.SetOlapProgram(programText); + + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent); + + TAutoPtr<IEventHandle> handle; + auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); + UNIT_ASSERT(result); + + auto& resRead = Proto(result); + UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); + UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); + if (i < numWrong) { + UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR); + UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); + UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); + UNIT_ASSERT_EQUAL(resRead.GetData(), ""); + } else { + UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); + UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); + UNIT_ASSERT(resRead.GetData().size() > 0); + + auto& meta = resRead.GetMeta(); + auto& schema = meta.GetSchema(); + + TVector<TString> readData; + readData.push_back(resRead.GetData()); + + switch (i) { + case 1: + UNIT_ASSERT(CheckColumns(readData[0], meta, {"timestamp"})); + UNIT_ASSERT(DataHas(readData, schema, {0, 100}, true)); + break; + case 2: + UNIT_ASSERT(CheckColumns(readData[0], meta, {"timestamp"}, 0)); + break; + default: + break; + } + } + ++i; + } +} + +void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema()) { + TTestBasicRuntime runtime; + TTester::Setup(runtime); + + TActorId sender = runtime.AllocateEdgeActor(); + CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + runtime.DispatchEvents(options); + + ui64 metaShard = TTestTxConfig::TxTablet1; + ui64 writeId = 0; + ui64 tableId = 1; + ui64 planStep = 100; + ui64 txId = 100; + + SetupSchema(runtime, sender, tableId); + + { // write some data + bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema)); + UNIT_ASSERT(ok); + + ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + PlanCommit(runtime, sender, planStep, txId); + } + + // TODO: write some into index + + std::vector<TString> programs; + + { + NKikimrSSA::TProgram ssa = MakeSelectAggregates(TAggAssignment::AGG_ANY); + TString serialized; + UNIT_ASSERT(ssa.SerializeToString(&serialized)); + NKikimrSSA::TOlapProgram program; + program.SetProgram(serialized); + + programs.push_back(""); + UNIT_ASSERT(program.SerializeToString(&programs.back())); + } + { - auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, 0, 0, tableId); + NKikimrSSA::TProgram ssa = MakeSelectAggregates(TAggAssignment::AGG_COUNT); + TString serialized; + UNIT_ASSERT(ssa.SerializeToString(&serialized)); + NKikimrSSA::TOlapProgram program; + program.SetProgram(serialized); + + programs.push_back(""); + UNIT_ASSERT(program.SerializeToString(&programs.back())); + } + + ui32 i = 0; + for (auto& programText : programs) { + auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId); auto& readProto = Proto(readEvent); readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS); - readProto.SetOlapProgram("XXXYYYZZZ"); + readProto.SetOlapProgram(programText); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent); @@ -911,10 +1108,25 @@ void TestReadWithProgramImpl() auto& resRead = Proto(result); UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); +#if 0 // TODO + { + UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); + UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); + UNIT_ASSERT(resRead.GetData().size() > 0); + + auto& meta = resRead.GetMeta(); + //auto& schema = meta.GetSchema(); + + TVector<TString> readData; + readData.push_back(resRead.GetData()); + + UNIT_ASSERT(CheckColumns(readData[0], meta, {"100", "101"}, 1)); + } +#else UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT_EQUAL(resRead.GetData(), ""); +#endif + ++i; } } @@ -922,11 +1134,11 @@ void TestReadWithProgramImpl() Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { Y_UNIT_TEST(Write) { - TestWriteImpl(TTestSchema::YdbSchema()); + TestWrite(TTestSchema::YdbSchema()); } Y_UNIT_TEST(WriteExoticTypes) { - TestWriteImpl(TTestSchema::YdbExoticSchema()); + TestWrite(TTestSchema::YdbExoticSchema()); } Y_UNIT_TEST(WriteReadDuplicate) { @@ -934,23 +1146,23 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } Y_UNIT_TEST(WriteRead) { - TestWriteReadImpl(false); + TestWriteRead(false); } Y_UNIT_TEST(WriteReadExoticTypes) { - TestWriteReadImpl(false, TTestSchema::YdbExoticSchema()); + TestWriteRead(false, TTestSchema::YdbExoticSchema()); } Y_UNIT_TEST(RebootWriteRead) { - TestWriteReadImpl(true); + TestWriteRead(true); } Y_UNIT_TEST(WriteReadNoCompression) { - TestWriteReadImpl(true, TTestSchema::YdbSchema(), "none"); + TestWriteRead(true, TTestSchema::YdbSchema(), "none"); } Y_UNIT_TEST(WriteReadZSTD) { - TestWriteReadImpl(true, TTestSchema::YdbSchema(), "zstd"); + TestWriteRead(true, TTestSchema::YdbSchema(), "zstd"); } Y_UNIT_TEST(CompactionInGranule) { @@ -961,8 +1173,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { TestCompactionInGranuleImpl(true); } - Y_UNIT_TEST(TestReadWithProgram) { - TestReadWithProgramImpl(); + Y_UNIT_TEST(ReadWithProgram) { + TestReadWithProgram(); + } + + Y_UNIT_TEST(ReadAggregate) { + TestReadAggregate(); } Y_UNIT_TEST(CompactionSplitGranule) { |