diff options
author | chertus <azuikov@ydb.tech> | 2022-12-28 17:02:31 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-12-28 17:02:31 +0300 |
commit | 61e159ca8f5b33e3a43e1b34444581675f1e96dd (patch) | |
tree | 243741c059db49ccedb9e3897c625703d5333044 | |
parent | 45e0a7c6b36d8fb5ba18f74c720d85c21d434932 (diff) | |
download | ydb-61e159ca8f5b33e3a43e1b34444581675f1e96dd.tar.gz |
fix ALTER TTL for ColumnTable
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__propose_transaction.cpp | 40 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ttl.h | 26 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.h | 14 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/tier_info.h | 63 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/ut_logs_engine.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp | 124 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_info_types.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp | 22 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_ttl.cpp | 58 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_logstore_ut.cpp | 8 |
14 files changed, 303 insertions, 118 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 102bae6c38..5fa3c2c49f 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -213,25 +213,45 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex TString columnName = ttlBody.GetTtlColumnName(); if (columnName.empty()) { - statusMessage = "TTL tx wrong TTL column"; + statusMessage = "TTL tx wrong TTL column ''"; status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; break; } - for (ui64 pathId : ttlBody.GetPathIds()) { - pathTtls.emplace(pathId, NOlap::TTiering::MakeTtl(unixTime, columnName)); + if (!Self->PrimaryIndex) { + statusMessage = "No primary index for TTL"; + status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; + break; + } + + auto schema = Self->PrimaryIndex->GetIndexInfo().ArrowSchema(); + auto ttlColumn = schema->GetFieldByName(columnName); + if (!ttlColumn) { + statusMessage = "TTL tx wrong TTL column '" + columnName + "'"; + status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; + break; + } + + if (statusMessage.empty()) { + for (ui64 pathId : ttlBody.GetPathIds()) { + NOlap::TTiering tiering; + tiering.Ttl = NOlap::TTierInfo::MakeTtl(unixTime, columnName); + pathTtls.emplace(pathId, std::move(tiering)); + } } } - if (auto event = Self->SetupTtl(pathTtls, true)) { - if (event->NeedWrites()) { - ctx.Send(Self->EvictionActor, event.release()); + if (statusMessage.empty()) { + if (auto event = Self->SetupTtl(pathTtls, true)) { + if (event->NeedWrites()) { + ctx.Send(Self->EvictionActor, event.release()); + } else { + ctx.Send(Self->SelfId(), event->TxEvent.release()); + } + status = NKikimrTxColumnShard::EResultStatus::SUCCESS; } else { - ctx.Send(Self->SelfId(), event->TxEvent.release()); + statusMessage = "TTL not started"; } - status = NKikimrTxColumnShard::EResultStatus::SUCCESS; - } else { - statusMessage = "TTL not started"; } break; diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index d3afee8194..e3c3eca148 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -571,6 +571,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP } else { Ttl.DropPathTtl(pathId); } + Ttl.Repeat(); // Atler TTL triggers TTL activity info.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj()); Schema::SaveTableInfo(db, table.PathId, tieringUsage); @@ -798,7 +799,9 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); if (Tiers) { - actualIndexInfo.SetTiering(Tiers->GetTiering()); // TODO: pathIds + auto pathTiering = Tiers->GetTiering(); // TODO: pathIds + actualIndexInfo.UpdatePathTiering(pathTiering); + actualIndexInfo.SetPathTiering(std::move(pathTiering)); } ActiveIndexingOrCompaction = true; @@ -841,7 +844,9 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); if (Tiers) { - actualIndexInfo.SetTiering(Tiers->GetTiering()); // TODO: pathIds + auto pathTiering = Tiers->GetTiering(); // TODO: pathIds + actualIndexInfo.UpdatePathTiering(pathTiering); + actualIndexInfo.SetPathTiering(std::move(pathTiering)); } ActiveIndexingOrCompaction = true; @@ -880,9 +885,14 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u LOG_S_DEBUG("Evicting path " << i.first << " with " << i.second.GetDebugString() << " at tablet " << TabletID()); } + auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); + actualIndexInfo.UpdatePathTiering(eviction); + std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges; indexChanges = PrimaryIndex->StartTtl(eviction); + actualIndexInfo.SetPathTiering(std::move(eviction)); + if (!indexChanges) { LOG_S_NOTICE("Cannot prepare TTL at tablet " << TabletID()); return {}; @@ -893,9 +903,6 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u bool needWrites = !indexChanges->PortionsToEvict.empty(); - auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); - actualIndexInfo.SetTiering(std::move(eviction)); - ActiveTtl = true; auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false); return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites); @@ -950,7 +957,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); #if 0 // No need for now if (Tiers) { - actualIndexInfo.SetTiering(Tiers->GetTiering()); + ... } #endif diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h index 5846fad4a4..0201d857b3 100644 --- a/ydb/core/tx/columnshard/columnshard_ttl.h +++ b/ydb/core/tx/columnshard/columnshard_ttl.h @@ -11,6 +11,7 @@ public: struct TEviction { TDuration EvictAfter; TString ColumnName; + ui32 UnitsInSecond = 0; // 0 means auto (data type specific) }; struct TDescription { @@ -23,6 +24,24 @@ public: Eviction = TEviction{expireSec, ttl.GetColumnName()}; Y_VERIFY(!Eviction->ColumnName.empty()); + + switch (ttl.GetColumnUnit()) { + case NKikimrSchemeOp::TTTLSettings::UNIT_SECONDS: + Eviction->UnitsInSecond = 1; + break; + case NKikimrSchemeOp::TTTLSettings::UNIT_MILLISECONDS: + Eviction->UnitsInSecond = 1000; + break; + case NKikimrSchemeOp::TTTLSettings::UNIT_MICROSECONDS: + Eviction->UnitsInSecond = 1000 * 1000; + break; + case NKikimrSchemeOp::TTTLSettings::UNIT_NANOSECONDS: + Eviction->UnitsInSecond = 1000 * 1000 * 1000; + break; + case NKikimrSchemeOp::TTTLSettings::UNIT_AUTO: + default: + break; + } } }; @@ -75,11 +94,12 @@ private: TDuration RepeatTtlTimeout{TDuration::Seconds(DEFAULT_REPEAT_TTL_TIMEOUT_SEC)}; TInstant LastRegularTtl; - std::shared_ptr<NOlap::TTierInfo> Convert(const TDescription& descr, TInstant timePoint) const { + std::shared_ptr<NOlap::TTierInfo> Convert(const TDescription& descr, TInstant timePoint) const + { if (descr.Eviction) { auto& evict = descr.Eviction; - auto border = timePoint - evict->EvictAfter; - return NOlap::TTierInfo::MakeTtl(border, evict->ColumnName); + TInstant border = timePoint - evict->EvictAfter; + return NOlap::TTierInfo::MakeTtl(border, evict->ColumnName, evict->UnitsInSecond); } return {}; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index e45fecdb8f..1350f74b9b 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -805,7 +805,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash continue; // It's not an error: allow TTL over multiple shards with different pathIds presented } - auto expireTimestamp = ttl.ExpireTimestamp(); + auto expireTimestamp = ttl.EvictScalar(); Y_VERIFY(expireTimestamp); auto ttlColumnNames = ttl.GetTtlColumns(); @@ -831,10 +831,10 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash TString tierName; for (auto& tierRef : ttl.OrderedTiers) { // TODO: lower/upper_bound + move into TEviction auto& tierInfo = tierRef.Get(); - if (!IndexInfo.AllowTtlOverColumn(tierInfo.Column)) { + if (!IndexInfo.AllowTtlOverColumn(tierInfo.EvictColumnName)) { continue; // Ignore tiers with bad ttl column } - if (NArrow::ScalarLess(tierInfo.EvictTimestamp(), max)) { + if (NArrow::ScalarLess(tierInfo.EvictScalar(), max)) { tierName = tierInfo.Name; } else { break; diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 64e9191987..37ce40d1ea 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -257,4 +257,31 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const { return MinMaxIdxColumnsIds.count(it->second); } +void TIndexInfo::UpdatePathTiering(THashMap<ui64, NOlap::TTiering>& pathTiering) const { + auto schema = ArrowSchema(); // init Schema if not yet + + for (auto& [pathId, tiering] : pathTiering) { + for (auto& [tierName, tierInfo] : tiering.TierByName) { + if (!tierInfo->EvictColumn) { + tierInfo->EvictColumn = schema->GetFieldByName(tierInfo->EvictColumnName); + } + } + if (tiering.Ttl && !tiering.Ttl->EvictColumn) { + tiering.Ttl->EvictColumn = schema->GetFieldByName(tiering.Ttl->EvictColumnName); + } + } +} + +void TIndexInfo::SetPathTiering(THashMap<ui64, TTiering>&& pathTierings) { + PathTiering = std::move(pathTierings); +} + +const TTiering* TIndexInfo::GetTiering(ui64 pathId) const { + auto it = PathTiering.find(pathId); + if (it != PathTiering.end()) { + return &it->second; + } + return nullptr; +} + } diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 0efd5c562c..a04b8352f9 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -181,17 +181,9 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema { void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; } const TCompression& GetDefaultCompression() const { return DefaultCompression; } - void SetTiering(THashMap<ui64, TTiering>&& pathTierings) { - PathTiering = std::move(pathTierings); - } - - const TTiering* GetTiering(ui64 pathId) const { - auto it = PathTiering.find(pathId); - if (it != PathTiering.end()) { - return &it->second; - } - return nullptr; - } + void UpdatePathTiering(THashMap<ui64, NOlap::TTiering>& pathTiering) const; + void SetPathTiering(THashMap<ui64, TTiering>&& pathTierings); + const TTiering* GetTiering(ui64 pathId) const; private: ui32 Id; diff --git a/ydb/core/tx/columnshard/engines/tier_info.h b/ydb/core/tx/columnshard/engines/tier_info.h index 6c80fd0215..92ffa8d660 100644 --- a/ydb/core/tx/columnshard/engines/tier_info.h +++ b/ydb/core/tx/columnshard/engines/tier_info.h @@ -12,35 +12,57 @@ struct TCompression { struct TTierInfo { TString Name; TInstant EvictBorder; - TString Column; + TString EvictColumnName; + std::shared_ptr<arrow::Field> EvictColumn; std::optional<TCompression> Compression; + ui32 TtlUnitsInSecond; - TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column) + TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column, ui32 unitsInSecond = 0) : Name(tierName) , EvictBorder(evictBorder) - , Column(column) + , EvictColumnName(column) + , TtlUnitsInSecond(unitsInSecond) { Y_VERIFY(!Name.empty()); - Y_VERIFY(!Column.empty()); + Y_VERIFY(!EvictColumnName.empty()); } - std::shared_ptr<arrow::Scalar> EvictTimestamp() const { + std::shared_ptr<arrow::Scalar> EvictScalar() const { if (Scalar) { return Scalar; } - Scalar = std::make_shared<arrow::TimestampScalar>( - EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO)); + ui32 multiplier = TtlUnitsInSecond ? TtlUnitsInSecond : 1; + + Y_VERIFY(EvictColumn); + switch (EvictColumn->type()->id()) { + case arrow::Type::TIMESTAMP: + Scalar = std::make_shared<arrow::TimestampScalar>( + EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO)); + break; + case arrow::Type::UINT16: // YQL Date + Scalar = std::make_shared<arrow::UInt16Scalar>(EvictBorder.Days()); + break; + case arrow::Type::UINT32: // YQL Datetime or Uint32 + Scalar = std::make_shared<arrow::UInt32Scalar>(EvictBorder.Seconds() * multiplier); + break; + case arrow::Type::UINT64: + Scalar = std::make_shared<arrow::UInt64Scalar>(EvictBorder.Seconds() * multiplier); + break; + default: + break; + } + return Scalar; } - static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn) { - return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn); + static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn, ui32 unitsInSecond = 0) { + return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn, unitsInSecond); } TString GetDebugString() const { TStringBuilder sb; - sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << Column << "' " + sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' " << arrow::util::Codec::GetCodecAsString(Compression ? Compression->Codec : TCompression().Codec) << ":" << ((Compression && Compression->Level) ? *Compression->Level : arrow::util::kUseDefaultCompressionLevel); @@ -85,19 +107,14 @@ struct TTiering { TSet<TTierRef> OrderedTiers; // Tiers ordered by border std::shared_ptr<TTierInfo> Ttl; - static TTiering MakeTtl(TInstant ttlBorder, const TString& ttlColumn) { - TTiering out; - out.Ttl = TTierInfo::MakeTtl(ttlBorder, ttlColumn); - return out; - } - bool Empty() const { return OrderedTiers.empty(); } void Add(const std::shared_ptr<TTierInfo>& tier) { if (!Empty()) { - Y_VERIFY(tier->Column == OrderedTiers.begin()->Get().Column); // TODO: support different ttl columns + // TODO: support different ttl columns + Y_VERIFY(tier->EvictColumnName == OrderedTiers.begin()->Get().EvictColumnName); } TierByName.emplace(tier->Name, tier); @@ -111,9 +128,9 @@ struct TTiering { return {}; } - std::shared_ptr<arrow::Scalar> ExpireTimestamp() const { - auto ttlTs = Ttl ? Ttl->EvictTimestamp() : nullptr; - auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictTimestamp(); + std::shared_ptr<arrow::Scalar> EvictScalar() const { + auto ttlTs = Ttl ? Ttl->EvictScalar() : nullptr; + auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictScalar(); if (!ttlTs) { return tierTs; } else if (!tierTs) { @@ -134,10 +151,10 @@ struct TTiering { THashSet<TString> GetTtlColumns() const { THashSet<TString> out; if (Ttl) { - out.insert(Ttl->Column); + out.insert(Ttl->EvictColumnName); } for (auto& [tierName, tier] : TierByName) { - out.insert(tier->Column); + out.insert(tier->EvictColumnName); } return out; } @@ -145,7 +162,7 @@ struct TTiering { TString GetDebugString() const { TStringBuilder sb; if (Ttl) { - sb << "ttl border '" << Ttl->EvictBorder << "' column '" << Ttl->Column << "'; "; + sb << Ttl->GetDebugString() << "; "; } for (auto&& i : OrderedTiers) { sb << i.Get().GetDebugString() << "; "; diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 8eebd177c1..423318b75f 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -704,8 +704,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { } // TTL + std::shared_ptr<arrow::DataType> ttlColType = arrow::timestamp(arrow::TimeUnit::MICRO); THashMap<ui64, NOlap::TTiering> pathTtls; - pathTtls.emplace(pathId, TTiering::MakeTtl(TInstant::MicroSeconds(10000), "timestamp")); + NOlap::TTiering tiering; + tiering.Ttl = NOlap::TTierInfo::MakeTtl(TInstant::MicroSeconds(10000), "timestamp"); + tiering.Ttl->EvictColumn = std::make_shared<arrow::Field>("timestamp", ttlColType); + pathTtls.emplace(pathId, std::move(tiering)); Ttl(engine, db, pathTtls, 2); // read + load + read diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index a7e564f7ec..fe072bf9b5 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -34,16 +34,35 @@ std::shared_ptr<arrow::RecordBatch> UpdateColumn(std::shared_ptr<arrow::RecordBa auto schema = batch->schema(); int pos = schema->GetFieldIndex(name); UNIT_ASSERT(pos >= 0); - UNIT_ASSERT(batch->GetColumnByName(name)->type_id() == arrow::Type::TIMESTAMP); - - auto scalar = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO)); - UNIT_ASSERT_VALUES_EQUAL(scalar.value, seconds * 1000 * 1000); + auto colType = batch->GetColumnByName(name)->type_id(); + + std::shared_ptr<arrow::Array> array; + if (colType == arrow::Type::TIMESTAMP) { + auto scalar = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO)); + UNIT_ASSERT_VALUES_EQUAL(scalar.value, seconds * 1000 * 1000); + + auto res = arrow::MakeArrayFromScalar(scalar, batch->num_rows()); + UNIT_ASSERT(res.ok()); + array = *res; + } else if (colType == arrow::Type::UINT16) { // YQL Date + TInstant date(TInstant::Seconds(seconds)); + auto res = arrow::MakeArrayFromScalar(arrow::UInt16Scalar(date.Days()), batch->num_rows()); + UNIT_ASSERT(res.ok()); + array = *res; + } else if (colType == arrow::Type::UINT32) { // YQL Datetime or Uint32 + auto res = arrow::MakeArrayFromScalar(arrow::UInt32Scalar(seconds), batch->num_rows()); + UNIT_ASSERT(res.ok()); + array = *res; + } else if (colType == arrow::Type::UINT64) { + auto res = arrow::MakeArrayFromScalar(arrow::UInt64Scalar(seconds), batch->num_rows()); + UNIT_ASSERT(res.ok()); + array = *res; + } - auto res = arrow::MakeArrayFromScalar(scalar, batch->num_rows()); - UNIT_ASSERT(res.ok()); + UNIT_ASSERT(array); auto columns = batch->columns(); - columns[pos] = *res; + columns[pos] = array; return arrow::RecordBatch::Make(schema, batch->num_rows(), columns); } @@ -75,35 +94,54 @@ std::shared_ptr<arrow::Array> DeserializeColumn(const TString& blob, const TStri bool CheckSame(const TString& blob, const TString& strSchema, ui32 expectedSize, const std::string& columnName, i64 seconds) { - auto expected = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO)); - UNIT_ASSERT_VALUES_EQUAL(expected.value, seconds * 1000 * 1000); - auto tsCol = DeserializeColumn(blob, strSchema, columnName); UNIT_ASSERT(tsCol); UNIT_ASSERT_VALUES_EQUAL(tsCol->length(), expectedSize); + std::shared_ptr<arrow::Scalar> expected; + switch (tsCol->type_id()) { + case arrow::Type::TIMESTAMP: + expected = std::make_shared<arrow::TimestampScalar>(seconds * 1000 * 1000, + arrow::timestamp(arrow::TimeUnit::MICRO)); + break; + case arrow::Type::UINT16: + expected = std::make_shared<arrow::UInt16Scalar>(TInstant::Seconds(seconds).Days()); + break; + case arrow::Type::UINT32: + expected = std::make_shared<arrow::UInt32Scalar>(seconds); + break; + case arrow::Type::UINT64: + expected = std::make_shared<arrow::UInt64Scalar>(seconds); + break; + default: + break; + } + + UNIT_ASSERT(expected); + for (int i = 0; i < tsCol->length(); ++i) { auto value = *tsCol->GetScalar(i); - if (!value->Equals(expected)) { - Cerr << "Unexpected: '" << value->ToString() << "', expected " << expected.value << "\n"; + if (!value->Equals(*expected)) { + Cerr << "Unexpected: '" << value->ToString() << "', expected " << expected->ToString() << "\n"; return false; } } return true; } -std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize, const TString& ttlColumnName) { +std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize, const TString& ttlColumnName, + const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = testYdbSchema) { UNIT_ASSERT(ts.size() == 2); - TString data1 = MakeTestBlob({0, portionSize}, testYdbSchema); + TString data1 = MakeTestBlob({0, portionSize}, ydbSchema); UNIT_ASSERT(data1.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); UNIT_ASSERT(data1.size() < 7 * 1024 * 1024); - TString data2 = MakeTestBlob({overlapSize, overlapSize + portionSize}, testYdbSchema); + TString data2 = MakeTestBlob({overlapSize, overlapSize + portionSize}, ydbSchema); UNIT_ASSERT(data2.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); UNIT_ASSERT(data2.size() < 7 * 1024 * 1024); - auto schema = NArrow::MakeArrowSchema(testYdbSchema); + auto schema = NArrow::MakeArrowSchema(ydbSchema); auto batch1 = UpdateColumn(NArrow::DeserializeBatch(data1, schema), ttlColumnName, ts[0]); auto batch2 = UpdateColumn(NArrow::DeserializeBatch(data2, schema), ttlColumnName, ts[1]); @@ -135,8 +173,20 @@ static constexpr ui32 PORTION_ROWS = 80 * 1000; // ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020 // ts[1] = 1620000000; // date -u --date='@1620000000' Mon May 3 00:00:00 UTC 2021 void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, - std::vector<ui64> ts = {1600000000, 1620000000}) + const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = testYdbSchema) { + std::vector<ui64> ts = {1600000000, 1620000000}; + + ui32 ttlIncSeconds = 1; + for (auto& [name, typeInfo] : ydbSchema) { + if (name == spec.TtlColumn) { + if (typeInfo.GetTypeId() == NTypeIds::Date) { + ttlIncSeconds = TDuration::Days(1).Seconds(); + } + break; + } + } + TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -170,7 +220,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, spec.EvictAfter = TDuration::Seconds(ttlSec); } bool ok = ProposeSchemaTx(runtime, sender, - TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk, spec, "/Root/olapStore"), + TTestSchema::CreateInitShardTxBody(tableId, ydbSchema, testYdbPk, spec, "/Root/olapStore"), {++planStep, ++txId}); UNIT_ASSERT(ok); PlanSchemaTx(runtime, sender, {planStep, txId}); @@ -179,7 +229,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, } // - auto blobs = MakeData(ts, PORTION_ROWS, PORTION_ROWS / 2, spec.TtlColumn); + auto blobs = MakeData(ts, PORTION_ROWS, PORTION_ROWS / 2, spec.TtlColumn, ydbSchema); UNIT_ASSERT_EQUAL(blobs.size(), 2); for (auto& data : blobs) { UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data)); @@ -196,7 +246,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, if (internal) { TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1, spec.TtlColumn); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + ttlIncSeconds, spec.TtlColumn); } TAutoPtr<IEventHandle> handle; @@ -245,7 +295,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, if (internal) { TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1, spec.TtlColumn); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + ttlIncSeconds, spec.TtlColumn); } { @@ -283,7 +333,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, if (internal) { TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1, spec.TtlColumn); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - ttlIncSeconds, spec.TtlColumn); } { @@ -929,7 +979,20 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { } Y_UNIT_TEST(ExternalTTL) { - TestTtl(false, false); + TestTtl(false, false); // over NTypeIds::Timestamp ttl column + } + + Y_UNIT_TEST(ExternalTTL_Types) { + auto ydbSchema = testYdbSchema; + for (auto typeId : {NTypeIds::Datetime, NTypeIds::Date, NTypeIds::Uint32, NTypeIds::Uint64}) { + UNIT_ASSERT_EQUAL(ydbSchema[8].first, "saved_at"); + ydbSchema[8].second = TTypeInfo(typeId); + + TTestSchema::TTableSpecials specs; + specs.SetTtlColumn("saved_at"); + + TestTtl(false, false, specs, ydbSchema); + } } Y_UNIT_TEST(RebootExternalTTL) { @@ -938,7 +1001,20 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { } Y_UNIT_TEST(InternalTTL) { - TestTtl(false, true); + TestTtl(false, true); // over NTypeIds::Timestamp ttl column + } + + Y_UNIT_TEST(InternalTTL_Types) { + auto ydbSchema = testYdbSchema; + for (auto typeId : {NTypeIds::Datetime, NTypeIds::Date, NTypeIds::Uint32, NTypeIds::Uint64}) { + UNIT_ASSERT_EQUAL(ydbSchema[8].first, "saved_at"); + ydbSchema[8].second = TTypeInfo(typeId); + + TTestSchema::TTableSpecials specs; + specs.SetTtlColumn("saved_at"); + + TestTtl(false, true, specs, ydbSchema); + } } Y_UNIT_TEST(RebootInternalTTL) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp index 97320acd1f..e378cb278e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp @@ -109,10 +109,6 @@ TColumnTableInfo::TPtr ParseParams( return nullptr; } - if (!ValidateTtlSettingsChange(tableInfo->Description.GetTtlSettings(), alter.GetAlterTtlSettings(), errStr)) { - return nullptr; - } - *alterData->Description.MutableTtlSettings() = alter.GetAlterTtlSettings(); alterData->Description.MutableTtlSettings()->SetVersion(currentTtlVersion + 1); } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 0ef5806640..d1549cdbe1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2889,10 +2889,6 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, const THashMap<ui32, TOlapSchema::TColumn>& columns, const THashMap<TString, ui32>& columnsByName, TString& errStr); -bool ValidateTtlSettingsChange( - const NKikimrSchemeOp::TColumnDataLifeCycle& oldTtl, - const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, - TString& errStr); } } diff --git a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp index b1fcb1f9f6..c8a50ea7f2 100644 --- a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp @@ -158,7 +158,6 @@ static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle:: auto unit = ttl.GetColumnUnit(); switch (GetType(*column)) { - case NScheme::NTypeIds::Date: case NScheme::NTypeIds::DyNumber: errStr = "Unsupported column type for TTL in column tables"; // TODO return false; @@ -187,25 +186,4 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, return true; } -bool ValidateTtlSettingsChange( - const NKikimrSchemeOp::TColumnDataLifeCycle& oldTtl, - const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, - TString& errStr) -{ - if (oldTtl.HasEnabled() && ttl.HasEnabled()) { - TString newTtlColName; - TString oldTtlColName; - - newTtlColName = ttl.GetEnabled().GetColumnName(); - oldTtlColName = oldTtl.GetEnabled().GetColumnName(); - - if (newTtlColName != oldTtlColName) { - errStr = "Changing of TTL column is not supported for column tables"; - return false; - } - } - - return true; -} - }} diff --git a/ydb/core/tx/schemeshard/ut_ttl.cpp b/ydb/core/tx/schemeshard/ut_ttl.cpp index 6eaf99e5e2..02c1eb175b 100644 --- a/ydb/core/tx/schemeshard/ut_ttl.cpp +++ b/ydb/core/tx/schemeshard/ut_ttl.cpp @@ -26,7 +26,10 @@ void CheckTTLSettings(TTestActorRuntime& runtime, const char* tableName = "TTLEn ); } -void CheckColumnTableTTLSettings(TTestActorRuntime& runtime, const char* tableName = "ColumnTableTTL") { +void CheckColumnTableTTLSettings(TTestActorRuntime& runtime, + const char* tableName = "ColumnTableTTL", + const char* columnName = "modified_at") +{ TestDescribeResult( DescribePath(runtime, Sprintf("/MyRoot/%s", tableName)), { NLs::PathExist, @@ -36,7 +39,7 @@ void CheckColumnTableTTLSettings(TTestActorRuntime& runtime, const char* tableNa const auto& ttl = table.GetTtlSettings(); UNIT_ASSERT(ttl.HasEnabled()); - UNIT_ASSERT_VALUES_EQUAL(ttl.GetEnabled().GetColumnName(), "modified_at"); + UNIT_ASSERT_VALUES_EQUAL(ttl.GetEnabled().GetColumnName(), columnName); UNIT_ASSERT_VALUES_EQUAL(ttl.GetEnabled().GetExpireAfterSeconds(), 3600); } } @@ -1085,7 +1088,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardColumnTableTTL) { TTestEnv env(runtime); ui64 txId = 100; - for (auto ct : {"Date", "DyNumber"}) { + for (auto ct : {"String", "DyNumber"}) { TestCreateColumnTable(runtime, ++txId, "/MyRoot", Sprintf(R"( Name: "ColumnTableTTL" Schema { @@ -1133,6 +1136,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardColumnTableTTL) { Schema { Columns { Name: "key" Type: "Uint64" NotNull: true } Columns { Name: "modified_at" Type: "Timestamp" } + Columns { Name: "saved_at" Type: "Datetime" } KeyColumnNames: ["key"] } )"); @@ -1162,6 +1166,18 @@ Y_UNIT_TEST_SUITE(TSchemeShardColumnTableTTL) { TestAlterColumnTable(runtime, ++txId, "/MyRoot", R"( Name: "ColumnTableTTL" AlterTtlSettings { + Enabled { + ColumnName: "saved_at" + ExpireAfterSeconds: 3600 + } + } + )"); + env.TestWaitNotification(runtime, txId); + CheckColumnTableTTLSettings(runtime, "ColumnTableTTL", "saved_at"); + + TestAlterColumnTable(runtime, ++txId, "/MyRoot", R"( + Name: "ColumnTableTTL" + AlterTtlSettings { Disabled { } } @@ -1178,6 +1194,42 @@ Y_UNIT_TEST_SUITE(TSchemeShardColumnTableTTL) { } ); } + + Y_UNIT_TEST(AlterColumnTable_Negative) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateColumnTable(runtime, ++txId, "/MyRoot", R"( + Name: "ColumnTableTTL" + Schema { + Columns { Name: "key" Type: "Uint64" NotNull: true } + Columns { Name: "modified_at" Type: "Timestamp" } + Columns { Name: "str" Type: "String" } + KeyColumnNames: ["key"] + } + )"); + env.TestWaitNotification(runtime, txId); + TestDescribeResult( + DescribePath(runtime, "/MyRoot/ColumnTableTTL"), { + NLs::PathExist, + NLs::Finished, [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + const auto& table = record.GetPathDescription().GetColumnTableDescription(); + UNIT_ASSERT(!table.HasTtlSettings()); + } + } + ); + + TestAlterColumnTable(runtime, ++txId, "/MyRoot", R"( + Name: "ColumnTableTTL" + AlterTtlSettings { + Enabled { + ColumnName: "str" + ExpireAfterSeconds: 3600 + } + } + )", {NKikimrScheme::StatusSchemeError}); + } } Y_UNIT_TEST_SUITE(TSchemeShardTTLTestsWithReboots) { diff --git a/ydb/services/ydb/ydb_logstore_ut.cpp b/ydb/services/ydb/ydb_logstore_ut.cpp index 19875ffb7d..44b08f43c9 100644 --- a/ydb/services/ydb/ydb_logstore_ut.cpp +++ b/ydb/services/ydb/ydb_logstore_ut.cpp @@ -476,12 +476,12 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { UNIT_ASSERT_C(ttlSettings.Empty(), "Table must not have TTL settings"); } - // Change TTL column (currently not supported) + // Change TTL column { NYdb::NLogStore::TAlterLogTableSettings alterLogTableSettings; - alterLogTableSettings.AlterTtlSettings(NYdb::NTable::TAlterTtlSettings::Set("ingested_at", TDuration::Seconds(86400))); + alterLogTableSettings.AlterTtlSettings(NYdb::NTable::TAlterTtlSettings::Set("ingested_at", TDuration::Seconds(2000))); auto res = logStoreClient.AlterLogTable("/Root/LogStore/log2", std::move(alterLogTableSettings)).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SCHEME_ERROR, res.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); } { auto res = logStoreClient.DescribeLogTable("/Root/LogStore/log2").GetValueSync(); @@ -489,7 +489,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { auto descr = res.GetDescription(); auto ttlSettings = descr.GetTtlSettings(); UNIT_ASSERT_C(!ttlSettings.Empty(), "Table must have TTL settings"); - UNIT_ASSERT_VALUES_EQUAL(ttlSettings->GetDateTypeColumn().GetColumnName(), "saved_at"); + UNIT_ASSERT_VALUES_EQUAL(ttlSettings->GetDateTypeColumn().GetColumnName(), "ingested_at"); UNIT_ASSERT_VALUES_EQUAL(ttlSettings->GetDateTypeColumn().GetExpireAfter(), TDuration::Seconds(2000)); } |