aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-05-22 12:00:57 +0300
committerchertus <azuikov@ydb.tech>2023-05-22 12:00:57 +0300
commit15a58b55da960ce2000a126854d322f1cd05dd23 (patch)
treee1628fd249a1f85fbc3a3d0434de22d309329540
parentf04144fef1314ee9674744a8c167b930e5c4dbb6 (diff)
downloadydb-15a58b55da960ce2000a126854d322f1cd05dd23.tar.gz
enable flag for composite marks
-rw-r--r--ydb/core/formats/arrow/replace_key.h2
-rw-r--r--ydb/core/protos/flat_scheme_op.proto2
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h13
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h37
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp64
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h4
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h4
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/container.h12
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp7
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp227
10 files changed, 254 insertions, 118 deletions
diff --git a/ydb/core/formats/arrow/replace_key.h b/ydb/core/formats/arrow/replace_key.h
index 32c1227512..74f7974c1f 100644
--- a/ydb/core/formats/arrow/replace_key.h
+++ b/ydb/core/formats/arrow/replace_key.h
@@ -102,6 +102,8 @@ public:
const arrow::Array& Column(int i) const {
Y_VERIFY_DEBUG(Columns);
+ Y_VERIFY_DEBUG((size_t)i < Columns->size());
+ Y_VERIFY_DEBUG((*Columns)[i]);
return *(*Columns)[i];
}
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 62435785da..b9a633ec4e 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -440,6 +440,8 @@ message TColumnTableSchema {
//optional EColumnCodec DefaultCompressionCodec = 6; // deprecated, not used before replace
//optional int32 DefaultCompressionLevel = 7; // deprecated, not used before replace
optional TCompressionOptions DefaultCompression = 8;
+
+ optional bool CompositeMarks = 9;
}
message TAlterColumnTableSchema {
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 9a166d96b3..5c44730029 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -81,6 +81,9 @@ struct TTestSchema {
struct TTableSpecials : public TStorageTier {
std::vector<TStorageTier> Tiers;
+ bool CompositeMarks = false;
+
+ TTableSpecials() noexcept = default;
bool HasTiers() const {
return !Tiers.empty();
@@ -90,12 +93,18 @@ struct TTestSchema {
return EvictAfter.has_value();
}
- TTableSpecials WithCodec(const TString& codec) {
+ TTableSpecials WithCodec(const TString& codec) const {
TTableSpecials out = *this;
out.SetCodec(codec);
return out;
}
+ TTableSpecials WithCompositeMarks(bool composite) const {
+ TTableSpecials out = *this;
+ out.CompositeMarks = composite;
+ return out;
+ }
+
TTableSpecials& SetTtl(std::optional<TDuration> ttl) {
EvictAfter = ttl;
return *this;
@@ -214,6 +223,7 @@ struct TTestSchema {
if (specials.CompressionLevel) {
schema->MutableDefaultCompression()->SetCompressionLevel(*specials.CompressionLevel);
}
+ schema->SetCompositeMarks(specials.CompositeMarks);
}
static void InitTtl(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle::TTtl* ttl) {
@@ -400,6 +410,7 @@ struct TTestBlobOptions {
ui32 SameValue = 42;
};
+TCell MakeTestCell(const TTypeInfo& typeInfo, ui32 value, std::vector<TString>& mem);
TString MakeTestBlob(std::pair<ui64, ui64> range, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns,
const TTestBlobOptions& options = {});
TSerializedTableRange MakeTestRange(std::pair<ui64, ui64> range, bool inclusiveFrom, bool inclusiveTo,
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index b15c748915..eb5e925588 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -33,6 +33,22 @@ struct TCompactionLimits {
class TMark {
public:
+ struct TCompare {
+ using is_transparent = void;
+
+ bool operator() (const TMark& a, const TMark& b) const {
+ return a < b;
+ }
+
+ bool operator() (const arrow::Scalar& a, const TMark& b) const {
+ return a < b;
+ }
+
+ bool operator() (const TMark& a, const arrow::Scalar& b) const {
+ return a < b;
+ }
+ };
+
explicit TMark(const NArrow::TReplaceKey& key)
: Border(key)
{}
@@ -48,6 +64,23 @@ public:
return Border <=> m.Border;
}
+ bool operator == (const arrow::Scalar& firstKey) const {
+ // TODO: avoid ToScalar()
+ return NArrow::ScalarCompare(*NArrow::TReplaceKey::ToScalar(Border, 0), firstKey) == 0;
+ }
+
+ std::partial_ordering operator <=> (const arrow::Scalar& firstKey) const {
+ // TODO: avoid ToScalar()
+ const int cmp = NArrow::ScalarCompare(*NArrow::TReplaceKey::ToScalar(Border, 0), firstKey);
+ if (cmp < 0) {
+ return std::partial_ordering::less;
+ } else if (cmp > 0) {
+ return std::partial_ordering::greater;
+ } else {
+ return std::partial_ordering::equivalent;
+ }
+ }
+
const NArrow::TReplaceKey& GetBorder() const noexcept {
return Border;
}
@@ -160,7 +193,9 @@ public:
case INSERT:
return "insert";
case COMPACTION:
- return "compaction";
+ return CompactionInfo
+ ? (CompactionInfo->InGranule ? "compaction in granule" : "compaction split granule" )
+ : "compaction";
case CLEANUP:
return "cleanup";
case TTL:
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 78cd9eee33..d2102e74b1 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -1228,23 +1228,40 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
}
out->Granules.reserve(pathGranules.size());
// TODO: out.Portions.reserve()
- std::optional<TMap<TMark, ui64>::const_iterator> previousIterator;
+ std::optional<TMarksMap::const_iterator> previousIterator;
+ const bool compositeMark = GetIndexKey()->num_fields() > 1;
+
for (auto&& filter : pkRangesFilter) {
- std::optional<NArrow::TReplaceKey> keyFrom = filter.KeyFrom(GetIndexKey());
- std::optional<NArrow::TReplaceKey> keyTo = filter.KeyTo(GetIndexKey());
+ std::optional<NArrow::TReplaceKey> indexKeyFrom = filter.KeyFrom(GetIndexKey());
+ std::optional<NArrow::TReplaceKey> indexKeyTo = filter.KeyTo(GetIndexKey());
+
+ std::shared_ptr<arrow::Scalar> keyFrom;
+ std::shared_ptr<arrow::Scalar> keyTo;
+ if (indexKeyFrom) {
+ keyFrom = NArrow::TReplaceKey::ToScalar(*indexKeyFrom, 0);
+ }
+ if (indexKeyTo) {
+ keyTo = NArrow::TReplaceKey::ToScalar(*indexKeyTo, 0);
+ }
+
auto it = pathGranules.begin();
if (keyFrom) {
- it = pathGranules.upper_bound(TMark(*keyFrom));
- --it;
+ it = pathGranules.lower_bound(*keyFrom);
+ if (it != pathGranules.begin()) {
+ if (it == pathGranules.end() || compositeMark || *keyFrom != it->first) {
+ // TODO: better check if we really need an additional granule before the range
+ --it;
+ }
+ }
}
+
if (previousIterator && (previousIterator == pathGranules.end() || it->first < (*previousIterator)->first)) {
it = *previousIterator;
}
for (; it != pathGranules.end(); ++it) {
auto& mark = it->first;
ui64 granule = it->second;
-
- if (keyTo && *keyTo < mark.GetBorder()) {
+ if (keyTo && mark > *keyTo) {
break;
}
@@ -1269,11 +1286,13 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
}
}
Y_VERIFY(outPortion.Produced());
- if (!pkRangesFilter.IsPortionInUsage(outPortion, GetIndexInfo())) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_skipped")("granule", granule)("portion", portionInfo->Portion());
+ if (!compositeMark && !pkRangesFilter.IsPortionInUsage(outPortion, GetIndexInfo())) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_skipped")
+ ("granule", granule)("portion", portionInfo->Portion());
continue;
} else {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_selected")("granule", granule)("portion", portionInfo->Portion());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_selected")
+ ("granule", granule)("portion", portionInfo->Portion());
}
out->Portions.emplace_back(std::move(outPortion));
granuleHasDataForSnaphsot = true;
@@ -1294,9 +1313,8 @@ static bool NeedSplit(const THashMap<ui64, TPortionInfo>& portions, const TCompa
ui64 sumSize = 0;
ui64 sumMaxSize = 0;
size_t activeCount = 0;
- std::shared_ptr<arrow::Scalar> minPk0;
- std::shared_ptr<arrow::Scalar> maxPk0;
- bool pkEqual = true;
+ THashSet<NArrow::TReplaceKey> borders;
+ bool differentBorders = false;
for (const auto& [_, info] : portions) {
// We need only actual portions here (with empty XPlanStep:XTxId)
@@ -1306,19 +1324,11 @@ static bool NeedSplit(const THashMap<ui64, TPortionInfo>& portions, const TCompa
continue;
}
- if (pkEqual) {
- const auto [minPkCurrent, maxPkCurrent] = info.MinMaxIndexKeyValue();
- // Check that all pks equal to each other.
- if ((pkEqual = bool(minPkCurrent) && bool(maxPkCurrent))) {
- if (minPk0 && maxPk0) {
- pkEqual = arrow::ScalarEquals(*minPk0, *minPkCurrent) && arrow::ScalarEquals(*maxPk0, *maxPkCurrent);
- } else {
- pkEqual = arrow::ScalarEquals(*minPkCurrent, *maxPkCurrent);
-
- minPk0 = minPkCurrent;
- maxPk0 = maxPkCurrent;
- }
- }
+ // Find at least 2 unique borders
+ if (!differentBorders) {
+ borders.insert(info.IndexKeyStart());
+ borders.insert(info.IndexKeyEnd());
+ differentBorders = (borders.size() > 1);
}
auto sizes = info.BlobsSizes();
@@ -1335,7 +1345,7 @@ static bool NeedSplit(const THashMap<ui64, TPortionInfo>& portions, const TCompa
return false;
}
- return !pkEqual && (sumMaxSize >= limits.GranuleBlobSplitSize || sumSize >= limits.GranuleOverloadSize);
+ return differentBorders && (sumMaxSize >= limits.GranuleBlobSplitSize || sumSize >= limits.GranuleOverloadSize);
}
std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompactedGranule) {
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 82b8353c79..5b8cdf9173 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -238,6 +238,8 @@ public:
std::unique_ptr<TCompactionInfo> Compact(ui64& lastCompactedGranule) override;
private:
+ using TMarksMap = std::map<TMark, ui64, TMark::TCompare>;
+
struct TGranuleMeta {
const TGranuleRecord Record;
THashMap<ui64, TPortionInfo> Portions; // portion -> portionInfo
@@ -257,7 +259,7 @@ private:
std::shared_ptr<TColumnsTable> ColumnsTable;
std::shared_ptr<TCountersTable> CountersTable;
THashMap<ui64, std::shared_ptr<TGranuleMeta>> Granules; // granule -> meta
- THashMap<ui64, TMap<TMark, ui64>> PathGranules; // path_id -> {mark, granule}
+ THashMap<ui64, TMarksMap> PathGranules; // path_id -> {mark, granule}
TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id
THashSet<ui64> GranulesInSplit;
/// Set of empty granules.
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 1545f29780..00bb37bb16 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -328,10 +328,6 @@ struct TPortionInfo {
const TString& tierName);
void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted);
- std::tuple<std::shared_ptr<arrow::Scalar>, std::shared_ptr<arrow::Scalar>> MinMaxIndexKeyValue() const {
- return MinMaxValue(Meta.FirstPkColumn);
- }
-
std::tuple<std::shared_ptr<arrow::Scalar>, std::shared_ptr<arrow::Scalar>> MinMaxValue(const ui32 columnId) const;
std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;
diff --git a/ydb/core/tx/columnshard/engines/predicate/container.h b/ydb/core/tx/columnshard/engines/predicate/container.h
index a568a9f9cd..b43f6fbf3f 100644
--- a/ydb/core/tx/columnshard/engines/predicate/container.h
+++ b/ydb/core/tx/columnshard/engines/predicate/container.h
@@ -71,7 +71,17 @@ public:
std::optional<NArrow::TReplaceKey> ExtractKey(const std::shared_ptr<arrow::Schema>& key) const {
if (Object) {
- return NArrow::TReplaceKey::FromBatch(Object->Batch, key, 0);
+ const auto& batchFields = Object->Batch->schema()->fields();
+ const auto& keyFields = key->fields();
+ size_t minSize = std::min(batchFields.size(), keyFields.size());
+ for (size_t i = 0; i < minSize; ++i) {
+ Y_VERIFY_DEBUG(batchFields[i]->Equals(*keyFields[i]));
+ }
+ if (batchFields.size() <= keyFields.size()) {
+ return NArrow::TReplaceKey::FromBatch(Object->Batch, Object->Batch->schema(), 0);
+ } else {
+ return NArrow::TReplaceKey::FromBatch(Object->Batch, key, 0);
+ }
}
return {};
}
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index c84debaa77..b88462131b 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -222,7 +222,7 @@ bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIc
void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db) {
Y_VERIFY(SchemaPresets.contains(presetId));
auto preset = SchemaPresets.at(presetId);
-
+
TSchemaPreset::TSchemaPresetVersionInfo versionInfo;
versionInfo.SetId(presetId);
versionInfo.SetSinceStep(version.Step);
@@ -272,6 +272,9 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTable
indexInfo.SetAllKeys();
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId);
+ } else {
+ Y_VERIFY(PrimaryIndex->GetIndexInfo().GetReplaceKey()->Equals(indexInfo.GetReplaceKey()));
+ Y_VERIFY(PrimaryIndex->GetIndexInfo().GetIndexKey()->Equals(indexInfo.GetIndexKey()));
}
PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo));
@@ -289,7 +292,7 @@ NOlap::TIndexInfo TTablesManager::ConvertSchema(const TTableSchema& schema) {
Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
ui32 indexId = 0;
- NOlap::TIndexInfo indexInfo("", indexId);
+ NOlap::TIndexInfo indexInfo("", indexId, schema.GetCompositeMarks());
for (const auto& col : schema.GetColumns()) {
const ui32 id = col.GetId();
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 1438eba278..b8e63a6f58 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -21,7 +21,7 @@ using TTypeInfo = NScheme::TTypeInfo;
template <typename TKey = ui64>
bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range,
- bool requireUniq = false) {
+ bool requireUniq = false, const std::string& columnName = "timestamp") {
static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>;
THashMap<TKey, ui32> keys;
@@ -34,11 +34,14 @@ bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::p
}
auto schema = NArrow::DeserializeSchema(srtSchema);
+ //Cerr << "Got schema: " << schema->ToString() << "\n";
+
for (auto& blob : blobs) {
auto batch = NArrow::DeserializeBatch(blob, schema);
UNIT_ASSERT(batch);
+ //Cerr << "Got batch: " << batch->ToString() << "\n";
- std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp");
+ std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName);
UNIT_ASSERT(array);
for (int i = 0; i < array->length(); ++i) {
@@ -306,19 +309,18 @@ struct TestTableDescription {
std::vector<std::pair<TString, TTypeInfo>> Schema = TTestSchema::YdbSchema();
std::vector<std::pair<TString, TTypeInfo>> Pk = TTestSchema::YdbPkSchema();
bool InStore = true;
+ bool CompositeMarks = false;
};
void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
- const TestTableDescription& table, TString codec = "none") {
+ const TestTableDescription& table = {}, TString codec = "none") {
NOlap::TSnapshot snap(10, 10);
TString txBody;
+ auto specials = TTestSchema::TTableSpecials().WithCodec(codec).WithCompositeMarks(table.CompositeMarks);
if (table.InStore) {
- txBody = TTestSchema::CreateTableTxBody(
- pathId, table.Schema, table.Pk, TTestSchema::TTableSpecials().WithCodec(codec));
-
+ txBody = TTestSchema::CreateTableTxBody(pathId, table.Schema, table.Pk, specials);
} else {
- txBody = TTestSchema::CreateStandaloneTableTxBody(
- pathId, table.Schema, table.Pk, TTestSchema::TTableSpecials().WithCodec(codec));
+ txBody = TTestSchema::CreateStandaloneTableTxBody(pathId, table.Schema, table.Pk, specials);
}
bool ok = ProposeSchemaTx(runtime, sender, txBody, snap);
UNIT_ASSERT(ok);
@@ -326,14 +328,6 @@ void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
PlanSchemaTx(runtime, sender, snap);
}
-void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
- const std::vector<std::pair<TString, TTypeInfo>>& schema = TTestSchema::YdbSchema(),
- const std::vector<std::pair<TString, TTypeInfo>>& pk = TTestSchema::YdbPkSchema(),
- TString codec = "none") {
- TestTableDescription table{schema, pk, true};
- SetupSchema(runtime, sender, pathId, table, codec);
-}
-
std::vector<TString> ReadManyResults(TTestBasicRuntime& runtime, TString& schema,
NKikimrTxColumnShard::TMetadata& meta, ui32 expected = 1000) {
std::vector<TString> readData;
@@ -488,7 +482,7 @@ void TestWriteReadDup() {
ui64 tableId = 1;
auto ydbSchema = TTestSchema::YdbSchema();
- SetupSchema(runtime, sender, tableId, ydbSchema);
+ SetupSchema(runtime, sender, tableId);
constexpr ui32 numRows = 10;
std::pair<ui64, ui64> portion = {10, 10 + numRows};
@@ -534,7 +528,7 @@ void TestWriteReadLongTxDup() {
ui64 tableId = 1;
auto ydbSchema = TTestSchema::YdbSchema();
- SetupSchema(runtime, sender, tableId, ydbSchema);
+ SetupSchema(runtime, sender, tableId);
constexpr ui32 numRows = 10;
std::pair<ui64, ui64> portion = {10, 10 + numRows};
@@ -1061,7 +1055,8 @@ void TestCompactionInGranuleImpl(bool reboots,
ui64 planStep = 100;
ui64 txId = 100;
- SetupSchema(runtime, sender, tableId, ydbSchema, ydbPk);
+ TestTableDescription table{.Schema = ydbSchema, .Pk = ydbPk};
+ SetupSchema(runtime, sender, tableId, table);
TAutoPtr<IEventHandle> handle;
// Write same keys: merge on compaction
@@ -1337,7 +1332,7 @@ void TestReadWithProgram(const TestTableDescription& table = {})
ui64 planStep = 100;
ui64 txId = 100;
- SetupSchema(runtime, sender, tableId, table.Schema);
+ SetupSchema(runtime, sender, tableId, table);
{ // write some data
bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, table.Schema));
@@ -1464,7 +1459,7 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) {
ui64 planStep = 100;
ui64 txId = 100;
- SetupSchema(runtime, sender, tableId, table.Schema);
+ SetupSchema(runtime, sender, tableId, table);
{ // write some data
bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, table.Schema));
@@ -1642,7 +1637,8 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche
auto pk = ydbSchema;
pk.resize(4);
- SetupSchema(runtime, sender, tableId, ydbSchema, pk);
+ TestTableDescription table{.Schema = ydbSchema, .Pk = pk};
+ SetupSchema(runtime, sender, tableId, table);
{ // write some data
bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, testDataBlob);
@@ -2002,34 +1998,71 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
const ui64 PlanStep;
const ui64 TxId;
const std::vector<std::pair<TString, TTypeInfo>> YdbPk;
+
public:
TTabletReadPredicateTest(TTestBasicRuntime& runtime, const ui64 planStep, const ui64 txId, const std::vector<std::pair<TString, TTypeInfo>>& ydbPk)
: Runtime(runtime)
, PlanStep(planStep)
, TxId(txId)
- , YdbPk(ydbPk) {
-
- }
+ , YdbPk(ydbPk)
+ {}
class TBorder {
private:
- YDB_READONLY(i32, Border, 0);
- YDB_READONLY(bool, Include, false);
+ std::vector<ui32> Border;
+ bool Include;
+
public:
- TBorder(const ui32 ts, const bool include)
- : Border(ts)
- , Include(include) {
+ TBorder(const std::vector<ui32>& values, const bool include = false)
+ : Border(values)
+ , Include(include)
+ {}
+
+ bool GetInclude() const noexcept { return Include; }
+
+ std::vector<TCell> GetCellVec(const std::vector<std::pair<TString, TTypeInfo>>& pk,
+ std::vector<TString>& mem, bool trailingNulls = false) const
+ {
+ UNIT_ASSERT(Border.size() <= pk.size());
+ std::vector<TCell> cells;
+ size_t i = 0;
+ for (; i < Border.size(); ++i) {
+ cells.push_back(MakeTestCell(pk[i].second, Border[i], mem));
+ }
+ for (; trailingNulls && i < pk.size(); ++i) {
+ cells.push_back(TCell());
+ }
+ return cells;
+ }
+ };
+ struct TTestCaseOptions {
+ std::optional<TBorder> From;
+ std::optional<TBorder> To;
+ std::optional<ui32> ExpectedCount;
+ bool DataReadOnEmpty = false;
+
+ TTestCaseOptions()
+ : DataReadOnEmpty(false)
+ {}
+
+ TTestCaseOptions& SetFrom(const TBorder& border) { From = border; return *this; }
+ TTestCaseOptions& SetTo(const TBorder& border) { To = border; return *this; }
+ TTestCaseOptions& SetExpectedCount(ui32 count) { ExpectedCount = count; return *this; }
+ TTestCaseOptions& SetDataReadOnEmpty(bool flag) { DataReadOnEmpty = flag; return *this; }
+
+ TSerializedTableRange MakeRange(const std::vector<std::pair<TString, TTypeInfo>>& pk) const {
+ std::vector<TString> mem;
+ auto cellsFrom = From ? From->GetCellVec(pk, mem, true) : std::vector<TCell>();
+ auto cellsTo = To ? To->GetCellVec(pk, mem) : std::vector<TCell>();
+ return TSerializedTableRange(TConstArrayRef<TCell>(cellsFrom), (From ? From->GetInclude() : false),
+ TConstArrayRef<TCell>(cellsTo), (To ? To->GetInclude(): false));
}
};
- class TTestCase: TNonCopyable {
+ class TTestCase: public TTestCaseOptions, TNonCopyable {
private:
- YDB_ACCESSOR_DEF(std::optional<TBorder>, From);
- YDB_ACCESSOR_DEF(std::optional<TBorder>, To);
- YDB_ACCESSOR_DEF(std::optional<ui32>, ExpectedCount);
- YDB_ACCESSOR(bool, DataReadOnEmpty, false);
- TTabletReadPredicateTest& Owner;
+ const TTabletReadPredicateTest& Owner;
const TString TestCaseName;
void Execute() {
@@ -2041,9 +2074,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
Proto(read.get()).AddColumnNames("timestamp");
Proto(read.get()).AddColumnNames("message");
- const TSerializedTableRange range = MakeTestRange(
- { From ? From->GetBorder() : 0, To ? To->GetBorder() : 0 },
- From ? From->GetInclude() : false, To ? To->GetInclude() : false, Owner.YdbPk);
+ const TSerializedTableRange range = MakeRange(Owner.YdbPk);
NOlap::TPredicate prGreater, prLess;
std::tie(prGreater, prLess) = RangePredicates(range, Owner.YdbPk);
@@ -2093,7 +2124,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
auto batch = NArrow::DeserializeBatch(resRead.GetData(), schema);
UNIT_ASSERT(batch);
if (ExpectedCount) {
- Y_VERIFY_S(batch->num_rows() == *ExpectedCount, batch->num_rows());
+ if (batch->num_rows() != *ExpectedCount) {
+ Cerr << batch->ToString() << "\n";
+ }
+ UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), *ExpectedCount);
}
UNIT_ASSERT(meta.HasReadStats());
@@ -2105,14 +2139,16 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
UNIT_ASSERT(readStats.GetIndexBatches());
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO
UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message"
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1);
+ //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1);
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 0); // TODO: min-max index optimization?
}
}
}
+
public:
- TTestCase(TTabletReadPredicateTest& owner, const TString& testCaseName)
- : Owner(owner)
+ TTestCase(TTabletReadPredicateTest& owner, const TString& testCaseName, const TTestCaseOptions& opts = {})
+ : TTestCaseOptions(opts)
+ , Owner(owner)
, TestCaseName(testCaseName)
{
Cerr << "TEST CASE " << TestCaseName << " START..." << Endl;
@@ -2128,14 +2164,13 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
}
};
- TTestCase Test(const TString& testCaseName) {
- return TTestCase(*this, testCaseName);
+
+ TTestCase Test(const TString& testCaseName, const TTestCaseOptions& options = {}) {
+ return TTestCase(*this, testCaseName, options);
}
};
- void TestCompactionSplitGranule(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema,
- const std::vector<std::pair<TString, TTypeInfo>>& ydbPk,
- const TTestBlobOptions& testBlobOptions = {}) {
+ void TestCompactionSplitGranule(const TestTableDescription& table, const TTestBlobOptions& testBlobOptions = {}) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -2151,10 +2186,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 planStep = 100;
ui64 txId = 100;
- SetupSchema(runtime, sender, tableId, ydbSchema, ydbPk, "lz4");
+ SetupSchema(runtime, sender, tableId, table, "lz4");
TAutoPtr<IEventHandle> handle;
- bool isStrPk0 = ydbPk[0].second == TTypeInfo(NTypeIds::String) || ydbPk[0].second == TTypeInfo(NTypeIds::Utf8);
+ bool isStrPk0 = table.Pk[0].second == TTypeInfo(NTypeIds::String) || table.Pk[0].second == TTypeInfo(NTypeIds::Utf8);
// Write different keys: grow on compaction
@@ -2167,7 +2202,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) {
ui64 start = i * (triggerPortionSize - overlapSize);
std::pair<ui64, ui64> triggerPortion = { start, start + triggerPortionSize };
- TString triggerData = MakeTestBlob(triggerPortion, ydbSchema, testBlobOptions);
+ TString triggerData = MakeTestBlob(triggerPortion, table.Schema, testBlobOptions);
UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::GetMaxBlobSize());
@@ -2183,7 +2218,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
--txId;
ui32 numRows = numWrites * (triggerPortionSize - overlapSize) + overlapSize;
- TString schema;
for (ui32 i = 0; i < 2; ++i) {
{
@@ -2215,36 +2249,67 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
if (isStrPk0) {
- UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, numRows }, true));
+ if (testBlobOptions.SameValueColumns.contains("timestamp")) {
+ UNIT_ASSERT(!testBlobOptions.SameValueColumns.contains("message"));
+ UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, numRows }, true, "message"));
+ } else {
+ UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, numRows }, true, "timestamp"));
+ }
} else {
UNIT_ASSERT(DataHas(readData, schema, { 0, numRows }, true));
}
}
- TTabletReadPredicateTest testAgent(runtime, planStep, txId, ydbPk);
- testAgent.Test(":1)").SetTo(TTabletReadPredicateTest::TBorder(1, false)).SetExpectedCount(1);
- testAgent.Test(":1]").SetTo(TTabletReadPredicateTest::TBorder(1, true)).SetExpectedCount(2);
- testAgent.Test(":0)").SetTo(TTabletReadPredicateTest::TBorder(0, false)).SetExpectedCount(0);
- testAgent.Test(":0]").SetTo(TTabletReadPredicateTest::TBorder(0, true)).SetExpectedCount(1);
+ std::vector<ui32> val0 = { 0 };
+ std::vector<ui32> val1 = { 1 };
+ std::vector<ui32> val9990 = { 99990 };
+ std::vector<ui32> val9999 = { 99999 };
+ std::vector<ui32> val1M = { 1000000000 };
+ std::vector<ui32> val1M_1 = { 1000000001 };
+
+ const bool composite = !testBlobOptions.SameValueColumns.empty();
+ if (composite) {
+ UNIT_ASSERT(table.Pk.size() >= 2);
+
+ ui32 sameValue = testBlobOptions.SameValue;
+ val0 = { sameValue, 0 };
+ val1 = { sameValue, 1 };
+ val9990 = { sameValue, 99990 };
+ val9999 = { sameValue, 99999 };
+ val1M = { sameValue, 1000000000 };
+ val1M_1 = { sameValue, 1000000001 };
+ }
+
+ using TBorder = TTabletReadPredicateTest::TBorder;
- testAgent.Test("[0:0]").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TTabletReadPredicateTest::TBorder(0, true)).SetExpectedCount(1);
- testAgent.Test("[0:1)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TTabletReadPredicateTest::TBorder(1, false)).SetExpectedCount(1);
- testAgent.Test("(0:1)").SetFrom(TTabletReadPredicateTest::TBorder(0, false)).SetTo(TTabletReadPredicateTest::TBorder(1, false)).SetExpectedCount(0).SetDataReadOnEmpty(true);
- testAgent.Test("outscope1").SetFrom(TTabletReadPredicateTest::TBorder(1000000000, true)).SetTo(TTabletReadPredicateTest::TBorder(1000000001, true))
+ TTabletReadPredicateTest testAgent(runtime, planStep, txId, table.Pk);
+ testAgent.Test(":1)").SetTo(TBorder(val1, false)).SetExpectedCount(1);
+ testAgent.Test(":1]").SetTo(TBorder(val1, true)).SetExpectedCount(2);
+ testAgent.Test(":0)").SetTo(TBorder(val0, false)).SetExpectedCount(0).SetDataReadOnEmpty(composite);
+ testAgent.Test(":0]").SetTo(TBorder(val0, true)).SetExpectedCount(1);
+
+ testAgent.Test("[0:0]").SetFrom(TBorder(val0, true)).SetTo(TBorder(val0, true)).SetExpectedCount(1);
+ testAgent.Test("[0:1)").SetFrom(TBorder(val0, true)).SetTo(TBorder(val1, false)).SetExpectedCount(1);
+ testAgent.Test("(0:1)").SetFrom(TBorder(val0, false)).SetTo(TBorder(val1, false)).SetExpectedCount(0).SetDataReadOnEmpty(true);
+ testAgent.Test("outscope1").SetFrom(TBorder(val1M, true)).SetTo(TBorder(val1M_1, true))
.SetExpectedCount(0).SetDataReadOnEmpty(isStrPk0);
// VERIFIED AS INCORRECT INTERVAL (its good)
-// testAgent.Test("[0-0)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TTabletReadPredicateTest::TBorder(0, false)).SetExpectedCount(0);
+// testAgent.Test("[0-0)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TBorder(0, false)).SetExpectedCount(0);
if (isStrPk0) {
- testAgent.Test("(99990:").SetFrom(TTabletReadPredicateTest::TBorder(99990, false)).SetExpectedCount(109);
- testAgent.Test("(99990:99999)").SetFrom(TTabletReadPredicateTest::TBorder(99990, false)).SetTo(TTabletReadPredicateTest::TBorder(99999, false)).SetExpectedCount(98);
- testAgent.Test("(99990:99999]").SetFrom(TTabletReadPredicateTest::TBorder(99990, false)).SetTo(TTabletReadPredicateTest::TBorder(99999, true)).SetExpectedCount(99);
- testAgent.Test("[99990:99999]").SetFrom(TTabletReadPredicateTest::TBorder(99990, true)).SetTo(TTabletReadPredicateTest::TBorder(99999, true)).SetExpectedCount(100);
+ if (composite) {
+ // TODO
+ } else {
+ testAgent.Test("(99990:").SetFrom(TBorder(val9990, false)).SetExpectedCount(109);
+ testAgent.Test("(99990:99999)").SetFrom(TBorder(val9990, false)).SetTo(TBorder(val9999, false)).SetExpectedCount(98);
+ testAgent.Test("(99990:99999]").SetFrom(TBorder(val9990, false)).SetTo(TBorder(val9999, true)).SetExpectedCount(99);
+ testAgent.Test("[99990:99999]").SetFrom(TBorder(val9990, true)).SetTo(TBorder(val9999, true)).SetExpectedCount(100);
+ }
} else {
- testAgent.Test("(numRows:").SetFrom(TTabletReadPredicateTest::TBorder(numRows, false)).SetExpectedCount(0);
- testAgent.Test("(numRows-1:").SetFrom(TTabletReadPredicateTest::TBorder(numRows - 1, false)).SetExpectedCount(0);
- testAgent.Test("(numRows-2:").SetFrom(TTabletReadPredicateTest::TBorder(numRows - 2, false)).SetExpectedCount(1);
- testAgent.Test("[numRows-1:").SetFrom(TTabletReadPredicateTest::TBorder(numRows - 1, true)).SetExpectedCount(1);
+ testAgent.Test("(numRows:").SetFrom(TBorder({numRows}, false)).SetExpectedCount(0);
+ testAgent.Test("(numRows-1:").SetFrom(TBorder({numRows - 1}, false)).SetExpectedCount(0);
+ testAgent.Test("(numRows-2:").SetFrom(TBorder({numRows - 2}, false)).SetExpectedCount(1);
+ testAgent.Test("[numRows-1:").SetFrom(TBorder({numRows - 1}, true)).SetExpectedCount(1);
}
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
@@ -2277,7 +2342,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 numBytes = static_cast<arrow::UInt64Array&>(*bytes).Value(i);
ui64 numRawBytes = static_cast<arrow::UInt64Array&>(*rawBytes).Value(i);
- Cerr << "[" << __LINE__ << "] " << ydbPk[0].second.GetTypeId() << " "
+ Cerr << "[" << __LINE__ << "] " << table.Pk[0].second.GetTypeId() << " "
<< pathId << " " << kind << " " << numRows << " " << numBytes << " " << numRawBytes << "\n";
if (pathId == tableId) {
@@ -2317,7 +2382,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
for (auto& type : types) {
schema[0].second = TTypeInfo(type);
pk[0].second = TTypeInfo(type);
- TestCompactionSplitGranule(schema, pk);
+ TestTableDescription table{.Schema = schema, .Pk = pk};
+ TestCompactionSplitGranule(table);
}
}
@@ -2333,14 +2399,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
for (auto& type : types) {
schema[0].second = TTypeInfo(type);
pk[0].second = TTypeInfo(type);
- TestCompactionSplitGranule(schema, pk);
+ TestTableDescription table{.Schema = schema, .Pk = pk};
+ TestCompactionSplitGranule(table);
}
}
Y_UNIT_TEST(CompactionSplitGranuleSameStrKey) {
- // TODO: KIKIMR-17862
- return;
-
std::vector<TTypeId> types = {
NTypeIds::String,
NTypeIds::Utf8
@@ -2354,7 +2418,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
for (auto& type : types) {
schema[0].second = TTypeInfo(type);
pk[0].second = TTypeInfo(type);
- TestCompactionSplitGranule(schema, pk, opts);
+ TestTableDescription table{.Schema = schema, .Pk = pk, .CompositeMarks = true};
+ TestCompactionSplitGranule(table, opts);
}
}
@@ -2376,7 +2441,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 txId = 100;
auto ydbSchema = TTestSchema::YdbSchema();
- SetupSchema(runtime, sender, tableId, ydbSchema);
+ SetupSchema(runtime, sender, tableId);
TAutoPtr<IEventHandle> handle;
// Write some test data to adavnce the time
@@ -2454,7 +2519,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 tableId = 1;
auto ydbSchema = TTestSchema::YdbSchema();
- SetupSchema(runtime, sender, tableId, ydbSchema);
+ SetupSchema(runtime, sender, tableId);
TAutoPtr<IEventHandle> handle;
bool blockReadFinished = true;
@@ -2493,7 +2558,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 srcGranule{0};
for (const auto& portionInfo : msg->IndexChanges->SwitchedPortions) {
ui64 granule = portionInfo.Granule();
- Y_VERIFY(!srcGranule || srcGranule == granule);
+ UNIT_ASSERT(!srcGranule || srcGranule == granule);
srcGranule = granule;
ui64 portionId = portionInfo.Portion();
Cerr << " " << portionId;