summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <[email protected]>2024-01-25 16:43:17 +0300
committerGitHub <[email protected]>2024-01-25 16:43:17 +0300
commit17ca7a29a85f9c25d3e50f2b84374b8edae5021a (patch)
treef3ebe3de4b90e4663507d8f48a480a9c3d207105
parentf5de6170b360ed6b440d8c38256a9c55e3beb13f (diff)
KIKIMR-20877: internal indexes processing (#1276)
-rw-r--r--ydb/core/formats/arrow/hash/calcer.cpp26
-rw-r--r--ydb/core/formats/arrow/hash/calcer.h1
-rw-r--r--ydb/core/formats/arrow/program.cpp29
-rw-r--r--ydb/core/formats/arrow/program.h35
-rw-r--r--ydb/core/formats/arrow/ya.make1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp31
-rw-r--r--ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h2
-rw-r--r--ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp3
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp147
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp2
-rw-r--r--ydb/core/protos/ssa.proto20
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h41
-rw-r--r--ydb/core/tx/columnshard/common/portion.cpp2
-rw-r--r--ydb/core/tx/columnshard/common/portion.h9
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.cpp39
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.h10
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.h20
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp18
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h36
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/ya.make2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h26
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp21
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/context.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp22
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h54
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp65
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.h32
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h22
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h147
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.h49
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h96
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h48
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp40
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h134
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp512
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h37
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h42
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make19
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp49
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.h32
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.cpp (renamed from ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp)34
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.h30
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp67
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h (renamed from ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h)54
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom/ya.make14
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/ya.make8
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp4
-rw-r--r--ydb/core/tx/columnshard/hooks/abstract/abstract.h2
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.h9
-rw-r--r--ydb/core/tx/program/program.cpp110
-rw-r--r--ydb/core/tx/program/program.h28
-rw-r--r--ydb/core/tx/program/registry.cpp8
-rw-r--r--ydb/core/tx/program/registry.h8
-rw-r--r--ydb/core/tx/schemeshard/olap/indexes/schema.cpp19
-rw-r--r--ydb/core/tx/schemeshard/olap/indexes/schema.h19
-rw-r--r--ydb/core/tx/schemeshard/olap/indexes/update.h2
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/create_store.cpp5
-rw-r--r--ydb/library/arrow_kernels/operations.h2
-rw-r--r--ydb/library/arrow_kernels/ya.make2
-rw-r--r--ydb/library/yql/core/arrow_kernels/request/request.h2
70 files changed, 2005 insertions, 401 deletions
diff --git a/ydb/core/formats/arrow/hash/calcer.cpp b/ydb/core/formats/arrow/hash/calcer.cpp
index 9db679c4b72..f5d2dc3515e 100644
--- a/ydb/core/formats/arrow/hash/calcer.cpp
+++ b/ydb/core/formats/arrow/hash/calcer.cpp
@@ -9,6 +9,25 @@
namespace NKikimr::NArrow::NHash {
+void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer) {
+ AFL_VERIFY(scalar);
+ NArrow::SwitchType(scalar->type->id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using T = typename TWrap::T;
+ using TScalar = typename arrow::TypeTraits<T>::ScalarType;
+
+ auto& typedScalar = static_cast<const TScalar&>(*scalar);
+ if constexpr (arrow::has_string_view<T>()) {
+ hashCalcer.Update((const ui8*)typedScalar.value->data(), typedScalar.value->size());
+ } else if constexpr (arrow::has_c_type<T>()) {
+ hashCalcer.Update((const ui8*)(typedScalar.data()), sizeof(T));
+ } else {
+ static_assert(arrow::is_decimal_type<T>());
+ }
+ return true;
+ });
+}
+
void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) {
NArrow::SwitchType(array->type_id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
@@ -21,12 +40,7 @@ void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int ro
if constexpr (arrow::has_string_view<T>()) {
hashCalcer.Update((const ui8*)value.data(), value.size());
} else if constexpr (arrow::has_c_type<T>()) {
- if constexpr (arrow::is_physical_integer_type<T>()) {
- hashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
- } else {
- // Do not use bool or floats for sharding
- static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>());
- }
+ hashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
} else {
static_assert(arrow::is_decimal_type<T>());
}
diff --git a/ydb/core/formats/arrow/hash/calcer.h b/ydb/core/formats/arrow/hash/calcer.h
index 066ba0cb135..511c8c40187 100644
--- a/ydb/core/formats/arrow/hash/calcer.h
+++ b/ydb/core/formats/arrow/hash/calcer.h
@@ -30,6 +30,7 @@ public:
TXX64(const std::vector<std::string>& columnNames, const ENoColumnPolicy noColumnPolicy, const ui64 seed = 0);
static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer);
+ static void AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer);
std::optional<std::vector<ui64>> Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const;
std::shared_ptr<arrow::Array> ExecuteToArray(const std::shared_ptr<arrow::RecordBatch>& batch, const std::string& hashFieldName) const;
};
diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp
index 424d82dc01d..fadd6bbf76b 100644
--- a/ydb/core/formats/arrow/program.cpp
+++ b/ydb/core/formats/arrow/program.cpp
@@ -20,7 +20,7 @@ enum class AggFunctionId {
AGG_MAX = 4,
AGG_SUM = 5,
};
-struct GroupByOptions : public arrow::compute::ScalarAggregateOptions {
+struct GroupByOptions: public arrow::compute::ScalarAggregateOptions {
struct Assign {
AggFunctionId function = AggFunctionId::AGG_UNSPECIFIED;
std::string result_column;
@@ -43,6 +43,7 @@ struct GroupByOptions : public arrow::compute::ScalarAggregateOptions {
#include <contrib/libs/apache/arrow/cpp/src/arrow/result.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
+#include <ydb/library/yql/core/arrow_kernels/request/request.h>
namespace NKikimr::NSsa {
@@ -605,6 +606,32 @@ IStepFunction<TAssign>::TPtr TAssign::GetFunction(arrow::compute::ExecContext* c
return std::make_shared<TSimpleFunction>(ctx);
}
+TString TAssign::DebugString() const {
+ TStringBuilder sb;
+ sb << "{";
+ if (Operation != EOperation::Unspecified) {
+ sb << "op=" << Operation << ";";
+ }
+ if (YqlOperationId) {
+ sb << "yql_op=" << (NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId << ";";
+ }
+ if (Arguments.size()) {
+ sb << "arguments=[";
+ for (auto&& i : Arguments) {
+ sb << i.DebugString() << ";";
+ }
+ sb << "];";
+ }
+ if (Constant) {
+ sb << "const=" << Constant->ToString() << ";";
+ }
+ if (KernelFunction) {
+ sb << "kernel=" << KernelFunction->name() << ";";
+ }
+ sb << "column=" << Column.DebugString() << ";";
+ sb << "}";
+ return sb;
+}
IStepFunction<TAggregateAssign>::TPtr TAggregateAssign::GetFunction(arrow::compute::ExecContext* ctx) const {
if (KernelFunction) {
diff --git a/ydb/core/formats/arrow/program.h b/ydb/core/formats/arrow/program.h
index 31254144fa9..000bd447b1e 100644
--- a/ydb/core/formats/arrow/program.h
+++ b/ydb/core/formats/arrow/program.h
@@ -108,6 +108,8 @@ protected:
};
class TAssign {
+private:
+ YDB_ACCESSOR_DEF(std::optional<ui32>, YqlOperationId);
public:
using TOperationType = EOperation;
@@ -237,12 +239,7 @@ public:
const arrow::compute::FunctionOptions* GetOptions() const { return FuncOpts.get(); }
IStepFunction<TAssign>::TPtr GetFunction(arrow::compute::ExecContext* ctx) const;
- TString DebugString() const {
- return TStringBuilder() <<
- "{op=" << Operation << ";column=" << Column.DebugString() << ";" << (Constant ? "const=" + Constant->ToString() + ";" : "NO;")
- << (KernelFunction ? ("kernel=" + KernelFunction->name() + ";") : "NO;")
- << "}";
- }
+ TString DebugString() const;
private:
const TColumnInfo Column;
EOperation Operation{EOperation::Unspecified};
@@ -325,13 +322,26 @@ public:
TString DebugString() const {
TStringBuilder sb;
sb << "{";
- sb << "assignes=[";
- for (auto&& i : Assignes) {
- sb << i.DebugString() << ";";
+ if (Assignes.size()) {
+ sb << "assignes=[";
+ for (auto&& i : Assignes) {
+ sb << i.DebugString() << ";";
+ }
+ sb << "];";
+ }
+ if (Filters.size()) {
+ sb << "filters=[";
+ for (auto&& i : Filters) {
+ sb << i.DebugString() << ";";
+ }
+ sb << "];";
+ }
+ if (GroupBy.size()) {
+ sb << "group_by_count=" << GroupBy.size() << "; ";
+ }
+ if (GroupByKeys.size()) {
+ sb << "group_by_keys_count=" << GroupByKeys.size() << ";";
}
- sb << "];";
- sb << "group_by_count = " << GroupBy.size() << "; ";
- sb << "group_by_keys_count=" << GroupByKeys.size() << ";";
sb << "projections=[";
for (auto&& i : Projection) {
@@ -396,6 +406,7 @@ public:
};
struct TProgram {
+public:
std::vector<std::shared_ptr<TProgramStep>> Steps;
THashMap<ui32, TColumnInfo> SourceColumns;
diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make
index 32fc6e462d1..146c5bae1e8 100644
--- a/ydb/core/formats/arrow/ya.make
+++ b/ydb/core/formats/arrow/ya.make
@@ -18,6 +18,7 @@ PEERDIR(
ydb/library/binary_json
ydb/library/dynumber
ydb/library/services
+ ydb/library/yql/core/arrow_kernels/request
)
IF (OS_WINDOWS)
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 0f9ff8525c0..b225b898796 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -4,6 +4,9 @@
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/tx/datashard/range_ops.h>
+#include <ydb/core/tx/program/program.h>
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h>
+#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h>
@@ -45,8 +48,7 @@ TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpRead
}
-std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task)
-{
+std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task) {
const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
std::vector<std::shared_ptr<arrow::Field>> columns;
std::vector<std::shared_ptr<arrow::Array>> data;
@@ -917,11 +919,34 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto
if (tableInfo->TableKind == ETableKind::Olap) {
auto* olapProgram = protoTaskMeta.MutableOlapProgram();
+ auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task);
+
olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program);
- auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task);
olapProgram->SetParametersSchema(schema);
olapProgram->SetParameters(parameters);
+
+ if (!!stageInfo.Meta.ColumnTableInfoPtr) {
+ std::shared_ptr<NSchemeShard::TOlapSchema> olapSchema = std::make_shared<NSchemeShard::TOlapSchema>();
+ olapSchema->ParseFromLocalDB(stageInfo.Meta.ColumnTableInfoPtr->Description.GetSchema());
+ if (olapSchema->GetIndexes().GetIndexes().size()) {
+ NOlap::TProgramContainer container;
+ NOlap::TSchemaResolverColumnsOnly resolver(olapSchema);
+ TString error;
+ YQL_ENSURE(container.Init(resolver, *olapProgram, error), "" << error);
+ auto data = NOlap::NIndexes::NRequest::TDataForIndexesCheckers::Build(container);
+ if (data) {
+ for (auto&& [indexId, i] : olapSchema->GetIndexes().GetIndexes()) {
+ AFL_VERIFY(!!i.GetIndexMeta());
+ i.GetIndexMeta()->FillIndexCheckers(data, *olapSchema);
+ }
+ auto checker = data->GetCoverChecker();
+ if (!!checker) {
+ checker.SerializeToProto(*olapProgram->MutableIndexChecker());
+ }
+ }
+ }
+ }
} else {
YQL_ENSURE(task.Meta.ReadInfo.OlapProgram.Program.empty());
}
diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h
index 753eade06d8..267829a1a5f 100644
--- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h
+++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h
@@ -1,5 +1,5 @@
#include "abstract.h"
-#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h>
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h>
namespace NKikimr::NKqp {
diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
index 0eecc157fd7..746c34ea4a4 100644
--- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
@@ -674,6 +674,7 @@ TTypedColumn CompileYqlKernelBinaryOperation(const TKqpOlapFilterBinaryOp& opera
}
const auto kernel = ctx.AddYqlKernelBinaryFunc(op, *leftColumn.Type, *rightColumn.Type, type);
+ cmpFunc->SetYqlOperationId((ui32)op);
cmpFunc->SetFunctionType(TProgram::YQL_KERNEL);
cmpFunc->SetKernelIdx(kernel.first);
cmpFunc->AddArguments()->SetId(leftColumn.Id);
@@ -706,8 +707,10 @@ const TTypedColumn BuildLogicalProgram(const TExprNode::TChildrenType& args, con
const auto idx = ctx.GetKernelRequestBuilder().AddBinaryOp(function, block, block, block);
logicalFunc->SetKernelIdx(idx);
logicalFunc->SetFunctionType(TProgram::YQL_KERNEL);
+ logicalFunc->SetYqlOperationId((ui32)function);
} else {
logicalFunc->SetFunctionType(function);
+ logicalFunc->SetId((ui32)function);
}
return {logicalOp->GetColumn().GetId(), block};
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index adff2eb29ec..e38258931b1 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -762,16 +762,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
-
TLocalHelper(kikimr).CreateTestOlapTable();
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2);
auto client = kikimr.GetTableClient();
- // EnableDebugLogging(kikimr);
-
{
TStreamExecScanQuerySettings settings;
settings.CollectQueryStats(ECollectQueryStatsMode::Full);
@@ -872,8 +868,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
-
TLocalHelper(kikimr).CreateTestOlapTable();
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2);
@@ -902,8 +896,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
-
TLocalHelper(kikimr).CreateTestOlapTable();
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2);
@@ -933,8 +925,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
-
TLocalHelper(kikimr).CreateTestOlapTable();
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2);
@@ -1091,8 +1081,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
-
auto client = kikimr.GetTableClient();
TLocalHelper(kikimr).CreateTestOlapTable();
@@ -1123,8 +1111,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
-
TLocalHelper(kikimr).CreateTestOlapTable();
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 3);
@@ -1175,14 +1161,10 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
-
TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
- // EnableDebugLogging(kikimr);
-
{
auto it = tableClient.StreamExecuteScanQuery(R"(
--!syntax_v1
@@ -1198,8 +1180,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[0u;]])");
}
- // EnableDebugLogging(kikimr);
-
{
WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000);
@@ -1210,8 +1190,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
}
- // EnableDebugLogging(kikimr);
-
{
auto it = tableClient.StreamExecuteScanQuery(R"(
--!syntax_v1
@@ -1243,6 +1221,111 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
+ Y_UNIT_TEST(Indexes) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+
+ TLocalHelper(kikimr).CreateTestOlapTable();
+ auto tableClient = kikimr.GetTableClient();
+
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+
+ {
+ auto alterQuery = TStringBuilder() <<
+ R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
+ FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.1}`);
+ )";
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
+ }
+ {
+ auto alterQuery = TStringBuilder() <<
+ R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
+ FEATURES=`{"column_names" : ["resource_id", "uid"], "false_positive_probability" : 0.2}`);
+ )";
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
+ }
+
+ {
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 1200000, 300200000, 10000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 1300000, 300300000, 10000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000);
+ }
+
+ {
+ auto it = tableClient.StreamExecuteScanQuery(R"(
+ --!syntax_v1
+
+ SELECT
+ COUNT(*)
+ FROM `/Root/olapStore/olapTable`
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ Cout << result << Endl;
+ CompareYson(result, R"([[230000u;]])");
+ }
+ AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 0);
+ TInstant start = Now();
+ ui32 compactionsStart = csController->GetCompactions().Val();
+ while (Now() - start < TDuration::Seconds(10)) {
+ if (compactionsStart != csController->GetCompactions().Val()) {
+ compactionsStart = csController->GetCompactions().Val();
+ start = Now();
+ }
+ Cerr << "WAIT_COMPACTION: " << csController->GetCompactions().Val() << Endl;
+ Sleep(TDuration::Seconds(1));
+ }
+
+ {
+ auto it = tableClient.StreamExecuteScanQuery(R"(
+ --!syntax_v1
+
+ SELECT
+ COUNT(*)
+ FROM `/Root/olapStore/olapTable`
+ WHERE resource_id = '3000008' AND level = 3 AND uid = 'uid_100000008'
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ Cout << result << Endl;
+ CompareYson(result, R"([[1u;]])");
+ AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val());
+ AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val());
+ }
+
+ {
+ const i64 before = csController->GetIndexesSkippingOnSelect().Val();
+ auto it = tableClient.StreamExecuteScanQuery(R"(
+ --!syntax_v1
+
+ SELECT
+ COUNT(*)
+ FROM `/Root/olapStore/olapTable`
+ WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222'
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ AFL_VERIFY(before != csController->GetIndexesSkippingOnSelect().Val());
+ Cout << result << Endl;
+ CompareYson(result, R"([[0u;]])");
+ AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val());
+ }
+ }
+
Y_UNIT_TEST(PushdownFilter) {
static bool enableLog = false;
@@ -1427,7 +1510,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TLocalHelper(kikimr).CreateTestOlapTable();
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2000);
-// EnableDebugLogging(kikimr);
auto tableClient = kikimr.GetTableClient();
auto selectQuery = TString(R"(
@@ -1919,7 +2001,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
@@ -1964,7 +2045,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
@@ -2011,7 +2091,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
@@ -2056,7 +2135,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
@@ -2101,7 +2179,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
- // EnableDebugLogging(kikimr);
TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
@@ -2289,7 +2366,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
- //EnableDebugLogging(kikimr);
TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
@@ -2383,7 +2459,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
-// EnableDebugLogging(kikimr);
TClickHelper(kikimr).CreateClickBenchTable();
auto tableClient = kikimr.GetTableClient();
@@ -2422,7 +2497,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto sender = runtime->AllocateEdgeActor();
InitRoot(server, sender);
-// EnableDebugLogging(runtime);
TClickHelper(*server).CreateClickBenchTable();
@@ -3416,8 +3490,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 1000);
}
- // EnableDebugLogging(kikimr);
-
auto tableClient = kikimr.GetTableClient();
auto selectQuery = TString(R"(
SELECT PathId, Kind, TabletId, Sum(Rows) as Rows
@@ -3462,8 +3534,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i*10000, 2000);
}
- // EnableDebugLogging(kikimr);
-
auto tableClient = kikimr.GetTableClient();
{
auto selectQuery = TString(R"(
@@ -3918,8 +3988,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 2000);
}
- // EnableDebugLogging(kikimr);
-
auto tableClient = kikimr.GetTableClient();
{
@@ -4068,8 +4136,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 2000);
}
- // EnableDebugLogging(kikimr);
-
auto tableClient = kikimr.GetTableClient();
{
@@ -4470,8 +4536,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- //EnableDebugLogging(kikimr);
-
auto tableClient = kikimr.GetTableClient();
auto session = tableClient.CreateSession().GetValueSync().GetSession();
@@ -4668,7 +4732,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Tests::TClient client(serverSettings);
auto& runtime = *server->GetRuntime();
- EnableDebugLogging(&runtime);
auto sender = runtime.AllocateEdgeActor();
server->SetupRootStoragePools(sender);
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 47fec6626cb..aecd1b44ba2 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -5369,6 +5369,8 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8),
TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32)
};
+
+ Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto
index 69db3462922..92e5738a3ac 100644
--- a/ydb/core/protos/ssa.proto
+++ b/ydb/core/protos/ssa.proto
@@ -40,6 +40,24 @@ message TProgram {
}
}
+ message TBloomFilterChecker {
+ repeated uint32 HashValues = 1;
+ }
+
+ message TOlapIndexChecker {
+ optional uint32 IndexId = 1;
+ optional string ClassName = 2;
+
+ message TCompositeChecker {
+ repeated TOlapIndexChecker ChildrenCheckers = 1;
+ }
+
+ oneof Implementation {
+ TBloomFilterChecker BloomFilter = 40;
+ TCompositeChecker Composite = 41;
+ }
+ }
+
message TParameter {
optional string Name = 1;
}
@@ -96,6 +114,7 @@ message TProgram {
repeated TColumn Arguments = 2;
optional EFunctionType FunctionType = 3 [ default = SIMPLE_ARROW ];
optional uint32 KernelIdx = 4;
+ optional uint32 YqlOperationId = 5; // TKernelRequestBuilder::EBinaryOp
}
message TExternalFunction {
@@ -185,4 +204,5 @@ message TOlapProgram {
// RecordBatch deserialization require arrow::Schema, thus store it here
optional bytes ParametersSchema = 2;
optional bytes Parameters = 3;
+ optional TProgram.TOlapIndexChecker IndexChecker = 4;
}
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 36d7695ffe7..6dfe9d7e1e2 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -6,7 +6,6 @@
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/protos/tx_columnshard.pb.h>
#include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h>
-#include <ydb/core/tx/columnshard/engines/columns_table.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tx/columnshard/operations/write.h>
@@ -39,6 +38,7 @@ struct Schema : NIceDb::Schema {
ColumnsTableId,
CountersTableId,
OperationsTableId,
+ IndexesTableId
};
enum class ETierTables: ui32 {
@@ -290,6 +290,19 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<StorageId, BlobId>;
};
+ struct IndexIndexes: NIceDb::Schema::Table<IndexesTableId> {
+ struct PathId: Column<1, NScheme::NTypeIds::Uint64> {};
+ struct PortionId: Column<2, NScheme::NTypeIds::Uint64> {};
+ struct IndexId: Column<3, NScheme::NTypeIds::Uint32> {};
+ struct ChunkIdx: Column<4, NScheme::NTypeIds::Uint32> {};
+ struct Blob: Column<5, NScheme::NTypeIds::String> {};
+ struct Offset: Column<6, NScheme::NTypeIds::Uint32> {};
+ struct Size: Column<7, NScheme::NTypeIds::Uint32> {};
+
+ using TKey = TableKey<PathId, PortionId, IndexId, ChunkIdx>;
+ using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size>;
+ };
+
using TTables = SchemaTables<
Value,
TxInfo,
@@ -310,7 +323,8 @@ struct Schema : NIceDb::Schema {
OneToOneEvictedBlobs,
Operations,
TierBlobsDraft,
- TierBlobsToDelete
+ TierBlobsToDelete,
+ IndexIndexes
>;
//
@@ -600,4 +614,27 @@ public:
}
};
+class TIndexChunkLoadContext {
+private:
+ YDB_READONLY_DEF(TBlobRange, BlobRange);
+ TChunkAddress Address;
+public:
+ TIndexChunk BuildIndexChunk() const {
+ return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), BlobRange);
+ }
+
+ template <class TSource>
+ TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
+ : Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>()) {
+ AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString());
+ TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexIndexes::Blob>();
+ Y_ABORT_UNLESS(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size());
+ TLogoBlobID logoBlobId((const ui64*)strBlobId.data());
+ BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId);
+ BlobRange.Offset = rowset.template GetValue<NColumnShard::Schema::IndexIndexes::Offset>();
+ BlobRange.Size = rowset.template GetValue<NColumnShard::Schema::IndexIndexes::Size>();
+ AFL_VERIFY(BlobRange.BlobId.IsValid() && BlobRange.Size)("event", "incorrect blob")("blob", BlobRange.ToString());
+ }
+};
+
}
diff --git a/ydb/core/tx/columnshard/common/portion.cpp b/ydb/core/tx/columnshard/common/portion.cpp
index 0359f5905d0..bc4c2ea7a07 100644
--- a/ydb/core/tx/columnshard/common/portion.cpp
+++ b/ydb/core/tx/columnshard/common/portion.cpp
@@ -1,4 +1,6 @@
#include "portion.h"
+#include <ydb/core/sys_view/common/path.h>
+#include <ydb/core/sys_view/common/schema.h>
namespace NKikimr::NOlap::NPortion {
diff --git a/ydb/core/tx/columnshard/common/portion.h b/ydb/core/tx/columnshard/common/portion.h
index 3b828374dc7..ae35cbcfc23 100644
--- a/ydb/core/tx/columnshard/common/portion.h
+++ b/ydb/core/tx/columnshard/common/portion.h
@@ -1,5 +1,6 @@
#pragma once
#include <util/system/types.h>
+#include <util/generic/string.h>
namespace NKikimr::NOlap::NPortion {
// NOTE: These values are persisted in LocalDB so they must be stable
@@ -12,4 +13,12 @@ enum EProduced: ui32 {
EVICTED
};
+class TSpecialColumns {
+public:
+ static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
+ static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
+ static const ui32 SPEC_COL_PLAN_STEP_INDEX = 0xffffff00;
+ static const ui32 SPEC_COL_TX_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 1;
+};
+
}
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index f492aacbd43..631b380e320 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -181,6 +181,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc
for (auto&& p : columnChunks) {
portionColumns.emplace(p.first, p.second[i].GetChunks());
}
+ resultSchema->GetIndexInfo().AppendIndexes(portionColumns);
batchSlices.emplace_back(portionColumns, schemaDetails, context.Counters.SplitterCounters, GetSplitSettings());
}
TSimilarSlicer slicer(GetSplitSettings().GetExpectedPortionSize());
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index c608915528c..39482ad60fd 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -182,6 +182,14 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
return false;
}
+ if (!db.LoadIndexes([&](const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext& loadContext) {
+ auto portion = GetGranulePtrVerified(pathId)->GetPortionPtr(portionId);
+ AFL_VERIFY(portion);
+ portion->AddIndex(loadContext.BuildIndexChunk());
+ })) {
+ return false;
+ };
+
for (auto&& i : Tables) {
i.second->OnAfterPortionsLoad();
}
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
index f3446b1bde6..20d5989e939 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
@@ -88,8 +88,45 @@ bool TDbWrapper::LoadColumns(const std::function<void(const NOlap::TPortionInfo&
callback(portion, chunkLoadContext);
- if (!rowset.Next())
+ if (!rowset.Next()) {
return false;
+ }
+ }
+ return true;
+}
+
+void TDbWrapper::WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) {
+ AFL_VERIFY(row.GetBlobRange().IsValid());
+ using IndexIndexes = NColumnShard::Schema::IndexIndexes;
+ NIceDb::TNiceDb db(Database);
+ db.Table<IndexIndexes>().Key(portion.GetPathId(), portion.GetPortionId(), row.GetIndexId(), row.GetChunkIdx()).Update(
+ NIceDb::TUpdate<IndexIndexes::Blob>(row.GetBlobRange().BlobId.SerializeBinary()),
+ NIceDb::TUpdate<IndexIndexes::Offset>(row.GetBlobRange().Offset),
+ NIceDb::TUpdate<IndexIndexes::Size>(row.GetBlobRange().Size)
+ );
+}
+
+void TDbWrapper::EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) {
+ NIceDb::TNiceDb db(Database);
+ using IndexIndexes = NColumnShard::Schema::IndexIndexes;
+ db.Table<IndexIndexes>().Key(portion.GetPathId(), portion.GetPortionId(), row.GetIndexId(), 0).Delete();
+}
+
+bool TDbWrapper::LoadIndexes(const std::function<void(const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext&)>& callback) {
+ NIceDb::TNiceDb db(Database);
+ using IndexIndexes = NColumnShard::Schema::IndexIndexes;
+ auto rowset = db.Table<IndexIndexes>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ NOlap::TIndexChunkLoadContext chunkLoadContext(rowset, DsGroupSelector);
+ callback(rowset.GetValue<IndexIndexes::PathId>(), rowset.GetValue<IndexIndexes::PortionId>(), chunkLoadContext);
+
+ if (!rowset.Next()) {
+ return false;
+ }
}
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h
index a90684dbffd..743f43272b2 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.h
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.h
@@ -8,9 +8,11 @@ class TDatabase;
namespace NKikimr::NOlap {
class TColumnChunkLoadContext;
+class TIndexChunkLoadContext;
struct TInsertedData;
class TInsertTableAccessor;
struct TColumnRecord;
+class TIndexChunk;
struct TGranuleRecord;
class IColumnEngine;
class TPortionInfo;
@@ -33,6 +35,10 @@ public:
virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0;
virtual bool LoadColumns(const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) = 0;
+ virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) = 0;
+ virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) = 0;
+ virtual bool LoadIndexes(const std::function<void(const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext&)>& callback) = 0;
+
virtual void WriteCounter(ui32 counterId, ui64 value) = 0;
virtual bool LoadCounters(const std::function<void(ui32 id, ui64 value)>& callback) = 0;
};
@@ -57,6 +63,10 @@ public:
void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
bool LoadColumns(const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override;
+ virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
+ virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
+ virtual bool LoadIndexes(const std::function<void(const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext&)>& callback) override;
+
void WriteCounter(ui32 counterId, ui64 value) override;
bool LoadCounters(const std::function<void(ui32 id, ui64 value)>& callback) override;
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h
index 688ecdb3c20..3917a035227 100644
--- a/ydb/core/tx/columnshard/engines/portions/column_record.h
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.h
@@ -16,6 +16,26 @@ namespace NKikimr::NOlap {
class TColumnChunkLoadContext;
struct TIndexInfo;
+class TIndexChunk {
+private:
+ YDB_READONLY(ui32, IndexId, 0);
+ YDB_READONLY(ui32, ChunkIdx, 0);
+ YDB_READONLY_DEF(TBlobRange, BlobRange);
+
+public:
+ TIndexChunk(const ui32 indexId, const ui32 chunkIdx, const TBlobRange& blobRange)
+ : IndexId(indexId)
+ , ChunkIdx(chunkIdx)
+ , BlobRange(blobRange) {
+
+ }
+
+ void RegisterBlobId(const TUnifiedBlobId& blobId) {
+// AFL_VERIFY(!BlobRange.BlobId.GetTabletId())("original", BlobRange.BlobId.ToStringNew())("new", blobId.ToStringNew());
+ BlobRange.BlobId = blobId;
+ }
+};
+
struct TChunkMeta: public TSimpleChunkMeta {
private:
using TBase = TSimpleChunkMeta;
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index d2fdb926ed0..fdf35132ff1 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -120,6 +120,16 @@ ui64 TPortionInfo::GetRawBytes(const std::set<ui32>& entityIds) const {
return sum;
}
+ui64 TPortionInfo::GetIndexBytes(const std::set<ui32>& entityIds) const {
+ ui64 sum = 0;
+ for (auto&& r : Indexes) {
+ if (entityIds.contains(r.GetIndexId())) {
+ sum += r.GetBlobRange().Size;
+ }
+ }
+ return sum;
+}
+
int TPortionInfo::CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns);
}
@@ -211,12 +221,20 @@ void TPortionInfo::RemoveFromDatabase(IDbWrapper& db) const {
for (auto& record : Records) {
db.EraseColumn(*this, record);
}
+ for (auto& record : Indexes) {
+ db.EraseIndex(*this, record);
+ }
}
+
void TPortionInfo::SaveToDatabase(IDbWrapper& db) const {
for (auto& record : Records) {
db.WriteColumn(*this, record);
}
+ for (auto& record : Indexes) {
+ db.WriteIndex(*this, record);
+ }
}
+
std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const {
Y_ABORT_UNLESS(!Blobs.empty());
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index a53c6734893..225c1381c79 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -1,11 +1,13 @@
#pragma once
#include "column_record.h"
#include "meta.h"
+
+#include <ydb/core/formats/arrow/special_keys.h>
+#include <ydb/core/tx/columnshard/blobs_action/abstract/storage.h>
+#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
-#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
-#include <ydb/core/tx/columnshard/blobs_action/abstract/storage.h>
-#include <ydb/core/formats/arrow/special_keys.h>
+
#include <ydb/library/yverify_stream/yverify_stream.h>
namespace NKikimr::NOlap {
@@ -25,6 +27,8 @@ private:
TPortionMeta Meta;
std::shared_ptr<NOlap::IBlobsStorageOperator> BlobsOperator;
ui64 DeprecatedGranuleId = 0;
+ YDB_READONLY_DEF(std::vector<TIndexChunk>, Indexes);
+
public:
std::vector<TColumnRecord> Records;
@@ -37,21 +41,37 @@ public:
}
void RegisterBlobId(const TChunkAddress& address, const TUnifiedBlobId& blobId) {
- bool found = false;
for (auto it = Records.begin(); it != Records.end(); ++it) {
if (it->ColumnId == address.GetEntityId() && it->Chunk == address.GetChunkIdx()) {
it->RegisterBlobId(blobId);
- found = true;
- break;
+ return;
}
}
- AFL_VERIFY(found)("address", address.DebugString());
+ for (auto it = Indexes.begin(); it != Indexes.end(); ++it) {
+ if (it->GetIndexId() == address.GetEntityId() && it->GetChunkIdx() == address.GetChunkIdx()) {
+ it->RegisterBlobId(blobId);
+ return;
+ }
+ }
+ AFL_VERIFY(false)("problem", "portion haven't address for blob registration")("address", address.DebugString());
}
void RemoveFromDatabase(IDbWrapper& db) const;
void SaveToDatabase(IDbWrapper& db) const;
+ void AddIndex(const TIndexChunk& chunk) {
+ ui32 chunkIdx = 0;
+ for (auto&& i : Indexes) {
+ if (i.GetIndexId() == chunk.GetIndexId()) {
+ AFL_VERIFY(chunkIdx == i.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", i.GetChunkIdx());
+ ++chunkIdx;
+ }
+ }
+ AFL_VERIFY(chunkIdx == chunk.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", chunk.GetChunkIdx());
+ Indexes.emplace_back(chunk);
+ }
+
bool OlderThen(const TPortionInfo& info) const {
return RecordSnapshotMin() < info.RecordSnapshotMin();
}
@@ -356,6 +376,8 @@ public:
return result;
}
+ ui64 GetIndexBytes(const std::set<ui32>& columnIds) const;
+
ui64 GetRawBytes(const std::vector<ui32>& columnIds) const;
ui64 GetRawBytes(const std::set<ui32>& columnIds) const;
ui64 GetRawBytes() const {
diff --git a/ydb/core/tx/columnshard/engines/predicate/ya.make b/ydb/core/tx/columnshard/engines/predicate/ya.make
index 10d5ff3b45b..3de8343b0b5 100644
--- a/ydb/core/tx/columnshard/engines/predicate/ya.make
+++ b/ydb/core/tx/columnshard/engines/predicate/ya.make
@@ -13,4 +13,6 @@ PEERDIR(
ydb/core/formats/arrow
)
+YQL_LAST_ABI_VERSION()
+
END()
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
index 9b50c4cfaa1..97fef7f317e 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
@@ -2,9 +2,35 @@
#include <ydb/library/accessor/accessor.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
+#include <util/string/join.h>
namespace NKikimr::NOlap::NPlainReader {
+class TIndexesSet {
+private:
+ YDB_READONLY_DEF(std::vector<ui32>, IndexIds);
+ YDB_READONLY_DEF(std::set<ui32>, IndexIdsSet);
+public:
+ TIndexesSet(const std::set<ui32>& indexIds)
+ : IndexIds(indexIds.begin(), indexIds.end())
+ , IndexIdsSet(indexIds) {
+ AFL_VERIFY(IndexIds.size() == IndexIdsSet.size())("indexes", JoinSeq(",", IndexIds));
+ }
+
+ TIndexesSet(const ui32& indexId)
+ : IndexIds({indexId})
+ , IndexIdsSet({indexId}) {
+ }
+
+ ui32 GetIndexesCount() const {
+ return IndexIds.size();
+ }
+
+ TString DebugString() const {
+ return TStringBuilder() << JoinSeq(",", IndexIds);
+ }
+};
+
class TColumnsSet {
private:
YDB_READONLY_DEF(std::set<ui32>, ColumnIds);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp
index 78a5da19a25..cf07daf49f8 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp
@@ -27,6 +27,12 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
}
std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource) const {
+ std::shared_ptr<IFetchingStep> result = std::make_shared<TFakeStep>();
+ std::shared_ptr<IFetchingStep> current = result;
+ if (!!IndexChecker) {
+ current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TIndexesSet>(IndexChecker->GetIndexIds())));
+ current = current->AttachNext(std::make_shared<TApplyIndexStep>(IndexChecker));
+ }
if (!EFColumns->GetColumnsCount()) {
TColumnsSet columnsFetch = *FFColumns;
if (needSnapshots) {
@@ -36,10 +42,8 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
columnsFetch = columnsFetch + *PKColumns + *SpecColumns;
}
if (columnsFetch.GetColumnsCount()) {
- std::shared_ptr<IFetchingStep> result = std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "simple");
- std::shared_ptr<IFetchingStep> current = result;
+ current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "simple"));
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch)));
- return result;
} else {
return nullptr;
}
@@ -49,8 +53,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
columnsFetch = columnsFetch + *SpecColumns;
}
AFL_VERIFY(columnsFetch.GetColumnsCount());
- std::shared_ptr<IFetchingStep> result = std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "ef");
- std::shared_ptr<IFetchingStep> current = result;
+ current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "ef"));
if (needSnapshots || FFColumns->Contains(SpecColumns)) {
current = current->AttachNext(std::make_shared<TAssemblerStep>(SpecColumns));
@@ -72,13 +75,10 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
}
- return result;
} else {
TColumnsSet columnsFetch = *MergeColumns + *EFColumns;
AFL_VERIFY(columnsFetch.GetColumnsCount());
- std::shared_ptr<IFetchingStep> result = std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "full");
- std::shared_ptr<IFetchingStep> current = result;
-
+ current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "full"));
current = current->AttachNext(std::make_shared<TAssemblerStep>(SpecColumns));
if (needSnapshots) {
current = current->AttachNext(std::make_shared<TSnapshotFilter>());
@@ -100,8 +100,8 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
}
- return result;
}
+ return result->GetNextStep();
}
TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
@@ -113,6 +113,7 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot());
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema);
+ IndexChecker = ReadMetadata->GetProgram().GetIndexChecker();
{
auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
if (efColumns.size()) {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
index 3fe05f23d8b..a8b727a5385 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
@@ -19,6 +19,7 @@ private:
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FFColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, ProgramInputColumns);
+ NIndexes::TIndexCheckerContainer IndexChecker;
TReadMetadata::TConstPtr ReadMetadata;
std::shared_ptr<TColumnsSet> EmptyColumns = std::make_shared<TColumnsSet>();
std::shared_ptr<TColumnsSet> PKFFColumns;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
index 4c29f220d34..3e3d5fd7173 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h
@@ -49,6 +49,9 @@ public:
}
std::shared_ptr<arrow::RecordBatch> GetBatch() const {
+ if (!Table) {
+ return nullptr;
+ }
return NArrow::ToBatch(Table, true);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp
index 3058b64781c..0773b0c50a4 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp
@@ -15,6 +15,10 @@ bool TStepAction::DoApply(IDataReader& /*owner*/) const {
bool TStepAction::DoExecute() {
while (Step) {
+ if (Source->IsEmptyData()) {
+ FinishedFlag = true;
+ return true;
+ }
if (!Step->ExecuteInplace(Source, Step)) {
return true;
}
@@ -29,11 +33,20 @@ bool TStepAction::DoExecute() {
}
bool TBlobsFetchingStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const {
- return !source->StartFetchingColumns(source, step, Columns);
+ AFL_VERIFY((!!Columns) ^ (!!Indexes));
+
+ const bool startFetchingColumns = Columns ? source->StartFetchingColumns(source, step, Columns) : false;
+ const bool startFetchingIndexes = Indexes ? source->StartFetchingIndexes(source, step, Indexes) : false;
+ return !startFetchingColumns && !startFetchingIndexes;
}
ui64 TBlobsFetchingStep::PredictRawBytes(const std::shared_ptr<IDataSource>& source) const {
- return source->GetRawBytes(Columns->GetColumnIds());
+ if (Columns) {
+ return source->GetRawBytes(Columns->GetColumnIds());
+ } else {
+ AFL_VERIFY(Indexes);
+ return source->GetIndexBytes(Indexes->GetIndexIdsSet());
+ }
}
bool TAssemblerStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const {
@@ -68,4 +81,9 @@ bool TBuildFakeSpec::DoExecuteInplace(const std::shared_ptr<IDataSource>& source
return true;
}
+bool TApplyIndexStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const {
+ source->ApplyIndex(IndexChecker);
+ return true;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h
index 297863392a7..6446538535d 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h
@@ -27,7 +27,9 @@ public:
AFL_VERIFY(nextStep);
NextStep = nextStep;
nextStep->Index = Index + 1;
- nextStep->BranchName = BranchName;
+ if (!nextStep->BranchName) {
+ nextStep->BranchName = BranchName;
+ }
return nextStep;
}
@@ -99,22 +101,66 @@ public:
}
};
+class TFakeStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+public:
+ virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& /*source*/, const std::shared_ptr<IFetchingStep>& /*step*/) const override {
+ return true;
+ }
+
+ TFakeStep()
+ : TBase("FAKE")
+ {
+
+ }
+};
+
+class TApplyIndexStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ const NIndexes::TIndexCheckerContainer IndexChecker;
+protected:
+ virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const override;
+public:
+ TApplyIndexStep(const NIndexes::TIndexCheckerContainer& indexChecker)
+ : TBase("APPLY_INDEX")
+ , IndexChecker(indexChecker)
+ {
+
+ }
+};
+
class TBlobsFetchingStep: public IFetchingStep {
private:
using TBase = IFetchingStep;
std::shared_ptr<TColumnsSet> Columns;
+ std::shared_ptr<TIndexesSet> Indexes;
protected:
virtual bool DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& step) const override;
virtual ui64 PredictRawBytes(const std::shared_ptr<IDataSource>& source) const override;
virtual TString DoDebugString() const override {
- return TStringBuilder() << "columns=" << Columns->DebugString() << ";";
+ TStringBuilder sb;
+ if (Columns) {
+ sb << "columns=" << Columns->DebugString() << ";";
+ } else {
+ sb << "indexes=" << Indexes->DebugString() << ";";
+ }
+ return sb;
}
public:
TBlobsFetchingStep(const std::shared_ptr<TColumnsSet>& columns, const TString& nameBranch = "")
: TBase("FETCHING", nameBranch)
- , Columns(columns)
- {
+ , Columns(columns) {
AFL_VERIFY(Columns);
+ AFL_VERIFY(Columns->GetColumnsCount());
+ }
+
+ TBlobsFetchingStep(const std::shared_ptr<TIndexesSet>& indexes, const TString& nameBranch = "")
+ : TBase("FETCHING", nameBranch)
+ , Indexes(indexes) {
+ AFL_VERIFY(Indexes);
+ AFL_VERIFY(Indexes->GetIndexesCount());
}
};
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
index e6a408933f3..e0118a290b0 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
@@ -8,6 +8,7 @@
#include <ydb/core/tx/columnshard/blobs_reader/events.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/formats/arrow/simple_arrays_cache.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
namespace NKikimr::NOlap::NPlainReader {
@@ -73,10 +74,10 @@ void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds,
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "chunks_stats")("fetch", fetchedChunks)("null", nullChunks)("reading_action", readingAction->GetStorageId())("columns", columnIds.size());
}
-bool TPortionDataSource::DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr,
- const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) {
+bool TPortionDataSource::DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName());
- Y_ABORT_UNLESS(columns->GetColumnsCount());
+ AFL_VERIFY(columns->GetColumnsCount());
+ AFL_VERIFY(!StageData->GetAppliedFilter() || !StageData->GetAppliedFilter()->IsTotalDenyFilter());
auto& columnIds = columns->GetColumnIds();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName())("fetching_info", step->DebugString());
@@ -88,6 +89,41 @@ bool TPortionDataSource::DoStartFetchingColumns(const std::shared_ptr<IDataSourc
StageData->AddNulls(std::move(nullBlocks));
}
+ if (!readAction->GetExpectedBlobsSize()) {
+ return false;
+ }
+
+ std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction};
+ auto constructor = std::make_shared<TBlobsFetcherTask>(actions, sourcePtr, step, GetContext(), "CS::READ::" + step->GetName(), "");
+ NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
+ return true;
+}
+
+bool TPortionDataSource::DoStartFetchingIndexes(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TIndexesSet>& indexes) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName());
+ Y_ABORT_UNLESS(indexes->GetIndexesCount());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName())("fetching_info", step->DebugString());
+
+ auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::" + step->GetName());
+ readAction->SetIsBackgroundProcess(false);
+ {
+ std::set<ui32> indexIds;
+ for (auto&& i : Portion->GetIndexes()) {
+ if (!indexes->GetIndexIdsSet().contains(i.GetIndexId())) {
+ continue;
+ }
+ indexIds.emplace(i.GetIndexId());
+ readAction->AddRange(i.GetBlobRange());
+ }
+ if (indexes->GetIndexIdsSet().size() != indexIds.size()) {
+ return false;
+ }
+ }
+
+ if (!readAction->GetExpectedBlobsSize()) {
+ return false;
+ }
+
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction};
auto constructor = std::make_shared<TBlobsFetcherTask>(actions, sourcePtr, step, GetContext(), "CS::READ::" + step->GetName(), "");
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
@@ -97,6 +133,29 @@ bool TPortionDataSource::DoStartFetchingColumns(const std::shared_ptr<IDataSourc
void TPortionDataSource::DoAbort() {
}
+void TPortionDataSource::DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexChecker) {
+ THashMap<ui32, std::vector<TString>> indexBlobs;
+ std::set<ui32> indexIds = indexChecker->GetIndexIds();
+ for (auto&& i : Portion->GetIndexes()) {
+ if (!indexIds.contains(i.GetIndexId())) {
+ continue;
+ }
+ indexBlobs[i.GetIndexId()].emplace_back(StageData->ExtractBlob(i.GetBlobRange()));
+ }
+ for (auto&& i : indexIds) {
+ if (!indexBlobs.contains(i)) {
+ return;
+ }
+ }
+ if (!indexChecker->Check(indexBlobs)) {
+ NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(false);
+ StageData->AddFilter(NArrow::TColumnFilter::BuildDenyFilter());
+ } else {
+ NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(true);
+ }
+ return;
+}
+
bool TCommittedDataSource::DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& /*columns*/) {
if (ReadStarted) {
return false;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
index 34f6a1fe041..b8765b373e7 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
@@ -46,8 +46,10 @@ protected:
}
virtual bool DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) = 0;
+ virtual bool DoStartFetchingIndexes(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TIndexesSet>& indexes) = 0;
virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) = 0;
virtual void DoAbort() = 0;
+ virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0;
public:
void SetIsReady();
@@ -55,6 +57,10 @@ public:
return GetStageData().IsEmpty();
}
+ void ApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) {
+ return DoApplyIndex(indexMeta);
+ }
+
void AssembleColumns(const std::shared_ptr<TColumnsSet>& columns) {
if (columns->IsEmpty()) {
return;
@@ -66,6 +72,11 @@ public:
AFL_VERIFY(columns);
return DoStartFetchingColumns(sourcePtr, step, columns);
}
+
+ bool StartFetchingIndexes(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TIndexesSet>& indexes) {
+ AFL_VERIFY(indexes);
+ return DoStartFetchingIndexes(sourcePtr, step, indexes);
+ }
void InitFetchingPlan(const std::shared_ptr<IFetchingStep>& fetchingFirstStep, const std::shared_ptr<IDataSource>& sourcePtr, const bool isExclusive);
std::shared_ptr<arrow::RecordBatch> GetLastPK() const {
@@ -76,6 +87,7 @@ public:
}
virtual ui64 GetRawBytes(const std::set<ui32>& columnIds) const = 0;
+ virtual ui64 GetIndexBytes(const std::set<ui32>& indexIds) const = 0;
bool IsMergingStarted() const {
return MergingStartedFlag;
@@ -155,7 +167,9 @@ private:
const std::shared_ptr<IBlobsReadingAction>& readingAction, THashMap<TBlobRange, ui32>& nullBlocks,
const std::shared_ptr<NArrow::TColumnFilter>& filter);
+ virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexChecker) override;
virtual bool DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) override;
+ virtual bool DoStartFetchingIndexes(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TIndexesSet>& indexes) override;
virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) override {
auto blobSchema = GetContext()->GetReadMetadata()->GetLoadSchema(Portion->GetMinSnapshot());
MutableStageData().AddBatch(Portion->PrepareForAssemble(*blobSchema, columns->GetFilteredSchemaVerified(), MutableStageData().MutableBlobs()).AssembleTable());
@@ -173,6 +187,10 @@ public:
return Portion->GetRawBytes(columnIds);
}
+ virtual ui64 GetIndexBytes(const std::set<ui32>& columnIds) const override {
+ return Portion->GetIndexBytes(columnIds);
+ }
+
const TPortionInfo& GetPortionInfo() const {
return *Portion;
}
@@ -199,8 +217,14 @@ private:
}
- virtual bool DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr,
- const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) override;
+ virtual bool DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& columns) override;
+ virtual bool DoStartFetchingIndexes(const std::shared_ptr<IDataSource>& /*sourcePtr*/, const std::shared_ptr<IFetchingStep>& /*step*/, const std::shared_ptr<TIndexesSet>& /*indexes*/) override {
+ return false;
+ }
+ virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& /*indexMeta*/) override {
+ return;
+ }
+
virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) override;
virtual NJson::TJsonValue DoDebugJson() const override {
NJson::TJsonValue result = NJson::JSON_MAP;
@@ -213,6 +237,10 @@ public:
return CommittedBlob.GetBlobRange().Size;
}
+ virtual ui64 GetIndexBytes(const std::set<ui32>& /*columnIds*/) const override {
+ return 0;
+ }
+
const TCommittedBlob& GetCommitted() const {
return CommittedBlob;
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
index 9da8c6f3d3e..6bac1a0af45 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
@@ -380,6 +380,12 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema&
return false;
}
+ for (const auto& idx : schema.GetIndexes()) {
+ NIndexes::TIndexMetaContainer meta;
+ AFL_VERIFY(meta.DeserializeFromProto(idx));
+ Indexes.emplace(meta->GetIndexId(), meta);
+ }
+
for (const auto& col : schema.GetColumns()) {
const ui32 id = col.GetId();
const TString& name = col.GetName();
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index 2446df2d2a4..b4293f6078f 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -7,10 +7,12 @@
#include <ydb/core/sys_view/common/schema.h>
#include <ydb/core/tx/columnshard/common/scalars.h>
+#include <ydb/core/tx/columnshard/common/portion.h>
#include <ydb/core/formats/arrow/dictionary/object.h>
#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/core/formats/arrow/transformer/abstract.h>
#include <ydb/core/scheme/scheme_types_proto.h>
+#include "indexes/abstract/meta.h"
namespace arrow {
class Array;
@@ -34,6 +36,7 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema {
private:
THashMap<ui32, TColumnFeatures> ColumnFeatures;
THashMap<ui32, std::shared_ptr<arrow::Field>> ArrowColumnByColumnIdCache;
+ THashMap<ui32, NIndexes::TIndexMetaContainer> Indexes;
TIndexInfo(const TString& name);
bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
TColumnFeatures& GetOrCreateColumnFeatures(const ui32 columnId) const;
@@ -41,14 +44,18 @@ private:
void BuildArrowSchema();
void InitializeCaches();
public:
- static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
- static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
+ static constexpr const char* SPEC_COL_PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP;
+ static constexpr const char* SPEC_COL_TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID;
static const TString STORE_INDEX_STATS_TABLE;
static const TString TABLE_INDEX_STATS_TABLE;
+ const THashMap<ui32, NIndexes::TIndexMetaContainer>& GetIndexes() const {
+ return Indexes;
+ }
+
enum class ESpecialColumn : ui32 {
- PLAN_STEP = 0xffffff00,
- TX_ID,
+ PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX,
+ TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID_INDEX
};
TString DebugString() const {
@@ -110,6 +117,13 @@ public:
std::shared_ptr<TColumnLoader> GetColumnLoaderOptional(const ui32 columnId) const;
std::shared_ptr<TColumnLoader> GetColumnLoaderVerified(const ui32 columnId) const;
+ void AppendIndexes(std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& originalData) const {
+ for (auto&& i : Indexes) {
+ std::shared_ptr<IPortionDataChunk> chunk = i.second->BuildIndex(i.first, originalData, *this);
+ AFL_VERIFY(originalData.emplace(i.first, std::vector<std::shared_ptr<IPortionDataChunk>>({chunk})).second);
+ }
+ }
+
/// Returns an id of the column located by name. The name should exists in the schema.
ui32 GetColumnId(const std::string& name) const;
std::optional<ui32> GetColumnIdOptional(const std::string& name) const;
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp
deleted file mode 100644
index e57c8222854..00000000000
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp
+++ /dev/null
@@ -1,10 +0,0 @@
-#include "abstract.h"
-#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
-#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
-#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
-#include <ydb/core/formats/arrow/hash/xx_hash.h>
-#include <ydb/core/formats/arrow/hash/calcer.h>
-
-namespace NKikimr::NOlap::NIndexes {
-
-} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h
deleted file mode 100644
index cb2cb5665eb..00000000000
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h
+++ /dev/null
@@ -1,147 +0,0 @@
-#pragma once
-
-#include <ydb/core/tx/columnshard/splitter/chunks.h>
-#include <ydb/core/tx/program/program.h>
-
-#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <library/cpp/object_factory/object_factory.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
-#include <ydb/services/bg_tasks/abstract/interface.h>
-#include <util/generic/string.h>
-
-#include <memory>
-#include <vector>
-
-namespace NKikimr::NOlap {
-struct TIndexInfo;
-}
-
-namespace NKikimr::NSchemeShard {
-class TOlapSchema;
-class IErrorCollector;
-}
-
-namespace NKikimr::NOlap::NIndexes {
-
-class IIndexChecker {
-protected:
- virtual bool DoCheck(std::vector<TString>&& blobs) const = 0;
-public:
- virtual ~IIndexChecker() = default;
- bool Check(std::vector<TString>&& blobs) const {
- return DoCheck(std::move(blobs));
- }
-};
-
-class TIndexCheckerContainer {
-private:
- YDB_READONLY(ui32, IndexId, 0);
- YDB_READONLY_DEF(std::shared_ptr<IIndexChecker>, Object);
-public:
- TIndexCheckerContainer(const ui32 indexId, const std::shared_ptr<IIndexChecker>& object)
- : IndexId(indexId)
- , Object(object) {
- AFL_VERIFY(IndexId);
- AFL_VERIFY(Object);
- }
-
- const IIndexChecker* operator->() const {
- return Object.get();
- }
-};
-
-class IIndexMeta {
-protected:
- virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const = 0;
- virtual std::shared_ptr<IIndexChecker> DoBuildIndexChecker(const TProgramContainer& program) const = 0;
- virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0;
- virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0;
-
-public:
- using TFactory = NObjectFactory::TObjectFactory<IIndexMeta, TString>;
- using TProto = NKikimrSchemeOp::TOlapIndexDescription;
-
- virtual ~IIndexMeta() = default;
-
- std::shared_ptr<IPortionDataChunk> BuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
- return DoBuildIndex(indexId, data, indexInfo);
- }
-
- std::shared_ptr<IIndexChecker> BuildIndexChecker(const TProgramContainer& program) const {
- return DoBuildIndexChecker(program);
- }
-
- bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) {
- return DoDeserializeFromProto(proto);
- }
-
- void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const {
- return DoSerializeToProto(proto);
- }
-
- virtual TString GetClassName() const = 0;
-};
-
-class IIndexMetaConstructor {
-protected:
- virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
- virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const = 0;
- virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) = 0;
- virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const = 0;
-public:
- using TFactory = NObjectFactory::TObjectFactory<IIndexMetaConstructor, TString>;
- using TProto = NKikimrSchemeOp::TOlapIndexRequested;
-
- virtual ~IIndexMetaConstructor() = default;
-
- TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
- return DoDeserializeFromJson(jsonInfo);
- }
-
- std::shared_ptr<IIndexMeta> CreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
- return DoCreateIndexMeta(currentSchema, errors);
- }
-
- TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) {
- return DoDeserializeFromProto(proto);
- }
-
- void SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const {
- return DoSerializeToProto(proto);
- }
-
- virtual TString GetClassName() const = 0;
-};
-
-class TIndexMetaContainer: public NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta> {
-private:
- using TBase = NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta>;
- YDB_READONLY(ui32, IndexId, 0);
-public:
- TIndexMetaContainer() = default;
- TIndexMetaContainer(const ui32 indexId, const std::shared_ptr<IIndexMeta>& object)
- : TBase(object)
- , IndexId(indexId)
- {
- AFL_VERIFY(IndexId);
- AFL_VERIFY(Object);
- }
-
- bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) {
- if (!TBase::DeserializeFromProto(proto)) {
- return false;
- }
- IndexId = proto.GetId();
- return true;
- }
-
- std::optional<TIndexCheckerContainer> BuildIndexChecker(const TProgramContainer& program) const {
- auto checker = GetObjectPtr()->BuildIndexChecker(program);
- if (!checker) {
- return {};
- }
- return TIndexCheckerContainer(IndexId, checker);
- }
-};
-
-} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.cpp
new file mode 100644
index 00000000000..6e58ded1332
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.cpp
@@ -0,0 +1,5 @@
+#include "checker.h"
+
+namespace NKikimr::NOlap::NIndexes {
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.h
new file mode 100644
index 00000000000..6258ad29338
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.h
@@ -0,0 +1,49 @@
+#pragma once
+#include <ydb/core/protos/ssa.pb.h>
+#include <ydb/services/bg_tasks/abstract/interface.h>
+#include <ydb/library/accessor/accessor.h>
+#include <library/cpp/object_factory/object_factory.h>
+
+namespace NKikimr::NOlap::NIndexes {
+
+class IIndexChecker {
+protected:
+ virtual bool DoCheck(const THashMap<ui32, std::vector<TString>>& blobs) const = 0;
+ virtual bool DoDeserializeFromProto(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) = 0;
+ virtual void DoSerializeToProto(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const = 0;
+ virtual std::set<ui32> DoGetIndexIds() const = 0;
+public:
+ using TFactory = NObjectFactory::TObjectFactory<IIndexChecker, TString>;
+ using TProto = NKikimrSSA::TProgram::TOlapIndexChecker;
+ virtual ~IIndexChecker() = default;
+ bool Check(const THashMap<ui32, std::vector<TString>>& blobs) const {
+ return DoCheck(blobs);
+ }
+
+ bool DeserializeFromProto(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) {
+ return DoDeserializeFromProto(proto);
+ }
+
+ void SerializeToProto(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const {
+ return DoSerializeToProto(proto);
+ }
+
+ std::set<ui32> GetIndexIds() const {
+ return DoGetIndexIds();
+ }
+
+ virtual TString GetClassName() const = 0;
+};
+
+class TIndexCheckerContainer: public NBackgroundTasks::TInterfaceProtoContainer<IIndexChecker> {
+private:
+ using TBase = NBackgroundTasks::TInterfaceProtoContainer<IIndexChecker>;
+public:
+ TIndexCheckerContainer() = default;
+ TIndexCheckerContainer(const std::shared_ptr<IIndexChecker>& object)
+ : TBase(object) {
+ AFL_VERIFY(Object);
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.cpp
new file mode 100644
index 00000000000..c25fb8f1609
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.cpp
@@ -0,0 +1,5 @@
+#include "composite.h"
+
+namespace NKikimr::NOlap::NIndexes {
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h
new file mode 100644
index 00000000000..04d6646c9b6
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h
@@ -0,0 +1,96 @@
+#pragma once
+#include "checker.h"
+
+namespace NKikimr::NOlap::NIndexes {
+
+class TCompositeIndexChecker: public IIndexChecker {
+protected:
+ std::vector<TIndexCheckerContainer> Checkers;
+protected:
+ virtual bool DoDeserializeFromProto(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) override {
+ for (auto&& i : proto.GetComposite().GetChildrenCheckers()) {
+ TIndexCheckerContainer container;
+ AFL_VERIFY(container.DeserializeFromProto(i));
+ Checkers.emplace_back(std::move(container));
+ }
+ return true;
+ }
+ virtual void DoSerializeToProto(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const override {
+ for (auto&& i : Checkers) {
+ i.SerializeToProto(*proto.MutableComposite()->AddChildrenCheckers());
+ }
+ }
+ virtual std::set<ui32> DoGetIndexIds() const override {
+ std::set<ui32> result;
+ for (auto&& i : Checkers) {
+ auto ids = i->GetIndexIds();
+ result.insert(ids.begin(), ids.end());
+ }
+ return result;
+ }
+public:
+ TCompositeIndexChecker() = default;
+ TCompositeIndexChecker(const std::vector<TIndexCheckerContainer>& checkers)
+ : Checkers(checkers) {
+
+ }
+ TCompositeIndexChecker(const std::vector<std::shared_ptr<IIndexChecker>>& checkers) {
+ for (auto&& i : checkers) {
+ Checkers.emplace_back(i);
+ }
+ }
+};
+
+class TAndIndexChecker: public TCompositeIndexChecker {
+private:
+ using TBase = TCompositeIndexChecker;
+public:
+ static TString GetClassNameStatic() {
+ return "AND_FILTERS";
+ }
+private:
+ static inline auto Registrator = TFactory::TRegistrator<TAndIndexChecker>(GetClassNameStatic());
+protected:
+ virtual bool DoCheck(const THashMap<ui32, std::vector<TString>>& blobsByIndexId) const override {
+ for (auto&& i : Checkers) {
+ if (!i->Check(blobsByIndexId)) {
+ return false;
+ }
+ }
+ return true;
+ }
+public:
+ using TBase::TBase;
+
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+class TOrIndexChecker: public TCompositeIndexChecker {
+private:
+ using TBase = TCompositeIndexChecker;
+public:
+ static TString GetClassNameStatic() {
+ return "OR_FILTERS";
+ }
+private:
+ static inline auto Registrator = TFactory::TRegistrator<TOrIndexChecker>(GetClassNameStatic());
+protected:
+ virtual bool DoCheck(const THashMap<ui32, std::vector<TString>>& blobsByIndexId) const override {
+ for (auto&& i : Checkers) {
+ if (i->Check(blobsByIndexId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+public:
+ using TBase::TBase;
+
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.cpp
new file mode 100644
index 00000000000..a93507bec06
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.cpp
@@ -0,0 +1,5 @@
+#include "constructor.h"
+
+namespace NKikimr::NOlap::NIndexes {
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h
new file mode 100644
index 00000000000..d8c820cffa2
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h
@@ -0,0 +1,48 @@
+#pragma once
+#include "meta.h"
+
+#include <ydb/library/conclusion/status.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/schemeshard/olap/common/common.h>
+
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NSchemeShard {
+class TOlapSchema;
+}
+
+namespace NKikimr::NOlap::NIndexes {
+
+class IIndexMetaConstructor {
+protected:
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
+ virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const ui32 indexId, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const = 0;
+ virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) = 0;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const = 0;
+public:
+ using TFactory = NObjectFactory::TObjectFactory<IIndexMetaConstructor, TString>;
+ using TProto = NKikimrSchemeOp::TOlapIndexRequested;
+
+ virtual ~IIndexMetaConstructor() = default;
+
+ TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ return DoDeserializeFromJson(jsonInfo);
+ }
+
+ std::shared_ptr<IIndexMeta> CreateIndexMeta(const ui32 indexId, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
+ return DoCreateIndexMeta(indexId, currentSchema, errors);
+ }
+
+ TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) {
+ return DoDeserializeFromProto(proto);
+ }
+
+ void SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const {
+ return DoSerializeToProto(proto);
+ }
+
+ virtual TString GetClassName() const = 0;
+};
+
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp
new file mode 100644
index 00000000000..c64d9b11a29
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp
@@ -0,0 +1,40 @@
+#include "meta.h"
+#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/formats/arrow/hash/xx_hash.h>
+#include <ydb/core/formats/arrow/hash/calcer.h>
+#include <ydb/core/formats/arrow/serializer/full.h>
+
+namespace NKikimr::NOlap::NIndexes {
+
+void TPortionIndexChunk::DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const {
+ portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdx(), bRange));
+}
+
+std::shared_ptr<NKikimr::NOlap::IPortionDataChunk> TIndexByColumns::DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
+ std::vector<TChunkedColumnReader> columnReaders;
+ for (auto&& i : ColumnIds) {
+ auto it = data.find(i);
+ AFL_VERIFY(it != data.end());
+ columnReaders.emplace_back(it->second, indexInfo.GetColumnLoaderVerified(i));
+ }
+ TChunkedBatchReader reader(std::move(columnReaders));
+ std::shared_ptr<arrow::RecordBatch> indexBatch = DoBuildIndexImpl(reader);
+ const TString indexData = TColumnSaver(nullptr, Serializer).Apply(indexBatch);
+ return std::make_shared<TPortionIndexChunk>(indexId, indexData);
+}
+
+bool TIndexByColumns::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& /*proto*/) {
+ Serializer = std::make_shared<NArrow::NSerialization::TFullDataSerializer>(arrow::ipc::IpcWriteOptions::Defaults());
+ return true;
+}
+
+TIndexByColumns::TIndexByColumns(const ui32 indexId, const std::set<ui32>& columnIds)
+ : TBase(indexId)
+ , ColumnIds(columnIds)
+{
+ Serializer = std::make_shared<NArrow::NSerialization::TFullDataSerializer>(arrow::ipc::IpcWriteOptions::Defaults());
+}
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h
new file mode 100644
index 00000000000..d80da770d1b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h
@@ -0,0 +1,134 @@
+#pragma once
+#include "checker.h"
+#include "program.h"
+
+#include <ydb/core/tx/columnshard/splitter/chunks.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/services/bg_tasks/abstract/interface.h>
+
+#include <library/cpp/object_factory/object_factory.h>
+
+namespace NYql::NNodes {
+class TExprBase;
+}
+
+namespace NKikimr::NOlap {
+struct TIndexInfo;
+class TProgramContainer;
+}
+
+namespace NKikimr::NSchemeShard {
+class TOlapSchema;
+}
+
+namespace NKikimr::NOlap::NIndexes {
+
+class IIndexMeta {
+private:
+ YDB_READONLY(ui32, IndexId, 0);
+protected:
+ virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const = 0;
+ virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const = 0;
+ virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0;
+
+public:
+ using TFactory = NObjectFactory::TObjectFactory<IIndexMeta, TString>;
+ using TProto = NKikimrSchemeOp::TOlapIndexDescription;
+
+ IIndexMeta() = default;
+ IIndexMeta(const ui32 indexId)
+ : IndexId(indexId)
+ {
+
+ }
+
+ virtual ~IIndexMeta() = default;
+
+ std::shared_ptr<IPortionDataChunk> BuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
+ return DoBuildIndex(indexId, data, indexInfo);
+ }
+
+ void FillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const {
+ return DoFillIndexCheckers(info, schema);
+ }
+
+ bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) {
+ IndexId = proto.GetId();
+ AFL_VERIFY(IndexId);
+ return DoDeserializeFromProto(proto);
+ }
+
+ void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const {
+ AFL_VERIFY(IndexId);
+ proto.SetId(IndexId);
+ return DoSerializeToProto(proto);
+ }
+
+ virtual TString GetClassName() const = 0;
+};
+
+class TIndexMetaContainer: public NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta> {
+private:
+ using TBase = NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta>;
+public:
+ TIndexMetaContainer() = default;
+ TIndexMetaContainer(const std::shared_ptr<IIndexMeta>& object)
+ : TBase(object)
+ {
+ AFL_VERIFY(Object);
+ }
+};
+
+class TPortionIndexChunk: public IPortionDataChunk {
+private:
+ using TBase = IPortionDataChunk;
+ const TString Data;
+protected:
+ virtual const TString& DoGetData() const override {
+ return Data;
+ }
+ virtual TString DoDebugString() const override {
+ return "";
+ }
+ virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplit(const TColumnSaver& /*saver*/, const std::shared_ptr<NColumnShard::TSplitterCounters>& /*counters*/, const std::vector<ui64>& /*splitSizes*/) const override {
+ return {};
+ }
+ virtual bool DoIsSplittable() const override {
+ return false;
+ }
+ virtual std::optional<ui32> DoGetRecordsCount() const override {
+ return {};
+ }
+ virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const override {
+ return nullptr;
+ }
+ virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const override {
+ return nullptr;
+ }
+ virtual void DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const override;
+public:
+ TPortionIndexChunk(const ui32 entityId, const TString& data)
+ : TBase(entityId, 0)
+ , Data(data)
+ {
+ }
+
+};
+
+class TIndexByColumns: public IIndexMeta {
+private:
+ using TBase = IIndexMeta;
+ std::shared_ptr<NArrow::NSerialization::ISerializer> Serializer;
+protected:
+ std::set<ui32> ColumnIds;
+ virtual std::shared_ptr<arrow::RecordBatch> DoBuildIndexImpl(TChunkedBatchReader& reader) const = 0;
+
+ virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const override final;
+ virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& /*proto*/) override;
+public:
+ TIndexByColumns() = default;
+ TIndexByColumns(const ui32 indexId, const std::set<ui32>& columnIds);
+};
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp
new file mode 100644
index 00000000000..d16f5fcfb33
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp
@@ -0,0 +1,512 @@
+#include "program.h"
+#include "composite.h"
+#include <ydb/library/yql/core/arrow_kernels/request/request.h>
+
+namespace NKikimr::NOlap::NIndexes::NRequest {
+
+class IRequestNode {
+protected:
+ TString Name;
+ std::vector<std::shared_ptr<IRequestNode>> Children;
+ IRequestNode* Parent = nullptr;
+ virtual bool DoCollapse() = 0;
+
+ virtual NJson::TJsonValue DoSerializeToJson() const = 0;
+ virtual std::shared_ptr<IRequestNode> DoCopy() const = 0;
+
+public:
+ template <class T>
+ T* FindFirst() const {
+ for (auto&& c : Children) {
+ if (auto* result = c->As<T>()) {
+ return result;
+ }
+ }
+ return nullptr;
+ }
+
+ std::shared_ptr<IRequestNode> Copy() const {
+ auto selfCopy = DoCopy();
+ selfCopy->Parent = nullptr;
+ selfCopy->Name = GetNextId(Name);
+ AFL_VERIFY(selfCopy);
+ for (auto&& i : Children) {
+ selfCopy->Children.emplace_back(i->Copy());
+ }
+ for (auto&& i : selfCopy->Children) {
+ i->Parent = selfCopy.get();
+ }
+ return selfCopy;
+ }
+
+ const TString& GetName() const {
+ return Name;
+ }
+ const std::vector<std::shared_ptr<IRequestNode>>& GetChildren() const {
+ return Children;
+ }
+
+ static TString GetNextId(const TString& originalName) {
+ static TAtomic Counter = 0;
+ TStringBuf sb(originalName.data(), originalName.size());
+ TStringBuf left;
+ TStringBuf right;
+ if (sb.TrySplit('$', left, right)) {
+ return TString(left.data(), left.size()) + "$" + ::ToString(AtomicIncrement(Counter));
+ } else {
+ return originalName + "$" + ::ToString(AtomicIncrement(Counter));
+ }
+ }
+
+ IRequestNode(const TString& name)
+ : Name(name) {
+
+ }
+
+ IRequestNode(const std::string& name)
+ : Name(name.data(), name.size()) {
+
+ }
+
+ IRequestNode(const char* name)
+ : Name(name) {
+
+ }
+
+ virtual ~IRequestNode() = default;
+
+ template <class T>
+ bool Is() const {
+ return dynamic_cast<const T*>(this);
+ }
+
+ template <class T>
+ T* As() {
+ return dynamic_cast<T*>(this);
+ }
+
+ void RemoveChildren(const TString& name) {
+ auto nameCopy = name;
+ const auto pred = [nameCopy](const std::shared_ptr<IRequestNode>& child) {
+ if (child->GetNodeName() == nameCopy) {
+ child->Parent = nullptr;
+ return true;
+ } else {
+ return false;
+ }
+ };
+ const ui32 sizeBefore = Children.size();
+ Children.erase(std::remove_if(Children.begin(), Children.end(), pred), Children.end());
+ AFL_VERIFY(sizeBefore == Children.size() + 1);
+ }
+
+ const TString& GetNodeName() const {
+ return Name;
+ }
+
+ virtual bool Collapse() {
+ for (auto&& i : Children) {
+ if (i->Collapse()) {
+ return true;
+ }
+ }
+ if (DoCollapse()) {
+ return true;
+ }
+ return false;
+ }
+
+ void Attach(const std::vector<std::shared_ptr<IRequestNode>>& children) {
+ auto copy = children;
+ for (auto&& c : copy) {
+ Attach(c);
+ }
+ }
+
+ void Attach(const std::shared_ptr<IRequestNode>& children) {
+ auto copy = children;
+ if (copy->Parent) {
+ copy->Parent->RemoveChildren(copy->GetNodeName());
+ }
+ copy->Parent = this;
+ for (auto&& i : Children) {
+ AFL_VERIFY(i->GetName() != copy->GetName());
+ }
+ Children.emplace_back(copy);
+ }
+
+ void Exchange(const TString& name, const std::shared_ptr<IRequestNode>& children) {
+ auto copy = children;
+ for (auto&& i : Children) {
+ if (i->GetName() == name) {
+ i = copy;
+ i->Parent = this;
+ return;
+ }
+ }
+ AFL_VERIFY(false);
+ }
+
+ NJson::TJsonValue SerializeToJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue(Name, DoSerializeToJson());
+ if (Children.size()) {
+ auto& childrenJson = result.InsertValue("children", NJson::JSON_ARRAY);
+ for (auto&& i : Children) {
+ childrenJson.AppendValue(i->SerializeToJson());
+ }
+ }
+ return result;
+ }
+};
+
+class TConstantNode: public IRequestNode {
+private:
+ using TBase = IRequestNode;
+ YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, Constant);
+protected:
+ virtual NJson::TJsonValue DoSerializeToJson() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("type", "const");
+ result.InsertValue("const", Constant->ToString());
+ return result;
+ }
+ virtual bool DoCollapse() override {
+ return false;
+ }
+ virtual std::shared_ptr<IRequestNode> DoCopy() const override {
+ return std::make_shared<TConstantNode>(GetName(), Constant);
+ }
+public:
+ TConstantNode(const std::string& name, const std::shared_ptr<arrow::Scalar>& constant)
+ : TBase(name)
+ , Constant(constant) {
+ }
+};
+
+class TRootNode: public IRequestNode {
+private:
+ using TBase = IRequestNode;
+protected:
+ virtual bool DoCollapse() override {
+ return false;
+ }
+ virtual NJson::TJsonValue DoSerializeToJson() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("type", "ROOT");
+ return result;
+ }
+
+ virtual std::shared_ptr<IRequestNode> DoCopy() const override {
+ return nullptr;
+ }
+public:
+ TRootNode()
+ : TBase("ROOT") {
+
+ }
+};
+
+class TOriginalColumn: public IRequestNode {
+private:
+ using TBase = IRequestNode;
+ YDB_READONLY_DEF(TString, ColumnName);
+protected:
+ virtual bool DoCollapse() override {
+ return false;
+ }
+ virtual NJson::TJsonValue DoSerializeToJson() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("type", "column");
+ result.InsertValue("column_name", ColumnName);
+ return result;
+ }
+ virtual std::shared_ptr<IRequestNode> DoCopy() const override {
+ return std::make_shared<TOriginalColumn>(GetName());
+ }
+public:
+ TOriginalColumn(const std::string& columnName)
+ : TBase(GetNextId(TString(columnName.data(), columnName.size())))
+ , ColumnName(columnName.data(), columnName.size()) {
+
+ }
+};
+
+class TPackAnd: public IRequestNode {
+private:
+ using TBase = IRequestNode;
+ THashMap<TString, std::shared_ptr<arrow::Scalar>> Conditions;
+ bool IsEmptyFlag = false;
+protected:
+ virtual bool DoCollapse() override {
+ return false;
+ }
+ virtual NJson::TJsonValue DoSerializeToJson() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("type", "pack_and");
+ if (IsEmptyFlag) {
+ result.InsertValue("empty", true);
+ }
+ auto& arrJson = result.InsertValue("conditions", NJson::JSON_ARRAY);
+ for (auto&& i : Conditions) {
+ auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP);
+ jsonCondition.InsertValue(i.first, i.second->ToString());
+ }
+ return result;
+ }
+ virtual std::shared_ptr<IRequestNode> DoCopy() const override {
+ return std::make_shared<TPackAnd>(*this);
+ }
+public:
+ TPackAnd(const TPackAnd&) = default;
+ TPackAnd(const TString& cName, const std::shared_ptr<arrow::Scalar>& value)
+ : TBase(GetNextId("PackAnd")) {
+ AddCondition(cName, value);
+ }
+
+ const THashMap<TString, std::shared_ptr<arrow::Scalar>>& GetEquals() const {
+ return Conditions;
+ }
+
+ bool IsEmpty() const {
+ return IsEmptyFlag;
+ }
+ void AddCondition(const TString& cName, const std::shared_ptr<arrow::Scalar>& value) {
+ AFL_VERIFY(value);
+ auto it = Conditions.find(cName);
+ if (it == Conditions.end()) {
+ Conditions.emplace(cName, value);
+ } else if (it->second->Equals(*value)) {
+ return;
+ } else {
+ IsEmptyFlag = true;
+ }
+ }
+ void Merge(const TPackAnd& add) {
+ for (auto&& i : add.Conditions) {
+ AddCondition(i.first, i.second);
+ }
+ }
+};
+
+class TOperationNode: public IRequestNode {
+private:
+ using TBase = IRequestNode;
+ NYql::TKernelRequestBuilder::EBinaryOp Operation;
+protected:
+ virtual NJson::TJsonValue DoSerializeToJson() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("type", "operation");
+ result.InsertValue("operation", ::ToString(Operation));
+ return result;
+ }
+
+ virtual bool DoCollapse() override {
+ if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::Coalesce) {
+ AFL_VERIFY(Children.size() == 2);
+ AFL_VERIFY(Children[1]->Is<TConstantNode>());
+ Parent->Attach(Children[0]);
+ Parent->RemoveChildren(GetNodeName());
+ return true;
+ }
+ if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::Equals && Children.size() == 2 && Children[1]->Is<TConstantNode>() && Children[0]->Is<TOriginalColumn>()) {
+ Parent->Exchange(GetNodeName(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetColumnName(), Children[1]->As<TConstantNode>()->GetConstant()));
+ return true;
+ }
+ if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) {
+ if (Parent->Is<TOperationNode>() && Parent->As<TOperationNode>()->Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) {
+ Parent->Attach(Children);
+ Parent->RemoveChildren(GetNodeName());
+ return true;
+ }
+ }
+ if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::Or) {
+ if (Parent->Is<TOperationNode>() && Parent->As<TOperationNode>()->Operation == NYql::TKernelRequestBuilder::EBinaryOp::Or) {
+ Parent->Attach(Children);
+ Parent->RemoveChildren(GetNodeName());
+ return true;
+ }
+ }
+ if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) {
+ auto copy = Children;
+ TPackAnd* baseSet = nullptr;
+ bool changed = false;
+ for (auto&& c : copy) {
+ if (c->Is<TPackAnd>()) {
+ if (baseSet) {
+ baseSet->Merge(*c->As<TPackAnd>());
+ RemoveChildren(c->GetNodeName());
+ changed = true;
+ } else {
+ baseSet = c->As<TPackAnd>();
+ }
+ }
+ }
+ if (changed) {
+ return true;
+ }
+ }
+
+ if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And && Children.size() == 1) {
+ AFL_VERIFY(Children.front()->Is<TPackAnd>());
+ Parent->Exchange(GetNodeName(), Children.front());
+ return true;
+ }
+
+ if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) {
+ std::vector<std::shared_ptr<IRequestNode>> newNodes;
+ std::set<TString> cNames;
+ for (auto&& i : Children) {
+ if (i->Is<TOperationNode>() && i->As<TOperationNode>()->Operation == NYql::TKernelRequestBuilder::EBinaryOp::Or) {
+ auto orNode = i;
+ RemoveChildren(i->GetNodeName());
+ auto copy = orNode->GetChildren();
+ auto copyChildren = Children;
+ for (auto&& orNodeChildren : copy) {
+ std::vector<std::shared_ptr<IRequestNode>> producedChildren;
+ for (auto&& c : copyChildren) {
+ producedChildren.emplace_back(c->Copy());
+ }
+ producedChildren.emplace_back(orNodeChildren->Copy());
+ newNodes.emplace_back(std::make_shared<TOperationNode>(GetNextId(Name), NYql::TKernelRequestBuilder::EBinaryOp::And, producedChildren));
+ }
+ Parent->Exchange(GetNodeName(), std::make_shared<TOperationNode>(GetNextId(orNode->GetName()), NYql::TKernelRequestBuilder::EBinaryOp::Or, newNodes));
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ virtual std::shared_ptr<IRequestNode> DoCopy() const override {
+ std::vector<std::shared_ptr<IRequestNode>> children;
+ return std::make_shared<TOperationNode>(GetName(), Operation, children);
+ }
+public:
+ NYql::TKernelRequestBuilder::EBinaryOp GetOperation() const {
+ return Operation;
+ }
+
+ TOperationNode(const std::string& name, const NYql::TKernelRequestBuilder::EBinaryOp& operation, const std::vector<std::shared_ptr<IRequestNode>>& args)
+ : TBase(name)
+ , Operation(operation) {
+ for (auto&& i : args) {
+ Attach(i);
+ }
+ }
+};
+
+class TNormalForm {
+private:
+ std::map<std::string, std::shared_ptr<IRequestNode>> Nodes;
+public:
+ TNormalForm() = default;
+
+ bool Add(const NSsa::TAssign& assign) {
+ std::vector<std::shared_ptr<IRequestNode>> argNodes;
+ for (auto&& arg : assign.GetArguments()) {
+ if (arg.IsGenerated()) {
+ auto it = Nodes.find(arg.GetColumnName());
+ AFL_VERIFY(it != Nodes.end());
+ argNodes.emplace_back(it->second);
+ } else {
+ argNodes.emplace_back(std::make_shared<TOriginalColumn>(arg.GetColumnName()));
+ }
+ }
+ for (auto&& i : argNodes) {
+ Nodes.erase(i->GetNodeName());
+ }
+
+ if (assign.IsConstant()) {
+ AFL_VERIFY(argNodes.size() == 0);
+ Nodes.emplace(assign.GetName(), std::make_shared<TConstantNode>(assign.GetName(), assign.GetConstant()));
+ } else if (!!assign.GetYqlOperationId()) {
+ Nodes.emplace(assign.GetName(), std::make_shared<TOperationNode>(assign.GetName(), (NYql::TKernelRequestBuilder::EBinaryOp)*assign.GetYqlOperationId(), argNodes));
+ } else {
+ return false;
+ }
+ return true;
+ }
+
+ std::shared_ptr<TRootNode> GetRootNode() {
+ if (Nodes.empty()) {
+ return nullptr;
+ }
+ AFL_VERIFY(Nodes.size() == 1);
+ auto result = std::make_shared<TRootNode>();
+ result->Attach(Nodes.begin()->second);
+ return result;
+ }
+};
+
+std::shared_ptr<TDataForIndexesCheckers> TDataForIndexesCheckers::Build(const TProgramContainer& program) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("program", program.DebugString());
+ auto fStep = program.GetSteps().front();
+ TNormalForm nForm;
+ for (auto&& s : fStep->GetAssignes()) {
+ if (!nForm.Add(s)) {
+ return nullptr;
+ }
+ }
+ auto rootNode = nForm.GetRootNode();
+ if (!rootNode) {
+ return nullptr;
+ }
+ while (rootNode->Collapse()) {
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("collapsed_program", rootNode->SerializeToJson());
+ if (rootNode->GetChildren().size() != 1) {
+ return nullptr;
+ }
+ std::shared_ptr<TDataForIndexesCheckers> result = std::make_shared<TDataForIndexesCheckers>();
+ if (auto* orNode = rootNode->GetChildren().front()->As<TOperationNode>()) {
+ if (orNode->GetOperation() == NYql::TKernelRequestBuilder::EBinaryOp::Or) {
+ for (auto&& i : orNode->GetChildren()) {
+ if (auto* andPackNode = i->As<TPackAnd>()) {
+ result->AddBranch(andPackNode->GetEquals());
+ } else if (auto* operationNode = i->As<TOperationNode>()) {
+ if (operationNode->GetOperation() == NYql::TKernelRequestBuilder::EBinaryOp::And) {
+ TPackAnd* pack = operationNode->FindFirst<TPackAnd>();
+ if (!pack) {
+ return nullptr;
+ }
+ result->AddBranch(pack->GetEquals());
+ }
+ } else {
+ return nullptr;
+ }
+ }
+ }
+ } else if (auto* andPackNode = rootNode->GetChildren().front()->As<TPackAnd>()) {
+ result->AddBranch(andPackNode->GetEquals());
+ } else {
+ return nullptr;
+ }
+ return result;
+}
+
+TIndexCheckerContainer TDataForIndexesCheckers::GetCoverChecker() const {
+ std::vector<std::shared_ptr<IIndexChecker>> andCheckers;
+ for (auto&& i : Branches) {
+ auto andChecker = i->GetAndChecker();
+ if (!andChecker) {
+ return TIndexCheckerContainer();
+ }
+ andCheckers.emplace_back(andChecker);
+ }
+ if (andCheckers.size() == 0) {
+ return TIndexCheckerContainer();
+ } else if (andCheckers.size() == 1) {
+ return andCheckers.front();
+ } else {
+ return TIndexCheckerContainer(std::make_shared<TOrIndexChecker>(andCheckers));
+ }
+}
+
+std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexChecker> TBranchCoverage::GetAndChecker() const {
+ if (Indexes.empty()) {
+ return nullptr;
+ }
+ return std::make_shared<TAndIndexChecker>(Indexes);
+}
+
+} // namespace NKikimr::NOlap::NIndexes::NRequest \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h
new file mode 100644
index 00000000000..898c4210b03
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h
@@ -0,0 +1,37 @@
+#pragma once
+#include <ydb/core/tx/program/program.h>
+
+namespace NKikimr::NOlap::NIndexes::NRequest {
+
+class TBranchCoverage {
+private:
+ THashMap<TString, std::shared_ptr<arrow::Scalar>> Equals;
+ YDB_ACCESSOR_DEF(std::vector<std::shared_ptr<IIndexChecker>>, Indexes);
+public:
+ TBranchCoverage(const THashMap<TString, std::shared_ptr<arrow::Scalar>>& equals)
+ : Equals(equals)
+ {
+
+ }
+
+ const THashMap<TString, std::shared_ptr<arrow::Scalar>>& GetEquals() const {
+ return Equals;
+ }
+
+ std::shared_ptr<IIndexChecker> GetAndChecker() const;
+};
+
+class TDataForIndexesCheckers {
+private:
+ YDB_READONLY_DEF(std::vector<std::shared_ptr<TBranchCoverage>>, Branches);
+public:
+ void AddBranch(const THashMap<TString, std::shared_ptr<arrow::Scalar>>& equalsData) {
+ Branches.emplace_back(std::make_shared<TBranchCoverage>(equalsData));
+ }
+
+ static std::shared_ptr<TDataForIndexesCheckers> Build(const TProgramContainer& program);
+
+ TIndexCheckerContainer GetCoverChecker() const;
+};
+
+} // namespace NKikimr::NOlap::NIndexes::NRequest \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.cpp
new file mode 100644
index 00000000000..3c80059ff05
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.cpp
@@ -0,0 +1,5 @@
+#include "simple.h"
+
+namespace NKikimr::NOlap::NIndexes {
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h
new file mode 100644
index 00000000000..6fa589899a1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h
@@ -0,0 +1,42 @@
+#pragma once
+#include "checker.h"
+
+namespace NKikimr::NOlap::NIndexes {
+
+class TSimpleIndexChecker: public IIndexChecker {
+private:
+ YDB_READONLY(ui32, IndexId, 0);
+protected:
+ virtual bool DoCheckImpl(const std::vector<TString>& blobs) const = 0;
+
+ virtual bool DoCheck(const THashMap<ui32, std::vector<TString>>& blobs) const override final {
+ auto it = blobs.find(IndexId);
+ AFL_VERIFY(it != blobs.end());
+ return DoCheckImpl(std::move(it->second));
+ }
+ virtual bool DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) = 0;
+ virtual void DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const = 0;
+
+ virtual bool DoDeserializeFromProto(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) override final {
+ IndexId = proto.GetIndexId();
+ AFL_VERIFY(IndexId);
+ return DoDeserializeFromProtoImpl(proto);
+ }
+ virtual void DoSerializeToProto(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const override final {
+ AFL_VERIFY(IndexId);
+ proto.SetIndexId(IndexId);
+ return DoSerializeToProtoImpl(proto);
+ }
+ virtual std::set<ui32> DoGetIndexIds() const override final {
+ return {IndexId};
+ }
+public:
+ TSimpleIndexChecker() = default;
+ TSimpleIndexChecker(const ui32 indexId)
+ : IndexId(indexId)
+ {
+
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make
new file mode 100644
index 00000000000..f5babe94975
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make
@@ -0,0 +1,19 @@
+LIBRARY()
+
+SRCS(
+ constructor.cpp
+ meta.cpp
+ checker.cpp
+ program.cpp
+ GLOBAL composite.cpp
+ simple.cpp
+)
+
+PEERDIR(
+ ydb/core/protos
+ ydb/core/formats/arrow
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp
new file mode 100644
index 00000000000..ac368bf8a46
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp
@@ -0,0 +1,49 @@
+#include "checker.h"
+#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/formats/arrow/common/validation.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_primitive.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NOlap::NIndexes {
+
+void TBloomFilterChecker::DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const {
+ for (auto&& i : HashValues) {
+ proto.MutableBloomFilter()->AddHashValues(i);
+ }
+}
+
+bool TBloomFilterChecker::DoCheckImpl(const std::vector<TString>& blobs) const {
+ for (auto&& blob : blobs) {
+ auto rb = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TFullDataDeserializer().Deserialize(blob));
+ AFL_VERIFY(rb);
+ AFL_VERIFY(rb->schema()->num_fields() == 1);
+ AFL_VERIFY(rb->schema()->field(0)->type()->id() == arrow::Type::BOOL);
+ auto& bArray = static_cast<const arrow::BooleanArray&>(*rb->column(0));
+ bool found = true;
+ for (auto&& i : HashValues) {
+ if (!bArray.Value(i % bArray.length())) {
+ found = false;
+ break;
+ }
+ }
+ if (found) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool TBloomFilterChecker::DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) {
+ if (!proto.HasBloomFilter()) {
+ return false;
+ }
+ for (auto&& i : proto.GetBloomFilter().GetHashValues()) {
+ HashValues.emplace(i);
+ }
+ if (HashValues.empty()) {
+ return false;
+ }
+ return true;
+}
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.h b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.h
new file mode 100644
index 00000000000..92ecf9534d2
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.h
@@ -0,0 +1,32 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h>
+namespace NKikimr::NOlap::NIndexes {
+
+class TBloomFilterChecker: public TSimpleIndexChecker {
+public:
+ static TString GetClassNameStatic() {
+ return "BLOOM_FILTER";
+ }
+private:
+ using TBase = TSimpleIndexChecker;
+ std::set<ui64> HashValues;
+ static inline auto Registrator = TFactory::TRegistrator<TBloomFilterChecker>(GetClassNameStatic());
+protected:
+ virtual bool DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) override;
+ virtual void DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const override;
+
+ virtual bool DoCheckImpl(const std::vector<TString>& blobs) const override;
+public:
+ TBloomFilterChecker() = default;
+ TBloomFilterChecker(const ui32 indexId, std::set<ui64>&& hashes)
+ : TBase(indexId)
+ , HashValues(std::move(hashes))
+ {
+
+ }
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.cpp
index 62b8d5ddc25..4ea787eb7d9 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.cpp
@@ -1,35 +1,11 @@
-#include "bloom.h"
-#include <ydb/core/formats/arrow/hash/xx_hash.h>
-#include <ydb/core/formats/arrow/hash/calcer.h>
+#include "constructor.h"
+#include "meta.h"
+
#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
-#include <ydb/core/tx/schemeshard/olap/common/common.h>
namespace NKikimr::NOlap::NIndexes {
-std::shared_ptr<arrow::RecordBatch> TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const {
- std::vector<bool> flags;
- flags.resize(BitsCount, false);
- for (ui32 i = 0; i < HashesCount; ++i) {
- NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(3 * i);
- for (; reader.IsCorrect(); reader.ReadNext()) {
- hashCalcer.Start();
- for (auto&& i : reader) {
- NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer);
- }
- flags[hashCalcer.Finish() % BitsCount] = true;
- }
- }
-
- arrow::BooleanBuilder builder;
- auto res = builder.Reserve(flags.size());
- NArrow::TStatusValidator::Validate(builder.AppendValues(flags));
- std::shared_ptr<arrow::BooleanArray> out;
- NArrow::TStatusValidator::Validate(builder.Finish(&out));
-
- return arrow::RecordBatch::Make(ResultSchema, BitsCount, {out});
-}
-
-std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TBloomIndexConstructor::DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
+std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TBloomIndexConstructor::DoCreateIndexMeta(const ui32 indexId, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
std::set<ui32> columnIds;
for (auto&& i : ColumnNames) {
auto* columnInfo = currentSchema.GetColumns().GetByName(i);
@@ -39,7 +15,7 @@ std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TBloomIndexConstructor::Do
}
AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second);
}
- return std::make_shared<TBloomIndexMeta>(columnIds, FalsePositiveProbability);
+ return std::make_shared<TBloomIndexMeta>(indexId, columnIds, FalsePositiveProbability);
}
NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.h b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.h
new file mode 100644
index 00000000000..0aeb4dd9809
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.h
@@ -0,0 +1,30 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h>
+namespace NKikimr::NOlap::NIndexes {
+
+class TBloomIndexConstructor: public IIndexMetaConstructor {
+public:
+ static TString GetClassNameStatic() {
+ return "BLOOM_FILTER";
+ }
+private:
+ std::set<TString> ColumnNames;
+ double FalsePositiveProbability = 0.1;
+ static inline auto Registrator = TFactory::TRegistrator<TBloomIndexConstructor>(GetClassNameStatic());
+protected:
+ virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const ui32 indexId, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override;
+
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override;
+
+ virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) override;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const override;
+
+public:
+ TBloomIndexConstructor() = default;
+
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp
new file mode 100644
index 00000000000..f24100e81d3
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp
@@ -0,0 +1,67 @@
+#include "meta.h"
+#include "checker.h"
+#include <ydb/core/formats/arrow/hash/xx_hash.h>
+#include <ydb/core/formats/arrow/hash/calcer.h>
+#include <ydb/core/tx/program/program.h>
+#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
+#include <library/cpp/deprecated/atomic/atomic.h>
+
+namespace NKikimr::NOlap::NIndexes {
+
+std::shared_ptr<arrow::RecordBatch> TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const {
+ std::vector<bool> flags;
+ flags.resize(BitsCount, false);
+ for (ui32 i = 0; i < HashesCount; ++i) {
+ NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(3 * i);
+ for (; reader.IsCorrect(); reader.ReadNext()) {
+ hashCalcer.Start();
+ for (auto&& i : reader) {
+ NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer);
+ }
+ flags[hashCalcer.Finish() % BitsCount] = true;
+ }
+ }
+
+ arrow::BooleanBuilder builder;
+ auto res = builder.Reserve(flags.size());
+ NArrow::TStatusValidator::Validate(builder.AppendValues(flags));
+ std::shared_ptr<arrow::BooleanArray> out;
+ NArrow::TStatusValidator::Validate(builder.Finish(&out));
+
+ return arrow::RecordBatch::Make(ResultSchema, BitsCount, {out});
+}
+
+void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const {
+ for (auto&& branch : info->GetBranches()) {
+ std::map<ui32, std::shared_ptr<arrow::Scalar>> foundColumns;
+ for (auto&& cId : ColumnIds) {
+ auto c = schema.GetColumns().GetById(cId);
+ if (!c) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "incorrect index column")("id", cId);
+ return;
+ }
+ auto itEqual = branch->GetEquals().find(c->GetName());
+ if (itEqual == branch->GetEquals().end()) {
+ break;
+ }
+ foundColumns.emplace(cId, itEqual->second);
+ }
+ if (foundColumns.size() != ColumnIds.size()) {
+ continue;
+ }
+ std::set<ui64> hashes;
+ for (ui32 i = 0; i < HashesCount; ++i) {
+ NArrow::NHash::NXX64::TStreamStringHashCalcer calcer(3 * i);
+ calcer.Start();
+ for (auto&& i : foundColumns) {
+ NArrow::NHash::TXX64::AppendField(i.second, calcer);
+ }
+ hashes.emplace(calcer.Finish());
+ }
+ branch->MutableIndexes().emplace_back(std::make_shared<TBloomFilterChecker>(GetIndexId(), std::move(hashes)));
+ }
+}
+
+} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h
index 15ed36ff3e5..bc4db867d60 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h
@@ -1,45 +1,7 @@
#pragma once
-#include "abstract.h"
-
-#include <ydb/core/tx/columnshard/splitter/chunks.h>
-#include <ydb/core/tx/program/program.h>
-
-#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <library/cpp/object_factory/object_factory.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
-#include <ydb/services/bg_tasks/abstract/interface.h>
-#include <util/generic/string.h>
-
-#include <memory>
-#include <vector>
-
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h>
namespace NKikimr::NOlap::NIndexes {
-class TBloomIndexConstructor: public IIndexMetaConstructor {
-public:
- static TString GetClassNameStatic() {
- return "BLOOM_FILTER";
- }
-private:
- std::set<TString> ColumnNames;
- double FalsePositiveProbability = 0.1;
- static inline auto Registrator = TFactory::TRegistrator<TBloomIndexConstructor>(GetClassNameStatic());
-protected:
- virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override;
-
- virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override;
-
- virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) override;
- virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const override;
-
-public:
- TBloomIndexConstructor() = default;
-
- virtual TString GetClassName() const override {
- return GetClassNameStatic();
- }
-};
-
class TBloomIndexMeta: public TIndexByColumns {
public:
static TString GetClassNameStatic() {
@@ -54,18 +16,20 @@ private:
ui32 BitsCount = 0;
static inline auto Registrator = TFactory::TRegistrator<TBloomIndexMeta>(GetClassNameStatic());
void Initialize() {
+ AFL_VERIFY(!ResultSchema);
+ std::vector<std::shared_ptr<arrow::Field>> fields = {std::make_shared<arrow::Field>("", arrow::TypeTraits<arrow::BooleanType>::type_singleton())};
+ ResultSchema = std::make_shared<arrow::Schema>(fields);
AFL_VERIFY(FalsePositiveProbability < 1 && FalsePositiveProbability > 0.01);
HashesCount = -1 * std::log(FalsePositiveProbability) / std::log(2);
- BitsCount = RowsCountExpectation * HashesCount / (std::log(2));
+ BitsCount = RowsCountExpectation * HashesCount / std::log(2);
}
protected:
- virtual std::shared_ptr<IIndexChecker> DoBuildIndexChecker(const TProgramContainer& /*program*/) const override {
- return nullptr;
- }
+ virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override;
virtual std::shared_ptr<arrow::RecordBatch> DoBuildIndexImpl(TChunkedBatchReader& reader) const override;
virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) override {
+ AFL_VERIFY(TBase::DoDeserializeFromProto(proto));
AFL_VERIFY(proto.HasBloomFilter());
auto& bFilter = proto.GetBloomFilter();
FalsePositiveProbability = bFilter.GetFalsePositiveProbability();
@@ -85,8 +49,8 @@ protected:
public:
TBloomIndexMeta() = default;
- TBloomIndexMeta(const std::set<ui32>& columnIds, const double fpProbability)
- : TBase(columnIds)
+ TBloomIndexMeta(const ui32 indexId, const std::set<ui32>& columnIds, const double fpProbability)
+ : TBase(indexId, columnIds)
, FalsePositiveProbability(fpProbability) {
Initialize();
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/ya.make b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/ya.make
new file mode 100644
index 00000000000..e333fcd3ef9
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ GLOBAL constructor.cpp
+ GLOBAL meta.cpp
+ GLOBAL checker.cpp
+)
+
+PEERDIR(
+ ydb/core/protos
+ ydb/core/formats/arrow
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make b/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make
index 138eb4d5e12..1bf1e550c89 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make
@@ -1,12 +1,8 @@
LIBRARY()
-SRCS(
- abstract.cpp
-)
-
PEERDIR(
- ydb/core/protos
- ydb/core/formats/arrow
+ ydb/core/tx/columnshard/engines/scheme/indexes/abstract
+ ydb/core/tx/columnshard/engines/scheme/indexes/bloom
)
END()
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index ee10d7674b5..e54f8248f3d 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -12,7 +12,7 @@ using namespace NOlap;
namespace {
-class TTestInsertTableDB : public IDbWrapper {
+class TTestInsertTableDB: public IDbWrapper {
public:
void Insert(const TInsertedData&) override {}
void Commit(const TInsertedData&) override {}
@@ -22,8 +22,7 @@ public:
void EraseAborted(const TInsertedData&) override {}
bool Load(TInsertTableAccessor&,
- const TInstant&) override
- {
+ const TInstant&) override {
return true;
}
@@ -31,6 +30,10 @@ public:
void EraseColumn(const TPortionInfo&, const TColumnRecord&) override {}
bool LoadColumns(const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>&) override { return true; }
+ virtual void WriteIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {}
+ virtual void EraseIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {}
+ virtual bool LoadIndexes(const std::function<void(const ui64 /*pathId*/, const ui64 /*portionId*/, const TIndexChunkLoadContext&)>& /*callback*/) override { return true; }
+
void WriteCounter(ui32, ui64) override {}
bool LoadCounters(const std::function<void(ui32 id, ui64 value)>&) override { return true; }
};
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 229100ecab5..8f1bdf36fc4 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -137,6 +137,10 @@ public:
return true;
}
+ virtual void WriteIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {}
+ virtual void EraseIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {}
+ virtual bool LoadIndexes(const std::function<void(const ui64 /*pathId*/, const ui64 /*portionId*/, const TIndexChunkLoadContext&)>& /*callback*/) override { return true; }
+
void WriteCounter(ui32 counterId, ui64 value) override {
auto& counters = Indices[0].Counters;
counters[counterId] = value;
diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
index 6b5f70674c8..8a3530fc731 100644
--- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
@@ -82,6 +82,8 @@ public:
bool OnStartCompaction(std::shared_ptr<NOlap::TColumnEngineChanges>& changes) {
return DoOnStartCompaction(changes);
}
+ virtual void OnIndexSelectProcessed(const bool /*result*/) {
+ }
virtual EOptimizerCompactionWeightControl GetCompactionControl() const {
return EOptimizerCompactionWeightControl::Force;
}
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h
index eed2aa61ce6..a6680395472 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.h
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.h
@@ -8,6 +8,8 @@ private:
YDB_READONLY(TAtomicCounter, FilteredRecordsCount, 0);
YDB_READONLY(TAtomicCounter, Compactions, 0);
YDB_READONLY(TAtomicCounter, Indexations, 0);
+ YDB_READONLY(TAtomicCounter, IndexesSkippingOnSelect, 0);
+ YDB_READONLY(TAtomicCounter, IndexesApprovedOnSelect, 0);
YDB_ACCESSOR(std::optional<TDuration>, GuaranteeIndexationInterval, TDuration::Zero());
YDB_ACCESSOR(std::optional<TDuration>, PeriodicWakeupActivationPeriod, std::nullopt);
YDB_ACCESSOR(std::optional<TDuration>, StatsReportInterval, std::nullopt);
@@ -38,6 +40,13 @@ protected:
}
public:
+ virtual void OnIndexSelectProcessed(const bool result) override {
+ if (result) {
+ IndexesApprovedOnSelect.Inc();
+ } else {
+ IndexesSkippingOnSelect.Inc();
+ }
+ }
void SetCompactionControl(const EOptimizerCompactionWeightControl value) {
CompactionControl = value;
}
diff --git a/ydb/core/tx/program/program.cpp b/ydb/core/tx/program/program.cpp
index 6ee25067bdd..34f40d6578f 100644
--- a/ydb/core/tx/program/program.cpp
+++ b/ydb/core/tx/program/program.cpp
@@ -4,6 +4,7 @@
#include <ydb/core/tx/columnshard/engines/filter.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/cast.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_scalar.h>
+#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
#include <google/protobuf/text_format.h>
namespace NKikimr::NOlap {
@@ -115,7 +116,11 @@ TAssign TProgramBuilder::MakeFunction(const NSsa::TColumnInfo& name,
Error = TStringBuilder() << "Unknown kernel for " << name.GetColumnName() << ";kernel_idx=" << func.GetKernelIdx();
return TAssign(name, EOperation::Unspecified, std::move(arguments));
}
- return TAssign(name, kernelFunction, std::move(arguments), nullptr);
+ TAssign result(name, kernelFunction, std::move(arguments), nullptr);
+ if (func.HasYqlOperationId()) {
+ result.SetYqlOperationId(func.GetYqlOperationId());
+ }
+ return result;
}
switch (func.GetId()) {
@@ -241,6 +246,7 @@ TAssign TProgramBuilder::MakeFunction(const NSsa::TColumnInfo& name,
case TId::FUNC_UNSPECIFIED:
break;
}
+
return TAssign(name, EOperation::Unspecified, std::move(arguments));
}
@@ -430,6 +436,26 @@ bool TProgramBuilder::ExtractGroupBy(NSsa::TProgramStep& step, const NKikimrSSA:
return true;
}
+
+}
+
+TString TSchemaResolverColumnsOnly::GetColumnName(ui32 id, bool required /*= true*/) const {
+ auto* column = Schema->GetColumns().GetById(id);
+ AFL_VERIFY(!required || !!column);
+ if (column) {
+ return column->GetName();
+ } else {
+ return "";
+ }
+}
+
+std::optional<ui32> TSchemaResolverColumnsOnly::GetColumnIdOptional(const TString& name) const {
+ auto* column = Schema->GetColumns().GetByName(name);
+ if (!column) {
+ return {};
+ } else {
+ return column->GetId();
+ }
}
const THashMap<ui32, NSsa::TColumnInfo>& TProgramContainer::GetSourceColumns() const {
@@ -450,11 +476,60 @@ std::set<std::string> TProgramContainer::GetEarlyFilterColumns() const {
return Default<std::set<std::string>>();
}
+bool TProgramContainer::Init(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& programProto, TString& error) {
+ ProgramProto = programProto;
+ if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) {
+ TString out;
+ ::google::protobuf::TextFormat::PrintToString(programProto, &out);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("program", out);
+ }
+
+ if (programProto.HasKernels()) {
+ KernelsRegistry.Parse(programProto.GetKernels());
+ }
+
+ if (!ParseProgram(columnResolver, programProto, error)) {
+ if (!error) {
+ error = TStringBuilder() << "Wrong olap program";
+ }
+ return false;
+ }
+
+ return true;
+}
+
+bool TProgramContainer::Init(const IColumnResolver& columnResolver, const NKikimrSSA::TOlapProgram& olapProgramProto, TString& error) {
+ NKikimrSSA::TProgram programProto;
+ if (!programProto.ParseFromString(olapProgramProto.GetProgram())) {
+ error = TStringBuilder() << "Can't parse TProgram";
+ return false;
+ }
+
+ if (olapProgramProto.HasParameters()) {
+ Y_ABORT_UNLESS(olapProgramProto.HasParametersSchema(), "Parameters are present, but there is no schema.");
+
+ auto schema = NArrow::DeserializeSchema(olapProgramProto.GetParametersSchema());
+ ProgramParameters = NArrow::DeserializeBatch(olapProgramProto.GetParameters(), schema);
+ }
+
+ ProgramProto = programProto;
+
+ if (!Init(columnResolver, ProgramProto, error)) {
+ return false;
+ }
+ if (olapProgramProto.HasIndexChecker()) {
+ if (!IndexChecker.DeserializeFromProto(olapProgramProto.GetIndexChecker())) {
+ AFL_VERIFY_DEBUG(false);
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", "cannot_parse_index_checker")("data", olapProgramProto.GetIndexChecker().DebugString());
+ }
+ }
+ return true;
+}
+
bool TProgramContainer::Init(const IColumnResolver& columnResolver, NKikimrSchemeOp::EOlapProgramType programType, TString serializedProgram, TString& error) {
Y_ABORT_UNLESS(serializedProgram);
Y_ABORT_UNLESS(!OverrideProcessingColumnsVector);
- NKikimrSSA::TProgram programProto;
NKikimrSSA::TOlapProgram olapProgramProto;
switch (programType) {
@@ -464,42 +539,13 @@ bool TProgramContainer::Init(const IColumnResolver& columnResolver, NKikimrSchem
return false;
}
- if (!programProto.ParseFromString(olapProgramProto.GetProgram())) {
- error = TStringBuilder() << "Can't parse TProgram";
- return false;
- }
-
break;
default:
error = TStringBuilder() << "Unsupported olap program version: " << (ui32)programType;
return false;
}
- ProgramProto = programProto;
- if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) {
- TString out;
- ::google::protobuf::TextFormat::PrintToString(programProto, &out);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("program", out);
- }
-
- if (olapProgramProto.HasParameters()) {
- Y_ABORT_UNLESS(olapProgramProto.HasParametersSchema(), "Parameters are present, but there is no schema.");
-
- auto schema = NArrow::DeserializeSchema(olapProgramProto.GetParametersSchema());
- ProgramParameters = NArrow::DeserializeBatch(olapProgramProto.GetParameters(), schema);
- }
-
- if (programProto.HasKernels()) {
- KernelsRegistry.Parse(programProto.GetKernels());
- }
-
- if (!ParseProgram(columnResolver, programProto, error)) {
- if (!error) {
- error = TStringBuilder() << "Wrong olap program";
- }
- return false;
- }
- return true;
+ return Init(columnResolver, olapProgramProto, error);
}
bool TProgramContainer::ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program, TString& error) {
diff --git a/ydb/core/tx/program/program.h b/ydb/core/tx/program/program.h
index b595fc1f610..dae9ae4d94a 100644
--- a/ydb/core/tx/program/program.h
+++ b/ydb/core/tx/program/program.h
@@ -7,7 +7,12 @@
#include <ydb/core/formats/arrow/custom_registry.h>
#include <ydb/core/tablet_flat/flat_dbase_scheme.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.h>
+#include <ydb/core/tx/columnshard/common/portion.h>
+namespace NKikimr::NSchemeShard {
+class TOlapSchema;
+}
namespace NKikimr::NOlap {
class IColumnResolver {
@@ -19,6 +24,26 @@ public:
virtual NSsa::TColumnInfo GetDefaultColumn() const = 0;
};
+class TSchemaResolverColumnsOnly: public IColumnResolver {
+private:
+ std::shared_ptr<NSchemeShard::TOlapSchema> Schema;
+public:
+ TSchemaResolverColumnsOnly(const std::shared_ptr<NSchemeShard::TOlapSchema>& schema)
+ : Schema(schema) {
+ AFL_VERIFY(Schema);
+ }
+
+ virtual TString GetColumnName(ui32 id, bool required = true) const override;
+ virtual std::optional<ui32> GetColumnIdOptional(const TString& name) const override;
+ virtual const NTable::TScheme::TTableSchema& GetSchema() const override {
+ AFL_VERIFY(false);
+ return Default<NTable::TScheme::TTableSchema>();
+ }
+ virtual NSsa::TColumnInfo GetDefaultColumn() const override {
+ return NSsa::TColumnInfo::Original((ui32)NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX, NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP);
+ }
+};
+
class TProgramContainer {
private:
NKikimrSSA::TProgram ProgramProto;
@@ -27,6 +52,7 @@ private:
TKernelsRegistry KernelsRegistry;
std::optional<std::set<std::string>> OverrideProcessingColumnsSet;
std::optional<std::vector<TString>> OverrideProcessingColumnsVector;
+ YDB_READONLY_DEF(NIndexes::TIndexCheckerContainer, IndexChecker);
public:
TString ProtoDebugString() const {
return ProgramProto.DebugString();
@@ -53,6 +79,8 @@ public:
}
bool Init(const IColumnResolver& columnResolver, NKikimrSchemeOp::EOlapProgramType programType, TString serializedProgram, TString& error);
+ bool Init(const IColumnResolver& columnResolver, const NKikimrSSA::TOlapProgram& olapProgramProto, TString& error);
+ bool Init(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& programProto, TString& error);
const std::vector<std::shared_ptr<NSsa::TProgramStep>>& GetSteps() const {
if (!Program) {
diff --git a/ydb/core/tx/program/registry.cpp b/ydb/core/tx/program/registry.cpp
index de2bd4cde4e..f8ba71e37d7 100644
--- a/ydb/core/tx/program/registry.cpp
+++ b/ydb/core/tx/program/registry.cpp
@@ -31,4 +31,12 @@ bool TKernelsRegistry::Parse(const TString& serialized) {
}
return true;
}
+
+NKikimr::NSsa::TFunctionPtr TKernelsRegistry::GetFunction(const size_t index) const {
+ if (index < Functions.size()) {
+ return Functions[index];
+ }
+ return nullptr;
+}
+
}
diff --git a/ydb/core/tx/program/registry.h b/ydb/core/tx/program/registry.h
index ba24f343555..bc4f3a99e63 100644
--- a/ydb/core/tx/program/registry.h
+++ b/ydb/core/tx/program/registry.h
@@ -14,13 +14,7 @@ private:
public:
bool Parse(const TString& serialized);
-
- NSsa::TFunctionPtr GetFunction(const size_t index) const {
- if (index <= Functions.size()) {
- return Functions[index];
- }
- return nullptr;
- }
+ NSsa::TFunctionPtr GetFunction(const size_t index) const;
};
}
diff --git a/ydb/core/tx/schemeshard/olap/indexes/schema.cpp b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp
index e715b134962..af35646873d 100644
--- a/ydb/core/tx/schemeshard/olap/indexes/schema.cpp
+++ b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp
@@ -15,6 +15,21 @@ void TOlapIndexSchema::DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDes
AFL_VERIFY(IndexMeta.DeserializeFromProto(indexSchema))("incorrect_proto", indexSchema.DebugString());
}
+bool TOlapIndexSchema::ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexUpsert& upsert, IErrorCollector& errors) {
+ AFL_VERIFY(upsert.GetName() == GetName());
+ AFL_VERIFY(!!upsert.GetIndexConstructor());
+ if (upsert.GetIndexConstructor().GetClassName() != IndexMeta.GetClassName()) {
+ errors.AddError("different index classes: " + upsert.GetIndexConstructor().GetClassName() + " vs " + IndexMeta.GetClassName());
+ return false;
+ }
+ auto object = upsert.GetIndexConstructor()->CreateIndexMeta(GetId(), currentSchema, errors);
+ if (!object) {
+ return false;
+ }
+ IndexMeta = NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMeta>(object);
+ return true;
+}
+
bool TOlapIndexesDescription::ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexesUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId) {
for (auto&& index : schemaUpdate.GetUpsertIndexes()) {
auto* currentIndex = MutableByName(index.GetName());
@@ -23,11 +38,11 @@ bool TOlapIndexesDescription::ApplyUpdate(const TOlapSchema& currentSchema, cons
return false;
}
} else {
- auto meta = index.GetIndexConstructor()->CreateIndexMeta(currentSchema, errors);
+ const ui32 id = nextEntityId++;
+ auto meta = index.GetIndexConstructor()->CreateIndexMeta(id, currentSchema, errors);
if (!meta) {
return false;
}
- const ui32 id = nextEntityId++;
TOlapIndexSchema newIndex(id, index.GetName(), meta);
Y_ABORT_UNLESS(IndexesByName.emplace(index.GetName(), id).second);
Y_ABORT_UNLESS(Indexes.emplace(id, std::move(newIndex)).second);
diff --git a/ydb/core/tx/schemeshard/olap/indexes/schema.h b/ydb/core/tx/schemeshard/olap/indexes/schema.h
index d5aaa8573a2..630016fe96a 100644
--- a/ydb/core/tx/schemeshard/olap/indexes/schema.h
+++ b/ydb/core/tx/schemeshard/olap/indexes/schema.h
@@ -3,12 +3,14 @@
namespace NKikimr::NSchemeShard {
+class TOlapSchema;
+
class TOlapIndexSchema {
private:
using TBase = TOlapIndexUpsert;
YDB_READONLY(ui32, Id, Max<ui32>());
YDB_READONLY_DEF(TString, Name);
- NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMeta> IndexMeta;
+ YDB_READONLY_DEF(NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMeta>, IndexMeta);
public:
TOlapIndexSchema() = default;
@@ -20,20 +22,7 @@ public:
}
- bool ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexUpsert& upsert, IErrorCollector& errors) {
- AFL_VERIFY(upsert.GetName() == GetName());
- AFL_VERIFY(!!upsert.GetIndexConstructor());
- if (upsert.GetIndexConstructor().GetClassName() != IndexMeta.GetClassName()) {
- errors.AddError("different index classes: " + upsert.GetIndexConstructor().GetClassName() + " vs " + IndexMeta.GetClassName());
- return false;
- }
- auto object = upsert.GetIndexConstructor()->CreateIndexMeta(currentSchema, errors);
- if (!object) {
- return false;
- }
- IndexMeta = NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMeta>(object);
- return true;
- }
+ bool ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexUpsert& upsert, IErrorCollector& errors);
void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& indexSchema) const;
void DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& indexSchema);
diff --git a/ydb/core/tx/schemeshard/olap/indexes/update.h b/ydb/core/tx/schemeshard/olap/indexes/update.h
index 0f1db75b0b8..f6d0f88fa31 100644
--- a/ydb/core/tx/schemeshard/olap/indexes/update.h
+++ b/ydb/core/tx/schemeshard/olap/indexes/update.h
@@ -3,7 +3,7 @@
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/tx/schemeshard/olap/common/common.h>
#include <ydb/library/accessor/accessor.h>
-#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h>
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h>
namespace NKikimr::NSchemeShard {
diff --git a/ydb/core/tx/schemeshard/olap/operations/create_store.cpp b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp
index a50f1193b1a..776b8d2737e 100644
--- a/ydb/core/tx/schemeshard/olap/operations/create_store.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp
@@ -7,11 +7,10 @@
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/mind/hive/hive.h>
-namespace {
-
using namespace NKikimr;
-using namespace NSchemeShard;
+using namespace NKikimr::NSchemeShard;
+namespace {
void ApplySharding(TTxId txId, TPathId pathId, TOlapStoreInfo::TPtr storeInfo,
const TChannelsBindings& channelsBindings,
diff --git a/ydb/library/arrow_kernels/operations.h b/ydb/library/arrow_kernels/operations.h
index f9dafc4b50b..bfe891274e0 100644
--- a/ydb/library/arrow_kernels/operations.h
+++ b/ydb/library/arrow_kernels/operations.h
@@ -1,3 +1,5 @@
+#pragma once
+
namespace NKikimr::NKernels {
enum class EOperation {
diff --git a/ydb/library/arrow_kernels/ya.make b/ydb/library/arrow_kernels/ya.make
index 00b387e8b17..f10d21e53b1 100644
--- a/ydb/library/arrow_kernels/ya.make
+++ b/ydb/library/arrow_kernels/ya.make
@@ -8,6 +8,8 @@ PEERDIR(
contrib/libs/apache/arrow
)
+GENERATE_ENUM_SERIALIZATION(operations.h)
+
SRCS(
func_cast.cpp
ut_common.cpp
diff --git a/ydb/library/yql/core/arrow_kernels/request/request.h b/ydb/library/yql/core/arrow_kernels/request/request.h
index c6398b45910..8a5efa8e118 100644
--- a/ydb/library/yql/core/arrow_kernels/request/request.h
+++ b/ydb/library/yql/core/arrow_kernels/request/request.h
@@ -16,7 +16,7 @@ public:
};
enum class EBinaryOp {
- And,
+ And = 0,
Or,
Xor,