diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-06 18:54:38 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-06 18:54:38 +0300 |
commit | a1edfae32d8ebc2479d063d6106156f128ba5fc5 (patch) | |
tree | 28122d7b6f09da270f9b6c9696029fa1a5a920ab | |
parent | f662d68f5a0c839f02969571c1fbba97c7711be4 (diff) | |
download | ydb-a1edfae32d8ebc2479d063d6106156f128ba5fc5.tar.gz |
InsertTable overloads
overloads insert_table
35 files changed, 736 insertions, 267 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 3869a4760ea..82b994b1a98 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -124,6 +124,25 @@ void TTxWrite::Complete(const TActorContext& ctx) { ctx.Send(Ev->Get()->GetSource(), Result.release()); } +void TColumnShard::OverloadWriteFail(const TString& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { + IncCounter(COUNTER_WRITE_FAIL); + IncCounter(COUNTER_WRITE_OVERLOAD); + + const auto& record = Proto(ev->Get()); + const auto& data = record.GetData(); + const ui64 tableId = record.GetTableId(); + const ui64 metaShard = record.GetTxInitiator(); + const ui64 writeId = record.GetWriteId(); + const TString& dedupId = record.GetDedupId(); + + LOG_S_INFO("Write (overload) " << data.size() << " bytes into pathId " << tableId + << "overload reason: [" << overloadReason << "]" + << " at tablet " << TabletID()); + + auto result = std::make_unique<TEvColumnShard::TEvWriteResult>( + TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::OVERLOADED); + ctx.Send(ev->Get()->GetSource(), result.release()); +} // EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { @@ -131,13 +150,13 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex OnYellowChannels(*ev->Get()); - auto& record = Proto(ev->Get()); - auto& data = record.GetData(); - ui64 tableId = record.GetTableId(); - ui64 metaShard = record.GetTxInitiator(); - ui64 writeId = record.GetWriteId(); - TString dedupId = record.GetDedupId(); - auto putStatus = ev->Get()->GetPutStatus(); + const auto& record = Proto(ev->Get()); + const auto& data = record.GetData(); + const ui64 tableId = record.GetTableId(); + const ui64 metaShard = record.GetTxInitiator(); + const ui64 writeId = record.GetWriteId(); + const TString dedupId = record.GetDedupId(); + const auto putStatus = ev->Get()->GetPutStatus(); bool isWritable = TablesManager.IsWritableTable(tableId); bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.HasPrimaryIndex() || !isWritable; @@ -175,28 +194,24 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex --WritesInFly; // write successed Y_VERIFY(putStatus == NKikimrProto::OK); Execute(new TTxWrite(this, ev), ctx); - } else if (isOutOfSpace || InsertTable->IsOverloaded(tableId) || ShardOverloaded()) { + } else if (isOutOfSpace) { IncCounter(COUNTER_WRITE_FAIL); - - if (isOutOfSpace) { - IncCounter(COUNTER_OUT_OF_SPACE); - LOG_S_ERROR("Write (out of disk space) " << data.size() << " bytes into pathId " << tableId - << " at tablet " << TabletID()); - } else { - bool tableOverload = InsertTable->IsOverloaded(tableId); - IncCounter(COUNTER_WRITE_OVERLOAD); - if (!tableOverload) { - IncCounter(COUNTER_WRITE_OVERLOAD_SHARD); - } - - LOG_S_INFO("Write (overload) " << data.size() << " bytes into pathId " << tableId - << (ShardOverloaded() ? " [shard]" : "") << (tableOverload? " [table]" : "") - << " at tablet " << TabletID()); - } + IncCounter(COUNTER_OUT_OF_SPACE); + LOG_S_ERROR("Write (out of disk space) " << data.size() << " bytes into pathId " << tableId + << " at tablet " << TabletID()); auto result = std::make_unique<TEvColumnShard::TEvWriteResult>( TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::OVERLOADED); ctx.Send(ev->Get()->GetSource(), result.release()); + } else if (InsertTable && InsertTable->IsOverloadedByCommitted(tableId)) { + CSCounters.OnOverloadInsertTable(data.size()); + OverloadWriteFail("insert_table", ev, ctx); + } else if (TablesManager.IsOverloaded(tableId)) { + CSCounters.OnOverloadGranule(data.size()); + OverloadWriteFail("granule", ev, ctx); + } else if (ShardOverloaded()) { + CSCounters.OnOverloadShard(data.size()); + OverloadWriteFail("shard", ev, ctx); } else { if (record.HasLongTxId()) { // TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId) diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 48677da1f90..75c04412999 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -619,8 +619,7 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivit // Preventing conflicts between indexing and compaction leads to election between them. // Indexing vs compaction probability depends on index and insert table overload status. // Prefer compaction: 25% by default; 50% if IndexOverloaded(); 6.25% if InsertTableOverloaded(). - const ui32 mask = InsertTableOverloaded() ? 0xF : (IndexOverloaded() ? 0x1 : 0x3); - const bool preferIndexing = BackgroundActivation & mask; + const bool preferIndexing = RandomNumber<ui32>(1000) < 750; if (preferIndexing) { if (activity.HasIndexation()) { @@ -680,27 +679,26 @@ bool TColumnShard::SetupIndexation() { std::vector<const NOlap::TInsertedData*> dataToIndex; dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT); THashMap<ui64, ui64> overloadedPathGranules; - for (auto& [pathId, committed] : InsertTable->GetCommitted()) { - auto* pMap = TablesManager.GetPrimaryIndexSafe().GetOverloadedGranules(pathId); - if (pMap) { - overloadedPathGranules[pathId] = pMap->size(); - } - InsertTable->SetOverloaded(pathId, !!pMap); - for (auto& data : committed) { - ui32 dataSize = data.BlobSize(); - Y_VERIFY(dataSize); - - size += dataSize; - if (bytesToIndex && (bytesToIndex + dataSize) > (ui64)Limits.MaxInsertBytes) { - continue; - } - if (pMap) { - ++ignored; - continue; + for (auto it = InsertTable->GetPathPriorities().rbegin(); it != InsertTable->GetPathPriorities().rend(); ++it) { + for (auto* pathInfo : it->second) { + const bool granulesOverloaded = TablesManager.GetPrimaryIndex()->HasOverloadedGranules(pathInfo->GetPathId()); + for (auto& data : pathInfo->GetCommitted()) { + ui32 dataSize = data.BlobSize(); + Y_VERIFY(dataSize); + + size += dataSize; + if (bytesToIndex && (bytesToIndex + dataSize) > (ui64)Limits.MaxInsertBytes) { + continue; + } + if (granulesOverloaded) { + ++ignored; + CSCounters.SkipIndexationInputDueToGranuleOverload(dataSize); + continue; + } + ++blobs; + bytesToIndex += dataSize; + dataToIndex.push_back(&data); } - ++blobs; - bytesToIndex += dataSize; - dataToIndex.push_back(&data); } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 4670398b548..226c99d0e4c 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -8,6 +8,7 @@ #include "blob_manager.h" #include "tables_manager.h" #include "inflight_request_tracker.h" +#include "counters/columnshard.h" #include <ydb/core/tablet/tablet_counters.h> #include <ydb/core/tablet/tablet_pipe_client_cache.h> @@ -163,6 +164,7 @@ class TColumnShard void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx); void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev); + void OverloadWriteFail(const TString& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx); ITransaction* CreateTxInitSchema(); ITransaction* CreateTxRunGc(); @@ -383,6 +385,8 @@ private: const TIndexationCounters IndexationCounters = TIndexationCounters("Indexation"); const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction"); + const TCSCounters CSCounters; + THashMap<ui64, TBasicTxInfo> BasicTxInfo; TSet<TDeadlineQueueItem> DeadlineQueue; @@ -427,14 +431,6 @@ private: (writesLimit && WritesInFly > writesLimit); } - bool InsertTableOverloaded() const { - return InsertTable && InsertTable->HasOverloaded(); - } - - bool IndexOverloaded() const { - return TablesManager.IndexOverloaded(); - } - TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId); TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId); void AddLongTxWrite(TWriteId writeId, ui64 txId); diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index b025ee9903d..f8e03b52366 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -5,7 +5,7 @@ #include <ydb/core/tx/long_tx_service/public/types.h> #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/protos/tx_columnshard.pb.h> -#include <ydb/core/tx/columnshard/engines/insert_table.h> +#include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h> #include <ydb/core/tx/columnshard/engines/granules_table.h> #include <ydb/core/tx/columnshard/engines/columns_table.h> @@ -464,9 +464,7 @@ struct Schema : NIceDb::Schema { static bool InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, - THashMap<TWriteId, TInsertedData>& inserted, - THashMap<ui64, TSet<TInsertedData>>& committed, - THashMap<TWriteId, TInsertedData>& aborted, + NOlap::TInsertTableAccessor& insertTable, const TInstant& loadTime) { auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select(); if (!rowset.IsReady()) @@ -502,13 +500,13 @@ struct Schema : NIceDb::Schema { switch (recType) { case EInsertTableIds::Inserted: - inserted.emplace(TWriteId{data.WriteTxId}, std::move(data)); + insertTable.AddInserted(TWriteId{ data.WriteTxId }, std::move(data)); break; case EInsertTableIds::Committed: - committed[data.PathId].emplace(data); + insertTable.AddCommitted(std::move(data)); break; case EInsertTableIds::Aborted: - aborted.emplace(TWriteId{data.WriteTxId}, std::move(data)); + insertTable.AddAborted(TWriteId{ data.WriteTxId }, std::move(data)); break; } diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h index c9b625eb7c3..af9aed36ff6 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.h +++ b/ydb/core/tx/columnshard/counters/columnshard.h @@ -60,12 +60,12 @@ public: OverloadShardCount->Add(1); } - void SkipIndexationInputDutToSplitCompaction(const ui64 size) const { + void SkipIndexationInputDueToSplitCompaction(const ui64 size) const { SkipIndexationInputDueToSplitCompactionBytes->Add(size); SkipIndexationInputDueToSplitCompactionCount->Add(1); } - void SkipIndexationInputDutToGranuleOverload(const ui64 size) const { + void SkipIndexationInputDueToGranuleOverload(const ui64 size) const { SkipIndexationInputDueToGranuleOverloadBytes->Add(size); SkipIndexationInputDueToGranuleOverloadCount->Add(1); } diff --git a/ydb/core/tx/columnshard/counters/common/private.cpp b/ydb/core/tx/columnshard/counters/common/private.cpp index 2c6d0c92c91..4fc7cc40623 100644 --- a/ydb/core/tx/columnshard/counters/common/private.cpp +++ b/ydb/core/tx/columnshard/counters/common/private.cpp @@ -47,7 +47,9 @@ public: auto it = Agents.find(signalName); if (it == Agents.end()) { it = Agents.emplace(signalName, std::make_shared<TValueAggregationAgent>(signalName, signalsOwner)).first; - NActors::TActivationContext::Register(new TRegularSignalBuilderActor(it->second)); + if (NActors::TlsActivationContext) { + NActors::TActivationContext::Register(new TRegularSignalBuilderActor(it->second)); + } } return it->second; } diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt index b868fce25e7..570006da4b2 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(insert_table) add_subdirectory(predicate) add_subdirectory(reader) add_subdirectory(storage) @@ -34,6 +35,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-reader columnshard-engines-predicate columnshard-engines-storage + columnshard-engines-insert_table formats-arrow-compression core-tx-program udf-service-exception_policy @@ -42,8 +44,8 @@ target_link_libraries(tx-columnshard-engines PUBLIC target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/compaction_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt index d73745d1174..60cd9a5bca8 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(insert_table) add_subdirectory(predicate) add_subdirectory(reader) add_subdirectory(storage) @@ -35,6 +36,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-reader columnshard-engines-predicate columnshard-engines-storage + columnshard-engines-insert_table formats-arrow-compression core-tx-program udf-service-exception_policy @@ -43,8 +45,8 @@ target_link_libraries(tx-columnshard-engines PUBLIC target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/compaction_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt index d73745d1174..60cd9a5bca8 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(insert_table) add_subdirectory(predicate) add_subdirectory(reader) add_subdirectory(storage) @@ -35,6 +36,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-reader columnshard-engines-predicate columnshard-engines-storage + columnshard-engines-insert_table formats-arrow-compression core-tx-program udf-service-exception_policy @@ -43,8 +45,8 @@ target_link_libraries(tx-columnshard-engines PUBLIC target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/compaction_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt index b868fce25e7..570006da4b2 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(insert_table) add_subdirectory(predicate) add_subdirectory(reader) add_subdirectory(storage) @@ -34,6 +35,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-reader columnshard-engines-predicate columnshard-engines-storage + columnshard-engines-insert_table formats-arrow-compression core-tx-program udf-service-exception_policy @@ -42,8 +44,8 @@ target_link_libraries(tx-columnshard-engines PUBLIC target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/compaction_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 076200c84e8..299d75dea0a 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -4,10 +4,11 @@ #include "index_info.h" #include "portion_info.h" #include "db_wrapper.h" -#include "insert_table.h" #include "columns_table.h" +#include "compaction_info.h" #include "granules_table.h" #include "predicate/filter.h" +#include "insert_table/data.h" #include <ydb/core/formats/arrow/replace_key.h> #include <ydb/core/tx/columnshard/blob.h> @@ -17,11 +18,12 @@ namespace NKikimr::NOlap { struct TPredicate; struct TCompactionLimits { - static constexpr const ui32 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant - static constexpr const ui32 MAX_BLOB_SIZE = 8 * 1024 * 1024; // some BlobStorage constant + static constexpr const ui64 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant + static constexpr const ui64 MAX_BLOB_SIZE = 8 * 1024 * 1024; // some BlobStorage constant static constexpr const ui64 EVICT_HOT_PORTION_BYTES = 1 * 1024 * 1024; static constexpr const ui64 DEFAULT_EVICTION_BYTES = 64 * 1024 * 1024; static constexpr const ui64 MAX_BLOBS_TO_DELETE = 10000; + static constexpr const ui64 OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID = 1024 * MAX_BLOB_SIZE; ui32 GoodBlobSize{MIN_GOOD_BLOB_SIZE}; ui32 GranuleBlobSplitSize{MAX_BLOB_SIZE}; @@ -114,72 +116,6 @@ private: static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type); }; -class ICompactionObjectCallback { -public: - virtual ~ICompactionObjectCallback() = default; - virtual void OnCompactionStarted(const bool inGranule) = 0; - virtual void OnCompactionFinished() = 0; - virtual void OnCompactionFailed(const TString& reason) = 0; - virtual void OnCompactionCanceled(const TString& reason) = 0; - virtual TString DebugString() const = 0; -}; - -struct TCompactionInfo { -private: - std::shared_ptr<ICompactionObjectCallback> CompactionObject; - mutable bool StatusProvided = false; - const bool InGranuleFlag = false; -public: - TCompactionInfo(std::shared_ptr<ICompactionObjectCallback> compactionObject, const bool inGranule) - : CompactionObject(compactionObject) - , InGranuleFlag(inGranule) - { - Y_VERIFY(compactionObject); - CompactionObject->OnCompactionStarted(InGranuleFlag); - } - - bool InGranule() const { - return InGranuleFlag; - } - - template <class T> - const T& GetObject() const { - auto result = dynamic_cast<const T*>(CompactionObject.get()); - Y_VERIFY(result); - return *result; - } - - void CompactionFinished() const { - Y_VERIFY(!StatusProvided); - StatusProvided = true; - CompactionObject->OnCompactionFinished(); - } - - void CompactionCanceled(const TString& reason) const { - Y_VERIFY(!StatusProvided); - StatusProvided = true; - CompactionObject->OnCompactionCanceled(reason); - } - - void CompactionFailed(const TString& reason) const { - Y_VERIFY(!StatusProvided); - StatusProvided = true; - CompactionObject->OnCompactionFailed(reason); - } - - ~TCompactionInfo() { - Y_VERIFY_DEBUG(StatusProvided); - if (!StatusProvided) { - CompactionObject->OnCompactionFailed("compaction unexpectedly finished"); - } - } - - friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) { - out << (info.InGranuleFlag ? "in granule" : "split granule") << " compaction of granule: " << info.CompactionObject->DebugString(); - return out; - } -}; - struct TPortionEvictionFeatures { const TString TargetTierName; const ui64 PathId; // portion path id for cold-storage-key construct @@ -632,7 +568,9 @@ public: virtual const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return GetIndexInfo().GetSortingKey(); } virtual const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return GetIndexInfo().GetIndexKey(); } virtual const THashSet<ui64>* GetOverloadedGranules(ui64 /*pathId*/) const { return nullptr; } - virtual bool HasOverloadedGranules() const { return false; } + bool HasOverloadedGranules(const ui64 pathId) const { + return GetOverloadedGranules(pathId) != nullptr; + } virtual TString SerializeMark(const NArrow::TReplaceKey& key) const = 0; virtual NArrow::TReplaceKey DeserializeMark(const TString& key, std::optional<ui32> markNumKeys) const = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 39ca47a44c6..4a225e60dfe 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -201,8 +201,6 @@ public: return GranulesStorage->GetOverloaded(pathId); } - bool HasOverloadedGranules() const override { return GranulesStorage->HasOverloadedGranules(); } - TString SerializeMark(const NArrow::TReplaceKey& key) const override { if (UseCompositeMarks()) { return TMark::SerializeComposite(key, MarkSchema()); diff --git a/ydb/core/tx/columnshard/engines/compaction_info.cpp b/ydb/core/tx/columnshard/engines/compaction_info.cpp new file mode 100644 index 00000000000..699aa959f65 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/compaction_info.cpp @@ -0,0 +1,11 @@ +#include "compaction_info.h" +#include "storage/granule.h" + +namespace NKikimr::NOlap { + +NKikimr::NOlap::TPlanCompactionInfo TCompactionInfo::GetPlanCompaction() const { + auto& granuleMeta = GetObject<TGranuleMeta>(); + return TPlanCompactionInfo(granuleMeta.GetPathId(), InGranule()); +} + +} diff --git a/ydb/core/tx/columnshard/engines/compaction_info.h b/ydb/core/tx/columnshard/engines/compaction_info.h new file mode 100644 index 00000000000..37ffc9dcffd --- /dev/null +++ b/ydb/core/tx/columnshard/engines/compaction_info.h @@ -0,0 +1,97 @@ +#pragma once +#include <util/generic/string.h> +#include <util/system/yassert.h> +#include <util/stream/output.h> +#include <memory> + +namespace NKikimr::NOlap { + +class ICompactionObjectCallback { +public: + virtual ~ICompactionObjectCallback() = default; + virtual void OnCompactionStarted(const bool inGranule) = 0; + virtual void OnCompactionFinished() = 0; + virtual void OnCompactionFailed(const TString& reason) = 0; + virtual void OnCompactionCanceled(const TString& reason) = 0; + virtual TString DebugString() const = 0; +}; + +class TPlanCompactionInfo { +private: + ui64 PathId = 0; + bool InternalFlag = false; +public: + TPlanCompactionInfo(const ui64 pathId, const bool internalFlag) + : PathId(pathId) + , InternalFlag(internalFlag) { + + } + + ui64 GetPathId() const { + return PathId; + } + + bool IsInternal() const { + return InternalFlag; + } +}; + +struct TCompactionInfo { +private: + std::shared_ptr<ICompactionObjectCallback> CompactionObject; + mutable bool StatusProvided = false; + const bool InGranuleFlag = false; +public: + TCompactionInfo(std::shared_ptr<ICompactionObjectCallback> compactionObject, const bool inGranule) + : CompactionObject(compactionObject) + , InGranuleFlag(inGranule) + { + Y_VERIFY(compactionObject); + CompactionObject->OnCompactionStarted(InGranuleFlag); + } + + TPlanCompactionInfo GetPlanCompaction() const; + + bool InGranule() const { + return InGranuleFlag; + } + + template <class T> + const T& GetObject() const { + auto result = dynamic_cast<const T*>(CompactionObject.get()); + Y_VERIFY(result); + return *result; + } + + void CompactionFinished() const { + Y_VERIFY(!StatusProvided); + StatusProvided = true; + CompactionObject->OnCompactionFinished(); + } + + void CompactionCanceled(const TString& reason) const { + Y_VERIFY(!StatusProvided); + StatusProvided = true; + CompactionObject->OnCompactionCanceled(reason); + } + + void CompactionFailed(const TString& reason) const { + Y_VERIFY(!StatusProvided); + StatusProvided = true; + CompactionObject->OnCompactionFailed(reason); + } + + ~TCompactionInfo() { + Y_VERIFY_DEBUG(StatusProvided); + if (!StatusProvided) { + CompactionObject->OnCompactionFailed("compaction unexpectedly finished"); + } + } + + friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) { + out << (info.InGranuleFlag ? "in granule" : "split granule") << " compaction of granule: " << info.CompactionObject->DebugString(); + return out; + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index e8cef6ca7e1..2b8aeb39455 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -34,12 +34,10 @@ void TDbWrapper::EraseAborted(const TInsertedData& data) { NColumnShard::Schema::InsertTable_EraseAborted(db, data); } -bool TDbWrapper::Load(THashMap<TWriteId, TInsertedData>& inserted, - THashMap<ui64, TSet<TInsertedData>>& committed, - THashMap<TWriteId, TInsertedData>& aborted, +bool TDbWrapper::Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) { NIceDb::TNiceDb db(Database); - return NColumnShard::Schema::InsertTable_Load(db, DsGroupSelector, inserted, committed, aborted, loadTime); + return NColumnShard::Schema::InsertTable_Load(db, DsGroupSelector, insertTable, loadTime); } void TDbWrapper::WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) { diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index b893c584733..0e32b6d18b7 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -8,6 +8,7 @@ class TDatabase; namespace NKikimr::NOlap { struct TInsertedData; +class TInsertTableAccessor; struct TColumnRecord; struct TGranuleRecord; class IColumnEngine; @@ -23,9 +24,7 @@ public: virtual void EraseCommitted(const TInsertedData& data) = 0; virtual void EraseAborted(const TInsertedData& data) = 0; - virtual bool Load(THashMap<TWriteId, TInsertedData>& inserted, - THashMap<ui64, TSet<TInsertedData>>& committed, - THashMap<TWriteId, TInsertedData>& aborted, + virtual bool Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) = 0; virtual void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0; @@ -54,9 +53,7 @@ public: void EraseCommitted(const TInsertedData& data) override; void EraseAborted(const TInsertedData& data) override; - bool Load(THashMap<TWriteId, TInsertedData>& inserted, - THashMap<ui64, TSet<TInsertedData>>& committed, - THashMap<TWriteId, TInsertedData>& aborted, + bool Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) override; void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override; diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 886b67762d2..64d1ca85694 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -1,5 +1,4 @@ #include "index_info.h" -#include "insert_table.h" #include "column_engine.h" #include <ydb/core/formats/arrow/arrow_batch_builder.h> diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..3ee6b423795 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-insert_table) +target_link_libraries(columnshard-engines-insert_table PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(columnshard-engines-insert_table PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp +) diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..e2faa9f1855 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-insert_table) +target_link_libraries(columnshard-engines-insert_table PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(columnshard-engines-insert_table PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp +) diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..e2faa9f1855 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-insert_table) +target_link_libraries(columnshard-engines-insert_table PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(columnshard-engines-insert_table PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp +) diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..3ee6b423795 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-insert_table) +target_link_libraries(columnshard-engines-insert_table PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(columnshard-engines-insert_table PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp +) diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.cpp b/ydb/core/tx/columnshard/engines/insert_table/data.cpp new file mode 100644 index 00000000000..2830fe24c17 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/data.cpp @@ -0,0 +1,5 @@ +#include "data.h" + +namespace NKikimr::NOlap { + +} diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/data.h index 2e9ddd5f097..dcbae641e22 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table/data.h @@ -1,8 +1,6 @@ #pragma once #include <ydb/core/tx/columnshard/blob.h> -#include <util/generic/set.h> - -#include "defs.h" +#include <ydb/core/tx/columnshard/engines/defs.h> namespace NKikimr::NOlap { @@ -136,52 +134,6 @@ public: } }; -class IDbWrapper; - -/// Use one table for inserted and commited blobs: -/// !Commited => {ShardOrPlan, WriteTxId} are {MetaShard, WriteId} -/// Commited => {ShardOrPlan, WriteTxId} are {PlanStep, TxId} -class TInsertTable { -public: - static constexpr const TDuration WaitCommitDelay = TDuration::Hours(24); - static constexpr const TDuration CleanDelay = TDuration::Minutes(10); - - struct TCounters { - ui64 Rows{}; - ui64 Bytes{}; - ui64 RawBytes{}; - }; - - bool Insert(IDbWrapper& dbTable, TInsertedData&& data); - TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, - const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists); - void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds); - THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const; - THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId); - void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key); - void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key); - std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot) const; - bool Load(IDbWrapper& dbTable, const TInstant& loadTime); - const TCounters& GetCountersPrepared() const { return StatsPrepared; } - const TCounters& GetCountersCommitted() const { return StatsCommitted; } - - size_t InsertedSize() const { return Inserted.size(); } - const THashMap<ui64, TSet<TInsertedData>>& GetCommitted() const { return CommittedByPathId; } - const THashMap<TWriteId, TInsertedData>& GetAborted() const { return Aborted; } - void SetOverloaded(ui64 pathId, bool overload); - bool IsOverloaded(ui64 pathId) const { return PathsOverloaded.contains(pathId); } - bool HasOverloaded() const { return !PathsOverloaded.empty(); } - -private: - THashMap<TWriteId, TInsertedData> Inserted; - THashMap<ui64, TSet<TInsertedData>> CommittedByPathId; - THashMap<TWriteId, TInsertedData> Aborted; - THashSet<ui64> PathsOverloaded; - mutable TInstant LastCleanup; - TCounters StatsPrepared; - TCounters StatsCommitted; -}; - } template <> diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index 50f15ace780..c7a799de8c4 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -1,22 +1,54 @@ -#include "defs.h" #include "insert_table.h" -#include "db_wrapper.h" -#include <ydb/core/tx/columnshard/columnshard_schema.h> #include <ydb/core/protos/tx_columnshard.pb.h> +#include <ydb/core/tx/columnshard/engines/db_wrapper.h> +#include <ydb/core/tx/columnshard/engines/column_engine.h> namespace NKikimr::NOlap { +void TInsertTable::OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load) noexcept { + if (!load) { + Counters.Inserted.Add(dataSize); + } + pathInfo.AddInsertedSize(dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); + ++StatsPrepared.Rows; + StatsPrepared.Bytes += dataSize; +} + +void TInsertTable::OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept { + Counters.Inserted.Erase(dataSize); + pathInfo.AddInsertedSize(-1 * (i64)dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); + Y_VERIFY(--StatsPrepared.Rows >= 0); + StatsPrepared.Bytes += dataSize; +} + +void TInsertTable::OnNewCommitted(const ui64 dataSize, const bool load) noexcept { + if (!load) { + Counters.Committed.Add(dataSize); + } + ++StatsCommitted.Rows; + StatsCommitted.Bytes += dataSize; +} + +void TInsertTable::OnEraseCommitted(TPathInfo& /*pathInfo*/, const ui64 dataSize) noexcept { + Counters.Committed.Erase(dataSize); + Y_VERIFY(--StatsCommitted.Rows >= 0); + StatsCommitted.Bytes -= dataSize; +} + bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) { TWriteId writeId{data.WriteTxId}; if (Inserted.contains(writeId)) { + Counters.Inserted.SkipAdd(data.BlobSize()); return false; } dbTable.Insert(data); - ui32 dataSize = data.BlobSize(); + const ui32 dataSize = data.BlobSize(); + const ui64 pathId = data.PathId; if (Inserted.emplace(writeId, std::move(data)).second) { - StatsPrepared.Rows = Inserted.size(); - StatsPrepared.Bytes += dataSize; + OnNewInserted(Summary.GetPathInfo(pathId), dataSize); + } else { + Counters.Inserted.SkipAdd(dataSize); } return true; } @@ -40,25 +72,25 @@ TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep, dbTable.EraseInserted(*data); - ui32 dataSize = data->BlobSize(); - + const ui64 dataSize = data->BlobSize(); + const ui64 pathId = data->PathId; + auto* pathInfo = Summary.GetPathInfoOptional(pathId); // There could be commit after drop: propose, drop, plan - if (pathExists(data->PathId)) { + if (pathInfo && pathExists(pathId)) { data->Commit(planStep, txId); dbTable.Commit(*data); - if (CommittedByPathId[data->PathId].emplace(std::move(*data)).second) { - ++StatsCommitted.Rows; - StatsCommitted.Bytes += dataSize; + if (pathInfo->AddCommitted(std::move(*data))) { + OnNewCommitted(dataSize); } } else { dbTable.Abort(*data); + Counters.Aborted.Add(data->BlobSize()); Aborted.emplace(writeId, std::move(*data)); } - if (Inserted.erase(writeId)) { - StatsPrepared.Rows = Inserted.size(); - StatsPrepared.Bytes -= dataSize; + if (pathInfo && Inserted.erase(writeId)) { + OnEraseInserted(*pathInfo, dataSize); } } @@ -72,14 +104,17 @@ void TInsertTable::Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWr for (auto writeId : writeIds) { // There could be inconsistency with txs and writes in case of bugs. So we could find no record for writeId. if (auto* data = Inserted.FindPtr(writeId)) { + Counters.Aborted.Add(data->BlobSize()); dbTable.EraseInserted(*data); dbTable.Abort(*data); - ui32 dataSize = data->BlobSize(); + const ui64 pathId = data->PathId; + const ui32 dataSize = data->BlobSize(); Aborted.emplace(writeId, std::move(*data)); if (Inserted.erase(writeId)) { - StatsPrepared.Rows = Inserted.size(); - StatsPrepared.Bytes -= dataSize; + OnEraseInserted(Summary.GetPathInfo(pathId), dataSize); + } else { + Counters.Inserted.SkipErase(dataSize); } } } @@ -107,20 +142,19 @@ THashSet<TWriteId> TInsertTable::OldWritesToAbort(const TInstant& now) const { THashSet<TWriteId> TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) { // Committed -> Aborted (for future cleanup) - TSet<TInsertedData> committed = std::move(CommittedByPathId[pathId]); - CommittedByPathId.erase(pathId); - - StatsCommitted.Rows -= committed.size(); - for (auto& data : committed) { - StatsCommitted.Bytes -= data.BlobSize(); - + auto pathInfo = Summary.ExtractPathInfo(pathId); + if (!pathInfo) { + return {}; + } + for (auto& data : pathInfo->GetCommitted()) { dbTable.EraseCommitted(data); - + OnEraseCommitted(*pathInfo, data.BlobSize()); TInsertedData copy = data; copy.Undo(); dbTable.Abort(copy); TWriteId writeId{copy.WriteTxId}; + Counters.Aborted.Add(copy.BlobSize()); Aborted.emplace(writeId, std::move(copy)); } @@ -137,14 +171,17 @@ THashSet<TWriteId> TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) { } void TInsertTable::EraseCommitted(IDbWrapper& dbTable, const TInsertedData& data) { - if (!CommittedByPathId.contains(data.PathId)) { + TPathInfo* pathInfo = Summary.GetPathInfoOptional(data.PathId); + if (!pathInfo) { + Counters.Committed.SkipErase(data.BlobSize()); return; } dbTable.EraseCommitted(data); - if (CommittedByPathId[data.PathId].erase(data)) { - --StatsCommitted.Rows; - StatsCommitted.Bytes -= data.BlobSize(); + if (pathInfo->EraseCommitted(data)) { + OnEraseCommitted(*pathInfo, data.BlobSize()); + } else { + Counters.Committed.SkipErase(data.BlobSize()); } } @@ -155,15 +192,14 @@ void TInsertTable::EraseAborted(IDbWrapper& dbTable, const TInsertedData& data) } dbTable.EraseAborted(data); + Counters.Aborted.Erase(data.BlobSize()); Aborted.erase(writeId); } bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant& loadTime) { - Inserted.clear(); - CommittedByPathId.clear(); - Aborted.clear(); + Clear(); - if (!dbTable.Load(Inserted, CommittedByPathId, Aborted, loadTime)) { + if (!dbTable.Load(*this, loadTime)) { return false; } @@ -172,15 +208,13 @@ bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant& loadTime) { StatsPrepared = {}; StatsCommitted = {}; - StatsPrepared.Rows = Inserted.size(); for (auto& [_, data] : Inserted) { - StatsPrepared.Bytes += data.BlobSize(); + OnNewInserted(Summary.GetPathInfo(data.PathId), data.BlobSize()); } - for (auto& [_, set] : CommittedByPathId) { - StatsCommitted.Rows += set.size(); - for (auto& data : set) { - StatsCommitted.Bytes += data.BlobSize(); + for (auto& [pathId, pathInfo] : Summary.GetPathInfo()) { + for (auto& data : pathInfo.GetCommitted()) { + OnNewCommitted(data.BlobSize()); } } @@ -188,15 +222,15 @@ bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant& loadTime) { } std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot) const { - const auto* committed = CommittedByPathId.FindPtr(pathId); - if (!committed) { + const TPathInfo* pInfo = Summary.GetPathInfoOptional(pathId); + if (!pInfo) { return {}; } std::vector<TCommittedBlob> ret; - ret.reserve(committed->size()); + ret.reserve(pInfo->GetCommitted().size()); - for (const auto& data : *committed) { + for (const auto& data : pInfo->GetCommitted()) { if (std::less_equal<TSnapshot>()(data.GetSnapshot(), snapshot)) { ret.emplace_back(TCommittedBlob(data.BlobId, data.GetSnapshot(), data.GetSchemaSnapshot())); } @@ -205,12 +239,4 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& sna return ret; } -void TInsertTable::SetOverloaded(ui64 pathId, bool overload) { - if (overload) { - PathsOverloaded.insert(pathId); - } else { - PathsOverloaded.erase(pathId); - } -} - } diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h new file mode 100644 index 00000000000..d777b2e0a73 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h @@ -0,0 +1,87 @@ +#pragma once +#include "data.h" +#include "rt_insertion.h" +#include "path_info.h" +#include <ydb/core/tx/columnshard/counters/insert_table.h> + +namespace NKikimr::NOlap { + +class IDbWrapper; + +/// Use one table for inserted and committed blobs: +/// !Commited => {ShardOrPlan, WriteTxId} are {MetaShard, WriteId} +/// Commited => {ShardOrPlan, WriteTxId} are {PlanStep, TxId} + +class TInsertTableAccessor { +protected: + THashMap<TWriteId, TInsertedData> Inserted; + THashMap<TWriteId, TInsertedData> Aborted; + TInsertionSummary Summary; + +protected: + void Clear() { + Inserted.clear(); + Summary.Clear(); + Aborted.clear(); + } +public: + const std::map<ui64, std::set<const TPathInfo*>>& GetPathPriorities() const { + return Summary.GetPathPriorities(); + } + + bool AddInserted(const TWriteId& writeId, TInsertedData&& data) { + return Inserted.emplace(writeId, std::move(data)).second; + } + bool AddAborted(const TWriteId& writeId, TInsertedData&& data) { + return Aborted.emplace(writeId, std::move(data)).second; + } + bool AddCommitted(TInsertedData&& data) { + const ui64 pathId = data.PathId; + return Summary.GetPathInfo(pathId).AddCommitted(std::move(data)); + } +}; + +class TInsertTable: public TInsertTableAccessor { +private: + void OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load = false) noexcept; + void OnNewCommitted(const ui64 dataSize, const bool load = false) noexcept; + void OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept; + void OnEraseCommitted(TPathInfo& pathInfo, const ui64 dataSize) noexcept; + +public: + static constexpr const TDuration WaitCommitDelay = TDuration::Hours(24); + static constexpr const TDuration CleanDelay = TDuration::Minutes(10); + + struct TCounters { + ui64 Rows{}; + ui64 Bytes{}; + ui64 RawBytes{}; + }; + + bool Insert(IDbWrapper& dbTable, TInsertedData&& data); + TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, + const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists); + void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds); + THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const; + THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId); + void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key); + void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key); + std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot) const; + bool Load(IDbWrapper& dbTable, const TInstant& loadTime); + const TCounters& GetCountersPrepared() const { return StatsPrepared; } + const TCounters& GetCountersCommitted() const { return StatsCommitted; } + + size_t InsertedSize() const { return Inserted.size(); } + const THashMap<TWriteId, TInsertedData>& GetAborted() const { return Aborted; } + bool IsOverloadedByCommitted(const ui64 pathId) const { + return Summary.IsOverloaded(pathId); + } +private: + + mutable TInstant LastCleanup; + TCounters StatsPrepared; + TCounters StatsCommitted; + const NColumnShard::TInsertTableCounters Counters; +}; + +} diff --git a/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp b/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp new file mode 100644 index 00000000000..489ec93cd5f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp @@ -0,0 +1,62 @@ +#include "path_info.h" +#include "rt_insertion.h" +#include <ydb/core/tx/columnshard/engines/column_engine.h> + +namespace NKikimr::NOlap { + +bool TPathInfo::SetCommittedOverload(const bool value) { + const bool startOverloaded = IsOverloaded(); + CommittedOverload = value; + return startOverloaded != IsOverloaded(); +} + +bool TPathInfo::SetInsertedOverload(const bool value) { + const bool startOverloaded = IsOverloaded(); + InsertedOverload = value; + return startOverloaded != IsOverloaded(); +} + +void TPathInfo::AddCommittedSize(const i64 size, const ui64 overloadLimit) { + CommittedSize += size; + Y_VERIFY(CommittedSize >= 0); + Summary->CommittedSize += size; + Y_VERIFY(Summary->CommittedSize >= 0); + SetCommittedOverload((ui64)CommittedSize > overloadLimit); +} + +void TPathInfo::AddInsertedSize(const i64 size, const ui64 overloadLimit) { + InsertedSize += size; + Y_VERIFY(InsertedSize >= 0); + Summary->InsertedSize += size; + Y_VERIFY(Summary->InsertedSize >= 0); + PathIdCounters.Committed.OnPathIdDataInfo(InsertedSize, 0); + SetInsertedOverload((ui64)InsertedSize > overloadLimit); +} + +bool TPathInfo::EraseCommitted(const TInsertedData& data) { + Summary->RemovePriority(*this); + const bool result = Committed.erase(data); + AddCommittedSize(-1 * (i64)data.BlobSize(), TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); + Summary->AddPriority(*this); + PathIdCounters.Committed.OnPathIdDataInfo(CommittedSize, Committed.size()); + return result; +} + +bool TPathInfo::AddCommitted(TInsertedData&& data) { + Summary->RemovePriority(*this); + AddCommittedSize(data.BlobSize(), TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); + bool result = Committed.emplace(std::move(data)).second; + Summary->AddPriority(*this); + PathIdCounters.Committed.OnPathIdDataInfo(CommittedSize, Committed.size()); + return result; +} + +TPathInfo::TPathInfo(TInsertionSummary& summary, const ui64 pathId) + : PathId(pathId) + , Summary(&summary) + , PathIdCounters(Summary->GetCounters().GetPathIdCounters()) +{ + +} + +} diff --git a/ydb/core/tx/columnshard/engines/insert_table/path_info.h b/ydb/core/tx/columnshard/engines/insert_table/path_info.h new file mode 100644 index 00000000000..57f19d1298a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/path_info.h @@ -0,0 +1,50 @@ +#pragma once +#include <ydb/core/tx/columnshard/counters/insert_table.h> +#include <util/generic/noncopyable.h> +#include "data.h" + +namespace NKikimr::NOlap { +class TInsertionSummary; +class TPathInfo: public TMoveOnly { +private: + const ui64 PathId = 0; + TSet<TInsertedData> Committed; + i64 CommittedSize = 0; + i64 InsertedSize = 0; + bool CommittedOverload = false; + bool InsertedOverload = false; + TInsertionSummary* Summary = nullptr; + const NColumnShard::TPathIdOwnedCounters PathIdCounters; + + bool SetCommittedOverload(const bool value); + bool SetInsertedOverload(const bool value); + + void AddCommittedSize(const i64 size, const ui64 overloadLimit); + +public: + void AddInsertedSize(const i64 size, const ui64 overloadLimit); + + explicit TPathInfo(TInsertionSummary& summary, const ui64 pathId); + + ui64 GetPathId() const { + return PathId; + } + + ui64 GetIndexationPriority() const { + return CommittedSize * Committed.size() * Committed.size(); + } + + bool EraseCommitted(const TInsertedData& data); + + const TSet<TInsertedData>& GetCommitted() const { + return Committed; + } + + bool AddCommitted(TInsertedData&& data); + + bool IsOverloaded() const { + return CommittedOverload || InsertedOverload; + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp new file mode 100644 index 00000000000..f76b5379379 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp @@ -0,0 +1,70 @@ +#include "rt_insertion.h" + +namespace NKikimr::NOlap { + +void TInsertionSummary::RemovePriority(const TPathInfo& pathInfo) noexcept { + const ui64 priority = pathInfo.GetIndexationPriority(); + auto it = Priorities.find(priority); + if (it == Priorities.end()) { + Y_VERIFY(priority == 0); + return; + } + Y_VERIFY(it->second.erase(&pathInfo) || priority == 0); + if (it->second.empty()) { + Priorities.erase(it); + } +} + +void TInsertionSummary::AddPriority(const TPathInfo& pathInfo) noexcept { + Y_VERIFY(Priorities[pathInfo.GetIndexationPriority()].emplace(&pathInfo).second); +} + +NKikimr::NOlap::TPathInfo& TInsertionSummary::GetPathInfo(const ui64 pathId) { + auto it = PathInfo.find(pathId); + if (it == PathInfo.end()) { + it = PathInfo.emplace(pathId, TPathInfo(*this, pathId)).first; + } + return it->second; +} + +std::optional<NKikimr::NOlap::TPathInfo> TInsertionSummary::ExtractPathInfo(const ui64 pathId) { + auto it = PathInfo.find(pathId); + if (it == PathInfo.end()) { + return {}; + } + RemovePriority(it->second); + std::optional<TPathInfo> result = std::move(it->second); + PathInfo.erase(it); + return result; +} + +NKikimr::NOlap::TPathInfo* TInsertionSummary::GetPathInfoOptional(const ui64 pathId) { + auto it = PathInfo.find(pathId); + if (it == PathInfo.end()) { + return nullptr; + } + return &it->second; +} + +const NKikimr::NOlap::TPathInfo* TInsertionSummary::GetPathInfoOptional(const ui64 pathId) const { + auto it = PathInfo.find(pathId); + if (it == PathInfo.end()) { + return nullptr; + } + return &it->second; +} + +bool TInsertionSummary::IsOverloaded(const ui64 pathId) const { + auto it = PathInfo.find(pathId); + if (it == PathInfo.end()) { + return false; + } + return it->second.IsOverloaded(); +} + +void TInsertionSummary::Clear() { + PathInfo.clear(); + Priorities.clear(); +} + +} diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h new file mode 100644 index 00000000000..63118535b59 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h @@ -0,0 +1,41 @@ +#pragma once +#include <ydb/core/tx/columnshard/counters/insert_table.h> +#include <ydb/library/accessor/accessor.h> +#include "path_info.h" + +namespace NKikimr::NOlap { + +class TInsertionSummary { +private: + const NColumnShard::TInsertTableCounters Counters; + YDB_READONLY(i64, CommittedSize, 0); + YDB_READONLY(i64, InsertedSize, 0); + std::map<ui64, std::set<const TPathInfo*>> Priorities; + THashMap<ui64, TPathInfo> PathInfo; + friend class TPathInfo; + void RemovePriority(const TPathInfo& pathInfo) noexcept; + void AddPriority(const TPathInfo& pathInfo) noexcept; + +public: + const NColumnShard::TInsertTableCounters& GetCounters() const { + return Counters; + } + NKikimr::NOlap::TPathInfo& GetPathInfo(const ui64 pathId); + std::optional<TPathInfo> ExtractPathInfo(const ui64 pathId); + TPathInfo* GetPathInfoOptional(const ui64 pathId); + const TPathInfo* GetPathInfoOptional(const ui64 pathId) const; + + const THashMap<ui64, TPathInfo>& GetPathInfo() const { + return PathInfo; + } + + void Clear(); + + bool IsOverloaded(const ui64 pathId) const; + + const std::map<ui64, std::set<const TPathInfo*>>& GetPathPriorities() const { + return Priorities; + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 77a31505227..6d93d765c66 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -6,6 +6,7 @@ #include <ydb/core/tx/columnshard/counters.h> #include <ydb/core/tx/columnshard/columnshard__scan.h> #include <ydb/core/tx/columnshard/columnshard_common.h> +#include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h> #include <ydb/core/tx/columnshard/engines/predicate/predicate.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> #include <ydb/core/scheme_types/scheme_type_info.h> diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 6d5e5475284..1446af52ffb 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -242,6 +242,10 @@ public: return Portions; } + ui64 GetPathId() const { + return Record.PathId; + } + const TPortionInfo& GetPortionVerified(const ui64 portion) const { auto it = Portions.find(portion); Y_VERIFY(it != Portions.end()); diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index 3688a52e2cd..6be2efea27a 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -1,7 +1,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <util/string/printf.h> #include "db_wrapper.h" -#include "insert_table.h" +#include "insert_table/insert_table.h" namespace NKikimr { @@ -18,9 +18,7 @@ public: void EraseCommitted(const TInsertedData&) override {} void EraseAborted(const TInsertedData&) override {} - bool Load(THashMap<TWriteId, TInsertedData>&, - THashMap<ui64, TSet<TInsertedData>>&, - THashMap<TWriteId, TInsertedData>&, + bool Load(TInsertTableAccessor&, const TInstant&) override { return true; @@ -78,9 +76,9 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { ui64 txId = 42; insertTable.Commit(dbTable, planStep, txId, metaShard, {TWriteId{writeId}}, [](ui64){ return true; }); - auto committed = insertTable.GetCommitted(); - UNIT_ASSERT_EQUAL(committed.size(), 1); - UNIT_ASSERT_EQUAL(committed.begin()->second.size(), 1); + UNIT_ASSERT_EQUAL(insertTable.GetPathPriorities().size(), 1); + UNIT_ASSERT_EQUAL(insertTable.GetPathPriorities().begin()->second.size(), 1); + UNIT_ASSERT_EQUAL((*insertTable.GetPathPriorities().begin()->second.begin())->GetCommitted().size(), 1); // read old snapshot blobs = insertTable.Read(tableId, TSnapshot::Zero()); diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index a3fdbdd9332..a7a58da4d02 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -47,13 +47,20 @@ public: Aborted.erase(TWriteId{data.WriteTxId}); } - bool Load(THashMap<TWriteId, TInsertedData>& inserted, - THashMap<ui64, TSet<TInsertedData>>& committed, - THashMap<TWriteId, TInsertedData>& aborted, + bool Load(TInsertTableAccessor& accessor, const TInstant&) override { - inserted = Inserted; - committed = Committed; - aborted = Aborted; + for (auto&& i : Inserted) { + accessor.AddInserted(i.first, std::move(i.second)); + } + for (auto&& i : Aborted) { + accessor.AddAborted(i.first, std::move(i.second)); + } + for (auto&& i : Committed) { + for (auto&& c: i.second) { + auto copy = c; + accessor.AddCommitted(std::move(copy)); + } + } return true; } diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index afc4181a1b6..6ec23442140 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -158,8 +158,8 @@ public: return SchemaPresets; } - bool IndexOverloaded() const { - return PrimaryIndex && PrimaryIndex->HasOverloadedGranules(); + bool IsOverloaded(const ui64 pathId) const { + return PrimaryIndex && PrimaryIndex->HasOverloadedGranules(pathId); } bool HasPrimaryIndex() const { |