aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <chertus@gmail.com>2022-06-14 17:51:09 +0300
committerArtem Zuikov <chertus@gmail.com>2022-06-14 17:51:09 +0300
commit83725a03a831a7665339c3bfc92750b8ec36d837 (patch)
tree495da1a148cb86fb135853611eaa57fc3e7e7120
parent9de5449fbf160aa05eb0200cd063d2489da9117a (diff)
downloadydb-83725a03a831a7665339c3bfc92750b8ec36d837.tar.gz
KIKIMR-15075: hotfix for unlimited eviction read in ColumnShard
ref:3e5d7dd77957bdc76c5d4108fcd1788cbf51ace0
-rw-r--r--ROADMAP.md14
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard_ttl.h11
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h6
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp19
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h3
7 files changed, 50 insertions, 16 deletions
diff --git a/ROADMAP.md b/ROADMAP.md
index 0ddb06fb7d..fd9b563f1c 100644
--- a/ROADMAP.md
+++ b/ROADMAP.md
@@ -1,14 +1,14 @@
# YDB Roadmap
## Query Processor
-1. [ ] Support for **Snapshot Readonly** transactions mode
+1. [ ] Support for **Snapshot Read-only** transactions mode
1. [ ] **Better resource management** for KQP Resource Manager (share information about nodes resources, avoid OOMs)
1. [ ] Switch to **New Engine** for OLTP queries
1. ✅ Support **`not null` for table columns**
1. [ ] **Aggregates and predicates push down to column-oriented tables**
1. [ ] **Optimize data formats** for data transition between query phases
-1. [ ] **Index Rename/Rebuild**
-1. [ ] **KQP Session Actor** as a replacement for KQP Worker Actor (optimize to reduce CPU usage)
+1. [ ] **Index Rebuild**
+1. [ ] Optimize **KQP Session Actor** (optimize to reduce CPU usage)
1. [ ] **PostgreSQL compatibility**
* [ ] Support PostgreSQL datatypes **serialization/deserialization** in YDB Public API
* [ ] PostgreSQL compatible **query execution** (TPC-C, TPC-H queries should work)
@@ -17,13 +17,13 @@
1. [ ] Support **constraints in query optimizer**
1. [ ] **Query Processor 3.0** (a set of tasks to be more like traditional database in case of query execution functionality)
* [ ] Support for **Streaming Lookup Join** via MVCC snapshots (avoid distributed transactions, scalability is better)
- * [ ] **Universal API call for DML, DDL with unlimited results size** (aka StreamExecuteQuery, which allows to execute each query)
+ * [ ] **Universal API call for DML, DDL, ScanQuery** (aka StreaminExecuteQuery, which allows to execute each query)
* [ ] Support for **secondary indexes in ScanQuery**
- * [ ] **Transaction can see its own updates** (updates made during transaction execution are not buffered in RAM anymore, but rather are written to disk and available to read by this transaction)
-1. [ ] **Computation graphs caching (compute/datashard programs)** (optimize CPU usage)
+ * [ ] **Transaction can see its updates** (updates made during transaction execution are not buffered in RAM anymore, but rather are written to disk and available to read by this transaction)
+1. [ ] **Query graphs caching at DataShards** (optimize CPU usage)
1. [ ] **RPC Deadline & Cancellation propagation** (smooth timeout management)
1. [ ] **DDL for column-oriented tables**
-1. [ ] **Select indexes automatically**
+2. [ ] **Select indexes automatically**
## Database Core (Tablets, etc)
1. [ ] Get **YDB topics** (aka pers queue, streams) ready for production
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index efd2be7122..4ebfc9440d 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -44,6 +44,9 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
auto changes = Ev->Get()->IndexChanges;
Y_VERIFY(changes);
+ LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString()
+ << ") changes: " << *changes << " at tablet " << Self->TabletID());
+
bool ok = false;
if (Ev->Get()->PutStatus == NKikimrProto::OK) {
NOlap::TSnapshot snapshot = changes->ApplySnapshot;
@@ -55,8 +58,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
ok = Self->PrimaryIndex->ApplyChanges(dbWrap, changes, snapshot); // update changes + apply
if (ok) {
- LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString()
- << ") apply changes: " << *changes << " at tablet " << Self->TabletID());
+ LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString() << ") apply at tablet " << Self->TabletID());
TBlobManagerDb blobManagerDb(txc.DB);
for (const auto& cmtd : changes->DataToIndex) {
@@ -213,6 +215,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
}
if (BlobsToExport.size()) {
+ size_t numBlobs = BlobsToExport.size();
for (auto& [blobId, tierName] : BlobsToExport) {
ExportTierBlobs[tierName].emplace(blobId, TString{});
}
@@ -221,6 +224,9 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
ExportNo = Self->LastExportNo + 1;
Self->LastExportNo += ExportTierBlobs.size();
+ LOG_S_DEBUG("TTxWriteIndex init export " << ExportNo << " of " << numBlobs << " blobs in "
+ << ExportTierBlobs.size() << " tiers at tablet " << Self->TabletID());
+
NIceDb::TNiceDb db(txc.DB);
Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 6a89c13094..a397abfdfc 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -738,6 +738,9 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
LOG_S_NOTICE("Cannot prepare TTL at tablet " << TabletID());
return {};
}
+ if (indexChanges->NeedRepeat) {
+ Ttl.Repeat();
+ }
bool needWrites = !indexChanges->PortionsToEvict.empty();
diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h
index 91802398ca..a9c88419cc 100644
--- a/ydb/core/tx/columnshard/columnshard_ttl.h
+++ b/ydb/core/tx/columnshard/columnshard_ttl.h
@@ -6,6 +6,7 @@ namespace NKikimr::NColumnShard {
class TTtl {
public:
static constexpr const ui64 DEFAULT_TTL_TIMEOUT_SEC = 60 * 60;
+ static constexpr const ui64 DEFAULT_REPEAT_TTL_TIMEOUT_SEC = 10;
struct TEviction {
TString TierName;
@@ -80,7 +81,7 @@ public:
}
THashMap<ui64, NOlap::TTiersInfo> MakeIndexTtlMap(TInstant now, bool force = false) {
- if ((now < LastRegularTtl + RegularTtlTimeout) && !force) {
+ if ((now < LastRegularTtl + TtlTimeout) && !force) {
return {};
}
@@ -93,12 +94,18 @@ public:
return out;
}
+ void Repeat() {
+ LastRegularTtl -= TtlTimeout;
+ LastRegularTtl += RepeatTtlTimeout;
+ }
+
const THashSet<TString>& TtlColumns() const { return Columns; }
private:
THashMap<ui64, TDescription> PathTtls; // pathId -> ttl
THashSet<TString> Columns;
- TDuration RegularTtlTimeout{TDuration::Seconds(DEFAULT_TTL_TIMEOUT_SEC)};
+ TDuration TtlTimeout{TDuration::Seconds(DEFAULT_TTL_TIMEOUT_SEC)};
+ TDuration RepeatTtlTimeout{TDuration::Seconds(DEFAULT_REPEAT_TTL_TIMEOUT_SEC)};
TInstant LastRegularTtl;
NOlap::TTiersInfo Convert(const TDescription& descr, TInstant timePoint) const {
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index c7d9b20a6f..42b49f9f97 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -16,6 +16,8 @@ struct TPredicate;
struct TCompactionLimits {
static constexpr const ui32 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant
static constexpr const ui32 MAX_BLOB_SIZE = 8 * 1024 * 1024; // some BlobStorage constant
+ static constexpr const ui64 DEFAULT_EVICTION_BYTES = 64 * 1024 * 1024;
+ static constexpr const ui64 MAX_BLOBS_TO_DELETE = 10000;
ui32 GoodBlobSize{MIN_GOOD_BLOB_SIZE};
ui32 GranuleBlobSplitSize{MAX_BLOB_SIZE};
@@ -131,6 +133,7 @@ public:
TVector<TColumnRecord> EvictedRecords;
TVector<std::pair<TPortionInfo, ui64>> PortionsToMove; // {portion, new granule}
THashMap<TBlobRange, TString> Blobs;
+ bool NeedRepeat{false};
bool IsInsert() const { return Type == INSERT; }
bool IsCompaction() const { return Type == COMPACTION; }
@@ -363,7 +366,8 @@ public:
const TSnapshot& outdatedSnapshot) = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot,
THashSet<ui64>& pathsToDrop) = 0;
- virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) = 0;
+ virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls,
+ ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0;
virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0;
virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
//virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 57417038b0..4e0433b4f3 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -771,13 +771,18 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
return changes;
}
-std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) {
+std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls,
+ ui64 maxEvictBytes) {
if (pathTtls.empty()) {
return {};
}
TSnapshot fakeSnapshot = {1, 1}; // TODO: better snapshot
auto changes = std::make_shared<TChanges>(TColumnEngineChanges::TTL, fakeSnapshot);
+ ui64 evicttionSize = 0;
+ bool allowEviction = true;
+ ui64 dropBlobs = 0;
+ bool allowDrop = true;
for (auto& [pathId, ttl] : pathTtls) {
if (!PathGranules.count(pathId)) {
@@ -800,19 +805,24 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
continue;
}
+ allowEviction = (evicttionSize <= maxEvictBytes);
+ allowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE);
+
if (auto max = info.MaxValue(ttlColumnId)) {
bool keep = false;
for (auto& border : ttl.TierBorders) {
if (NArrow::ScalarLess(*border.ToTimestamp(), *max)) {
keep = true;
- if (info.TierName != border.TierName) {
+ if (allowEviction && info.TierName != border.TierName) {
+ evicttionSize += info.BlobsSizes().first;
changes->PortionsToEvict.emplace_back(info, border.TierName);
}
break;
}
}
- if (!keep) {
+ if (!keep && allowDrop) {
Y_VERIFY(!NArrow::ScalarLess(*ttl.TierBorders.back().ToTimestamp(), *max));
+ dropBlobs += info.NumRecords();
changes->PortionsToDrop.push_back(info);
}
}
@@ -825,6 +835,9 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
return {};
}
+ if (!allowEviction || !allowDrop) {
+ changes->NeedRepeat = true;
+ }
return changes;
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 7a4f443ed3..9da5479df0 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -141,7 +141,8 @@ public:
const TSnapshot& outdatedSnapshot) override;
std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot,
THashSet<ui64>& pathsToDrop) override;
- std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) override;
+ std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls,
+ ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override;
bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
const TSnapshot& snapshot) override;
void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) override;