diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-06-14 17:51:09 +0300 |
---|---|---|
committer | Artem Zuikov <chertus@gmail.com> | 2022-06-14 17:51:09 +0300 |
commit | 83725a03a831a7665339c3bfc92750b8ec36d837 (patch) | |
tree | 495da1a148cb86fb135853611eaa57fc3e7e7120 | |
parent | 9de5449fbf160aa05eb0200cd063d2489da9117a (diff) | |
download | ydb-83725a03a831a7665339c3bfc92750b8ec36d837.tar.gz |
KIKIMR-15075: hotfix for unlimited eviction read in ColumnShard
ref:3e5d7dd77957bdc76c5d4108fcd1788cbf51ace0
-rw-r--r-- | ROADMAP.md | 14 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write_index.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ttl.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 3 |
7 files changed, 50 insertions, 16 deletions
diff --git a/ROADMAP.md b/ROADMAP.md index 0ddb06fb7d..fd9b563f1c 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -1,14 +1,14 @@ # YDB Roadmap ## Query Processor -1. [ ] Support for **Snapshot Readonly** transactions mode +1. [ ] Support for **Snapshot Read-only** transactions mode 1. [ ] **Better resource management** for KQP Resource Manager (share information about nodes resources, avoid OOMs) 1. [ ] Switch to **New Engine** for OLTP queries 1. ✅ Support **`not null` for table columns** 1. [ ] **Aggregates and predicates push down to column-oriented tables** 1. [ ] **Optimize data formats** for data transition between query phases -1. [ ] **Index Rename/Rebuild** -1. [ ] **KQP Session Actor** as a replacement for KQP Worker Actor (optimize to reduce CPU usage) +1. [ ] **Index Rebuild** +1. [ ] Optimize **KQP Session Actor** (optimize to reduce CPU usage) 1. [ ] **PostgreSQL compatibility** * [ ] Support PostgreSQL datatypes **serialization/deserialization** in YDB Public API * [ ] PostgreSQL compatible **query execution** (TPC-C, TPC-H queries should work) @@ -17,13 +17,13 @@ 1. [ ] Support **constraints in query optimizer** 1. [ ] **Query Processor 3.0** (a set of tasks to be more like traditional database in case of query execution functionality) * [ ] Support for **Streaming Lookup Join** via MVCC snapshots (avoid distributed transactions, scalability is better) - * [ ] **Universal API call for DML, DDL with unlimited results size** (aka StreamExecuteQuery, which allows to execute each query) + * [ ] **Universal API call for DML, DDL, ScanQuery** (aka StreaminExecuteQuery, which allows to execute each query) * [ ] Support for **secondary indexes in ScanQuery** - * [ ] **Transaction can see its own updates** (updates made during transaction execution are not buffered in RAM anymore, but rather are written to disk and available to read by this transaction) -1. [ ] **Computation graphs caching (compute/datashard programs)** (optimize CPU usage) + * [ ] **Transaction can see its updates** (updates made during transaction execution are not buffered in RAM anymore, but rather are written to disk and available to read by this transaction) +1. [ ] **Query graphs caching at DataShards** (optimize CPU usage) 1. [ ] **RPC Deadline & Cancellation propagation** (smooth timeout management) 1. [ ] **DDL for column-oriented tables** -1. [ ] **Select indexes automatically** +2. [ ] **Select indexes automatically** ## Database Core (Tablets, etc) 1. [ ] Get **YDB topics** (aka pers queue, streams) ready for production diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index efd2be7122..4ebfc9440d 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -44,6 +44,9 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { auto changes = Ev->Get()->IndexChanges; Y_VERIFY(changes); + LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString() + << ") changes: " << *changes << " at tablet " << Self->TabletID()); + bool ok = false; if (Ev->Get()->PutStatus == NKikimrProto::OK) { NOlap::TSnapshot snapshot = changes->ApplySnapshot; @@ -55,8 +58,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector); ok = Self->PrimaryIndex->ApplyChanges(dbWrap, changes, snapshot); // update changes + apply if (ok) { - LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString() - << ") apply changes: " << *changes << " at tablet " << Self->TabletID()); + LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString() << ") apply at tablet " << Self->TabletID()); TBlobManagerDb blobManagerDb(txc.DB); for (const auto& cmtd : changes->DataToIndex) { @@ -213,6 +215,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { } if (BlobsToExport.size()) { + size_t numBlobs = BlobsToExport.size(); for (auto& [blobId, tierName] : BlobsToExport) { ExportTierBlobs[tierName].emplace(blobId, TString{}); } @@ -221,6 +224,9 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { ExportNo = Self->LastExportNo + 1; Self->LastExportNo += ExportTierBlobs.size(); + LOG_S_DEBUG("TTxWriteIndex init export " << ExportNo << " of " << numBlobs << " blobs in " + << ExportTierBlobs.size() << " tiers at tablet " << Self->TabletID()); + NIceDb::TNiceDb db(txc.DB); Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo); } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 6a89c13094..a397abfdfc 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -738,6 +738,9 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u LOG_S_NOTICE("Cannot prepare TTL at tablet " << TabletID()); return {}; } + if (indexChanges->NeedRepeat) { + Ttl.Repeat(); + } bool needWrites = !indexChanges->PortionsToEvict.empty(); diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h index 91802398ca..a9c88419cc 100644 --- a/ydb/core/tx/columnshard/columnshard_ttl.h +++ b/ydb/core/tx/columnshard/columnshard_ttl.h @@ -6,6 +6,7 @@ namespace NKikimr::NColumnShard { class TTtl { public: static constexpr const ui64 DEFAULT_TTL_TIMEOUT_SEC = 60 * 60; + static constexpr const ui64 DEFAULT_REPEAT_TTL_TIMEOUT_SEC = 10; struct TEviction { TString TierName; @@ -80,7 +81,7 @@ public: } THashMap<ui64, NOlap::TTiersInfo> MakeIndexTtlMap(TInstant now, bool force = false) { - if ((now < LastRegularTtl + RegularTtlTimeout) && !force) { + if ((now < LastRegularTtl + TtlTimeout) && !force) { return {}; } @@ -93,12 +94,18 @@ public: return out; } + void Repeat() { + LastRegularTtl -= TtlTimeout; + LastRegularTtl += RepeatTtlTimeout; + } + const THashSet<TString>& TtlColumns() const { return Columns; } private: THashMap<ui64, TDescription> PathTtls; // pathId -> ttl THashSet<TString> Columns; - TDuration RegularTtlTimeout{TDuration::Seconds(DEFAULT_TTL_TIMEOUT_SEC)}; + TDuration TtlTimeout{TDuration::Seconds(DEFAULT_TTL_TIMEOUT_SEC)}; + TDuration RepeatTtlTimeout{TDuration::Seconds(DEFAULT_REPEAT_TTL_TIMEOUT_SEC)}; TInstant LastRegularTtl; NOlap::TTiersInfo Convert(const TDescription& descr, TInstant timePoint) const { diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index c7d9b20a6f..42b49f9f97 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -16,6 +16,8 @@ struct TPredicate; struct TCompactionLimits { static constexpr const ui32 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant static constexpr const ui32 MAX_BLOB_SIZE = 8 * 1024 * 1024; // some BlobStorage constant + static constexpr const ui64 DEFAULT_EVICTION_BYTES = 64 * 1024 * 1024; + static constexpr const ui64 MAX_BLOBS_TO_DELETE = 10000; ui32 GoodBlobSize{MIN_GOOD_BLOB_SIZE}; ui32 GranuleBlobSplitSize{MAX_BLOB_SIZE}; @@ -131,6 +133,7 @@ public: TVector<TColumnRecord> EvictedRecords; TVector<std::pair<TPortionInfo, ui64>> PortionsToMove; // {portion, new granule} THashMap<TBlobRange, TString> Blobs; + bool NeedRepeat{false}; bool IsInsert() const { return Type == INSERT; } bool IsCompaction() const { return Type == COMPACTION; } @@ -363,7 +366,8 @@ public: const TSnapshot& outdatedSnapshot) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop) = 0; - virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) = 0; + virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls, + ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0; virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0; virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0; //virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 57417038b0..4e0433b4f3 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -771,13 +771,18 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T return changes; } -std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) { +std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls, + ui64 maxEvictBytes) { if (pathTtls.empty()) { return {}; } TSnapshot fakeSnapshot = {1, 1}; // TODO: better snapshot auto changes = std::make_shared<TChanges>(TColumnEngineChanges::TTL, fakeSnapshot); + ui64 evicttionSize = 0; + bool allowEviction = true; + ui64 dropBlobs = 0; + bool allowDrop = true; for (auto& [pathId, ttl] : pathTtls) { if (!PathGranules.count(pathId)) { @@ -800,19 +805,24 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash continue; } + allowEviction = (evicttionSize <= maxEvictBytes); + allowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE); + if (auto max = info.MaxValue(ttlColumnId)) { bool keep = false; for (auto& border : ttl.TierBorders) { if (NArrow::ScalarLess(*border.ToTimestamp(), *max)) { keep = true; - if (info.TierName != border.TierName) { + if (allowEviction && info.TierName != border.TierName) { + evicttionSize += info.BlobsSizes().first; changes->PortionsToEvict.emplace_back(info, border.TierName); } break; } } - if (!keep) { + if (!keep && allowDrop) { Y_VERIFY(!NArrow::ScalarLess(*ttl.TierBorders.back().ToTimestamp(), *max)); + dropBlobs += info.NumRecords(); changes->PortionsToDrop.push_back(info); } } @@ -825,6 +835,9 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash return {}; } + if (!allowEviction || !allowDrop) { + changes->NeedRepeat = true; + } return changes; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 7a4f443ed3..9da5479df0 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -141,7 +141,8 @@ public: const TSnapshot& outdatedSnapshot) override; std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop) override; - std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) override; + std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls, + ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override; bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) override; void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) override; |