diff options
author | chertus <azuikov@ydb.tech> | 2023-05-22 12:00:57 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-05-22 12:00:57 +0300 |
commit | 15a58b55da960ce2000a126854d322f1cd05dd23 (patch) | |
tree | e1628fd249a1f85fbc3a3d0434de22d309329540 | |
parent | f04144fef1314ee9674744a8c167b930e5c4dbb6 (diff) | |
download | ydb-15a58b55da960ce2000a126854d322f1cd05dd23.tar.gz |
enable flag for composite marks
-rw-r--r-- | ydb/core/formats/arrow/replace_key.h | 2 | ||||
-rw-r--r-- | ydb/core/protos/flat_scheme_op.proto | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ut_common.h | 13 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 37 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 64 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/predicate/container.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/tables_manager.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 227 |
10 files changed, 254 insertions, 118 deletions
diff --git a/ydb/core/formats/arrow/replace_key.h b/ydb/core/formats/arrow/replace_key.h index 32c1227512..74f7974c1f 100644 --- a/ydb/core/formats/arrow/replace_key.h +++ b/ydb/core/formats/arrow/replace_key.h @@ -102,6 +102,8 @@ public: const arrow::Array& Column(int i) const { Y_VERIFY_DEBUG(Columns); + Y_VERIFY_DEBUG((size_t)i < Columns->size()); + Y_VERIFY_DEBUG((*Columns)[i]); return *(*Columns)[i]; } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 62435785da..b9a633ec4e 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -440,6 +440,8 @@ message TColumnTableSchema { //optional EColumnCodec DefaultCompressionCodec = 6; // deprecated, not used before replace //optional int32 DefaultCompressionLevel = 7; // deprecated, not used before replace optional TCompressionOptions DefaultCompression = 8; + + optional bool CompositeMarks = 9; } message TAlterColumnTableSchema { diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 9a166d96b3..5c44730029 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -81,6 +81,9 @@ struct TTestSchema { struct TTableSpecials : public TStorageTier { std::vector<TStorageTier> Tiers; + bool CompositeMarks = false; + + TTableSpecials() noexcept = default; bool HasTiers() const { return !Tiers.empty(); @@ -90,12 +93,18 @@ struct TTestSchema { return EvictAfter.has_value(); } - TTableSpecials WithCodec(const TString& codec) { + TTableSpecials WithCodec(const TString& codec) const { TTableSpecials out = *this; out.SetCodec(codec); return out; } + TTableSpecials WithCompositeMarks(bool composite) const { + TTableSpecials out = *this; + out.CompositeMarks = composite; + return out; + } + TTableSpecials& SetTtl(std::optional<TDuration> ttl) { EvictAfter = ttl; return *this; @@ -214,6 +223,7 @@ struct TTestSchema { if (specials.CompressionLevel) { schema->MutableDefaultCompression()->SetCompressionLevel(*specials.CompressionLevel); } + schema->SetCompositeMarks(specials.CompositeMarks); } static void InitTtl(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle::TTtl* ttl) { @@ -400,6 +410,7 @@ struct TTestBlobOptions { ui32 SameValue = 42; }; +TCell MakeTestCell(const TTypeInfo& typeInfo, ui32 value, std::vector<TString>& mem); TString MakeTestBlob(std::pair<ui64, ui64> range, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns, const TTestBlobOptions& options = {}); TSerializedTableRange MakeTestRange(std::pair<ui64, ui64> range, bool inclusiveFrom, bool inclusiveTo, diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index b15c748915..eb5e925588 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -33,6 +33,22 @@ struct TCompactionLimits { class TMark { public: + struct TCompare { + using is_transparent = void; + + bool operator() (const TMark& a, const TMark& b) const { + return a < b; + } + + bool operator() (const arrow::Scalar& a, const TMark& b) const { + return a < b; + } + + bool operator() (const TMark& a, const arrow::Scalar& b) const { + return a < b; + } + }; + explicit TMark(const NArrow::TReplaceKey& key) : Border(key) {} @@ -48,6 +64,23 @@ public: return Border <=> m.Border; } + bool operator == (const arrow::Scalar& firstKey) const { + // TODO: avoid ToScalar() + return NArrow::ScalarCompare(*NArrow::TReplaceKey::ToScalar(Border, 0), firstKey) == 0; + } + + std::partial_ordering operator <=> (const arrow::Scalar& firstKey) const { + // TODO: avoid ToScalar() + const int cmp = NArrow::ScalarCompare(*NArrow::TReplaceKey::ToScalar(Border, 0), firstKey); + if (cmp < 0) { + return std::partial_ordering::less; + } else if (cmp > 0) { + return std::partial_ordering::greater; + } else { + return std::partial_ordering::equivalent; + } + } + const NArrow::TReplaceKey& GetBorder() const noexcept { return Border; } @@ -160,7 +193,9 @@ public: case INSERT: return "insert"; case COMPACTION: - return "compaction"; + return CompactionInfo + ? (CompactionInfo->InGranule ? "compaction in granule" : "compaction split granule" ) + : "compaction"; case CLEANUP: return "cleanup"; case TTL: diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 78cd9eee33..d2102e74b1 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -1228,23 +1228,40 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot } out->Granules.reserve(pathGranules.size()); // TODO: out.Portions.reserve() - std::optional<TMap<TMark, ui64>::const_iterator> previousIterator; + std::optional<TMarksMap::const_iterator> previousIterator; + const bool compositeMark = GetIndexKey()->num_fields() > 1; + for (auto&& filter : pkRangesFilter) { - std::optional<NArrow::TReplaceKey> keyFrom = filter.KeyFrom(GetIndexKey()); - std::optional<NArrow::TReplaceKey> keyTo = filter.KeyTo(GetIndexKey()); + std::optional<NArrow::TReplaceKey> indexKeyFrom = filter.KeyFrom(GetIndexKey()); + std::optional<NArrow::TReplaceKey> indexKeyTo = filter.KeyTo(GetIndexKey()); + + std::shared_ptr<arrow::Scalar> keyFrom; + std::shared_ptr<arrow::Scalar> keyTo; + if (indexKeyFrom) { + keyFrom = NArrow::TReplaceKey::ToScalar(*indexKeyFrom, 0); + } + if (indexKeyTo) { + keyTo = NArrow::TReplaceKey::ToScalar(*indexKeyTo, 0); + } + auto it = pathGranules.begin(); if (keyFrom) { - it = pathGranules.upper_bound(TMark(*keyFrom)); - --it; + it = pathGranules.lower_bound(*keyFrom); + if (it != pathGranules.begin()) { + if (it == pathGranules.end() || compositeMark || *keyFrom != it->first) { + // TODO: better check if we really need an additional granule before the range + --it; + } + } } + if (previousIterator && (previousIterator == pathGranules.end() || it->first < (*previousIterator)->first)) { it = *previousIterator; } for (; it != pathGranules.end(); ++it) { auto& mark = it->first; ui64 granule = it->second; - - if (keyTo && *keyTo < mark.GetBorder()) { + if (keyTo && mark > *keyTo) { break; } @@ -1269,11 +1286,13 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot } } Y_VERIFY(outPortion.Produced()); - if (!pkRangesFilter.IsPortionInUsage(outPortion, GetIndexInfo())) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_skipped")("granule", granule)("portion", portionInfo->Portion()); + if (!compositeMark && !pkRangesFilter.IsPortionInUsage(outPortion, GetIndexInfo())) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_skipped") + ("granule", granule)("portion", portionInfo->Portion()); continue; } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_selected")("granule", granule)("portion", portionInfo->Portion()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_selected") + ("granule", granule)("portion", portionInfo->Portion()); } out->Portions.emplace_back(std::move(outPortion)); granuleHasDataForSnaphsot = true; @@ -1294,9 +1313,8 @@ static bool NeedSplit(const THashMap<ui64, TPortionInfo>& portions, const TCompa ui64 sumSize = 0; ui64 sumMaxSize = 0; size_t activeCount = 0; - std::shared_ptr<arrow::Scalar> minPk0; - std::shared_ptr<arrow::Scalar> maxPk0; - bool pkEqual = true; + THashSet<NArrow::TReplaceKey> borders; + bool differentBorders = false; for (const auto& [_, info] : portions) { // We need only actual portions here (with empty XPlanStep:XTxId) @@ -1306,19 +1324,11 @@ static bool NeedSplit(const THashMap<ui64, TPortionInfo>& portions, const TCompa continue; } - if (pkEqual) { - const auto [minPkCurrent, maxPkCurrent] = info.MinMaxIndexKeyValue(); - // Check that all pks equal to each other. - if ((pkEqual = bool(minPkCurrent) && bool(maxPkCurrent))) { - if (minPk0 && maxPk0) { - pkEqual = arrow::ScalarEquals(*minPk0, *minPkCurrent) && arrow::ScalarEquals(*maxPk0, *maxPkCurrent); - } else { - pkEqual = arrow::ScalarEquals(*minPkCurrent, *maxPkCurrent); - - minPk0 = minPkCurrent; - maxPk0 = maxPkCurrent; - } - } + // Find at least 2 unique borders + if (!differentBorders) { + borders.insert(info.IndexKeyStart()); + borders.insert(info.IndexKeyEnd()); + differentBorders = (borders.size() > 1); } auto sizes = info.BlobsSizes(); @@ -1335,7 +1345,7 @@ static bool NeedSplit(const THashMap<ui64, TPortionInfo>& portions, const TCompa return false; } - return !pkEqual && (sumMaxSize >= limits.GranuleBlobSplitSize || sumSize >= limits.GranuleOverloadSize); + return differentBorders && (sumMaxSize >= limits.GranuleBlobSplitSize || sumSize >= limits.GranuleOverloadSize); } std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompactedGranule) { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 82b8353c79..5b8cdf9173 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -238,6 +238,8 @@ public: std::unique_ptr<TCompactionInfo> Compact(ui64& lastCompactedGranule) override; private: + using TMarksMap = std::map<TMark, ui64, TMark::TCompare>; + struct TGranuleMeta { const TGranuleRecord Record; THashMap<ui64, TPortionInfo> Portions; // portion -> portionInfo @@ -257,7 +259,7 @@ private: std::shared_ptr<TColumnsTable> ColumnsTable; std::shared_ptr<TCountersTable> CountersTable; THashMap<ui64, std::shared_ptr<TGranuleMeta>> Granules; // granule -> meta - THashMap<ui64, TMap<TMark, ui64>> PathGranules; // path_id -> {mark, granule} + THashMap<ui64, TMarksMap> PathGranules; // path_id -> {mark, granule} TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id THashSet<ui64> GranulesInSplit; /// Set of empty granules. diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 1545f29780..00bb37bb16 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -328,10 +328,6 @@ struct TPortionInfo { const TString& tierName); void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted); - std::tuple<std::shared_ptr<arrow::Scalar>, std::shared_ptr<arrow::Scalar>> MinMaxIndexKeyValue() const { - return MinMaxValue(Meta.FirstPkColumn); - } - std::tuple<std::shared_ptr<arrow::Scalar>, std::shared_ptr<arrow::Scalar>> MinMaxValue(const ui32 columnId) const; std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const; std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const; diff --git a/ydb/core/tx/columnshard/engines/predicate/container.h b/ydb/core/tx/columnshard/engines/predicate/container.h index a568a9f9cd..b43f6fbf3f 100644 --- a/ydb/core/tx/columnshard/engines/predicate/container.h +++ b/ydb/core/tx/columnshard/engines/predicate/container.h @@ -71,7 +71,17 @@ public: std::optional<NArrow::TReplaceKey> ExtractKey(const std::shared_ptr<arrow::Schema>& key) const { if (Object) { - return NArrow::TReplaceKey::FromBatch(Object->Batch, key, 0); + const auto& batchFields = Object->Batch->schema()->fields(); + const auto& keyFields = key->fields(); + size_t minSize = std::min(batchFields.size(), keyFields.size()); + for (size_t i = 0; i < minSize; ++i) { + Y_VERIFY_DEBUG(batchFields[i]->Equals(*keyFields[i])); + } + if (batchFields.size() <= keyFields.size()) { + return NArrow::TReplaceKey::FromBatch(Object->Batch, Object->Batch->schema(), 0); + } else { + return NArrow::TReplaceKey::FromBatch(Object->Batch, key, 0); + } } return {}; } diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index c84debaa77..b88462131b 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -222,7 +222,7 @@ bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIc void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db) { Y_VERIFY(SchemaPresets.contains(presetId)); auto preset = SchemaPresets.at(presetId); - + TSchemaPreset::TSchemaPresetVersionInfo versionInfo; versionInfo.SetId(presetId); versionInfo.SetSinceStep(version.Step); @@ -272,6 +272,9 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTable indexInfo.SetAllKeys(); if (!PrimaryIndex) { PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId); + } else { + Y_VERIFY(PrimaryIndex->GetIndexInfo().GetReplaceKey()->Equals(indexInfo.GetReplaceKey())); + Y_VERIFY(PrimaryIndex->GetIndexInfo().GetIndexKey()->Equals(indexInfo.GetIndexKey())); } PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo)); @@ -289,7 +292,7 @@ NOlap::TIndexInfo TTablesManager::ConvertSchema(const TTableSchema& schema) { Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); ui32 indexId = 0; - NOlap::TIndexInfo indexInfo("", indexId); + NOlap::TIndexInfo indexInfo("", indexId, schema.GetCompositeMarks()); for (const auto& col : schema.GetColumns()) { const ui32 id = col.GetId(); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 1438eba278..b8e63a6f58 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -21,7 +21,7 @@ using TTypeInfo = NScheme::TTypeInfo; template <typename TKey = ui64> bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range, - bool requireUniq = false) { + bool requireUniq = false, const std::string& columnName = "timestamp") { static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>; THashMap<TKey, ui32> keys; @@ -34,11 +34,14 @@ bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::p } auto schema = NArrow::DeserializeSchema(srtSchema); + //Cerr << "Got schema: " << schema->ToString() << "\n"; + for (auto& blob : blobs) { auto batch = NArrow::DeserializeBatch(blob, schema); UNIT_ASSERT(batch); + //Cerr << "Got batch: " << batch->ToString() << "\n"; - std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp"); + std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName); UNIT_ASSERT(array); for (int i = 0; i < array->length(); ++i) { @@ -306,19 +309,18 @@ struct TestTableDescription { std::vector<std::pair<TString, TTypeInfo>> Schema = TTestSchema::YdbSchema(); std::vector<std::pair<TString, TTypeInfo>> Pk = TTestSchema::YdbPkSchema(); bool InStore = true; + bool CompositeMarks = false; }; void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, - const TestTableDescription& table, TString codec = "none") { + const TestTableDescription& table = {}, TString codec = "none") { NOlap::TSnapshot snap(10, 10); TString txBody; + auto specials = TTestSchema::TTableSpecials().WithCodec(codec).WithCompositeMarks(table.CompositeMarks); if (table.InStore) { - txBody = TTestSchema::CreateTableTxBody( - pathId, table.Schema, table.Pk, TTestSchema::TTableSpecials().WithCodec(codec)); - + txBody = TTestSchema::CreateTableTxBody(pathId, table.Schema, table.Pk, specials); } else { - txBody = TTestSchema::CreateStandaloneTableTxBody( - pathId, table.Schema, table.Pk, TTestSchema::TTableSpecials().WithCodec(codec)); + txBody = TTestSchema::CreateStandaloneTableTxBody(pathId, table.Schema, table.Pk, specials); } bool ok = ProposeSchemaTx(runtime, sender, txBody, snap); UNIT_ASSERT(ok); @@ -326,14 +328,6 @@ void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, PlanSchemaTx(runtime, sender, snap); } -void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, - const std::vector<std::pair<TString, TTypeInfo>>& schema = TTestSchema::YdbSchema(), - const std::vector<std::pair<TString, TTypeInfo>>& pk = TTestSchema::YdbPkSchema(), - TString codec = "none") { - TestTableDescription table{schema, pk, true}; - SetupSchema(runtime, sender, pathId, table, codec); -} - std::vector<TString> ReadManyResults(TTestBasicRuntime& runtime, TString& schema, NKikimrTxColumnShard::TMetadata& meta, ui32 expected = 1000) { std::vector<TString> readData; @@ -488,7 +482,7 @@ void TestWriteReadDup() { ui64 tableId = 1; auto ydbSchema = TTestSchema::YdbSchema(); - SetupSchema(runtime, sender, tableId, ydbSchema); + SetupSchema(runtime, sender, tableId); constexpr ui32 numRows = 10; std::pair<ui64, ui64> portion = {10, 10 + numRows}; @@ -534,7 +528,7 @@ void TestWriteReadLongTxDup() { ui64 tableId = 1; auto ydbSchema = TTestSchema::YdbSchema(); - SetupSchema(runtime, sender, tableId, ydbSchema); + SetupSchema(runtime, sender, tableId); constexpr ui32 numRows = 10; std::pair<ui64, ui64> portion = {10, 10 + numRows}; @@ -1061,7 +1055,8 @@ void TestCompactionInGranuleImpl(bool reboots, ui64 planStep = 100; ui64 txId = 100; - SetupSchema(runtime, sender, tableId, ydbSchema, ydbPk); + TestTableDescription table{.Schema = ydbSchema, .Pk = ydbPk}; + SetupSchema(runtime, sender, tableId, table); TAutoPtr<IEventHandle> handle; // Write same keys: merge on compaction @@ -1337,7 +1332,7 @@ void TestReadWithProgram(const TestTableDescription& table = {}) ui64 planStep = 100; ui64 txId = 100; - SetupSchema(runtime, sender, tableId, table.Schema); + SetupSchema(runtime, sender, tableId, table); { // write some data bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, table.Schema)); @@ -1464,7 +1459,7 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) { ui64 planStep = 100; ui64 txId = 100; - SetupSchema(runtime, sender, tableId, table.Schema); + SetupSchema(runtime, sender, tableId, table); { // write some data bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, table.Schema)); @@ -1642,7 +1637,8 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche auto pk = ydbSchema; pk.resize(4); - SetupSchema(runtime, sender, tableId, ydbSchema, pk); + TestTableDescription table{.Schema = ydbSchema, .Pk = pk}; + SetupSchema(runtime, sender, tableId, table); { // write some data bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, testDataBlob); @@ -2002,34 +1998,71 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { const ui64 PlanStep; const ui64 TxId; const std::vector<std::pair<TString, TTypeInfo>> YdbPk; + public: TTabletReadPredicateTest(TTestBasicRuntime& runtime, const ui64 planStep, const ui64 txId, const std::vector<std::pair<TString, TTypeInfo>>& ydbPk) : Runtime(runtime) , PlanStep(planStep) , TxId(txId) - , YdbPk(ydbPk) { - - } + , YdbPk(ydbPk) + {} class TBorder { private: - YDB_READONLY(i32, Border, 0); - YDB_READONLY(bool, Include, false); + std::vector<ui32> Border; + bool Include; + public: - TBorder(const ui32 ts, const bool include) - : Border(ts) - , Include(include) { + TBorder(const std::vector<ui32>& values, const bool include = false) + : Border(values) + , Include(include) + {} + + bool GetInclude() const noexcept { return Include; } + + std::vector<TCell> GetCellVec(const std::vector<std::pair<TString, TTypeInfo>>& pk, + std::vector<TString>& mem, bool trailingNulls = false) const + { + UNIT_ASSERT(Border.size() <= pk.size()); + std::vector<TCell> cells; + size_t i = 0; + for (; i < Border.size(); ++i) { + cells.push_back(MakeTestCell(pk[i].second, Border[i], mem)); + } + for (; trailingNulls && i < pk.size(); ++i) { + cells.push_back(TCell()); + } + return cells; + } + }; + struct TTestCaseOptions { + std::optional<TBorder> From; + std::optional<TBorder> To; + std::optional<ui32> ExpectedCount; + bool DataReadOnEmpty = false; + + TTestCaseOptions() + : DataReadOnEmpty(false) + {} + + TTestCaseOptions& SetFrom(const TBorder& border) { From = border; return *this; } + TTestCaseOptions& SetTo(const TBorder& border) { To = border; return *this; } + TTestCaseOptions& SetExpectedCount(ui32 count) { ExpectedCount = count; return *this; } + TTestCaseOptions& SetDataReadOnEmpty(bool flag) { DataReadOnEmpty = flag; return *this; } + + TSerializedTableRange MakeRange(const std::vector<std::pair<TString, TTypeInfo>>& pk) const { + std::vector<TString> mem; + auto cellsFrom = From ? From->GetCellVec(pk, mem, true) : std::vector<TCell>(); + auto cellsTo = To ? To->GetCellVec(pk, mem) : std::vector<TCell>(); + return TSerializedTableRange(TConstArrayRef<TCell>(cellsFrom), (From ? From->GetInclude() : false), + TConstArrayRef<TCell>(cellsTo), (To ? To->GetInclude(): false)); } }; - class TTestCase: TNonCopyable { + class TTestCase: public TTestCaseOptions, TNonCopyable { private: - YDB_ACCESSOR_DEF(std::optional<TBorder>, From); - YDB_ACCESSOR_DEF(std::optional<TBorder>, To); - YDB_ACCESSOR_DEF(std::optional<ui32>, ExpectedCount); - YDB_ACCESSOR(bool, DataReadOnEmpty, false); - TTabletReadPredicateTest& Owner; + const TTabletReadPredicateTest& Owner; const TString TestCaseName; void Execute() { @@ -2041,9 +2074,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { Proto(read.get()).AddColumnNames("timestamp"); Proto(read.get()).AddColumnNames("message"); - const TSerializedTableRange range = MakeTestRange( - { From ? From->GetBorder() : 0, To ? To->GetBorder() : 0 }, - From ? From->GetInclude() : false, To ? To->GetInclude() : false, Owner.YdbPk); + const TSerializedTableRange range = MakeRange(Owner.YdbPk); NOlap::TPredicate prGreater, prLess; std::tie(prGreater, prLess) = RangePredicates(range, Owner.YdbPk); @@ -2093,7 +2124,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { auto batch = NArrow::DeserializeBatch(resRead.GetData(), schema); UNIT_ASSERT(batch); if (ExpectedCount) { - Y_VERIFY_S(batch->num_rows() == *ExpectedCount, batch->num_rows()); + if (batch->num_rows() != *ExpectedCount) { + Cerr << batch->ToString() << "\n"; + } + UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), *ExpectedCount); } UNIT_ASSERT(meta.HasReadStats()); @@ -2105,14 +2139,16 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { UNIT_ASSERT(readStats.GetIndexBatches()); //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message" - UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1); + //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1); //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 0); // TODO: min-max index optimization? } } } + public: - TTestCase(TTabletReadPredicateTest& owner, const TString& testCaseName) - : Owner(owner) + TTestCase(TTabletReadPredicateTest& owner, const TString& testCaseName, const TTestCaseOptions& opts = {}) + : TTestCaseOptions(opts) + , Owner(owner) , TestCaseName(testCaseName) { Cerr << "TEST CASE " << TestCaseName << " START..." << Endl; @@ -2128,14 +2164,13 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } } }; - TTestCase Test(const TString& testCaseName) { - return TTestCase(*this, testCaseName); + + TTestCase Test(const TString& testCaseName, const TTestCaseOptions& options = {}) { + return TTestCase(*this, testCaseName, options); } }; - void TestCompactionSplitGranule(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, - const std::vector<std::pair<TString, TTypeInfo>>& ydbPk, - const TTestBlobOptions& testBlobOptions = {}) { + void TestCompactionSplitGranule(const TestTableDescription& table, const TTestBlobOptions& testBlobOptions = {}) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -2151,10 +2186,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 planStep = 100; ui64 txId = 100; - SetupSchema(runtime, sender, tableId, ydbSchema, ydbPk, "lz4"); + SetupSchema(runtime, sender, tableId, table, "lz4"); TAutoPtr<IEventHandle> handle; - bool isStrPk0 = ydbPk[0].second == TTypeInfo(NTypeIds::String) || ydbPk[0].second == TTypeInfo(NTypeIds::Utf8); + bool isStrPk0 = table.Pk[0].second == TTypeInfo(NTypeIds::String) || table.Pk[0].second == TTypeInfo(NTypeIds::Utf8); // Write different keys: grow on compaction @@ -2167,7 +2202,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) { ui64 start = i * (triggerPortionSize - overlapSize); std::pair<ui64, ui64> triggerPortion = { start, start + triggerPortionSize }; - TString triggerData = MakeTestBlob(triggerPortion, ydbSchema, testBlobOptions); + TString triggerData = MakeTestBlob(triggerPortion, table.Schema, testBlobOptions); UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::GetMaxBlobSize()); @@ -2183,7 +2218,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { --txId; ui32 numRows = numWrites * (triggerPortionSize - overlapSize) + overlapSize; - TString schema; for (ui32 i = 0; i < 2; ++i) { { @@ -2215,36 +2249,67 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } if (isStrPk0) { - UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, numRows }, true)); + if (testBlobOptions.SameValueColumns.contains("timestamp")) { + UNIT_ASSERT(!testBlobOptions.SameValueColumns.contains("message")); + UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, numRows }, true, "message")); + } else { + UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, numRows }, true, "timestamp")); + } } else { UNIT_ASSERT(DataHas(readData, schema, { 0, numRows }, true)); } } - TTabletReadPredicateTest testAgent(runtime, planStep, txId, ydbPk); - testAgent.Test(":1)").SetTo(TTabletReadPredicateTest::TBorder(1, false)).SetExpectedCount(1); - testAgent.Test(":1]").SetTo(TTabletReadPredicateTest::TBorder(1, true)).SetExpectedCount(2); - testAgent.Test(":0)").SetTo(TTabletReadPredicateTest::TBorder(0, false)).SetExpectedCount(0); - testAgent.Test(":0]").SetTo(TTabletReadPredicateTest::TBorder(0, true)).SetExpectedCount(1); + std::vector<ui32> val0 = { 0 }; + std::vector<ui32> val1 = { 1 }; + std::vector<ui32> val9990 = { 99990 }; + std::vector<ui32> val9999 = { 99999 }; + std::vector<ui32> val1M = { 1000000000 }; + std::vector<ui32> val1M_1 = { 1000000001 }; + + const bool composite = !testBlobOptions.SameValueColumns.empty(); + if (composite) { + UNIT_ASSERT(table.Pk.size() >= 2); + + ui32 sameValue = testBlobOptions.SameValue; + val0 = { sameValue, 0 }; + val1 = { sameValue, 1 }; + val9990 = { sameValue, 99990 }; + val9999 = { sameValue, 99999 }; + val1M = { sameValue, 1000000000 }; + val1M_1 = { sameValue, 1000000001 }; + } + + using TBorder = TTabletReadPredicateTest::TBorder; - testAgent.Test("[0:0]").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TTabletReadPredicateTest::TBorder(0, true)).SetExpectedCount(1); - testAgent.Test("[0:1)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TTabletReadPredicateTest::TBorder(1, false)).SetExpectedCount(1); - testAgent.Test("(0:1)").SetFrom(TTabletReadPredicateTest::TBorder(0, false)).SetTo(TTabletReadPredicateTest::TBorder(1, false)).SetExpectedCount(0).SetDataReadOnEmpty(true); - testAgent.Test("outscope1").SetFrom(TTabletReadPredicateTest::TBorder(1000000000, true)).SetTo(TTabletReadPredicateTest::TBorder(1000000001, true)) + TTabletReadPredicateTest testAgent(runtime, planStep, txId, table.Pk); + testAgent.Test(":1)").SetTo(TBorder(val1, false)).SetExpectedCount(1); + testAgent.Test(":1]").SetTo(TBorder(val1, true)).SetExpectedCount(2); + testAgent.Test(":0)").SetTo(TBorder(val0, false)).SetExpectedCount(0).SetDataReadOnEmpty(composite); + testAgent.Test(":0]").SetTo(TBorder(val0, true)).SetExpectedCount(1); + + testAgent.Test("[0:0]").SetFrom(TBorder(val0, true)).SetTo(TBorder(val0, true)).SetExpectedCount(1); + testAgent.Test("[0:1)").SetFrom(TBorder(val0, true)).SetTo(TBorder(val1, false)).SetExpectedCount(1); + testAgent.Test("(0:1)").SetFrom(TBorder(val0, false)).SetTo(TBorder(val1, false)).SetExpectedCount(0).SetDataReadOnEmpty(true); + testAgent.Test("outscope1").SetFrom(TBorder(val1M, true)).SetTo(TBorder(val1M_1, true)) .SetExpectedCount(0).SetDataReadOnEmpty(isStrPk0); // VERIFIED AS INCORRECT INTERVAL (its good) -// testAgent.Test("[0-0)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TTabletReadPredicateTest::TBorder(0, false)).SetExpectedCount(0); +// testAgent.Test("[0-0)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TBorder(0, false)).SetExpectedCount(0); if (isStrPk0) { - testAgent.Test("(99990:").SetFrom(TTabletReadPredicateTest::TBorder(99990, false)).SetExpectedCount(109); - testAgent.Test("(99990:99999)").SetFrom(TTabletReadPredicateTest::TBorder(99990, false)).SetTo(TTabletReadPredicateTest::TBorder(99999, false)).SetExpectedCount(98); - testAgent.Test("(99990:99999]").SetFrom(TTabletReadPredicateTest::TBorder(99990, false)).SetTo(TTabletReadPredicateTest::TBorder(99999, true)).SetExpectedCount(99); - testAgent.Test("[99990:99999]").SetFrom(TTabletReadPredicateTest::TBorder(99990, true)).SetTo(TTabletReadPredicateTest::TBorder(99999, true)).SetExpectedCount(100); + if (composite) { + // TODO + } else { + testAgent.Test("(99990:").SetFrom(TBorder(val9990, false)).SetExpectedCount(109); + testAgent.Test("(99990:99999)").SetFrom(TBorder(val9990, false)).SetTo(TBorder(val9999, false)).SetExpectedCount(98); + testAgent.Test("(99990:99999]").SetFrom(TBorder(val9990, false)).SetTo(TBorder(val9999, true)).SetExpectedCount(99); + testAgent.Test("[99990:99999]").SetFrom(TBorder(val9990, true)).SetTo(TBorder(val9999, true)).SetExpectedCount(100); + } } else { - testAgent.Test("(numRows:").SetFrom(TTabletReadPredicateTest::TBorder(numRows, false)).SetExpectedCount(0); - testAgent.Test("(numRows-1:").SetFrom(TTabletReadPredicateTest::TBorder(numRows - 1, false)).SetExpectedCount(0); - testAgent.Test("(numRows-2:").SetFrom(TTabletReadPredicateTest::TBorder(numRows - 2, false)).SetExpectedCount(1); - testAgent.Test("[numRows-1:").SetFrom(TTabletReadPredicateTest::TBorder(numRows - 1, true)).SetExpectedCount(1); + testAgent.Test("(numRows:").SetFrom(TBorder({numRows}, false)).SetExpectedCount(0); + testAgent.Test("(numRows-1:").SetFrom(TBorder({numRows - 1}, false)).SetExpectedCount(0); + testAgent.Test("(numRows-2:").SetFrom(TBorder({numRows - 2}, false)).SetExpectedCount(1); + testAgent.Test("[numRows-1:").SetFrom(TBorder({numRows - 1}, true)).SetExpectedCount(1); } RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); @@ -2277,7 +2342,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 numBytes = static_cast<arrow::UInt64Array&>(*bytes).Value(i); ui64 numRawBytes = static_cast<arrow::UInt64Array&>(*rawBytes).Value(i); - Cerr << "[" << __LINE__ << "] " << ydbPk[0].second.GetTypeId() << " " + Cerr << "[" << __LINE__ << "] " << table.Pk[0].second.GetTypeId() << " " << pathId << " " << kind << " " << numRows << " " << numBytes << " " << numRawBytes << "\n"; if (pathId == tableId) { @@ -2317,7 +2382,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { for (auto& type : types) { schema[0].second = TTypeInfo(type); pk[0].second = TTypeInfo(type); - TestCompactionSplitGranule(schema, pk); + TestTableDescription table{.Schema = schema, .Pk = pk}; + TestCompactionSplitGranule(table); } } @@ -2333,14 +2399,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { for (auto& type : types) { schema[0].second = TTypeInfo(type); pk[0].second = TTypeInfo(type); - TestCompactionSplitGranule(schema, pk); + TestTableDescription table{.Schema = schema, .Pk = pk}; + TestCompactionSplitGranule(table); } } Y_UNIT_TEST(CompactionSplitGranuleSameStrKey) { - // TODO: KIKIMR-17862 - return; - std::vector<TTypeId> types = { NTypeIds::String, NTypeIds::Utf8 @@ -2354,7 +2418,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { for (auto& type : types) { schema[0].second = TTypeInfo(type); pk[0].second = TTypeInfo(type); - TestCompactionSplitGranule(schema, pk, opts); + TestTableDescription table{.Schema = schema, .Pk = pk, .CompositeMarks = true}; + TestCompactionSplitGranule(table, opts); } } @@ -2376,7 +2441,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 txId = 100; auto ydbSchema = TTestSchema::YdbSchema(); - SetupSchema(runtime, sender, tableId, ydbSchema); + SetupSchema(runtime, sender, tableId); TAutoPtr<IEventHandle> handle; // Write some test data to adavnce the time @@ -2454,7 +2519,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 tableId = 1; auto ydbSchema = TTestSchema::YdbSchema(); - SetupSchema(runtime, sender, tableId, ydbSchema); + SetupSchema(runtime, sender, tableId); TAutoPtr<IEventHandle> handle; bool blockReadFinished = true; @@ -2493,7 +2558,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 srcGranule{0}; for (const auto& portionInfo : msg->IndexChanges->SwitchedPortions) { ui64 granule = portionInfo.Granule(); - Y_VERIFY(!srcGranule || srcGranule == granule); + UNIT_ASSERT(!srcGranule || srcGranule == granule); srcGranule = granule; ui64 portionId = portionInfo.Portion(); Cerr << " " << portionId; |