aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-12-28 17:02:31 +0300
committerchertus <azuikov@ydb.tech>2022-12-28 17:02:31 +0300
commit61e159ca8f5b33e3a43e1b34444581675f1e96dd (patch)
tree243741c059db49ccedb9e3897c625703d5333044
parent45e0a7c6b36d8fb5ba18f74c720d85c21d434932 (diff)
downloadydb-61e159ca8f5b33e3a43e1b34444581675f1e96dd.tar.gz
fix ALTER TTL for ColumnTable
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp40
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp19
-rw-r--r--ydb/core/tx/columnshard/columnshard_ttl.h26
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp27
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h14
-rw-r--r--ydb/core/tx/columnshard/engines/tier_info.h63
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp6
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp124
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp22
-rw-r--r--ydb/core/tx/schemeshard/ut_ttl.cpp58
-rw-r--r--ydb/services/ydb/ydb_logstore_ut.cpp8
14 files changed, 303 insertions, 118 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index 102bae6c38..5fa3c2c49f 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -213,25 +213,45 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
TString columnName = ttlBody.GetTtlColumnName();
if (columnName.empty()) {
- statusMessage = "TTL tx wrong TTL column";
+ statusMessage = "TTL tx wrong TTL column ''";
status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
break;
}
- for (ui64 pathId : ttlBody.GetPathIds()) {
- pathTtls.emplace(pathId, NOlap::TTiering::MakeTtl(unixTime, columnName));
+ if (!Self->PrimaryIndex) {
+ statusMessage = "No primary index for TTL";
+ status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
+ break;
+ }
+
+ auto schema = Self->PrimaryIndex->GetIndexInfo().ArrowSchema();
+ auto ttlColumn = schema->GetFieldByName(columnName);
+ if (!ttlColumn) {
+ statusMessage = "TTL tx wrong TTL column '" + columnName + "'";
+ status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
+ break;
+ }
+
+ if (statusMessage.empty()) {
+ for (ui64 pathId : ttlBody.GetPathIds()) {
+ NOlap::TTiering tiering;
+ tiering.Ttl = NOlap::TTierInfo::MakeTtl(unixTime, columnName);
+ pathTtls.emplace(pathId, std::move(tiering));
+ }
}
}
- if (auto event = Self->SetupTtl(pathTtls, true)) {
- if (event->NeedWrites()) {
- ctx.Send(Self->EvictionActor, event.release());
+ if (statusMessage.empty()) {
+ if (auto event = Self->SetupTtl(pathTtls, true)) {
+ if (event->NeedWrites()) {
+ ctx.Send(Self->EvictionActor, event.release());
+ } else {
+ ctx.Send(Self->SelfId(), event->TxEvent.release());
+ }
+ status = NKikimrTxColumnShard::EResultStatus::SUCCESS;
} else {
- ctx.Send(Self->SelfId(), event->TxEvent.release());
+ statusMessage = "TTL not started";
}
- status = NKikimrTxColumnShard::EResultStatus::SUCCESS;
- } else {
- statusMessage = "TTL not started";
}
break;
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index d3afee8194..e3c3eca148 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -571,6 +571,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
} else {
Ttl.DropPathTtl(pathId);
}
+ Ttl.Repeat(); // Atler TTL triggers TTL activity
info.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
Schema::SaveTableInfo(db, table.PathId, tieringUsage);
@@ -798,7 +799,9 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
if (Tiers) {
- actualIndexInfo.SetTiering(Tiers->GetTiering()); // TODO: pathIds
+ auto pathTiering = Tiers->GetTiering(); // TODO: pathIds
+ actualIndexInfo.UpdatePathTiering(pathTiering);
+ actualIndexInfo.SetPathTiering(std::move(pathTiering));
}
ActiveIndexingOrCompaction = true;
@@ -841,7 +844,9 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
if (Tiers) {
- actualIndexInfo.SetTiering(Tiers->GetTiering()); // TODO: pathIds
+ auto pathTiering = Tiers->GetTiering(); // TODO: pathIds
+ actualIndexInfo.UpdatePathTiering(pathTiering);
+ actualIndexInfo.SetPathTiering(std::move(pathTiering));
}
ActiveIndexingOrCompaction = true;
@@ -880,9 +885,14 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
LOG_S_DEBUG("Evicting path " << i.first << " with " << i.second.GetDebugString() << " at tablet " << TabletID());
}
+ auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
+ actualIndexInfo.UpdatePathTiering(eviction);
+
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges;
indexChanges = PrimaryIndex->StartTtl(eviction);
+ actualIndexInfo.SetPathTiering(std::move(eviction));
+
if (!indexChanges) {
LOG_S_NOTICE("Cannot prepare TTL at tablet " << TabletID());
return {};
@@ -893,9 +903,6 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
bool needWrites = !indexChanges->PortionsToEvict.empty();
- auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
- actualIndexInfo.SetTiering(std::move(eviction));
-
ActiveTtl = true;
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false);
return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites);
@@ -950,7 +957,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
#if 0 // No need for now
if (Tiers) {
- actualIndexInfo.SetTiering(Tiers->GetTiering());
+ ...
}
#endif
diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h
index 5846fad4a4..0201d857b3 100644
--- a/ydb/core/tx/columnshard/columnshard_ttl.h
+++ b/ydb/core/tx/columnshard/columnshard_ttl.h
@@ -11,6 +11,7 @@ public:
struct TEviction {
TDuration EvictAfter;
TString ColumnName;
+ ui32 UnitsInSecond = 0; // 0 means auto (data type specific)
};
struct TDescription {
@@ -23,6 +24,24 @@ public:
Eviction = TEviction{expireSec, ttl.GetColumnName()};
Y_VERIFY(!Eviction->ColumnName.empty());
+
+ switch (ttl.GetColumnUnit()) {
+ case NKikimrSchemeOp::TTTLSettings::UNIT_SECONDS:
+ Eviction->UnitsInSecond = 1;
+ break;
+ case NKikimrSchemeOp::TTTLSettings::UNIT_MILLISECONDS:
+ Eviction->UnitsInSecond = 1000;
+ break;
+ case NKikimrSchemeOp::TTTLSettings::UNIT_MICROSECONDS:
+ Eviction->UnitsInSecond = 1000 * 1000;
+ break;
+ case NKikimrSchemeOp::TTTLSettings::UNIT_NANOSECONDS:
+ Eviction->UnitsInSecond = 1000 * 1000 * 1000;
+ break;
+ case NKikimrSchemeOp::TTTLSettings::UNIT_AUTO:
+ default:
+ break;
+ }
}
};
@@ -75,11 +94,12 @@ private:
TDuration RepeatTtlTimeout{TDuration::Seconds(DEFAULT_REPEAT_TTL_TIMEOUT_SEC)};
TInstant LastRegularTtl;
- std::shared_ptr<NOlap::TTierInfo> Convert(const TDescription& descr, TInstant timePoint) const {
+ std::shared_ptr<NOlap::TTierInfo> Convert(const TDescription& descr, TInstant timePoint) const
+ {
if (descr.Eviction) {
auto& evict = descr.Eviction;
- auto border = timePoint - evict->EvictAfter;
- return NOlap::TTierInfo::MakeTtl(border, evict->ColumnName);
+ TInstant border = timePoint - evict->EvictAfter;
+ return NOlap::TTierInfo::MakeTtl(border, evict->ColumnName, evict->UnitsInSecond);
}
return {};
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index e45fecdb8f..1350f74b9b 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -805,7 +805,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
continue; // It's not an error: allow TTL over multiple shards with different pathIds presented
}
- auto expireTimestamp = ttl.ExpireTimestamp();
+ auto expireTimestamp = ttl.EvictScalar();
Y_VERIFY(expireTimestamp);
auto ttlColumnNames = ttl.GetTtlColumns();
@@ -831,10 +831,10 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
TString tierName;
for (auto& tierRef : ttl.OrderedTiers) { // TODO: lower/upper_bound + move into TEviction
auto& tierInfo = tierRef.Get();
- if (!IndexInfo.AllowTtlOverColumn(tierInfo.Column)) {
+ if (!IndexInfo.AllowTtlOverColumn(tierInfo.EvictColumnName)) {
continue; // Ignore tiers with bad ttl column
}
- if (NArrow::ScalarLess(tierInfo.EvictTimestamp(), max)) {
+ if (NArrow::ScalarLess(tierInfo.EvictScalar(), max)) {
tierName = tierInfo.Name;
} else {
break;
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index 64e9191987..37ce40d1ea 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -257,4 +257,31 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const {
return MinMaxIdxColumnsIds.count(it->second);
}
+void TIndexInfo::UpdatePathTiering(THashMap<ui64, NOlap::TTiering>& pathTiering) const {
+ auto schema = ArrowSchema(); // init Schema if not yet
+
+ for (auto& [pathId, tiering] : pathTiering) {
+ for (auto& [tierName, tierInfo] : tiering.TierByName) {
+ if (!tierInfo->EvictColumn) {
+ tierInfo->EvictColumn = schema->GetFieldByName(tierInfo->EvictColumnName);
+ }
+ }
+ if (tiering.Ttl && !tiering.Ttl->EvictColumn) {
+ tiering.Ttl->EvictColumn = schema->GetFieldByName(tiering.Ttl->EvictColumnName);
+ }
+ }
+}
+
+void TIndexInfo::SetPathTiering(THashMap<ui64, TTiering>&& pathTierings) {
+ PathTiering = std::move(pathTierings);
+}
+
+const TTiering* TIndexInfo::GetTiering(ui64 pathId) const {
+ auto it = PathTiering.find(pathId);
+ if (it != PathTiering.end()) {
+ return &it->second;
+ }
+ return nullptr;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 0efd5c562c..a04b8352f9 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -181,17 +181,9 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema {
void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; }
const TCompression& GetDefaultCompression() const { return DefaultCompression; }
- void SetTiering(THashMap<ui64, TTiering>&& pathTierings) {
- PathTiering = std::move(pathTierings);
- }
-
- const TTiering* GetTiering(ui64 pathId) const {
- auto it = PathTiering.find(pathId);
- if (it != PathTiering.end()) {
- return &it->second;
- }
- return nullptr;
- }
+ void UpdatePathTiering(THashMap<ui64, NOlap::TTiering>& pathTiering) const;
+ void SetPathTiering(THashMap<ui64, TTiering>&& pathTierings);
+ const TTiering* GetTiering(ui64 pathId) const;
private:
ui32 Id;
diff --git a/ydb/core/tx/columnshard/engines/tier_info.h b/ydb/core/tx/columnshard/engines/tier_info.h
index 6c80fd0215..92ffa8d660 100644
--- a/ydb/core/tx/columnshard/engines/tier_info.h
+++ b/ydb/core/tx/columnshard/engines/tier_info.h
@@ -12,35 +12,57 @@ struct TCompression {
struct TTierInfo {
TString Name;
TInstant EvictBorder;
- TString Column;
+ TString EvictColumnName;
+ std::shared_ptr<arrow::Field> EvictColumn;
std::optional<TCompression> Compression;
+ ui32 TtlUnitsInSecond;
- TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column)
+ TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column, ui32 unitsInSecond = 0)
: Name(tierName)
, EvictBorder(evictBorder)
- , Column(column)
+ , EvictColumnName(column)
+ , TtlUnitsInSecond(unitsInSecond)
{
Y_VERIFY(!Name.empty());
- Y_VERIFY(!Column.empty());
+ Y_VERIFY(!EvictColumnName.empty());
}
- std::shared_ptr<arrow::Scalar> EvictTimestamp() const {
+ std::shared_ptr<arrow::Scalar> EvictScalar() const {
if (Scalar) {
return Scalar;
}
- Scalar = std::make_shared<arrow::TimestampScalar>(
- EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO));
+ ui32 multiplier = TtlUnitsInSecond ? TtlUnitsInSecond : 1;
+
+ Y_VERIFY(EvictColumn);
+ switch (EvictColumn->type()->id()) {
+ case arrow::Type::TIMESTAMP:
+ Scalar = std::make_shared<arrow::TimestampScalar>(
+ EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO));
+ break;
+ case arrow::Type::UINT16: // YQL Date
+ Scalar = std::make_shared<arrow::UInt16Scalar>(EvictBorder.Days());
+ break;
+ case arrow::Type::UINT32: // YQL Datetime or Uint32
+ Scalar = std::make_shared<arrow::UInt32Scalar>(EvictBorder.Seconds() * multiplier);
+ break;
+ case arrow::Type::UINT64:
+ Scalar = std::make_shared<arrow::UInt64Scalar>(EvictBorder.Seconds() * multiplier);
+ break;
+ default:
+ break;
+ }
+
return Scalar;
}
- static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn) {
- return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn);
+ static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn, ui32 unitsInSecond = 0) {
+ return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn, unitsInSecond);
}
TString GetDebugString() const {
TStringBuilder sb;
- sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << Column << "' "
+ sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' "
<< arrow::util::Codec::GetCodecAsString(Compression ? Compression->Codec : TCompression().Codec)
<< ":" << ((Compression && Compression->Level) ?
*Compression->Level : arrow::util::kUseDefaultCompressionLevel);
@@ -85,19 +107,14 @@ struct TTiering {
TSet<TTierRef> OrderedTiers; // Tiers ordered by border
std::shared_ptr<TTierInfo> Ttl;
- static TTiering MakeTtl(TInstant ttlBorder, const TString& ttlColumn) {
- TTiering out;
- out.Ttl = TTierInfo::MakeTtl(ttlBorder, ttlColumn);
- return out;
- }
-
bool Empty() const {
return OrderedTiers.empty();
}
void Add(const std::shared_ptr<TTierInfo>& tier) {
if (!Empty()) {
- Y_VERIFY(tier->Column == OrderedTiers.begin()->Get().Column); // TODO: support different ttl columns
+ // TODO: support different ttl columns
+ Y_VERIFY(tier->EvictColumnName == OrderedTiers.begin()->Get().EvictColumnName);
}
TierByName.emplace(tier->Name, tier);
@@ -111,9 +128,9 @@ struct TTiering {
return {};
}
- std::shared_ptr<arrow::Scalar> ExpireTimestamp() const {
- auto ttlTs = Ttl ? Ttl->EvictTimestamp() : nullptr;
- auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictTimestamp();
+ std::shared_ptr<arrow::Scalar> EvictScalar() const {
+ auto ttlTs = Ttl ? Ttl->EvictScalar() : nullptr;
+ auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictScalar();
if (!ttlTs) {
return tierTs;
} else if (!tierTs) {
@@ -134,10 +151,10 @@ struct TTiering {
THashSet<TString> GetTtlColumns() const {
THashSet<TString> out;
if (Ttl) {
- out.insert(Ttl->Column);
+ out.insert(Ttl->EvictColumnName);
}
for (auto& [tierName, tier] : TierByName) {
- out.insert(tier->Column);
+ out.insert(tier->EvictColumnName);
}
return out;
}
@@ -145,7 +162,7 @@ struct TTiering {
TString GetDebugString() const {
TStringBuilder sb;
if (Ttl) {
- sb << "ttl border '" << Ttl->EvictBorder << "' column '" << Ttl->Column << "'; ";
+ sb << Ttl->GetDebugString() << "; ";
}
for (auto&& i : OrderedTiers) {
sb << i.Get().GetDebugString() << "; ";
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 8eebd177c1..423318b75f 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -704,8 +704,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
}
// TTL
+ std::shared_ptr<arrow::DataType> ttlColType = arrow::timestamp(arrow::TimeUnit::MICRO);
THashMap<ui64, NOlap::TTiering> pathTtls;
- pathTtls.emplace(pathId, TTiering::MakeTtl(TInstant::MicroSeconds(10000), "timestamp"));
+ NOlap::TTiering tiering;
+ tiering.Ttl = NOlap::TTierInfo::MakeTtl(TInstant::MicroSeconds(10000), "timestamp");
+ tiering.Ttl->EvictColumn = std::make_shared<arrow::Field>("timestamp", ttlColType);
+ pathTtls.emplace(pathId, std::move(tiering));
Ttl(engine, db, pathTtls, 2);
// read + load + read
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index a7e564f7ec..fe072bf9b5 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -34,16 +34,35 @@ std::shared_ptr<arrow::RecordBatch> UpdateColumn(std::shared_ptr<arrow::RecordBa
auto schema = batch->schema();
int pos = schema->GetFieldIndex(name);
UNIT_ASSERT(pos >= 0);
- UNIT_ASSERT(batch->GetColumnByName(name)->type_id() == arrow::Type::TIMESTAMP);
-
- auto scalar = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO));
- UNIT_ASSERT_VALUES_EQUAL(scalar.value, seconds * 1000 * 1000);
+ auto colType = batch->GetColumnByName(name)->type_id();
+
+ std::shared_ptr<arrow::Array> array;
+ if (colType == arrow::Type::TIMESTAMP) {
+ auto scalar = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO));
+ UNIT_ASSERT_VALUES_EQUAL(scalar.value, seconds * 1000 * 1000);
+
+ auto res = arrow::MakeArrayFromScalar(scalar, batch->num_rows());
+ UNIT_ASSERT(res.ok());
+ array = *res;
+ } else if (colType == arrow::Type::UINT16) { // YQL Date
+ TInstant date(TInstant::Seconds(seconds));
+ auto res = arrow::MakeArrayFromScalar(arrow::UInt16Scalar(date.Days()), batch->num_rows());
+ UNIT_ASSERT(res.ok());
+ array = *res;
+ } else if (colType == arrow::Type::UINT32) { // YQL Datetime or Uint32
+ auto res = arrow::MakeArrayFromScalar(arrow::UInt32Scalar(seconds), batch->num_rows());
+ UNIT_ASSERT(res.ok());
+ array = *res;
+ } else if (colType == arrow::Type::UINT64) {
+ auto res = arrow::MakeArrayFromScalar(arrow::UInt64Scalar(seconds), batch->num_rows());
+ UNIT_ASSERT(res.ok());
+ array = *res;
+ }
- auto res = arrow::MakeArrayFromScalar(scalar, batch->num_rows());
- UNIT_ASSERT(res.ok());
+ UNIT_ASSERT(array);
auto columns = batch->columns();
- columns[pos] = *res;
+ columns[pos] = array;
return arrow::RecordBatch::Make(schema, batch->num_rows(), columns);
}
@@ -75,35 +94,54 @@ std::shared_ptr<arrow::Array> DeserializeColumn(const TString& blob, const TStri
bool CheckSame(const TString& blob, const TString& strSchema, ui32 expectedSize,
const std::string& columnName, i64 seconds) {
- auto expected = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO));
- UNIT_ASSERT_VALUES_EQUAL(expected.value, seconds * 1000 * 1000);
-
auto tsCol = DeserializeColumn(blob, strSchema, columnName);
UNIT_ASSERT(tsCol);
UNIT_ASSERT_VALUES_EQUAL(tsCol->length(), expectedSize);
+ std::shared_ptr<arrow::Scalar> expected;
+ switch (tsCol->type_id()) {
+ case arrow::Type::TIMESTAMP:
+ expected = std::make_shared<arrow::TimestampScalar>(seconds * 1000 * 1000,
+ arrow::timestamp(arrow::TimeUnit::MICRO));
+ break;
+ case arrow::Type::UINT16:
+ expected = std::make_shared<arrow::UInt16Scalar>(TInstant::Seconds(seconds).Days());
+ break;
+ case arrow::Type::UINT32:
+ expected = std::make_shared<arrow::UInt32Scalar>(seconds);
+ break;
+ case arrow::Type::UINT64:
+ expected = std::make_shared<arrow::UInt64Scalar>(seconds);
+ break;
+ default:
+ break;
+ }
+
+ UNIT_ASSERT(expected);
+
for (int i = 0; i < tsCol->length(); ++i) {
auto value = *tsCol->GetScalar(i);
- if (!value->Equals(expected)) {
- Cerr << "Unexpected: '" << value->ToString() << "', expected " << expected.value << "\n";
+ if (!value->Equals(*expected)) {
+ Cerr << "Unexpected: '" << value->ToString() << "', expected " << expected->ToString() << "\n";
return false;
}
}
return true;
}
-std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize, const TString& ttlColumnName) {
+std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize, const TString& ttlColumnName,
+ const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = testYdbSchema) {
UNIT_ASSERT(ts.size() == 2);
- TString data1 = MakeTestBlob({0, portionSize}, testYdbSchema);
+ TString data1 = MakeTestBlob({0, portionSize}, ydbSchema);
UNIT_ASSERT(data1.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
UNIT_ASSERT(data1.size() < 7 * 1024 * 1024);
- TString data2 = MakeTestBlob({overlapSize, overlapSize + portionSize}, testYdbSchema);
+ TString data2 = MakeTestBlob({overlapSize, overlapSize + portionSize}, ydbSchema);
UNIT_ASSERT(data2.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
UNIT_ASSERT(data2.size() < 7 * 1024 * 1024);
- auto schema = NArrow::MakeArrowSchema(testYdbSchema);
+ auto schema = NArrow::MakeArrowSchema(ydbSchema);
auto batch1 = UpdateColumn(NArrow::DeserializeBatch(data1, schema), ttlColumnName, ts[0]);
auto batch2 = UpdateColumn(NArrow::DeserializeBatch(data2, schema), ttlColumnName, ts[1]);
@@ -135,8 +173,20 @@ static constexpr ui32 PORTION_ROWS = 80 * 1000;
// ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020
// ts[1] = 1620000000; // date -u --date='@1620000000' Mon May 3 00:00:00 UTC 2021
void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
- std::vector<ui64> ts = {1600000000, 1620000000})
+ const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = testYdbSchema)
{
+ std::vector<ui64> ts = {1600000000, 1620000000};
+
+ ui32 ttlIncSeconds = 1;
+ for (auto& [name, typeInfo] : ydbSchema) {
+ if (name == spec.TtlColumn) {
+ if (typeInfo.GetTypeId() == NTypeIds::Date) {
+ ttlIncSeconds = TDuration::Days(1).Seconds();
+ }
+ break;
+ }
+ }
+
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -170,7 +220,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
spec.EvictAfter = TDuration::Seconds(ttlSec);
}
bool ok = ProposeSchemaTx(runtime, sender,
- TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk, spec, "/Root/olapStore"),
+ TTestSchema::CreateInitShardTxBody(tableId, ydbSchema, testYdbPk, spec, "/Root/olapStore"),
{++planStep, ++txId});
UNIT_ASSERT(ok);
PlanSchemaTx(runtime, sender, {planStep, txId});
@@ -179,7 +229,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
}
//
- auto blobs = MakeData(ts, PORTION_ROWS, PORTION_ROWS / 2, spec.TtlColumn);
+ auto blobs = MakeData(ts, PORTION_ROWS, PORTION_ROWS / 2, spec.TtlColumn, ydbSchema);
UNIT_ASSERT_EQUAL(blobs.size(), 2);
for (auto& data : blobs) {
UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data));
@@ -196,7 +246,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
if (internal) {
TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn);
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1, spec.TtlColumn);
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + ttlIncSeconds, spec.TtlColumn);
}
TAutoPtr<IEventHandle> handle;
@@ -245,7 +295,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
if (internal) {
TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn);
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1, spec.TtlColumn);
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + ttlIncSeconds, spec.TtlColumn);
}
{
@@ -283,7 +333,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
if (internal) {
TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn);
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1, spec.TtlColumn);
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - ttlIncSeconds, spec.TtlColumn);
}
{
@@ -929,7 +979,20 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
}
Y_UNIT_TEST(ExternalTTL) {
- TestTtl(false, false);
+ TestTtl(false, false); // over NTypeIds::Timestamp ttl column
+ }
+
+ Y_UNIT_TEST(ExternalTTL_Types) {
+ auto ydbSchema = testYdbSchema;
+ for (auto typeId : {NTypeIds::Datetime, NTypeIds::Date, NTypeIds::Uint32, NTypeIds::Uint64}) {
+ UNIT_ASSERT_EQUAL(ydbSchema[8].first, "saved_at");
+ ydbSchema[8].second = TTypeInfo(typeId);
+
+ TTestSchema::TTableSpecials specs;
+ specs.SetTtlColumn("saved_at");
+
+ TestTtl(false, false, specs, ydbSchema);
+ }
}
Y_UNIT_TEST(RebootExternalTTL) {
@@ -938,7 +1001,20 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
}
Y_UNIT_TEST(InternalTTL) {
- TestTtl(false, true);
+ TestTtl(false, true); // over NTypeIds::Timestamp ttl column
+ }
+
+ Y_UNIT_TEST(InternalTTL_Types) {
+ auto ydbSchema = testYdbSchema;
+ for (auto typeId : {NTypeIds::Datetime, NTypeIds::Date, NTypeIds::Uint32, NTypeIds::Uint64}) {
+ UNIT_ASSERT_EQUAL(ydbSchema[8].first, "saved_at");
+ ydbSchema[8].second = TTypeInfo(typeId);
+
+ TTestSchema::TTableSpecials specs;
+ specs.SetTtlColumn("saved_at");
+
+ TestTtl(false, true, specs, ydbSchema);
+ }
}
Y_UNIT_TEST(RebootInternalTTL) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
index 97320acd1f..e378cb278e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
@@ -109,10 +109,6 @@ TColumnTableInfo::TPtr ParseParams(
return nullptr;
}
- if (!ValidateTtlSettingsChange(tableInfo->Description.GetTtlSettings(), alter.GetAlterTtlSettings(), errStr)) {
- return nullptr;
- }
-
*alterData->Description.MutableTtlSettings() = alter.GetAlterTtlSettings();
alterData->Description.MutableTtlSettings()->SetVersion(currentTtlVersion + 1);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 0ef5806640..d1549cdbe1 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -2889,10 +2889,6 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl,
const THashMap<ui32, TOlapSchema::TColumn>& columns,
const THashMap<TString, ui32>& columnsByName,
TString& errStr);
-bool ValidateTtlSettingsChange(
- const NKikimrSchemeOp::TColumnDataLifeCycle& oldTtl,
- const NKikimrSchemeOp::TColumnDataLifeCycle& ttl,
- TString& errStr);
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp
index b1fcb1f9f6..c8a50ea7f2 100644
--- a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp
@@ -158,7 +158,6 @@ static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle::
auto unit = ttl.GetColumnUnit();
switch (GetType(*column)) {
- case NScheme::NTypeIds::Date:
case NScheme::NTypeIds::DyNumber:
errStr = "Unsupported column type for TTL in column tables"; // TODO
return false;
@@ -187,25 +186,4 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl,
return true;
}
-bool ValidateTtlSettingsChange(
- const NKikimrSchemeOp::TColumnDataLifeCycle& oldTtl,
- const NKikimrSchemeOp::TColumnDataLifeCycle& ttl,
- TString& errStr)
-{
- if (oldTtl.HasEnabled() && ttl.HasEnabled()) {
- TString newTtlColName;
- TString oldTtlColName;
-
- newTtlColName = ttl.GetEnabled().GetColumnName();
- oldTtlColName = oldTtl.GetEnabled().GetColumnName();
-
- if (newTtlColName != oldTtlColName) {
- errStr = "Changing of TTL column is not supported for column tables";
- return false;
- }
- }
-
- return true;
-}
-
}}
diff --git a/ydb/core/tx/schemeshard/ut_ttl.cpp b/ydb/core/tx/schemeshard/ut_ttl.cpp
index 6eaf99e5e2..02c1eb175b 100644
--- a/ydb/core/tx/schemeshard/ut_ttl.cpp
+++ b/ydb/core/tx/schemeshard/ut_ttl.cpp
@@ -26,7 +26,10 @@ void CheckTTLSettings(TTestActorRuntime& runtime, const char* tableName = "TTLEn
);
}
-void CheckColumnTableTTLSettings(TTestActorRuntime& runtime, const char* tableName = "ColumnTableTTL") {
+void CheckColumnTableTTLSettings(TTestActorRuntime& runtime,
+ const char* tableName = "ColumnTableTTL",
+ const char* columnName = "modified_at")
+{
TestDescribeResult(
DescribePath(runtime, Sprintf("/MyRoot/%s", tableName)), {
NLs::PathExist,
@@ -36,7 +39,7 @@ void CheckColumnTableTTLSettings(TTestActorRuntime& runtime, const char* tableNa
const auto& ttl = table.GetTtlSettings();
UNIT_ASSERT(ttl.HasEnabled());
- UNIT_ASSERT_VALUES_EQUAL(ttl.GetEnabled().GetColumnName(), "modified_at");
+ UNIT_ASSERT_VALUES_EQUAL(ttl.GetEnabled().GetColumnName(), columnName);
UNIT_ASSERT_VALUES_EQUAL(ttl.GetEnabled().GetExpireAfterSeconds(), 3600);
}
}
@@ -1085,7 +1088,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardColumnTableTTL) {
TTestEnv env(runtime);
ui64 txId = 100;
- for (auto ct : {"Date", "DyNumber"}) {
+ for (auto ct : {"String", "DyNumber"}) {
TestCreateColumnTable(runtime, ++txId, "/MyRoot", Sprintf(R"(
Name: "ColumnTableTTL"
Schema {
@@ -1133,6 +1136,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardColumnTableTTL) {
Schema {
Columns { Name: "key" Type: "Uint64" NotNull: true }
Columns { Name: "modified_at" Type: "Timestamp" }
+ Columns { Name: "saved_at" Type: "Datetime" }
KeyColumnNames: ["key"]
}
)");
@@ -1162,6 +1166,18 @@ Y_UNIT_TEST_SUITE(TSchemeShardColumnTableTTL) {
TestAlterColumnTable(runtime, ++txId, "/MyRoot", R"(
Name: "ColumnTableTTL"
AlterTtlSettings {
+ Enabled {
+ ColumnName: "saved_at"
+ ExpireAfterSeconds: 3600
+ }
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ CheckColumnTableTTLSettings(runtime, "ColumnTableTTL", "saved_at");
+
+ TestAlterColumnTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "ColumnTableTTL"
+ AlterTtlSettings {
Disabled {
}
}
@@ -1178,6 +1194,42 @@ Y_UNIT_TEST_SUITE(TSchemeShardColumnTableTTL) {
}
);
}
+
+ Y_UNIT_TEST(AlterColumnTable_Negative) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateColumnTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "ColumnTableTTL"
+ Schema {
+ Columns { Name: "key" Type: "Uint64" NotNull: true }
+ Columns { Name: "modified_at" Type: "Timestamp" }
+ Columns { Name: "str" Type: "String" }
+ KeyColumnNames: ["key"]
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ TestDescribeResult(
+ DescribePath(runtime, "/MyRoot/ColumnTableTTL"), {
+ NLs::PathExist,
+ NLs::Finished, [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ const auto& table = record.GetPathDescription().GetColumnTableDescription();
+ UNIT_ASSERT(!table.HasTtlSettings());
+ }
+ }
+ );
+
+ TestAlterColumnTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "ColumnTableTTL"
+ AlterTtlSettings {
+ Enabled {
+ ColumnName: "str"
+ ExpireAfterSeconds: 3600
+ }
+ }
+ )", {NKikimrScheme::StatusSchemeError});
+ }
}
Y_UNIT_TEST_SUITE(TSchemeShardTTLTestsWithReboots) {
diff --git a/ydb/services/ydb/ydb_logstore_ut.cpp b/ydb/services/ydb/ydb_logstore_ut.cpp
index 19875ffb7d..44b08f43c9 100644
--- a/ydb/services/ydb/ydb_logstore_ut.cpp
+++ b/ydb/services/ydb/ydb_logstore_ut.cpp
@@ -476,12 +476,12 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
UNIT_ASSERT_C(ttlSettings.Empty(), "Table must not have TTL settings");
}
- // Change TTL column (currently not supported)
+ // Change TTL column
{
NYdb::NLogStore::TAlterLogTableSettings alterLogTableSettings;
- alterLogTableSettings.AlterTtlSettings(NYdb::NTable::TAlterTtlSettings::Set("ingested_at", TDuration::Seconds(86400)));
+ alterLogTableSettings.AlterTtlSettings(NYdb::NTable::TAlterTtlSettings::Set("ingested_at", TDuration::Seconds(2000)));
auto res = logStoreClient.AlterLogTable("/Root/LogStore/log2", std::move(alterLogTableSettings)).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SCHEME_ERROR, res.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}
{
auto res = logStoreClient.DescribeLogTable("/Root/LogStore/log2").GetValueSync();
@@ -489,7 +489,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
auto descr = res.GetDescription();
auto ttlSettings = descr.GetTtlSettings();
UNIT_ASSERT_C(!ttlSettings.Empty(), "Table must have TTL settings");
- UNIT_ASSERT_VALUES_EQUAL(ttlSettings->GetDateTypeColumn().GetColumnName(), "saved_at");
+ UNIT_ASSERT_VALUES_EQUAL(ttlSettings->GetDateTypeColumn().GetColumnName(), "ingested_at");
UNIT_ASSERT_VALUES_EQUAL(ttlSettings->GetDateTypeColumn().GetExpireAfter(), TDuration::Seconds(2000));
}