aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-05-30 21:11:23 +0300
committerchertus <azuikov@ydb.tech>2023-05-30 21:11:23 +0300
commitcf7573c049542f9f4ed80fb0a1ab37de29dcd6dc (patch)
treeec6fb42c71c35f908b972f82a13d5916651a0129
parente012e39f33e7acf4091d11d5f73061ac0c06d7d6 (diff)
downloadydb-cf7573c049542f9f4ed80fb0a1ab37de29dcd6dc.tar.gz
enable composite marks in standalone ColumnTables by default
Предполагается следующий алгоритм миграции: 1. В транке включаем всем standalone аналитическим табличкам CompositeMarks 2. В 21.1 / 21.2 портируем протобуф и киляем таблетку, которая на страрте увидела флаг CompositeMarks 3. Для CloudLogging (таблички в ColumnStore-ах) накатываем CompositeMarks через версию: раскатываем 23.3 c поддержкой; включаем флажок в 23.4 4. Для внутренних логов при необходимости раскатываем версию из ветки с включенными композитными засечками
-rw-r--r--ydb/core/formats/arrow/replace_key.h6
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp28
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/tx_columnshard.proto6
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h30
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.cpp23
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h3
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h15
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h9
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp53
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h9
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp3
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp142
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp3
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp6
17 files changed, 246 insertions, 100 deletions
diff --git a/ydb/core/formats/arrow/replace_key.h b/ydb/core/formats/arrow/replace_key.h
index 74f7974c1f3..d14edcc8f8e 100644
--- a/ydb/core/formats/arrow/replace_key.h
+++ b/ydb/core/formats/arrow/replace_key.h
@@ -107,6 +107,12 @@ public:
return *(*Columns)[i];
}
+ std::shared_ptr<arrow::Array> ColumnPtr(int i) const {
+ Y_VERIFY_DEBUG(Columns);
+ Y_VERIFY_DEBUG((size_t)i < Columns->size());
+ return (*Columns)[i];
+ }
+
TReplaceKeyTemplate<const TArrayVec*> ToRaw() const {
if constexpr (IsOwning) {
return TReplaceKeyTemplate<const TArrayVec*>(Columns.get(), Position);
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 2779cd83b40..ce224790a20 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -1763,7 +1763,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_UNIT_TEST(AggregationCountPushdown) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
// EnableDebugLogging(kikimr);
@@ -1808,7 +1808,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_UNIT_TEST(AggregationCountGroupByPushdown) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
// EnableDebugLogging(kikimr);
@@ -1855,7 +1855,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_UNIT_TEST_TWIN(CountAllPushdown, UseLlvm) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
// EnableDebugLogging(kikimr);
@@ -1900,7 +1900,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_UNIT_TEST(CountAllNoPushdown) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
// EnableDebugLogging(kikimr);
@@ -2080,7 +2080,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
void TestAggregationsBase(const std::vector<TAggregationTestCase>& cases) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
//EnableDebugLogging(kikimr);
@@ -2174,7 +2174,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
void TestClickBenchBase(const std::vector<TAggregationTestCase>& cases) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
// EnableDebugLogging(kikimr);
@@ -2270,7 +2270,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
void TestTableWithNulls(const std::vector<TAggregationTestCase>& cases) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
EnableDebugLogging(kikimr);
@@ -4331,7 +4331,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
;
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
@@ -4387,7 +4387,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
;
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
@@ -4441,7 +4441,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
;
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
@@ -4775,7 +4775,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_UNIT_TEST(Olap_InsertFails) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
EnableDebugLogging(kikimr);
@@ -4795,7 +4795,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_UNIT_TEST(OlapRead_FailsOnDataQuery) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
EnableDebugLogging(kikimr);
@@ -4821,7 +4821,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_UNIT_TEST(OlapRead_UsesScanOnJoin) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
EnableDebugLogging(kikimr);
@@ -4844,7 +4844,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_UNIT_TEST(OlapRead_UsesScanOnJoinWithDataShardTable) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
TKikimrRunner kikimr(settings);
EnableDebugLogging(kikimr);
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index e8f91a7f319..a57a48658fc 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -808,6 +808,7 @@ message TFeatureFlags {
optional bool EnableForceImmediateEffectsExecution = 94 [default = false];
optional bool EnableTopicSplitMerge = 95 [default = false];
optional bool EnableChangefeedDynamoDBStreamsFormat = 96 [default = false];
+ optional bool ForceColumnTablesCompositeMarks = 97 [default = false];
}
message THttpProxyConfig {
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index 569c2885382..37abe8a8497 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -255,6 +255,10 @@ message TSchemaSeqNo {
optional uint64 Round = 2;
}
+message TIndexGranuleMeta {
+ optional uint32 MarkSize = 1; // Composite key mark (granule border) size: count of first PK elements in mark
+}
+
message TIndexPortionMeta {
oneof Produced {
bool IsInserted = 1;
@@ -263,7 +267,7 @@ message TIndexPortionMeta {
bool IsEvicted = 4;
}
optional string TierName = 5;
- optional bytes IndexKeyBorders = 6; // arrow::RecordBatch with first and last IndexKey rows
+ optional bytes PrimaryKeyBorders = 6; // arrow::RecordBatch with first and last ReplaceKey rows
}
message TIndexColumnMeta {
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index 7a213130a0c..e141a7be443 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -48,6 +48,7 @@ public:
FEATURE_FLAG_SETTER(EnableForceImmediateEffectsExecution)
FEATURE_FLAG_SETTER(EnableTopicSplitMerge)
FEATURE_FLAG_SETTER(EnableChangefeedDynamoDBStreamsFormat)
+ FEATURE_FLAG_SETTER(ForceColumnTablesCompositeMarks)
#undef FEATURE_FLAG_SETTER
};
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 29ead9b7053..b025ee9903d 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -211,9 +211,10 @@ struct Schema : NIceDb::Schema {
struct Granule : Column<4, NScheme::NTypeIds::Uint64> {}; // FK: {Index, Granule} -> TIndexColumns
struct PlanStep : Column<5, NScheme::NTypeIds::Uint64> {};
struct TxId : Column<6, NScheme::NTypeIds::Uint64> {};
+ struct Metadata : Column<7, NScheme::NTypeIds::String> {}; // NKikimrTxColumnShard.TIndexGranuleMeta
using TKey = TableKey<Index, PathId, IndexKey>;
- using TColumns = TableColumns<Index, PathId, IndexKey, Granule, PlanStep, TxId>;
+ using TColumns = TableColumns<Index, PathId, IndexKey, Granule, PlanStep, TxId, Metadata>;
};
struct IndexColumns : NIceDb::Schema::Table<ColumnsTableId> {
@@ -486,7 +487,7 @@ struct Schema : NIceDb::Schema {
ui64 indexTxId = rowset.GetValue<InsertTable::IndexTxId>();
indexSnapshot = NOlap::TSnapshot(indexPlanStep, indexTxId);
}
-
+
TString error;
NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error);
Y_VERIFY(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str());
@@ -521,10 +522,20 @@ struct Schema : NIceDb::Schema {
static void IndexGranules_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine,
const TGranuleRecord& row) {
+ TString metaStr;
+ const auto& indexInfo = engine.GetIndexInfo();
+ if (indexInfo.IsCompositeIndexKey()) {
+ NKikimrTxColumnShard::TIndexGranuleMeta meta;
+ Y_VERIFY(indexInfo.GetIndexKey());
+ meta.SetMarkSize(indexInfo.GetIndexKey()->num_fields());
+ Y_VERIFY(meta.SerializeToString(&metaStr));
+ }
+
db.Table<IndexGranules>().Key(index, row.PathId, engine.SerializeMark(row.Mark)).Update(
NIceDb::TUpdate<IndexGranules::Granule>(row.Granule),
NIceDb::TUpdate<IndexGranules::PlanStep>(row.GetCreatedAt().GetPlanStep()),
- NIceDb::TUpdate<IndexGranules::TxId>(row.GetCreatedAt().GetTxId())
+ NIceDb::TUpdate<IndexGranules::TxId>(row.GetCreatedAt().GetTxId()),
+ NIceDb::TUpdate<IndexGranules::Metadata>(metaStr)
);
}
@@ -545,8 +556,19 @@ struct Schema : NIceDb::Schema {
ui64 granule = rowset.GetValue<IndexGranules::Granule>();
ui64 planStep = rowset.GetValue<IndexGranules::PlanStep>();
ui64 txId = rowset.GetValue<IndexGranules::TxId>();
+ TString metaStr = rowset.GetValue<IndexGranules::Metadata>();
+
+ std::optional<ui32> markNumKeys;
+ if (metaStr.size()) {
+ NKikimrTxColumnShard::TIndexGranuleMeta meta;
+ Y_VERIFY(meta.ParseFromString(metaStr));
+ if (meta.HasMarkSize()) {
+ markNumKeys = meta.GetMarkSize();
+ }
+ }
- callback(TGranuleRecord(pathId, granule, NOlap::TSnapshot(planStep, txId), engine.DeserializeMark(indexKey)));
+ callback(TGranuleRecord(pathId, granule, NOlap::TSnapshot(planStep, txId),
+ engine.DeserializeMark(indexKey, markNumKeys)));
if (!rowset.Next())
return false;
diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp
index 430cbea2aa3..4a49cc67e5b 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine.cpp
@@ -11,7 +11,7 @@ TString TMark::SerializeScalar(const NArrow::TReplaceKey& key, const std::shared
}
NArrow::TReplaceKey TMark::DeserializeScalar(const TString& key, const std::shared_ptr<arrow::Schema>& schema) {
- Y_VERIFY(schema->num_fields() == 1);
+ Y_VERIFY(schema->num_fields() > 0);
return NArrow::TReplaceKey::FromScalar(DeserializeKeyScalar(key, schema->field(0)->type()));
}
@@ -69,6 +69,27 @@ NArrow::TReplaceKey TMark::MinBorder(const std::shared_ptr<arrow::Schema>& schem
}
}
+NArrow::TReplaceKey TMark::ExtendBorder(const NArrow::TReplaceKey& key,
+ const std::shared_ptr<arrow::Schema>& schema) {
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ i32 numFields = schema->num_fields();
+ columns.reserve(numFields);
+ for (i32 i = 0; i < numFields; ++i) {
+ const auto& field = schema->field(i);
+ if (i < key.Size()) {
+ columns.emplace_back(key.ColumnPtr(i));
+ Y_VERIFY(columns.back()->type()->Equals(field->type()));
+ } else {
+ auto scalar = MinScalar(field->type());
+ Y_VERIFY(scalar);
+ auto res = arrow::MakeArrayFromScalar(*scalar, 1);
+ Y_VERIFY(res.ok());
+ columns.emplace_back(*res);
+ }
+ }
+ return NArrow::TReplaceKey::FromBatch(arrow::RecordBatch::Make(schema, 1, columns), 0);
+}
+
}
template <>
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 49b49c68aaf..076200c84e8 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -102,6 +102,7 @@ public:
static NArrow::TReplaceKey DeserializeComposite(const TString& key, const std::shared_ptr<arrow::Schema>& schema);
static NArrow::TReplaceKey MinBorder(const std::shared_ptr<arrow::Schema>& schema);
+ static NArrow::TReplaceKey ExtendBorder(const NArrow::TReplaceKey& key, const std::shared_ptr<arrow::Schema>& schema);
std::string ToString() const;
@@ -634,7 +635,7 @@ public:
virtual bool HasOverloadedGranules() const { return false; }
virtual TString SerializeMark(const NArrow::TReplaceKey& key) const = 0;
- virtual NArrow::TReplaceKey DeserializeMark(const TString& key) const = 0;
+ virtual NArrow::TReplaceKey DeserializeMark(const TString& key, std::optional<ui32> markNumKeys) const = 0;
virtual bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 20b01766685..723e1cb8d93 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -663,8 +663,6 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
const TSnapshot& snapshot) {
auto changes = std::static_pointer_cast<TChanges>(indexChanges);
- const auto& indexInfo = GetIndexInfo();
-
// Update tmp granules with real ids
auto granuleRemap = changes->TmpToNewGranules(LastGranule);
ui64 portion = LastPortion;
@@ -678,11 +676,11 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
produced = changes->CompactionInfo->InGranule() ? TPortionMeta::COMPACTED : TPortionMeta::SPLIT_COMPACTED;
}
- portionInfo.UpdateRecordsMeta(indexInfo, produced);
+ portionInfo.UpdateRecordsMeta(produced);
}
for (auto& [portionInfo, _] : changes->PortionsToEvict) {
- portionInfo.UpdateRecordsMeta(indexInfo, TPortionMeta::EVICTED);
+ portionInfo.UpdateRecordsMeta(TPortionMeta::EVICTED);
}
for (auto& [_, id] : changes->PortionsToMove) {
@@ -1085,7 +1083,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
out->Granules.reserve(pathGranules.size());
// TODO: out.Portions.reserve()
std::optional<TMarksMap::const_iterator> previousIterator;
- const bool compositeMark = GetIndexKey()->num_fields() > 1;
+ const bool compositeMark = UseCompositeMarks();
for (auto&& filter : pkRangesFilter) {
std::optional<NArrow::TReplaceKey> indexKeyFrom = filter.KeyFrom(GetIndexKey());
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 3130ebcae24..39ca47a44c6 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -211,11 +211,16 @@ public:
}
}
- NArrow::TReplaceKey DeserializeMark(const TString& key) const override {
- if (UseCompositeMarks()) {
+ NArrow::TReplaceKey DeserializeMark(const TString& key, std::optional<ui32> markNumKeys) const override {
+ if (markNumKeys) {
+ Y_VERIFY(*markNumKeys == (ui32)MarkSchema()->num_fields());
return TMark::DeserializeComposite(key, MarkSchema());
} else {
- return TMark::DeserializeScalar(key, MarkSchema());
+ NArrow::TReplaceKey markKey = TMark::DeserializeScalar(key, MarkSchema());
+ if (UseCompositeMarks()) {
+ return TMark::ExtendBorder(markKey, MarkSchema());
+ }
+ return markKey;
}
}
@@ -302,8 +307,8 @@ private:
return *CachedDefaultMark;
}
- bool UseCompositeMarks() const {
- return MarkSchema()->num_fields() > 1;
+ bool UseCompositeMarks() const noexcept {
+ return GetIndexInfo().IsCompositeIndexKey();
}
void ClearIndex() {
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 235df0a974f..446ea3d108a 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -190,8 +190,9 @@ public:
return result;
}
- static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
- TIndexInfo result("", 0, schema.GetCompositeMarks());
+ static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema,
+ bool forceCompositeMarks) {
+ TIndexInfo result("", 0, forceCompositeMarks || schema.GetCompositeMarks());
if (!result.DeserializeFromProto(schema)) {
return {};
}
@@ -277,6 +278,10 @@ public:
/// Returns whether the replace keys defined.
bool IsReplacing() const { return ReplaceKey.get(); }
+ bool IsCompositeIndexKey() const {
+ return CompositeIndexKey;
+ }
+
std::shared_ptr<NArrow::TSortDescription> SortDescription() const;
std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription() const;
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index 171d8a90df6..1cbdca17c62 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -5,7 +5,7 @@
namespace NKikimr::NOlap {
std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const {
- if (dataSchema.GetSnapshot() == GetSnapshot()) {
+ if (dataSchema.GetSnapshot() == GetSnapshot()) {
return batch;
}
const std::shared_ptr<arrow::Schema>& resultArrowSchema = GetSchema();
@@ -71,21 +71,21 @@ void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std:
Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
// Copy first and last key rows into new batch to free source batch's memory
- std::shared_ptr<arrow::RecordBatch> edgesBatch;
{
- auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetIndexKey());
+ auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
std::vector<bool> bits(batch->num_rows(), false);
bits[0] = true;
- bits[batch->num_rows() - 1] = true;
+ bits[batch->num_rows() - 1] = true; // it colud be 0 if batch has one row
auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows());
auto res = arrow::compute::Filter(keyBatch, filter);
Y_VERIFY(res.ok());
- edgesBatch = res->record_batch();
- Y_VERIFY(edgesBatch->num_rows() == 1 || edgesBatch->num_rows() == 2);
+ Meta.ReplaceKeyEdges = res->record_batch();
+ Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2);
}
+ auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey());
Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
@@ -108,7 +108,7 @@ void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std:
}
}
-TString TPortionInfo::GetMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec) const {
+TString TPortionInfo::GetMetadata(const TColumnRecord& rec) const {
NKikimrTxColumnShard::TIndexColumnMeta meta; // TODO: move proto serialization out of engines folder
if (Meta.ColumnMeta.contains(rec.ColumnId)) {
const auto& columnMeta = Meta.ColumnMeta.find(rec.ColumnId)->second;
@@ -152,14 +152,11 @@ TString TPortionInfo::GetMetadata(const TIndexInfo& indexInfo, const TColumnReco
portionMeta->SetTierName(TierName);
}
- Y_VERIFY(Meta.IndexKeyStart && Meta.IndexKeyEnd);
- const bool compositeIndexKey = indexInfo.GetIndexKey()->num_fields() > 1;
- if (compositeIndexKey) {
- // We know that IndexKeyStart and IndexKeyEnd are made from edgesBatch. Restore it.
- auto edgesBatch = Meta.IndexKeyStart->RestoreBatch(indexInfo.GetIndexKey());
- Y_VERIFY(edgesBatch && edgesBatch->ValidateFull().ok());
- Y_VERIFY(edgesBatch->num_rows() == 1 || edgesBatch->num_rows() == 2);
- portionMeta->SetIndexKeyBorders(NArrow::SerializeBatchNoCompression(edgesBatch));
+ if (const auto& keyEdgesBatch = Meta.ReplaceKeyEdges) {
+ Y_VERIFY(keyEdgesBatch);
+ Y_VERIFY_DEBUG(keyEdgesBatch->ValidateFull().ok());
+ Y_VERIFY(keyEdgesBatch->num_rows() == 1 || keyEdgesBatch->num_rows() == 2);
+ portionMeta->SetPrimaryKeyBorders(NArrow::SerializeBatchNoCompression(keyEdgesBatch));
}
}
@@ -179,7 +176,7 @@ void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord
Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
auto field = indexInfo.ArrowColumnField(rec.ColumnId);
- const bool compositeIndexKey = indexInfo.GetIndexKey()->num_fields() > 1;
+ const bool compositeIndexKey = indexInfo.IsCompositeIndexKey();
if (meta.HasPortionMeta()) {
Y_VERIFY_DEBUG(rec.ColumnId == Meta.FirstPkColumn);
@@ -197,14 +194,18 @@ void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord
Meta.Produced = TPortionMeta::EVICTED;
}
- if (compositeIndexKey) {
- Y_VERIFY(portionMeta.HasIndexKeyBorders());
- auto edgesBatch = NArrow::DeserializeBatch(portionMeta.GetIndexKeyBorders(), indexInfo.GetIndexKey());
- Y_VERIFY(edgesBatch && edgesBatch->ValidateFull().ok());
- Y_VERIFY(edgesBatch->num_rows() == 1 || edgesBatch->num_rows() == 2);
-
- Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
- Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
+ if (portionMeta.HasPrimaryKeyBorders()) {
+ Meta.ReplaceKeyEdges = NArrow::DeserializeBatch(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey());
+ Y_VERIFY(Meta.ReplaceKeyEdges);
+ Y_VERIFY_DEBUG(Meta.ReplaceKeyEdges->ValidateFull().ok());
+ Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2);
+
+ if (compositeIndexKey) {
+ auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey());
+ Y_VERIFY(edgesBatch);
+ Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
+ Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
+ }
}
}
if (meta.HasNumRows()) {
@@ -231,6 +232,10 @@ void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord
Meta.IndexKeyEnd = NArrow::TReplaceKey::FromScalar(scalar);
}
}
+
+ // Portion genarated without PrimaryKeyBorders and loaded with indexInfo.IsCompositeIndexKey()
+ // We should have no such portions for ForceColumnTablesCompositeMarks feature
+ Y_VERIFY(Meta.IndexKeyStart && Meta.IndexKeyEnd);
}
std::tuple<std::shared_ptr<arrow::Scalar>, std::shared_ptr<arrow::Scalar>> TPortionInfo::MinMaxValue(const ui32 columnId) const {
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 28c26bb1775..f84a43489f3 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -185,6 +185,7 @@ struct TPortionMeta {
EProduced Produced{UNSPECIFIED};
THashMap<ui32, TColumnMeta> ColumnMeta;
ui32 FirstPkColumn = 0;
+ std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows
std::optional<NArrow::TReplaceKey> IndexKeyStart;
std::optional<NArrow::TReplaceKey> IndexKeyEnd;
@@ -228,7 +229,7 @@ struct TPortionInfo {
bool Empty() const { return Records.empty(); }
bool Produced() const { return Meta.Produced != TPortionMeta::UNSPECIFIED; }
- bool Valid() const { return !Empty() && Produced() && Meta.HasPkMinMax(); }
+ bool Valid() const { return !Empty() && Produced() && Meta.HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
bool IsInserted() const { return Meta.Produced == TPortionMeta::INSERTED; }
bool IsEvicted() const { return Meta.Produced == TPortionMeta::EVICTED; }
bool CanHaveDups() const { return !Produced(); /* || IsInserted(); */ }
@@ -311,10 +312,10 @@ struct TPortionInfo {
}
}
- void UpdateRecordsMeta(const TIndexInfo& indexInfo, TPortionMeta::EProduced produced) {
+ void UpdateRecordsMeta(TPortionMeta::EProduced produced) {
Meta.Produced = produced;
for (auto& record : Records) {
- record.Metadata = GetMetadata(indexInfo, record);
+ record.Metadata = GetMetadata(record);
}
}
@@ -329,7 +330,7 @@ struct TPortionInfo {
LoadMetadata(indexInfo, rec);
}
- TString GetMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec) const;
+ TString GetMetadata(const TColumnRecord& rec) const;
void LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec);
void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
const TString& tierName);
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index f462ecbfad9..605c81a1d65 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -289,7 +289,8 @@ std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(c
}
NOlap::TIndexInfo TTablesManager::DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
- std::optional<NOlap::TIndexInfo> indexInfo = NOlap::TIndexInfo::BuildFromProto(schema);
+ bool forceCompositeMarks = AppData()->FeatureFlags.GetForceColumnTablesCompositeMarks();
+ std::optional<NOlap::TIndexInfo> indexInfo = NOlap::TIndexInfo::BuildFromProto(schema, forceCompositeMarks);
Y_VERIFY(indexInfo);
return *indexInfo;
}
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index ad1cb024f49..fb1fe8e9193 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -1010,9 +1010,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT(DataHasOnly(readData, schema, {11, 41 + 1}));
}
-void TestCompactionInGranuleImpl(bool reboots,
- const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema,
- const std::vector<std::pair<TString, TTypeInfo>>& ydbPk) {
+void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -1055,9 +1053,10 @@ void TestCompactionInGranuleImpl(bool reboots,
ui64 planStep = 100;
ui64 txId = 100;
- TestTableDescription table{.Schema = ydbSchema, .Pk = ydbPk};
SetupSchema(runtime, sender, tableId, table);
TAutoPtr<IEventHandle> handle;
+ const auto& ydbSchema = table.Schema;
+ const auto& ydbPk = table.Pk;
// Write same keys: merge on compaction
@@ -1809,6 +1808,13 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TestWriteRead(false, table);
}
+ Y_UNIT_TEST(WriteReadStandaloneComposite) {
+ TestTableDescription table;
+ table.InStore = false;
+ table.CompositeMarks = true;
+ TestWriteRead(false, table);
+ }
+
Y_UNIT_TEST(WriteReadExoticTypes) {
TestTableDescription table;
table.Schema = TTestSchema::YdbExoticSchema();
@@ -1840,7 +1846,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TestWriteRead(true, {}, "zstd");
}
- Y_UNIT_TEST(CompactionInGranule) {
+ void TestCompactionInGranule(bool composite) {
std::vector<TTypeId> types = {
NTypeIds::Timestamp,
//NTypeIds::Int16,
@@ -1859,9 +1865,19 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
for (auto& type : types) {
schema[0].second = TTypeInfo(type);
pk[0].second = TTypeInfo(type);
- TestCompactionInGranuleImpl(false, schema, pk);
+ TestTableDescription table{.Schema = schema, .Pk = pk, .CompositeMarks = composite};
+ TestCompactionInGranuleImpl(false, table);
}
}
+
+ Y_UNIT_TEST(CompactionInGranule) {
+ TestCompactionInGranule(false);
+ }
+
+ Y_UNIT_TEST(CompactionInGranule_Composite) {
+ TestCompactionInGranule(true);
+ }
+
#if 0
Y_UNIT_TEST(CompactionInGranuleFloatKey) {
std::vector<NScheme::TTypeId> types = {
@@ -1878,7 +1894,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
}
#endif
- Y_UNIT_TEST(CompactionInGranuleStrKey) {
+ void TestCompactionInGranuleStrKey(bool composite) {
std::vector<NScheme::TTypeId> types = {
NTypeIds::String,
NTypeIds::Utf8
@@ -1889,11 +1905,20 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
for (auto& type : types) {
schema[0].second = TTypeInfo(type);
pk[0].second = TTypeInfo(type);
- TestCompactionInGranuleImpl(false, schema, pk);
+ TestTableDescription table{.Schema = schema, .Pk = pk, .CompositeMarks = composite};
+ TestCompactionInGranuleImpl(false, table);
}
}
- Y_UNIT_TEST(RebootCompactionInGranule) {
+ Y_UNIT_TEST(CompactionInGranuleStrKey) {
+ TestCompactionInGranuleStrKey(false);
+ }
+
+ Y_UNIT_TEST(CompactionInGranuleStrKey_Composite) {
+ TestCompactionInGranuleStrKey(true);
+ }
+
+ void TestRebootCompactionInGranule(bool composite) {
// some of types
std::vector<NScheme::TTypeId> types = {
NTypeIds::Timestamp,
@@ -1906,10 +1931,19 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
for (auto& type : types) {
schema[0].second = TTypeInfo(type);
pk[0].second = TTypeInfo(type);
- TestCompactionInGranuleImpl(true, schema, pk);
+ TestTableDescription table{.Schema = schema, .Pk = pk, .CompositeMarks = composite};
+ TestCompactionInGranuleImpl(true, table);
}
}
+ Y_UNIT_TEST(RebootCompactionInGranule) {
+ TestRebootCompactionInGranule(false);
+ }
+
+ Y_UNIT_TEST(RebootCompactionInGranule_Composite) {
+ TestRebootCompactionInGranule(true);
+ }
+
Y_UNIT_TEST(ReadWithProgram) {
TestReadWithProgram();
}
@@ -2185,7 +2219,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
};
- void TestCompactionSplitGranule(const TestTableDescription& table, const TTestBlobOptions& testBlobOptions = {}) {
+ void TestCompactionSplitGranuleImpl(const TestTableDescription& table, const TTestBlobOptions& testBlobOptions = {}) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -2263,15 +2297,13 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), x);
}
- if (isStrPk0) {
- if (testBlobOptions.SameValueColumns.contains("timestamp")) {
- UNIT_ASSERT(!testBlobOptions.SameValueColumns.contains("message"));
- UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, numRows }, true, "message"));
- } else {
- UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, numRows }, true, "timestamp"));
- }
+ if (testBlobOptions.SameValueColumns.contains("timestamp")) {
+ UNIT_ASSERT(!testBlobOptions.SameValueColumns.contains("message"));
+ UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, numRows }, true, "message"));
} else {
- UNIT_ASSERT(DataHas(readData, schema, { 0, numRows }, true));
+ UNIT_ASSERT(isStrPk0
+ ? DataHas<std::string>(readData, schema, { 0, numRows }, true, "timestamp")
+ : DataHas(readData, schema, { 0, numRows }, true, "timestamp"));
}
}
@@ -2281,8 +2313,11 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
std::vector<ui32> val9999 = { 99999 };
std::vector<ui32> val1M = { 1000000000 };
std::vector<ui32> val1M_1 = { 1000000001 };
+ std::vector<ui32> valNumRows = { numRows };
+ std::vector<ui32> valNumRows_1 = { numRows - 1 };
+ std::vector<ui32> valNumRows_2 = { numRows - 2 };
- const bool composite = !testBlobOptions.SameValueColumns.empty();
+ const bool composite = table.CompositeMarks;
if (composite) {
UNIT_ASSERT(table.Pk.size() >= 2);
@@ -2293,6 +2328,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
val9999 = { sameValue, 99999 };
val1M = { sameValue, 1000000000 };
val1M_1 = { sameValue, 1000000001 };
+ valNumRows = { sameValue, numRows };
+ valNumRows_1 = { sameValue, numRows - 1 };
+ valNumRows_2 = { sameValue, numRows - 2 };
}
using TBorder = TTabletReadPredicateTest::TBorder;
@@ -2307,7 +2345,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
testAgent.Test("[0:1)").SetFrom(TBorder(val0, true)).SetTo(TBorder(val1, false)).SetExpectedCount(1);
testAgent.Test("(0:1)").SetFrom(TBorder(val0, false)).SetTo(TBorder(val1, false)).SetExpectedCount(0).SetDataReadOnEmpty(true);
testAgent.Test("outscope1").SetFrom(TBorder(val1M, true)).SetTo(TBorder(val1M_1, true))
- .SetExpectedCount(0).SetDataReadOnEmpty(isStrPk0);
+ .SetExpectedCount(0).SetDataReadOnEmpty(isStrPk0 || composite);
// VERIFIED AS INCORRECT INTERVAL (its good)
// testAgent.Test("[0-0)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TBorder(0, false)).SetExpectedCount(0);
@@ -2321,10 +2359,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
testAgent.Test("[99990:99999]").SetFrom(TBorder(val9990, true)).SetTo(TBorder(val9999, true)).SetExpectedCount(100);
}
} else {
- testAgent.Test("(numRows:").SetFrom(TBorder({numRows}, false)).SetExpectedCount(0);
- testAgent.Test("(numRows-1:").SetFrom(TBorder({numRows - 1}, false)).SetExpectedCount(0);
- testAgent.Test("(numRows-2:").SetFrom(TBorder({numRows - 2}, false)).SetExpectedCount(1);
- testAgent.Test("[numRows-1:").SetFrom(TBorder({numRows - 1}, true)).SetExpectedCount(1);
+ testAgent.Test("(numRows:").SetFrom(TBorder(valNumRows, false)).SetExpectedCount(0).SetDataReadOnEmpty(composite);
+ testAgent.Test("(numRows-1:").SetFrom(TBorder(valNumRows_1, false)).SetExpectedCount(0).SetDataReadOnEmpty(composite);
+ testAgent.Test("(numRows-2:").SetFrom(TBorder(valNumRows_2, false)).SetExpectedCount(1);
+ testAgent.Test("[numRows-1:").SetFrom(TBorder(valNumRows_1, true)).SetExpectedCount(1);
}
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
@@ -2375,7 +2413,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
}
- Y_UNIT_TEST(CompactionSplitGranule) {
+ void TestCompactionSplitGranule(bool composite) {
std::vector<TTypeId> types = {
NTypeIds::Timestamp,
//NTypeIds::Int16,
@@ -2393,16 +2431,32 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
auto schema = TTestSchema::YdbSchema();
auto pk = TTestSchema::YdbPkSchema();
+ TTestBlobOptions opts;
+ if (composite) {
+ opts.SameValueColumns.emplace(pk[0].first);
+ }
for (auto& type : types) {
schema[0].second = TTypeInfo(type);
pk[0].second = TTypeInfo(type);
- TestTableDescription table{.Schema = schema, .Pk = pk};
- TestCompactionSplitGranule(table);
+ if (composite) {
+ schema[1].second = TTypeInfo(type);
+ pk[1].second = TTypeInfo(type);
+ }
+ TestTableDescription table{.Schema = schema, .Pk = pk, .CompositeMarks = composite};
+ TestCompactionSplitGranuleImpl(table, opts);
}
}
- Y_UNIT_TEST(CompactionSplitGranuleStrKey) {
+ Y_UNIT_TEST(CompactionSplitGranule) {
+ TestCompactionSplitGranule(false);
+ }
+
+ Y_UNIT_TEST(CompactionSplitGranule_Composite) {
+ TestCompactionSplitGranule(true);
+ }
+
+ void TestCompactionSplitGranuleStrKey(bool composite) {
std::vector<TTypeId> types = {
NTypeIds::String,
NTypeIds::Utf8
@@ -2410,16 +2464,28 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
auto schema = TTestSchema::YdbSchema();
auto pk = TTestSchema::YdbPkSchema();
+ TTestBlobOptions opts;
+ if (composite) {
+ opts.SameValueColumns.emplace(pk[0].first);
+ }
for (auto& type : types) {
schema[0].second = TTypeInfo(type);
pk[0].second = TTypeInfo(type);
- TestTableDescription table{.Schema = schema, .Pk = pk};
- TestCompactionSplitGranule(table);
+ TestTableDescription table{.Schema = schema, .Pk = pk, .CompositeMarks = composite};
+ TestCompactionSplitGranuleImpl(table, opts);
}
}
- Y_UNIT_TEST(CompactionSplitGranuleSameStrKey) {
+ Y_UNIT_TEST(CompactionSplitGranuleStrKey) {
+ TestCompactionSplitGranuleStrKey(false);
+ }
+
+ Y_UNIT_TEST(CompactionSplitGranuleStrKey_Composite) {
+ TestCompactionSplitGranuleStrKey(true);
+ }
+
+ void TestCompactionSplitGranuleSameStrKey(bool composite) {
std::vector<TTypeId> types = {
NTypeIds::String,
NTypeIds::Utf8
@@ -2428,16 +2494,22 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
auto schema = TTestSchema::YdbSchema();
auto pk = TTestSchema::YdbPkSchema();
TTestBlobOptions opts;
- opts.SameValueColumns.emplace(pk[0].first);
+ if (composite) {
+ opts.SameValueColumns.emplace(pk[0].first);
+ }
for (auto& type : types) {
schema[0].second = TTypeInfo(type);
pk[0].second = TTypeInfo(type);
- TestTableDescription table{.Schema = schema, .Pk = pk, .CompositeMarks = true};
- TestCompactionSplitGranule(table, opts);
+ TestTableDescription table{.Schema = schema, .Pk = pk, .CompositeMarks = composite};
+ TestCompactionSplitGranuleImpl(table, opts);
}
}
+ Y_UNIT_TEST(CompactionSplitGranuleSameStrKey) {
+ TestCompactionSplitGranuleSameStrKey(true);
+ }
+
Y_UNIT_TEST(ReadStale) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
index 9c5fd0995c6..b1b6783e3c3 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
@@ -687,6 +687,9 @@ public:
return result;
}
tableInfo = tableConstructor.BuildTableInfo(errors);
+ if (tableInfo) {
+ tableInfo->Description.MutableSchema()->SetCompositeMarks(true);
+ }
}
if (!tableInfo) {
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index 3a79d3b3873..33225ffb371 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -243,7 +243,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
;
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
@@ -329,7 +329,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
server->EnableGRpc(grpcPort);
@@ -445,7 +445,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
.SetEnableBackgroundTasks(true)
- .SetEnableOlapSchemaOperations(true);
+ .SetForceColumnTablesCompositeMarks(true);
;
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);