summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/formats/arrow/arrow_filter.h2
-rw-r--r--ydb/core/formats/arrow/hash/calcer.cpp11
-rw-r--r--ydb/core/formats/arrow/hash/calcer.h1
-rw-r--r--ydb/core/formats/arrow/ut/ut_hash.cpp60
-rw-r--r--ydb/core/formats/arrow/ut/ya.make1
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp125
-rw-r--r--ydb/core/protos/ssa.proto2
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h10
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp34
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.h4
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp71
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp38
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h2
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp25
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h23
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h2
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp18
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h13
-rw-r--r--ydb/core/tx/columnshard/hooks/abstract/abstract.h2
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.h7
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.h9
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.h13
-rw-r--r--ydb/core/tx/columnshard/splitter/rb_splitter.cpp36
-rw-r--r--ydb/core/tx/columnshard/splitter/rb_splitter.h7
-rw-r--r--ydb/core/tx/schemeshard/olap/indexes/schema.cpp5
27 files changed, 456 insertions, 83 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h
index 29dc8471c54..b33b9a13707 100644
--- a/ydb/core/formats/arrow/arrow_filter.h
+++ b/ydb/core/formats/arrow/arrow_filter.h
@@ -46,13 +46,13 @@ private:
static ui32 CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2);
class TMergerImpl;
- void Add(const bool value, const ui32 count = 1);
void Reset(const ui32 count);
void ResetCaches() const {
FilterPlain.reset();
FilteredCount.reset();
}
public:
+ void Add(const bool value, const ui32 count = 1);
std::optional<ui32> GetFilteredCount() const;
const std::vector<bool>& BuildSimpleFilter() const;
std::shared_ptr<arrow::BooleanArray> BuildArrowFilter(const ui32 expectedSize, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
diff --git a/ydb/core/formats/arrow/hash/calcer.cpp b/ydb/core/formats/arrow/hash/calcer.cpp
index f5d2dc3515e..b9833216dc6 100644
--- a/ydb/core/formats/arrow/hash/calcer.cpp
+++ b/ydb/core/formats/arrow/hash/calcer.cpp
@@ -18,9 +18,9 @@ void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TSt
auto& typedScalar = static_cast<const TScalar&>(*scalar);
if constexpr (arrow::has_string_view<T>()) {
- hashCalcer.Update((const ui8*)typedScalar.value->data(), typedScalar.value->size());
+ hashCalcer.Update(reinterpret_cast<const ui8*>(typedScalar.value->data()), typedScalar.value->size());
} else if constexpr (arrow::has_c_type<T>()) {
- hashCalcer.Update((const ui8*)(typedScalar.data()), sizeof(T));
+ hashCalcer.Update(reinterpret_cast<const ui8*>(typedScalar.data()), sizeof(typedScalar.value));
} else {
static_assert(arrow::is_decimal_type<T>());
}
@@ -130,4 +130,11 @@ std::vector<std::shared_ptr<arrow::Array>> TXX64::GetColumns(const std::shared_p
return columns;
}
+ui64 TXX64::CalcHash(const std::shared_ptr<arrow::Scalar>& scalar) {
+ NXX64::TStreamStringHashCalcer calcer(0);
+ calcer.Start();
+ AppendField(scalar, calcer);
+ return calcer.Finish();
+}
+
}
diff --git a/ydb/core/formats/arrow/hash/calcer.h b/ydb/core/formats/arrow/hash/calcer.h
index 511c8c40187..d7f49e2ef1e 100644
--- a/ydb/core/formats/arrow/hash/calcer.h
+++ b/ydb/core/formats/arrow/hash/calcer.h
@@ -31,6 +31,7 @@ public:
static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer);
static void AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer);
+ static ui64 CalcHash(const std::shared_ptr<arrow::Scalar>& scalar);
std::optional<std::vector<ui64>> Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const;
std::shared_ptr<arrow::Array> ExecuteToArray(const std::shared_ptr<arrow::RecordBatch>& batch, const std::string& hashFieldName) const;
};
diff --git a/ydb/core/formats/arrow/ut/ut_hash.cpp b/ydb/core/formats/arrow/ut/ut_hash.cpp
new file mode 100644
index 00000000000..3255d430352
--- /dev/null
+++ b/ydb/core/formats/arrow/ut/ut_hash.cpp
@@ -0,0 +1,60 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/hash/xx_hash.h>
+#include <ydb/core/formats/arrow/hash/calcer.h>
+
+Y_UNIT_TEST_SUITE(Hash) {
+
+ using namespace NKikimr::NArrow;
+
+ Y_UNIT_TEST(ScalarBinaryHash) {
+ std::shared_ptr<arrow::Scalar> s1 = std::make_shared<arrow::StringScalar>("abcde");
+ std::shared_ptr<arrow::Scalar> s2 = std::make_shared<arrow::StringScalar>("abcde");
+ NHash::NXX64::TStreamStringHashCalcer calcer1(0);
+ calcer1.Start();
+ NHash::TXX64::AppendField(s1, calcer1);
+
+ NHash::NXX64::TStreamStringHashCalcer calcer2(0);
+ calcer2.Start();
+ NHash::TXX64::AppendField(s2, calcer2);
+ const ui64 hash = calcer1.Finish();
+ Cerr << hash << Endl;
+ Y_ABORT_UNLESS(hash == calcer2.Finish());
+ }
+
+ Y_UNIT_TEST(ScalarCTypeHash) {
+ std::shared_ptr<arrow::Scalar> s1 = std::make_shared<arrow::UInt32Scalar>(52);
+ std::shared_ptr<arrow::Scalar> s2 = std::make_shared<arrow::UInt32Scalar>(52);
+ NHash::NXX64::TStreamStringHashCalcer calcer1(0);
+ calcer1.Start();
+ NHash::TXX64::AppendField(s1, calcer1);
+
+ NHash::NXX64::TStreamStringHashCalcer calcer2(0);
+ calcer2.Start();
+ NHash::TXX64::AppendField(s2, calcer2);
+ const ui64 hash = calcer1.Finish();
+ Cerr << hash << Endl;
+ Y_ABORT_UNLESS(hash == calcer2.Finish());
+ }
+
+ Y_UNIT_TEST(ScalarCompositeHash) {
+ std::shared_ptr<arrow::Scalar> s11 = std::make_shared<arrow::StringScalar>("abcde");
+ std::shared_ptr<arrow::Scalar> s12 = std::make_shared<arrow::UInt32Scalar>(52);
+ std::shared_ptr<arrow::Scalar> s21 = std::make_shared<arrow::StringScalar>("abcde");
+ std::shared_ptr<arrow::Scalar> s22 = std::make_shared<arrow::UInt32Scalar>(52);
+ NHash::NXX64::TStreamStringHashCalcer calcer1(0);
+ calcer1.Start();
+ NHash::TXX64::AppendField(s11, calcer1);
+ NHash::TXX64::AppendField(s12, calcer1);
+
+ NHash::NXX64::TStreamStringHashCalcer calcer2(0);
+ calcer2.Start();
+ NHash::TXX64::AppendField(s21, calcer2);
+ NHash::TXX64::AppendField(s22, calcer2);
+ const ui64 hash = calcer1.Finish();
+ Cerr << hash << Endl;
+ Y_ABORT_UNLESS(hash == calcer2.Finish());
+ }
+
+
+};
diff --git a/ydb/core/formats/arrow/ut/ya.make b/ydb/core/formats/arrow/ut/ya.make
index 312e9033e7e..c4c993ec322 100644
--- a/ydb/core/formats/arrow/ut/ya.make
+++ b/ydb/core/formats/arrow/ut/ya.make
@@ -28,6 +28,7 @@ SRCS(
ut_dictionary.cpp
ut_size_calcer.cpp
ut_column_filter.cpp
+ ut_hash.cpp
)
END()
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index e38258931b1..c7fa1dae577 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -1229,14 +1229,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
- Tests::NCommon::TLoggerInit(kikimr).Initialize();
+// Tests::NCommon::TLoggerInit(kikimr).Initialize();
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
- FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.1}`);
+ FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
@@ -1245,13 +1245,17 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
- FEATURES=`{"column_names" : ["resource_id", "uid"], "false_positive_probability" : 0.2}`);
+ FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
+ std::vector<TString> uids;
+ std::vector<TString> resourceIds;
+ std::vector<ui32> levels;
+
{
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
@@ -1260,6 +1264,23 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000);
+
+ const auto filler = [&](const ui32 startRes, const ui32 startUid, const ui32 count) {
+ for (ui32 i = 0; i < count; ++i) {
+ uids.emplace_back("uid_" + ::ToString(startUid + i));
+ resourceIds.emplace_back(::ToString(startRes + i));
+ levels.emplace_back(i % 5);
+ }
+ };
+
+ filler(1000000, 300000000, 10000);
+ filler(1100000, 300100000, 10000);
+ filler(1200000, 300200000, 10000);
+ filler(1300000, 300300000, 10000);
+ filler(1400000, 300400000, 10000);
+ filler(2000000, 200000000, 70000);
+ filler(3000000, 100000000, 110000);
+
}
{
@@ -1276,7 +1297,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Cout << result << Endl;
CompareYson(result, R"([[230000u;]])");
}
+
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 0);
+ AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 0);
TInstant start = Now();
ui32 compactionsStart = csController->GetCompactions().Val();
while (Now() - start < TDuration::Seconds(10)) {
@@ -1295,35 +1318,97 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
SELECT
COUNT(*)
FROM `/Root/olapStore/olapTable`
- WHERE resource_id = '3000008' AND level = 3 AND uid = 'uid_100000008'
+ WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222'
)").GetValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cout << result << Endl;
+ Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << Endl;
+ CompareYson(result, R"([[0u;]])");
+ AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0);
+ AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 17);
+ AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 4);
+ }
+ ui32 requestsCount = 100;
+ for (ui32 i = 0; i < requestsCount; ++i) {
+ const ui32 idx = RandomNumber<ui32>(uids.size());
+ const auto query = [](const TString& res, const TString& uid, const ui32 level) {
+ TStringBuilder sb;
+ sb << "SELECT" << Endl;
+ sb << "COUNT(*)" << Endl;
+ sb << "FROM `/Root/olapStore/olapTable`" << Endl;
+ sb << "WHERE(" << Endl;
+ sb << "resource_id = '" << res << "' AND" << Endl;
+ sb << "uid= '" << uid << "' AND" << Endl;
+ sb << "level= " << level << Endl;
+ sb << ")";
+ return sb;
+ };
+ auto it = tableClient.StreamExecuteScanQuery(query(resourceIds[idx], uids[idx], levels[idx])).GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << " / " << csController->GetIndexesSkippedNoData().Val() << Endl;
CompareYson(result, R"([[1u;]])");
- AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val());
- AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val());
}
+ AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() / csController->GetIndexesSkippingOnSelect().Val() < 0.15);
+
+ }
+
+ Y_UNIT_TEST(IndexesModificationError) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+
+ TLocalHelper(kikimr).CreateTestOlapTable();
+ auto tableClient = kikimr.GetTableClient();
+
+ // Tests::NCommon::TLoggerInit(kikimr).Initialize();
+
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+
{
- const i64 before = csController->GetIndexesSkippingOnSelect().Val();
- auto it = tableClient.StreamExecuteScanQuery(R"(
- --!syntax_v1
+ auto alterQuery = TStringBuilder() <<
+ R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
+ FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`);
+ )";
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
+ }
- SELECT
- COUNT(*)
- FROM `/Root/olapStore/olapTable`
- WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222'
- )").GetValueSync();
+ {
+ auto alterQuery = TStringBuilder() <<
+ R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
+ FEATURES=`{"column_names" : ["uid", "resource_id"], "false_positive_probability" : 0.05}`);
+ )";
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_UNEQUAL(alterResult.GetStatus(), EStatus::SUCCESS);
+ }
- UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
- TString result = StreamResultToYson(it);
- AFL_VERIFY(before != csController->GetIndexesSkippingOnSelect().Val());
- Cout << result << Endl;
- CompareYson(result, R"([[0u;]])");
- AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val());
+ {
+ auto alterQuery = TStringBuilder() <<
+ R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
+ FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.005}`);
+ )";
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_UNEQUAL(alterResult.GetStatus(), EStatus::SUCCESS);
+ }
+
+ {
+ auto alterQuery = TStringBuilder() <<
+ R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
+ FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.01}`);
+ )";
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
+
}
Y_UNIT_TEST(PushdownFilter) {
diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto
index 92e5738a3ac..0b48c433242 100644
--- a/ydb/core/protos/ssa.proto
+++ b/ydb/core/protos/ssa.proto
@@ -41,7 +41,7 @@ message TProgram {
}
message TBloomFilterChecker {
- repeated uint32 HashValues = 1;
+ repeated uint64 HashValues = 1;
}
message TOlapIndexChecker {
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 6dfe9d7e1e2..d7fbdf27d50 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -298,9 +298,10 @@ struct Schema : NIceDb::Schema {
struct Blob: Column<5, NScheme::NTypeIds::String> {};
struct Offset: Column<6, NScheme::NTypeIds::Uint32> {};
struct Size: Column<7, NScheme::NTypeIds::Uint32> {};
+ struct RecordsCount: Column<8, NScheme::NTypeIds::Uint32> {};
using TKey = TableKey<PathId, PortionId, IndexId, ChunkIdx>;
- using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size>;
+ using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size, RecordsCount>;
};
using TTables = SchemaTables<
@@ -618,14 +619,17 @@ class TIndexChunkLoadContext {
private:
YDB_READONLY_DEF(TBlobRange, BlobRange);
TChunkAddress Address;
+ const ui32 RecordsCount;
public:
TIndexChunk BuildIndexChunk() const {
- return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), BlobRange);
+ return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), RecordsCount, BlobRange);
}
template <class TSource>
TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
- : Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>()) {
+ : Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
+ , RecordsCount(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RecordsCount>())
+ {
AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString());
TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexIndexes::Blob>();
Y_ABORT_UNLESS(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size());
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index 36b099471fb..2ff9533af29 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -98,26 +98,40 @@ void TChangesWithAppend::DoCompile(TFinalizationContext& context) {
}
std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch,
- const ui64 granule, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context) const {
+ const ui64 pathId, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context) const {
Y_ABORT_UNLESS(batch->num_rows());
auto resultSchema = context.SchemaVersions.GetSchema(snapshot);
- std::vector<TPortionInfoWithBlobs> out;
std::shared_ptr<NOlap::TSerializationStats> stats = std::make_shared<NOlap::TSerializationStats>();
if (granuleMeta) {
stats = granuleMeta->BuildSerializationStats(resultSchema);
}
auto schema = std::make_shared<TDefaultSchemaDetails>(resultSchema, SaverContext, stats);
- TRBSplitLimiter limiter(context.Counters.SplitterCounters, schema, batch, SplitSettings);
+ std::vector<TPortionInfoWithBlobs> out;
+ {
+ std::vector<TBatchSerializedSlice> pages = TRBSplitLimiter::BuildSimpleSlices(batch, SplitSettings, context.Counters.SplitterCounters, schema);
+ std::vector<TGeneralSerializedSlice> generalPages;
+ for (auto&& i : pages) {
+ std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> portionColumns = i.GetPortionChunks();
+ resultSchema->GetIndexInfo().AppendIndexes(portionColumns);
+ generalPages.emplace_back(portionColumns, schema, context.Counters.SplitterCounters, SplitSettings);
+ }
+
+ TSimilarSlicer slicer(SplitSettings.GetExpectedPortionSize());
+ auto packs = slicer.Split(generalPages);
- std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunkByBlobs;
- std::shared_ptr<arrow::RecordBatch> portionBatch;
- while (limiter.Next(chunkByBlobs, portionBatch)) {
- TPortionInfoWithBlobs infoWithBlob = TPortionInfoWithBlobs::BuildByBlobs(chunkByBlobs, nullptr, granule, snapshot, SaverContext.GetStorageOperator());
- infoWithBlob.GetPortionInfo().AddMetadata(*resultSchema, portionBatch, SaverContext.GetTierName());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portion_appended", infoWithBlob.GetPortionInfo().DebugString());
- out.emplace_back(std::move(infoWithBlob));
+ ui32 recordIdx = 0;
+ for (auto&& i : packs) {
+ TGeneralSerializedSlice slice(std::move(i));
+ auto b = batch->Slice(recordIdx, slice.GetRecordsCount());
+ std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunksByBlobs = slice.GroupChunksByBlobs();
+ out.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, pathId, snapshot, SaverContext.GetStorageOperator()));
+ NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
+ NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
+ out.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName());
+ recordIdx += slice.GetRecordsCount();
+ }
}
return out;
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h
index 3917a035227..a56197774c4 100644
--- a/ydb/core/tx/columnshard/engines/portions/column_record.h
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.h
@@ -20,12 +20,14 @@ class TIndexChunk {
private:
YDB_READONLY(ui32, IndexId, 0);
YDB_READONLY(ui32, ChunkIdx, 0);
+ YDB_READONLY(ui32, RecordsCount, 0);
YDB_READONLY_DEF(TBlobRange, BlobRange);
public:
- TIndexChunk(const ui32 indexId, const ui32 chunkIdx, const TBlobRange& blobRange)
+ TIndexChunk(const ui32 indexId, const ui32 chunkIdx, const ui32 recordsCount, const TBlobRange& blobRange)
: IndexId(indexId)
, ChunkIdx(chunkIdx)
+ , RecordsCount(recordsCount)
, BlobRange(blobRange) {
}
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index fdf35132ff1..3deb26a0a73 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -235,6 +235,77 @@ void TPortionInfo::SaveToDatabase(IDbWrapper& db) const {
}
}
+std::vector<NKikimr::NOlap::TPortionInfo::TPage> TPortionInfo::BuildPages() const {
+ std::vector<TPage> pages;
+ struct TPart {
+ public:
+ const TColumnRecord* Record = nullptr;
+ const TIndexChunk* Index = nullptr;
+ const ui32 RecordsCount;
+ TPart(const TColumnRecord* record, const ui32 recordsCount)
+ : Record(record)
+ , RecordsCount(recordsCount) {
+
+ }
+ TPart(const TIndexChunk* record, const ui32 recordsCount)
+ : Index(record)
+ , RecordsCount(recordsCount) {
+
+ }
+ };
+ std::map<ui32, std::deque<TPart>> entities;
+ std::map<ui32, ui32> currentCursor;
+ ui32 currentSize = 0;
+ ui32 currentId = 0;
+ for (auto&& i : Records) {
+ if (currentId != i.GetColumnId()) {
+ currentSize = 0;
+ currentId = i.GetColumnId();
+ }
+ currentSize += i.GetMeta().GetNumRowsVerified();
+ ++currentCursor[currentSize];
+ entities[i.GetColumnId()].emplace_back(&i, i.GetMeta().GetNumRowsVerified());
+ }
+ for (auto&& i : Indexes) {
+ if (currentId != i.GetIndexId()) {
+ currentSize = 0;
+ currentId = i.GetIndexId();
+ }
+ currentSize += i.GetRecordsCount();
+ ++currentCursor[currentSize];
+ entities[i.GetIndexId()].emplace_back(&i, i.GetRecordsCount());
+ }
+ const ui32 entitiesCount = entities.size();
+ ui32 predCount = 0;
+ for (auto&& i : currentCursor) {
+ if (i.second != entitiesCount) {
+ continue;
+ }
+ std::vector<const TColumnRecord*> records;
+ std::vector<const TIndexChunk*> indexes;
+ for (auto&& c : entities) {
+ ui32 readyCount = 0;
+ while (readyCount < i.first - predCount && c.second.size()) {
+ if (c.second.front().Record) {
+ records.emplace_back(c.second.front().Record);
+ } else {
+ AFL_VERIFY(c.second.front().Index);
+ indexes.emplace_back(c.second.front().Index);
+ }
+ readyCount += c.second.front().RecordsCount;
+ c.second.pop_front();
+ }
+ AFL_VERIFY(readyCount == i.first - predCount)("ready", readyCount)("cursor", i.first)("pred_cursor", predCount);
+ }
+ pages.emplace_back(std::move(records), std::move(indexes), i.first - predCount);
+ predCount = i.first;
+ }
+ for (auto&& i : entities) {
+ AFL_VERIFY(i.second.empty());
+ }
+ return pages;
+}
+
std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const {
Y_ABORT_UNLESS(!Blobs.empty());
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index 225c1381c79..ffca573d8cc 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -30,6 +30,23 @@ private:
YDB_READONLY_DEF(std::vector<TIndexChunk>, Indexes);
public:
+ class TPage {
+ private:
+ YDB_READONLY_DEF(std::vector<const TColumnRecord*>, Records);
+ YDB_READONLY_DEF(std::vector<const TIndexChunk*>, Indexes);
+ YDB_READONLY(ui32, RecordsCount, 0);
+ public:
+ TPage(std::vector<const TColumnRecord*>&& records, std::vector<const TIndexChunk*>&& indexes, const ui32 recordsCount)
+ : Records(std::move(records))
+ , Indexes(std::move(indexes))
+ , RecordsCount(recordsCount)
+ {
+
+ }
+ };
+
+ std::vector<TPage> BuildPages() const;
+
std::vector<TColumnRecord> Records;
const std::vector<TColumnRecord>& GetRecords() const {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
index e0118a290b0..6bc4ddf410b 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
@@ -121,6 +121,7 @@ bool TPortionDataSource::DoStartFetchingIndexes(const std::shared_ptr<IDataSourc
}
if (!readAction->GetExpectedBlobsSize()) {
+ NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed({});
return false;
}
@@ -136,24 +137,37 @@ void TPortionDataSource::DoAbort() {
void TPortionDataSource::DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexChecker) {
THashMap<ui32, std::vector<TString>> indexBlobs;
std::set<ui32> indexIds = indexChecker->GetIndexIds();
- for (auto&& i : Portion->GetIndexes()) {
- if (!indexIds.contains(i.GetIndexId())) {
- continue;
+// NActors::TLogContextGuard gLog = NActors::TLogContextBuilder::Build()("records_count", GetRecordsCount())("portion_id", Portion->GetAddress().DebugString());
+ std::vector<TPortionInfo::TPage> pages = Portion->BuildPages();
+ NArrow::TColumnFilter constructor = NArrow::TColumnFilter::BuildAllowFilter();
+ for (auto&& p : pages) {
+ for (auto&& i : p.GetIndexes()) {
+ if (!indexIds.contains(i->GetIndexId())) {
+ continue;
+ }
+ indexBlobs[i->GetIndexId()].emplace_back(StageData->ExtractBlob(i->GetBlobRange()));
}
- indexBlobs[i.GetIndexId()].emplace_back(StageData->ExtractBlob(i.GetBlobRange()));
- }
- for (auto&& i : indexIds) {
- if (!indexBlobs.contains(i)) {
- return;
+ for (auto&& i : indexIds) {
+ if (!indexBlobs.contains(i)) {
+ return;
+ }
+ }
+ if (indexChecker->Check(indexBlobs)) {
+ NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(true);
+ constructor.Add(true, p.GetRecordsCount());
+ } else {
+ NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(false);
+ constructor.Add(false, p.GetRecordsCount());
}
}
- if (!indexChecker->Check(indexBlobs)) {
- NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(false);
+ AFL_VERIFY(constructor.Size() == Portion->GetRecordsCount());
+ if (constructor.IsTotalDenyFilter()) {
StageData->AddFilter(NArrow::TColumnFilter::BuildDenyFilter());
+ } else if (constructor.IsTotalAllowFilter()) {
+ return;
} else {
- NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(true);
+ StageData->AddFilter(constructor);
}
- return;
}
bool TCommittedDataSource::DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& /*columns*/) {
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h
index 04d6646c9b6..17bb40a2b96 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h
@@ -78,7 +78,9 @@ private:
static inline auto Registrator = TFactory::TRegistrator<TOrIndexChecker>(GetClassNameStatic());
protected:
virtual bool DoCheck(const THashMap<ui32, std::vector<TString>>& blobsByIndexId) const override {
+// ui32 idx = 0;
for (auto&& i : Checkers) {
+// NActors::TLogContextGuard gLog = NActors::TLogContextBuilder::Build()("branch", idx++);
if (i->Check(blobsByIndexId)) {
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp
index c64d9b11a29..4debc30257d 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp
@@ -9,20 +9,25 @@
namespace NKikimr::NOlap::NIndexes {
void TPortionIndexChunk::DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const {
- portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdx(), bRange));
+ portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdx(), RecordsCount, bRange));
}
std::shared_ptr<NKikimr::NOlap::IPortionDataChunk> TIndexByColumns::DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
+ AFL_VERIFY(data.size());
std::vector<TChunkedColumnReader> columnReaders;
for (auto&& i : ColumnIds) {
auto it = data.find(i);
AFL_VERIFY(it != data.end());
columnReaders.emplace_back(it->second, indexInfo.GetColumnLoaderVerified(i));
}
+ ui32 recordsCount = 0;
+ for (auto&& i : data.begin()->second) {
+ recordsCount += i->GetRecordsCountVerified();
+ }
TChunkedBatchReader reader(std::move(columnReaders));
std::shared_ptr<arrow::RecordBatch> indexBatch = DoBuildIndexImpl(reader);
const TString indexData = TColumnSaver(nullptr, Serializer).Apply(indexBatch);
- return std::make_shared<TPortionIndexChunk>(indexId, indexData);
+ return std::make_shared<TPortionIndexChunk>(indexId, recordsCount, indexData);
}
bool TIndexByColumns::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& /*proto*/) {
@@ -37,4 +42,20 @@ TIndexByColumns::TIndexByColumns(const ui32 indexId, const std::set<ui32>& colum
Serializer = std::make_shared<NArrow::NSerialization::TFullDataSerializer>(arrow::ipc::IpcWriteOptions::Defaults());
}
+NKikimr::TConclusionStatus TIndexByColumns::CheckSameColumnsForModification(const IIndexMeta& newMeta) const {
+ const auto* bMeta = dynamic_cast<const TIndexByColumns*>(&newMeta);
+ if (!bMeta) {
+ return TConclusionStatus::Fail("cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName());
+ }
+ if (bMeta->ColumnIds.size() != ColumnIds.size()) {
+ return TConclusionStatus::Fail("columns count is different");
+ }
+ for (auto&& i : bMeta->ColumnIds) {
+ if (!ColumnIds.contains(i)) {
+ return TConclusionStatus::Fail("columns set is different or column was recreated in database");
+ }
+ }
+ return TConclusionStatus::Success();
+}
+
} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h
index d80da770d1b..28d8825cb10 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h
@@ -5,6 +5,7 @@
#include <ydb/core/tx/columnshard/splitter/chunks.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/services/bg_tasks/abstract/interface.h>
+#include <ydb/library/conclusion/status.h>
#include <library/cpp/object_factory/object_factory.h>
@@ -31,6 +32,7 @@ protected:
virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const = 0;
virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0;
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0;
+ virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& newMeta) const = 0;
public:
using TFactory = NObjectFactory::TObjectFactory<IIndexMeta, TString>;
@@ -43,6 +45,16 @@ public:
}
+ TConclusionStatus CheckModificationCompatibility(const std::shared_ptr<IIndexMeta>& newMeta) const {
+ if (!newMeta) {
+ return TConclusionStatus::Fail("new meta cannot be absent");
+ }
+ if (newMeta->GetClassName() != GetClassName()) {
+ return TConclusionStatus::Fail("new meta have to be same index class (" + GetClassName() + "), but new class name: " + newMeta->GetClassName());
+ }
+ return DoCheckModificationCompatibility(*newMeta);
+ }
+
virtual ~IIndexMeta() = default;
std::shared_ptr<IPortionDataChunk> BuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
@@ -83,6 +95,7 @@ public:
class TPortionIndexChunk: public IPortionDataChunk {
private:
using TBase = IPortionDataChunk;
+ const ui32 RecordsCount;
const TString Data;
protected:
virtual const TString& DoGetData() const override {
@@ -98,7 +111,7 @@ protected:
return false;
}
virtual std::optional<ui32> DoGetRecordsCount() const override {
- return {};
+ return RecordsCount;
}
virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const override {
return nullptr;
@@ -108,8 +121,9 @@ protected:
}
virtual void DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const override;
public:
- TPortionIndexChunk(const ui32 entityId, const TString& data)
- : TBase(entityId, 0)
+ TPortionIndexChunk(const ui32 entityId, const ui32 recordsCount, const TString& data)
+ : TBase(entityId, 0)
+ , RecordsCount(recordsCount)
, Data(data)
{
}
@@ -126,6 +140,9 @@ protected:
virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const override final;
virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& /*proto*/) override;
+
+ TConclusionStatus CheckSameColumnsForModification(const IIndexMeta& newMeta) const;
+
public:
TIndexByColumns() = default;
TIndexByColumns(const ui32 indexId, const std::set<ui32>& columnIds);
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h
index 6fa589899a1..f50566a0ff4 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h
@@ -12,7 +12,7 @@ protected:
virtual bool DoCheck(const THashMap<ui32, std::vector<TString>>& blobs) const override final {
auto it = blobs.find(IndexId);
AFL_VERIFY(it != blobs.end());
- return DoCheckImpl(std::move(it->second));
+ return DoCheckImpl(it->second);
}
virtual bool DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) = 0;
virtual void DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const = 0;
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp
index ac368bf8a46..368589b11c3 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp
@@ -27,6 +27,7 @@ bool TBloomFilterChecker::DoCheckImpl(const std::vector<TString>& blobs) const {
}
}
if (found) {
+// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("size", bArray.length())("data", bArray.ToString())("index_id", GetIndexId());
return true;
}
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp
index f24100e81d3..3c52518d0b8 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp
@@ -11,18 +11,23 @@
namespace NKikimr::NOlap::NIndexes {
std::shared_ptr<arrow::RecordBatch> TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const {
- std::vector<bool> flags;
- flags.resize(BitsCount, false);
+ std::set<ui64> hashes;
for (ui32 i = 0; i < HashesCount; ++i) {
NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(3 * i);
- for (; reader.IsCorrect(); reader.ReadNext()) {
+ for (reader.Start(); reader.IsCorrect(); reader.ReadNext()) {
hashCalcer.Start();
for (auto&& i : reader) {
NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer);
}
- flags[hashCalcer.Finish() % BitsCount] = true;
+ const ui64 h = hashCalcer.Finish();
+ hashes.emplace(h);
}
}
+ const ui32 bitsCount = hashes.size() / std::log(2);
+ std::vector<bool> flags(bitsCount, false);
+ for (auto&& i : hashes) {
+ flags[i % flags.size()] = true;
+ }
arrow::BooleanBuilder builder;
auto res = builder.Reserve(flags.size());
@@ -30,7 +35,7 @@ std::shared_ptr<arrow::RecordBatch> TBloomIndexMeta::DoBuildIndexImpl(TChunkedBa
std::shared_ptr<arrow::BooleanArray> out;
NArrow::TStatusValidator::Validate(builder.Finish(&out));
- return arrow::RecordBatch::Make(ResultSchema, BitsCount, {out});
+ return arrow::RecordBatch::Make(ResultSchema, bitsCount, {out});
}
void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const {
@@ -58,7 +63,8 @@ void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataF
for (auto&& i : foundColumns) {
NArrow::NHash::TXX64::AppendField(i.second, calcer);
}
- hashes.emplace(calcer.Finish());
+ const ui64 hash = calcer.Finish();
+ hashes.emplace(hash);
}
branch->MutableIndexes().emplace_back(std::make_shared<TBloomFilterChecker>(GetIndexId(), std::move(hashes)));
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h
index bc4db867d60..b077c6cfc6f 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h
@@ -10,20 +10,25 @@ public:
private:
using TBase = TIndexByColumns;
std::shared_ptr<arrow::Schema> ResultSchema;
- const ui64 RowsCountExpectation = 10000;
double FalsePositiveProbability = 0.1;
ui32 HashesCount = 0;
- ui32 BitsCount = 0;
static inline auto Registrator = TFactory::TRegistrator<TBloomIndexMeta>(GetClassNameStatic());
void Initialize() {
AFL_VERIFY(!ResultSchema);
std::vector<std::shared_ptr<arrow::Field>> fields = {std::make_shared<arrow::Field>("", arrow::TypeTraits<arrow::BooleanType>::type_singleton())};
ResultSchema = std::make_shared<arrow::Schema>(fields);
- AFL_VERIFY(FalsePositiveProbability < 1 && FalsePositiveProbability > 0.01);
+ AFL_VERIFY(FalsePositiveProbability < 1 && FalsePositiveProbability >= 0.01);
HashesCount = -1 * std::log(FalsePositiveProbability) / std::log(2);
- BitsCount = RowsCountExpectation * HashesCount / std::log(2);
}
protected:
+ virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& newMeta) const override {
+ const auto* bMeta = dynamic_cast<const TBloomIndexMeta*>(&newMeta);
+ if (!bMeta) {
+ return TConclusionStatus::Fail("cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName());
+ }
+ AFL_VERIFY(FalsePositiveProbability < 1 && FalsePositiveProbability >= 0.01);
+ return TBase::CheckSameColumnsForModification(newMeta);
+ }
virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override;
virtual std::shared_ptr<arrow::RecordBatch> DoBuildIndexImpl(TChunkedBatchReader& reader) const override;
diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
index 8a3530fc731..e9b9144cacb 100644
--- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
@@ -82,7 +82,7 @@ public:
bool OnStartCompaction(std::shared_ptr<NOlap::TColumnEngineChanges>& changes) {
return DoOnStartCompaction(changes);
}
- virtual void OnIndexSelectProcessed(const bool /*result*/) {
+ virtual void OnIndexSelectProcessed(const std::optional<bool> /*result*/) {
}
virtual EOptimizerCompactionWeightControl GetCompactionControl() const {
return EOptimizerCompactionWeightControl::Force;
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h
index a6680395472..7fc824508e9 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.h
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.h
@@ -10,6 +10,7 @@ private:
YDB_READONLY(TAtomicCounter, Indexations, 0);
YDB_READONLY(TAtomicCounter, IndexesSkippingOnSelect, 0);
YDB_READONLY(TAtomicCounter, IndexesApprovedOnSelect, 0);
+ YDB_READONLY(TAtomicCounter, IndexesSkippedNoData, 0);
YDB_ACCESSOR(std::optional<TDuration>, GuaranteeIndexationInterval, TDuration::Zero());
YDB_ACCESSOR(std::optional<TDuration>, PeriodicWakeupActivationPeriod, std::nullopt);
YDB_ACCESSOR(std::optional<TDuration>, StatsReportInterval, std::nullopt);
@@ -40,8 +41,10 @@ protected:
}
public:
- virtual void OnIndexSelectProcessed(const bool result) override {
- if (result) {
+ virtual void OnIndexSelectProcessed(const std::optional<bool> result) override {
+ if (!result) {
+ IndexesSkippedNoData.Inc();
+ } else if (*result) {
IndexesApprovedOnSelect.Inc();
} else {
IndexesSkippingOnSelect.Inc();
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h
index d86db7ae8ca..a97a1e1770e 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.h
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.h
@@ -108,6 +108,15 @@ protected:
}
public:
+
+ std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> GetPortionChunks() const {
+ std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> result;
+ for (auto&& i : Data) {
+ AFL_VERIFY(result.emplace(i.GetEntityId(), i.GetChunks()).second);
+ }
+ return result;
+ }
+
std::shared_ptr<arrow::RecordBatch> GetFirstLastPKBatch(const std::shared_ptr<arrow::Schema>& pkSchema) const {
std::vector<std::shared_ptr<arrow::Array>> pkColumns;
for (auto&& i : pkSchema->fields()) {
diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h
index 1623bd2914d..73f88f15e5a 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.h
+++ b/ydb/core/tx/columnshard/splitter/chunks.h
@@ -139,6 +139,12 @@ public:
: Chunks(chunks)
, Loader(loader)
{
+ Start();
+ }
+
+ void Start() {
+ CurrentChunkIndex = 0;
+ CurrentRecordIndex = 0;
if (Chunks.size()) {
CurrentChunk = Loader->ApplyVerifiedColumn(Chunks.front()->GetData());
}
@@ -191,6 +197,13 @@ public:
return IsCorrectFlag;
}
+ void Start() {
+ IsCorrectFlag = true;
+ for (auto&& i : Columns) {
+ i.Start();
+ }
+ }
+
bool ReadNext() {
std::optional<bool> result;
for (auto&& i : Columns) {
diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
index 5fae273e44c..698a9e6f3e4 100644
--- a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
+++ b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
@@ -10,20 +10,7 @@ TRBSplitLimiter::TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters
, Settings(settings)
{
Y_ABORT_UNLESS(Batch->num_rows());
- std::vector<TBatchSerializedSlice> slices;
- auto stats = schemaInfo->GetBatchSerializationStats(Batch);
- ui32 recordsCount = Settings.GetMinRecordsCount();
- if (stats) {
- const ui32 recordsCountForMinSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), Settings.GetMinBlobSize()).value_or(recordsCount);
- const ui32 recordsCountForMaxPortionSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), Settings.GetMaxPortionSize()).value_or(recordsCount);
- recordsCount = std::min(recordsCountForMaxPortionSize, std::max(recordsCount, recordsCountForMinSize));
- }
- auto linearSplitInfo = TSimpleSplitter::GetOptimalLinearSplitting(Batch->num_rows(), recordsCount);
- for (auto it = linearSplitInfo.StartIterator(); it.IsValid(); it.Next()) {
- std::shared_ptr<arrow::RecordBatch> current = batch->Slice(it.GetPosition(), it.GetCurrentPackSize());
- TBatchSerializedSlice slice(current, schemaInfo, Counters, settings);
- slices.emplace_back(std::move(slice));
- }
+ std::vector<TBatchSerializedSlice> slices = BuildSimpleSlices(Batch, Settings, Counters, schemaInfo);
auto chunks = TSimilarSlicer(Settings.GetMinBlobSize()).Split(slices);
ui32 chunkStartPosition = 0;
@@ -60,4 +47,25 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<std::shared_ptr<IPortionDataC
Slices.pop_front();
return true;
}
+
+std::vector<NKikimr::NOlap::TBatchSerializedSlice> TRBSplitLimiter::BuildSimpleSlices(const std::shared_ptr<arrow::RecordBatch>& batch, const TSplitSettings& settings,
+ const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const ISchemaDetailInfo::TPtr& schemaInfo)
+{
+ std::vector<TBatchSerializedSlice> slices;
+ auto stats = schemaInfo->GetBatchSerializationStats(batch);
+ ui32 recordsCount = settings.GetMinRecordsCount();
+ if (stats) {
+ const ui32 recordsCountForMinSize = stats->PredictOptimalPackRecordsCount(batch->num_rows(), settings.GetMinBlobSize()).value_or(recordsCount);
+ const ui32 recordsCountForMaxPortionSize = stats->PredictOptimalPackRecordsCount(batch->num_rows(), settings.GetMaxPortionSize()).value_or(recordsCount);
+ recordsCount = std::min(recordsCountForMaxPortionSize, std::max(recordsCount, recordsCountForMinSize));
+ }
+ auto linearSplitInfo = TSimpleSplitter::GetOptimalLinearSplitting(batch->num_rows(), recordsCount);
+ for (auto it = linearSplitInfo.StartIterator(); it.IsValid(); it.Next()) {
+ std::shared_ptr<arrow::RecordBatch> current = batch->Slice(it.GetPosition(), it.GetCurrentPackSize());
+ TBatchSerializedSlice slice(current, schemaInfo, counters, settings);
+ slices.emplace_back(std::move(slice));
+ }
+ return slices;
+}
+
}
diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.h b/ydb/core/tx/columnshard/splitter/rb_splitter.h
index 0b0356b86a5..e5d25f2dcef 100644
--- a/ydb/core/tx/columnshard/splitter/rb_splitter.h
+++ b/ydb/core/tx/columnshard/splitter/rb_splitter.h
@@ -56,6 +56,13 @@ public:
TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters,
ISchemaDetailInfo::TPtr schemaInfo, const std::shared_ptr<arrow::RecordBatch> batch, const TSplitSettings& settings);
+ static std::vector<TBatchSerializedSlice> BuildSimpleSlices(const std::shared_ptr<arrow::RecordBatch>& batch, const TSplitSettings& settings,
+ const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const ISchemaDetailInfo::TPtr& schemaInfo);
+
+ std::deque<TBatchSerializedSlice> ExtractSlices() {
+ return std::move(Slices);
+ }
+
bool Next(std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch);
};
diff --git a/ydb/core/tx/schemeshard/olap/indexes/schema.cpp b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp
index af35646873d..81aa18edff5 100644
--- a/ydb/core/tx/schemeshard/olap/indexes/schema.cpp
+++ b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp
@@ -26,6 +26,11 @@ bool TOlapIndexSchema::ApplyUpdate(const TOlapSchema& currentSchema, const TOlap
if (!object) {
return false;
}
+ auto conclusion = IndexMeta->CheckModificationCompatibility(object);
+ if (conclusion.IsFail()) {
+ errors.AddError("cannot modify index: " + conclusion.GetErrorMessage());
+ return false;
+ }
IndexMeta = NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMeta>(object);
return true;
}