diff options
author | nsofya <nsofya@yandex-team.com> | 2023-07-16 16:33:42 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-07-16 16:33:42 +0300 |
commit | e2d90629d3845752f212957a12f10d9fe37f1253 (patch) | |
tree | c92ee558e8d5ec71ad87614d8b0c80e05ccaa310 | |
parent | 2a34c2363c9fe7aaf201df9bb8de005a1a8fbaef (diff) | |
download | ydb-e2d90629d3845752f212957a12f10d9fe37f1253.tar.gz |
KIKIMR-18343: Check overloaded function
IsWritableTable -> IsReadyForWrite
Check overloaded function
-rw-r--r-- | ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 150 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 15 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/tables_manager.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/tables_manager.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ya.make | 1 |
9 files changed, 138 insertions, 78 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index f11d99ca83..3efff5739f 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -19,6 +19,12 @@ get_built_tool_path( tools/enum_parser/enum_parser enum_parser ) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(core-tx-columnshard) target_compile_options(core-tx-columnshard PRIVATE @@ -91,3 +97,8 @@ generate_enum_serilization(core-tx-columnshard INCLUDE_HEADERS ydb/core/tx/columnshard/columnshard.h ) +generate_enum_serilization(core-tx-columnshard + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/columnshard_impl.h +) diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 589bd3ab85..9908953b5d 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -19,6 +19,12 @@ get_built_tool_path( tools/enum_parser/enum_parser enum_parser ) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(core-tx-columnshard) target_compile_options(core-tx-columnshard PRIVATE @@ -92,3 +98,8 @@ generate_enum_serilization(core-tx-columnshard INCLUDE_HEADERS ydb/core/tx/columnshard/columnshard.h ) +generate_enum_serilization(core-tx-columnshard + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/columnshard_impl.h +) diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 589bd3ab85..9908953b5d 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -19,6 +19,12 @@ get_built_tool_path( tools/enum_parser/enum_parser enum_parser ) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(core-tx-columnshard) target_compile_options(core-tx-columnshard PRIVATE @@ -92,3 +98,8 @@ generate_enum_serilization(core-tx-columnshard INCLUDE_HEADERS ydb/core/tx/columnshard/columnshard.h ) +generate_enum_serilization(core-tx-columnshard + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/columnshard_impl.h +) diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index f11d99ca83..3efff5739f 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -19,6 +19,12 @@ get_built_tool_path( tools/enum_parser/enum_parser enum_parser ) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(core-tx-columnshard) target_compile_options(core-tx-columnshard PRIVATE @@ -91,3 +97,8 @@ generate_enum_serilization(core-tx-columnshard INCLUDE_HEADERS ydb/core/tx/columnshard/columnshard.h ) +generate_enum_serilization(core-tx-columnshard + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.h + INCLUDE_HEADERS + ydb/core/tx/columnshard/columnshard_impl.h +) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 6e76150f8a..170d90151d 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -59,62 +59,59 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { Y_VERIFY(logoBlobId.IsValid()); bool ok = false; - if (!Self->TablesManager.HasPrimaryIndex() || !Self->TablesManager.IsWritableTable(tableId)) { - status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; - } else { - if (record.HasLongTxId()) { - Y_VERIFY(metaShard == 0); - auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId()); - writeId = (ui64)Self->GetLongTxWrite(db, longTxId, record.GetWritePartId()); - } + Y_VERIFY(Self->TablesManager.IsReadyForWrite(tableId)); + if (record.HasLongTxId()) { + Y_VERIFY(metaShard == 0); + auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId()); + writeId = (ui64)Self->GetLongTxWrite(db, longTxId, record.GetWritePartId()); + } - ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds(); - TInstant time = TInstant::Seconds(writeUnixTime); - - // First write wins - TBlobGroupSelector dsGroupSelector(Self->Info()); - NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - - const auto& snapshotSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); - NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time, snapshotSchema->GetSnapshot()); - ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); - if (ok) { - THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time); - Self->TryAbortWrites(db, dbTable, std::move(writesToAbort)); - - // TODO: It leads to write+erase for aborted rows. Abort() inserts rows, EraseAborted() erases them. - // It's not optimal but correct. - TBlobManagerDb blobManagerDb(txc.DB); - auto allAborted = Self->InsertTable->GetAborted(); // copy (src is modified in cycle) - for (auto& [abortedWriteId, abortedData] : allAborted) { - Self->InsertTable->EraseAborted(dbTable, abortedData); - Self->BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb); - } + ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds(); + TInstant time = TInstant::Seconds(writeUnixTime); + + // First write wins + TBlobGroupSelector dsGroupSelector(Self->Info()); + NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); + + const auto& snapshotSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); + NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time, snapshotSchema->GetSnapshot()); + ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); + if (ok) { + THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time); + Self->TryAbortWrites(db, dbTable, std::move(writesToAbort)); + + // TODO: It leads to write+erase for aborted rows. Abort() inserts rows, EraseAborted() erases them. + // It's not optimal but correct. + TBlobManagerDb blobManagerDb(txc.DB); + auto allAborted = Self->InsertTable->GetAborted(); // copy (src is modified in cycle) + for (auto& [abortedWriteId, abortedData] : allAborted) { + Self->InsertTable->EraseAborted(dbTable, abortedData); + Self->BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb); + } - // Put new data into blob cache - Y_VERIFY(logoBlobId.BlobSize() == data.size()); - NBlobCache::AddRangeToCache(NBlobCache::TBlobRange(logoBlobId, 0, data.size()), data); + // Put new data into blob cache + Y_VERIFY(logoBlobId.BlobSize() == data.size()); + NBlobCache::AddRangeToCache(NBlobCache::TBlobRange(logoBlobId, 0, data.size()), data); - // Put new data into batch cache - Y_VERIFY(Ev->Get()->WrittenBatch); - Self->BatchCache.Insert(TWriteId(writeId), logoBlobId, Ev->Get()->WrittenBatch); + // Put new data into batch cache + Y_VERIFY(Ev->Get()->WrittenBatch); + Self->BatchCache.Insert(TWriteId(writeId), logoBlobId, Ev->Get()->WrittenBatch); - Self->UpdateInsertTableCounters(); + Self->UpdateInsertTableCounters(); - ui64 blobsWritten = Ev->Get()->BlobBatch.GetBlobCount(); - ui64 bytesWritten = Ev->Get()->BlobBatch.GetTotalSize(); - Self->IncCounter(COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten); - Self->IncCounter(COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten); - Self->IncCounter(COUNTER_RAW_BYTES_UPSERTED, meta.GetRawBytes()); - Self->IncCounter(COUNTER_WRITE_SUCCESS); + ui64 blobsWritten = Ev->Get()->BlobBatch.GetBlobCount(); + ui64 bytesWritten = Ev->Get()->BlobBatch.GetTotalSize(); + Self->IncCounter(COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten); + Self->IncCounter(COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten); + Self->IncCounter(COUNTER_RAW_BYTES_UPSERTED, meta.GetRawBytes()); + Self->IncCounter(COUNTER_WRITE_SUCCESS); - Self->BlobManager->SaveBlobBatch(std::move(Ev->Get()->BlobBatch), blobManagerDb); - } else { - LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << writeId << TxSuffix()); + Self->BlobManager->SaveBlobBatch(std::move(Ev->Get()->BlobBatch), blobManagerDb); + } else { + LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << writeId << TxSuffix()); - // Return EResultStatus::SUCCESS for dups - Self->IncCounter(COUNTER_WRITE_DUPLICATE); - } + // Return EResultStatus::SUCCESS for dups + Self->IncCounter(COUNTER_WRITE_DUPLICATE); } if (status != NKikimrTxColumnShard::EResultStatus::SUCCESS) { @@ -134,9 +131,15 @@ void TTxWrite::Complete(const TActorContext& ctx) { ctx.Send(Ev->Get()->GetSource(), Result.release()); } -void TColumnShard::OverloadWriteFail(const TString& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { +void TColumnShard::OverloadWriteFail(const EOverloadStatus& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { + Y_VERIFY(overloadReason != EOverloadStatus::None); + IncCounter(COUNTER_WRITE_FAIL); - IncCounter(COUNTER_WRITE_OVERLOAD); + if (overloadReason == EOverloadStatus::Disk) { + IncCounter(COUNTER_OUT_OF_SPACE); + } else { + IncCounter(COUNTER_WRITE_OVERLOAD); + } const auto& record = Proto(ev->Get()); const auto& data = record.GetData(); @@ -154,6 +157,24 @@ void TColumnShard::OverloadWriteFail(const TString& overloadReason, TEvColumnSha ctx.Send(ev->Get()->GetSource(), result.release()); } +TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId, const ui64 dataSize) const { + if (IsAnyChannelYellowStop()) { + return EOverloadStatus::Disk; + } + + if (InsertTable && InsertTable->IsOverloadedByCommitted(tableId)) { + CSCounters.OnOverloadInsertTable(dataSize); + return EOverloadStatus::InsertTable; + } else if (TablesManager.IsOverloaded(tableId)) { + CSCounters.OnOverloadGranule(dataSize);; + return EOverloadStatus::Granule; + } else if (ShardOverloaded()) { + CSCounters.OnOverloadShard(dataSize); + return EOverloadStatus::Shard; + } + return EOverloadStatus::None; +} + // EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { LastAccessTime = TAppData::TimeProvider->Now(); @@ -168,15 +189,14 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex const TString dedupId = record.GetDedupId(); const auto putStatus = ev->Get()->GetPutStatus(); - bool isWritable = TablesManager.IsWritableTable(tableId); - bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.HasPrimaryIndex() || !isWritable; + bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.IsReadyForWrite(tableId); bool errorReturned = (putStatus != NKikimrProto::OK) && (putStatus != NKikimrProto::UNKNOWN); - bool isOutOfSpace = IsAnyChannelYellowStop(); + auto overloadStatus = CheckOverloaded(tableId, data.size()); if (error || errorReturned) { LOG_S_NOTICE("Write (fail) " << data.size() << " bytes into pathId " << tableId << ", status " << putStatus - << (TablesManager.HasPrimaryIndex()? "": ", no index") << (isWritable? "": ", ro") + << (TablesManager.HasPrimaryIndex()? "": ", no index") << " at tablet " << TabletID()); IncCounter(COUNTER_WRITE_FAIL); @@ -206,24 +226,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex WritesSizeInFlight -= ev->Get()->ResourceUsage.SourceMemorySize; Y_VERIFY(putStatus == NKikimrProto::OK); Execute(new TTxWrite(this, ev), ctx); - } else if (isOutOfSpace) { - IncCounter(COUNTER_WRITE_FAIL); - IncCounter(COUNTER_OUT_OF_SPACE); - LOG_S_ERROR("Write (out of disk space) " << data.size() << " bytes into pathId " << tableId - << " at tablet " << TabletID()); - - auto result = std::make_unique<TEvColumnShard::TEvWriteResult>( - TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::OVERLOADED); - ctx.Send(ev->Get()->GetSource(), result.release()); - } else if (InsertTable && InsertTable->IsOverloadedByCommitted(tableId)) { - CSCounters.OnOverloadInsertTable(data.size()); - OverloadWriteFail("insert_table", ev, ctx); - } else if (TablesManager.IsOverloaded(tableId)) { - CSCounters.OnOverloadGranule(data.size()); - OverloadWriteFail("granule", ev, ctx); - } else if (ShardOverloaded()) { - CSCounters.OnOverloadShard(data.size()); - OverloadWriteFail("shard", ev, ctx); + } else if (overloadStatus != EOverloadStatus::None) { + OverloadWriteFail(overloadStatus, ev, ctx); } else { if (record.HasLongTxId()) { // TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId) diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index de16d09960..cc4ca9af31 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -161,8 +161,6 @@ class TColumnShard void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx); void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev); - void OverloadWriteFail(const TString& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx); - ITransaction* CreateTxInitSchema(); ITransaction* CreateTxRunGc(); @@ -212,6 +210,19 @@ class TColumnShard void ActivateTiering(const ui64 pathId, const TString& useTiering); +public: + enum class EOverloadStatus { + Shard /* "shard" */, + Granule /* "granule" */, + InsertTable /* "insert_table" */, + Disk /* "disk" */, + None + }; + +private: + void OverloadWriteFail(const EOverloadStatus& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx); + EOverloadStatus CheckOverloaded(const ui64 tableId, const ui64 dataSize) const; + protected: STFUNC(StateInit) { TRACE_EVENT(NKikimrServices::TX_COLUMNSHARD); diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index f462ecbfad..f255c87ec3 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -158,8 +158,8 @@ bool TTablesManager::HasTable(const ui64 pathId) const { return true; } -bool TTablesManager::IsWritableTable(const ui64 pathId) const { - return HasTable(pathId); +bool TTablesManager::IsReadyForWrite(const ui64 pathId) const { + return HasPrimaryIndex() && HasTable(pathId); } bool TTablesManager::HasPreset(const ui32 presetId) const { diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 6ec2344214..10a1825919 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -195,7 +195,7 @@ public: ui64 GetMemoryUsage() const; bool HasTable(const ui64 pathId) const; - bool IsWritableTable(const ui64 pathId) const; + bool IsReadyForWrite(const ui64 pathId) const; bool HasPreset(const ui32 presetId) const; void DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db); diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index 8ff6f8988e..34e0594f22 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -38,6 +38,7 @@ SRCS( ) GENERATE_ENUM_SERIALIZATION(columnshard.h) +GENERATE_ENUM_SERIALIZATION(columnshard_impl.h) PEERDIR( library/cpp/actors/core |