summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <[email protected]>2022-05-23 03:37:07 +0300
committerArtem Zuikov <[email protected]>2022-05-23 03:37:07 +0300
commit36b979155c2920bedad7f9aaa0e6c1503ca32e1f (patch)
treea80ec094f7dc6ab860a57b3a74ed609fd5ca08a7
parentd17840111fdb9918731661ca8bb6100cea295471 (diff)
KIKIMR-14822: SSA improvements
ref:4d8bf98afc4ca447411112b192cc9599ab35f9e5
-rw-r--r--ydb/core/protos/ssa.proto21
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard_common.cpp7
-rw-r--r--ydb/core/tx/columnshard/columnshard_common.h2
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp27
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_read_write.cpp256
6 files changed, 284 insertions, 35 deletions
diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto
index eebf965f9ae..0b6774cf17c 100644
--- a/ydb/core/protos/ssa.proto
+++ b/ydb/core/protos/ssa.proto
@@ -98,12 +98,29 @@ message TProgram {
message TAggregateAssignment {
enum EAggregateFunction {
AGG_UNSPECIFIED = 0;
- // TODO
+ AGG_ANY = 1;
+ AGG_COUNT = 2;
+ //AGG_MIN = 3;
+ //AGG_MAX = 4;
+ //AGG_SUM = 5;
+ //AGG_AVG = 6;
+ //AGG_VAR = 7;
+ //AGG_COVAR = 8;
+ //AGG_STDDEV = 9;
+ //AGG_CORR = 10;
+ //AGG_ARG_MIN = 11;
+ //AGG_ARG_MAX = 12;
+ //AGG_COUNT_DISTINCT = 13;
+ //AGG_QUANTILES = 14;
+ //AGG_TOP_COUNT = 15;
+ //AGG_TOP_SUM = 16;
}
message TAggregateFunction {
optional uint32 Id = 1; // EAggregateFunction
repeated TColumn Arguments = 2;
+ optional string Variant = 3; // i.e. POP/SAMP for AGG_VAR, AGG_COVAR, AGG_STDDEV
+ // TODO: Parameters, i.e. N for topK(N)(arg)
}
optional TColumn Column = 1;
@@ -147,4 +164,4 @@ message TOlapProgram {
// RecordBatch deserialization require arrow::Schema, thus store it here
optional bytes ParametersSchema = 2;
optional bytes Parameters = 3;
-} \ No newline at end of file
+}
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index 1db2290a4ef..8b9c7313731 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -204,8 +204,10 @@ bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapP
read.ProgramParameters = NArrow::DeserializeBatch(olapProgram.GetParameters(), schema);
}
- read.AddProgram(columnResolver, program);
-
+ if (!read.AddProgram(columnResolver, program)) {
+ ErrorDescription = TStringBuilder() << "Wrong olap program";
+ return false;
+ }
return true;
}
diff --git a/ydb/core/tx/columnshard/columnshard_common.cpp b/ydb/core/tx/columnshard/columnshard_common.cpp
index 5b663700440..0662e5115f0 100644
--- a/ydb/core/tx/columnshard/columnshard_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_common.cpp
@@ -305,7 +305,7 @@ std::pair<TPredicate, TPredicate> RangePredicates(const TSerializedTableRange& r
TPredicate(EOperation::Less, rightBorder, NArrow::MakeArrowSchema(rightColumns), toInclusive));
}
-void TReadDescription::AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program)
+bool TReadDescription::AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program)
{
using TId = NKikimrSSA::TProgram::TCommand;
@@ -325,8 +325,10 @@ void TReadDescription::AddProgram(const IColumnResolver& columnResolver, const N
step = std::make_shared<NArrow::TProgramStep>();
break;
case TId::kGroupBy:
+ // TODO
+ return false; // not implemented
case TId::LINE_NOT_SET:
- Y_VERIFY(false); // not implemented
+ Y_VERIFY(false);
break;
}
}
@@ -337,6 +339,7 @@ void TReadDescription::AddProgram(const IColumnResolver& columnResolver, const N
}
ProgramSourceColumns = std::move(info.Sources);
+ return true;
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_common.h b/ydb/core/tx/columnshard/columnshard_common.h
index cf11921a839..bbec34cbbd8 100644
--- a/ydb/core/tx/columnshard/columnshard_common.h
+++ b/ydb/core/tx/columnshard/columnshard_common.h
@@ -48,7 +48,7 @@ struct TReadDescription {
ui64 PlanStep = 0;
ui64 TxId = 0;
- void AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program);
+ bool AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program);
};
}
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 07b19a4ab99..2fff0b14416 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -60,9 +60,8 @@ public:
auto ready = IndexedData.GetReadyResults(Max<i64>());
size_t next = 1;
for (auto it = ready.begin(); it != ready.end(); ++it, ++next) {
- TString data = NArrow::SerializeBatchNoCompression(it->ResultBatch);
bool lastOne = Finished() && (next == ready.size());
- SendResult(ctx, data, lastOne);
+ SendResult(ctx, it->ResultBatch, lastOne);
}
DieFinished(ctx);
@@ -81,11 +80,16 @@ public:
SendResult(ctx, {}, true, status);
}
- void SendResult(const TActorContext& ctx, TString data, bool finished = false,
+ void SendResult(const TActorContext& ctx, const std::shared_ptr<arrow::RecordBatch>& batch, bool finished = false,
NKikimrTxColumnShard::EResultStatus status = NKikimrTxColumnShard::EResultStatus::SUCCESS) {
auto chunkEvent = std::make_unique<TEvColumnShard::TEvReadResult>(*Result);
auto& proto = Proto(chunkEvent.get());
+ TString data;
+ if (batch) {
+ data = NArrow::SerializeBatchNoCompression(batch);
+ }
+
if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
Y_VERIFY(!data.empty());
}
@@ -98,7 +102,7 @@ public:
auto metadata = proto.MutableMeta();
metadata->SetFormat(NKikimrTxColumnShard::FORMAT_ARROW);
- metadata->SetSchema(GetSerializedSchema());
+ metadata->SetSchema(GetSerializedSchema(batch));
if (finished) {
auto stats = ReadMetadata->ReadStats;
auto* proto = metadata->MutableReadStats();
@@ -221,12 +225,19 @@ private:
ui32 ReturnedBatchNo;
mutable TString SerializedSchema;
- TString GetSerializedSchema() const {
- if (!SerializedSchema.empty()) {
+ TString GetSerializedSchema(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ Y_VERIFY(ReadMetadata->ResultSchema);
+
+ // TODO: make real ResultSchema with SSA effects
+ if (ReadMetadata->ResultSchema->Equals(batch->schema())) {
+ if (!SerializedSchema.empty()) {
+ return SerializedSchema;
+ }
+ SerializedSchema = NArrow::SerializeSchema(*ReadMetadata->ResultSchema);
return SerializedSchema;
}
- SerializedSchema = NArrow::SerializeSchema(*ReadMetadata->ResultSchema);
- return SerializedSchema;
+
+ return NArrow::SerializeSchema(*batch->schema());
}
};
diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
index f7467e6e301..da93bf78de8 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
@@ -135,7 +135,7 @@ void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
PlanSchemaTx(runtime, sender, snap);
}
-void TestWriteImpl(const TVector<std::pair<TString, TTypeId>>& ydbSchema) {
+void TestWrite(const TVector<std::pair<TString, TTypeId>>& ydbSchema) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -278,8 +278,8 @@ void TestWriteReadDup() {
}
}
-void TestWriteReadImpl(bool reboots, const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema(),
- TString codec = "") {
+void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema(),
+ TString codec = "") {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -856,7 +856,62 @@ void TestCompactionInGranuleImpl(bool reboots) {
}
}
-void TestReadWithProgramImpl()
+using TAssignment = NKikimrSSA::TProgram::TAssignment;
+using TAggAssignment = NKikimrSSA::TProgram::TAggregateAssignment;
+
+// SELECT timestamp FROM t WHERE timestamp <op> saved_at
+static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssignment::FUNC_CMP_EQUAL) {
+ NKikimrSSA::TProgram ssa;
+
+ std::vector<ui32> columnIds = {1, 9};
+ ui32 tmpColumnId = 100;
+
+ auto* line1 = ssa.AddCommand();
+ auto* l1_assign = line1->MutableAssign();
+ l1_assign->MutableColumn()->SetId(tmpColumnId);
+ auto* l1_func = l1_assign->MutableFunction();
+ l1_func->SetId(compareId);
+ l1_func->AddArguments()->SetId(columnIds[0]);
+ l1_func->AddArguments()->SetId(columnIds[1]);
+
+ auto* line2 = ssa.AddCommand();
+ line2->MutableFilter()->MutablePredicate()->SetId(tmpColumnId);
+
+ auto* line3 = ssa.AddCommand();
+ line3->MutableProjection()->AddColumns()->SetId(columnIds[0]);
+ return ssa;
+}
+
+// SELECT some(timestamp), some(saved_at) FROM t
+NKikimrSSA::TProgram MakeSelectAggregates(TAggAssignment::EAggregateFunction aggId = TAggAssignment::AGG_ANY) {
+ NKikimrSSA::TProgram ssa;
+
+ std::vector<ui32> columnIds = {1, 9};
+ ui32 tmpColumnId = 100;
+
+ auto* line1 = ssa.AddCommand();
+ auto* groupBy = line1->MutableGroupBy();
+ //
+ auto* l1_agg1 = groupBy->AddAggregates();
+ l1_agg1->MutableColumn()->SetId(tmpColumnId);
+ auto* l1_agg1_f = l1_agg1->MutableFunction();
+ l1_agg1_f->SetId(aggId);
+ l1_agg1_f->AddArguments()->SetId(columnIds[0]);
+ //
+ auto* l1_agg2 = groupBy->AddAggregates();
+ l1_agg2->MutableColumn()->SetId(tmpColumnId + 1);
+ auto* l1_agg2_f = l1_agg2->MutableFunction();
+ l1_agg2_f->SetId(aggId);
+ l1_agg2_f->AddArguments()->SetId(columnIds[1]);
+
+ auto* line2 = ssa.AddCommand();
+ auto* proj = line2->MutableProjection();
+ proj->AddColumns()->SetId(tmpColumnId);
+ proj->AddColumns()->SetId(tmpColumnId + 1);
+ return ssa;
+}
+
+void TestReadWithProgram(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema())
{
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -869,15 +924,53 @@ void TestReadWithProgramImpl()
runtime.DispatchEvents(options);
ui64 metaShard = TTestTxConfig::TxTablet1;
+ ui64 writeId = 0;
ui64 tableId = 1;
+ ui64 planStep = 100;
+ ui64 txId = 100;
SetupSchema(runtime, sender, tableId);
+
+ { // write some data
+ bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema));
+ UNIT_ASSERT(ok);
+
+ ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ PlanCommit(runtime, sender, planStep, txId);
+ }
+
+ ui32 numWrong = 1;
+ std::vector<TString> programs;
+ programs.push_back("XXXYYYZZZ");
+
+ {
+ NKikimrSSA::TProgram ssa = MakeSelect(TAssignment::FUNC_CMP_EQUAL);
+ TString serialized;
+ UNIT_ASSERT(ssa.SerializeToString(&serialized));
+ NKikimrSSA::TOlapProgram program;
+ program.SetProgram(serialized);
+
+ programs.push_back("");
+ UNIT_ASSERT(program.SerializeToString(&programs.back()));
+ }
+
{
- auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, 0, 0, tableId);
+ NKikimrSSA::TProgram ssa = MakeSelect(TAssignment::FUNC_CMP_NOT_EQUAL);
+ TString serialized;
+ UNIT_ASSERT(ssa.SerializeToString(&serialized));
+ NKikimrSSA::TOlapProgram program;
+ program.SetProgram(serialized);
+
+ programs.push_back("");
+ UNIT_ASSERT(program.SerializeToString(&programs.back()));
+ }
+
+ for (auto& programText : programs) {
+ auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId);
auto& readProto = Proto(readEvent);
readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM);
- readProto.SetOlapProgram("XXXYYYZZZ");
+ readProto.SetOlapProgram(programText);
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent);
@@ -895,12 +988,116 @@ void TestReadWithProgramImpl()
UNIT_ASSERT_EQUAL(resRead.GetData(), "");
}
+ ui32 i = 0;
+ for (auto& programText : programs) {
+ auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId);
+ auto& readProto = Proto(readEvent);
+
+ readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
+ readProto.SetOlapProgram(programText);
+
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent);
+
+ TAutoPtr<IEventHandle> handle;
+ auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
+ UNIT_ASSERT(result);
+
+ auto& resRead = Proto(result);
+ UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
+ UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
+ if (i < numWrong) {
+ UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR);
+ UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
+ UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
+ UNIT_ASSERT_EQUAL(resRead.GetData(), "");
+ } else {
+ UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
+ UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
+ UNIT_ASSERT(resRead.GetData().size() > 0);
+
+ auto& meta = resRead.GetMeta();
+ auto& schema = meta.GetSchema();
+
+ TVector<TString> readData;
+ readData.push_back(resRead.GetData());
+
+ switch (i) {
+ case 1:
+ UNIT_ASSERT(CheckColumns(readData[0], meta, {"timestamp"}));
+ UNIT_ASSERT(DataHas(readData, schema, {0, 100}, true));
+ break;
+ case 2:
+ UNIT_ASSERT(CheckColumns(readData[0], meta, {"timestamp"}, 0));
+ break;
+ default:
+ break;
+ }
+ }
+ ++i;
+ }
+}
+
+void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema()) {
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+ runtime.DispatchEvents(options);
+
+ ui64 metaShard = TTestTxConfig::TxTablet1;
+ ui64 writeId = 0;
+ ui64 tableId = 1;
+ ui64 planStep = 100;
+ ui64 txId = 100;
+
+ SetupSchema(runtime, sender, tableId);
+
+ { // write some data
+ bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema));
+ UNIT_ASSERT(ok);
+
+ ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ PlanCommit(runtime, sender, planStep, txId);
+ }
+
+ // TODO: write some into index
+
+ std::vector<TString> programs;
+
+ {
+ NKikimrSSA::TProgram ssa = MakeSelectAggregates(TAggAssignment::AGG_ANY);
+ TString serialized;
+ UNIT_ASSERT(ssa.SerializeToString(&serialized));
+ NKikimrSSA::TOlapProgram program;
+ program.SetProgram(serialized);
+
+ programs.push_back("");
+ UNIT_ASSERT(program.SerializeToString(&programs.back()));
+ }
+
{
- auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, 0, 0, tableId);
+ NKikimrSSA::TProgram ssa = MakeSelectAggregates(TAggAssignment::AGG_COUNT);
+ TString serialized;
+ UNIT_ASSERT(ssa.SerializeToString(&serialized));
+ NKikimrSSA::TOlapProgram program;
+ program.SetProgram(serialized);
+
+ programs.push_back("");
+ UNIT_ASSERT(program.SerializeToString(&programs.back()));
+ }
+
+ ui32 i = 0;
+ for (auto& programText : programs) {
+ auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId);
auto& readProto = Proto(readEvent);
readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
- readProto.SetOlapProgram("XXXYYYZZZ");
+ readProto.SetOlapProgram(programText);
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent);
@@ -911,10 +1108,25 @@ void TestReadWithProgramImpl()
auto& resRead = Proto(result);
UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
+#if 0 // TODO
+ {
+ UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
+ UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
+ UNIT_ASSERT(resRead.GetData().size() > 0);
+
+ auto& meta = resRead.GetMeta();
+ //auto& schema = meta.GetSchema();
+
+ TVector<TString> readData;
+ readData.push_back(resRead.GetData());
+
+ UNIT_ASSERT(CheckColumns(readData[0], meta, {"100", "101"}, 1));
+ }
+#else
UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT_EQUAL(resRead.GetData(), "");
+#endif
+ ++i;
}
}
@@ -922,11 +1134,11 @@ void TestReadWithProgramImpl()
Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
Y_UNIT_TEST(Write) {
- TestWriteImpl(TTestSchema::YdbSchema());
+ TestWrite(TTestSchema::YdbSchema());
}
Y_UNIT_TEST(WriteExoticTypes) {
- TestWriteImpl(TTestSchema::YdbExoticSchema());
+ TestWrite(TTestSchema::YdbExoticSchema());
}
Y_UNIT_TEST(WriteReadDuplicate) {
@@ -934,23 +1146,23 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
Y_UNIT_TEST(WriteRead) {
- TestWriteReadImpl(false);
+ TestWriteRead(false);
}
Y_UNIT_TEST(WriteReadExoticTypes) {
- TestWriteReadImpl(false, TTestSchema::YdbExoticSchema());
+ TestWriteRead(false, TTestSchema::YdbExoticSchema());
}
Y_UNIT_TEST(RebootWriteRead) {
- TestWriteReadImpl(true);
+ TestWriteRead(true);
}
Y_UNIT_TEST(WriteReadNoCompression) {
- TestWriteReadImpl(true, TTestSchema::YdbSchema(), "none");
+ TestWriteRead(true, TTestSchema::YdbSchema(), "none");
}
Y_UNIT_TEST(WriteReadZSTD) {
- TestWriteReadImpl(true, TTestSchema::YdbSchema(), "zstd");
+ TestWriteRead(true, TTestSchema::YdbSchema(), "zstd");
}
Y_UNIT_TEST(CompactionInGranule) {
@@ -961,8 +1173,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TestCompactionInGranuleImpl(true);
}
- Y_UNIT_TEST(TestReadWithProgram) {
- TestReadWithProgramImpl();
+ Y_UNIT_TEST(ReadWithProgram) {
+ TestReadWithProgram();
+ }
+
+ Y_UNIT_TEST(ReadAggregate) {
+ TestReadAggregate();
}
Y_UNIT_TEST(CompactionSplitGranule) {