aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-07-16 16:33:42 +0300
committernsofya <nsofya@yandex-team.com>2023-07-16 16:33:42 +0300
commite2d90629d3845752f212957a12f10d9fe37f1253 (patch)
treec92ee558e8d5ec71ad87614d8b0c80e05ccaa310
parent2a34c2363c9fe7aaf201df9bb8de005a1a8fbaef (diff)
downloadydb-e2d90629d3845752f212957a12f10d9fe37f1253.tar.gz
KIKIMR-18343: Check overloaded function
IsWritableTable -> IsReadyForWrite Check overloaded function
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt11
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt11
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt11
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt11
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp150
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h15
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp4
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h2
-rw-r--r--ydb/core/tx/columnshard/ya.make1
9 files changed, 138 insertions, 78 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index f11d99ca83..3efff5739f 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -19,6 +19,12 @@ get_built_tool_path(
tools/enum_parser/enum_parser
enum_parser
)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(core-tx-columnshard)
target_compile_options(core-tx-columnshard PRIVATE
@@ -91,3 +97,8 @@ generate_enum_serilization(core-tx-columnshard
INCLUDE_HEADERS
ydb/core/tx/columnshard/columnshard.h
)
+generate_enum_serilization(core-tx-columnshard
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.h
+ INCLUDE_HEADERS
+ ydb/core/tx/columnshard/columnshard_impl.h
+)
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 589bd3ab85..9908953b5d 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -19,6 +19,12 @@ get_built_tool_path(
tools/enum_parser/enum_parser
enum_parser
)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(core-tx-columnshard)
target_compile_options(core-tx-columnshard PRIVATE
@@ -92,3 +98,8 @@ generate_enum_serilization(core-tx-columnshard
INCLUDE_HEADERS
ydb/core/tx/columnshard/columnshard.h
)
+generate_enum_serilization(core-tx-columnshard
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.h
+ INCLUDE_HEADERS
+ ydb/core/tx/columnshard/columnshard_impl.h
+)
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 589bd3ab85..9908953b5d 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -19,6 +19,12 @@ get_built_tool_path(
tools/enum_parser/enum_parser
enum_parser
)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(core-tx-columnshard)
target_compile_options(core-tx-columnshard PRIVATE
@@ -92,3 +98,8 @@ generate_enum_serilization(core-tx-columnshard
INCLUDE_HEADERS
ydb/core/tx/columnshard/columnshard.h
)
+generate_enum_serilization(core-tx-columnshard
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.h
+ INCLUDE_HEADERS
+ ydb/core/tx/columnshard/columnshard_impl.h
+)
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index f11d99ca83..3efff5739f 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -19,6 +19,12 @@ get_built_tool_path(
tools/enum_parser/enum_parser
enum_parser
)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(core-tx-columnshard)
target_compile_options(core-tx-columnshard PRIVATE
@@ -91,3 +97,8 @@ generate_enum_serilization(core-tx-columnshard
INCLUDE_HEADERS
ydb/core/tx/columnshard/columnshard.h
)
+generate_enum_serilization(core-tx-columnshard
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.h
+ INCLUDE_HEADERS
+ ydb/core/tx/columnshard/columnshard_impl.h
+)
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 6e76150f8a..170d90151d 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -59,62 +59,59 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
Y_VERIFY(logoBlobId.IsValid());
bool ok = false;
- if (!Self->TablesManager.HasPrimaryIndex() || !Self->TablesManager.IsWritableTable(tableId)) {
- status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
- } else {
- if (record.HasLongTxId()) {
- Y_VERIFY(metaShard == 0);
- auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId());
- writeId = (ui64)Self->GetLongTxWrite(db, longTxId, record.GetWritePartId());
- }
+ Y_VERIFY(Self->TablesManager.IsReadyForWrite(tableId));
+ if (record.HasLongTxId()) {
+ Y_VERIFY(metaShard == 0);
+ auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId());
+ writeId = (ui64)Self->GetLongTxWrite(db, longTxId, record.GetWritePartId());
+ }
- ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds();
- TInstant time = TInstant::Seconds(writeUnixTime);
-
- // First write wins
- TBlobGroupSelector dsGroupSelector(Self->Info());
- NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
-
- const auto& snapshotSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
- NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time, snapshotSchema->GetSnapshot());
- ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
- if (ok) {
- THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time);
- Self->TryAbortWrites(db, dbTable, std::move(writesToAbort));
-
- // TODO: It leads to write+erase for aborted rows. Abort() inserts rows, EraseAborted() erases them.
- // It's not optimal but correct.
- TBlobManagerDb blobManagerDb(txc.DB);
- auto allAborted = Self->InsertTable->GetAborted(); // copy (src is modified in cycle)
- for (auto& [abortedWriteId, abortedData] : allAborted) {
- Self->InsertTable->EraseAborted(dbTable, abortedData);
- Self->BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb);
- }
+ ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds();
+ TInstant time = TInstant::Seconds(writeUnixTime);
+
+ // First write wins
+ TBlobGroupSelector dsGroupSelector(Self->Info());
+ NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
+
+ const auto& snapshotSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
+ NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time, snapshotSchema->GetSnapshot());
+ ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
+ if (ok) {
+ THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time);
+ Self->TryAbortWrites(db, dbTable, std::move(writesToAbort));
+
+ // TODO: It leads to write+erase for aborted rows. Abort() inserts rows, EraseAborted() erases them.
+ // It's not optimal but correct.
+ TBlobManagerDb blobManagerDb(txc.DB);
+ auto allAborted = Self->InsertTable->GetAborted(); // copy (src is modified in cycle)
+ for (auto& [abortedWriteId, abortedData] : allAborted) {
+ Self->InsertTable->EraseAborted(dbTable, abortedData);
+ Self->BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb);
+ }
- // Put new data into blob cache
- Y_VERIFY(logoBlobId.BlobSize() == data.size());
- NBlobCache::AddRangeToCache(NBlobCache::TBlobRange(logoBlobId, 0, data.size()), data);
+ // Put new data into blob cache
+ Y_VERIFY(logoBlobId.BlobSize() == data.size());
+ NBlobCache::AddRangeToCache(NBlobCache::TBlobRange(logoBlobId, 0, data.size()), data);
- // Put new data into batch cache
- Y_VERIFY(Ev->Get()->WrittenBatch);
- Self->BatchCache.Insert(TWriteId(writeId), logoBlobId, Ev->Get()->WrittenBatch);
+ // Put new data into batch cache
+ Y_VERIFY(Ev->Get()->WrittenBatch);
+ Self->BatchCache.Insert(TWriteId(writeId), logoBlobId, Ev->Get()->WrittenBatch);
- Self->UpdateInsertTableCounters();
+ Self->UpdateInsertTableCounters();
- ui64 blobsWritten = Ev->Get()->BlobBatch.GetBlobCount();
- ui64 bytesWritten = Ev->Get()->BlobBatch.GetTotalSize();
- Self->IncCounter(COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten);
- Self->IncCounter(COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten);
- Self->IncCounter(COUNTER_RAW_BYTES_UPSERTED, meta.GetRawBytes());
- Self->IncCounter(COUNTER_WRITE_SUCCESS);
+ ui64 blobsWritten = Ev->Get()->BlobBatch.GetBlobCount();
+ ui64 bytesWritten = Ev->Get()->BlobBatch.GetTotalSize();
+ Self->IncCounter(COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten);
+ Self->IncCounter(COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten);
+ Self->IncCounter(COUNTER_RAW_BYTES_UPSERTED, meta.GetRawBytes());
+ Self->IncCounter(COUNTER_WRITE_SUCCESS);
- Self->BlobManager->SaveBlobBatch(std::move(Ev->Get()->BlobBatch), blobManagerDb);
- } else {
- LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << writeId << TxSuffix());
+ Self->BlobManager->SaveBlobBatch(std::move(Ev->Get()->BlobBatch), blobManagerDb);
+ } else {
+ LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << writeId << TxSuffix());
- // Return EResultStatus::SUCCESS for dups
- Self->IncCounter(COUNTER_WRITE_DUPLICATE);
- }
+ // Return EResultStatus::SUCCESS for dups
+ Self->IncCounter(COUNTER_WRITE_DUPLICATE);
}
if (status != NKikimrTxColumnShard::EResultStatus::SUCCESS) {
@@ -134,9 +131,15 @@ void TTxWrite::Complete(const TActorContext& ctx) {
ctx.Send(Ev->Get()->GetSource(), Result.release());
}
-void TColumnShard::OverloadWriteFail(const TString& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
+void TColumnShard::OverloadWriteFail(const EOverloadStatus& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
+ Y_VERIFY(overloadReason != EOverloadStatus::None);
+
IncCounter(COUNTER_WRITE_FAIL);
- IncCounter(COUNTER_WRITE_OVERLOAD);
+ if (overloadReason == EOverloadStatus::Disk) {
+ IncCounter(COUNTER_OUT_OF_SPACE);
+ } else {
+ IncCounter(COUNTER_WRITE_OVERLOAD);
+ }
const auto& record = Proto(ev->Get());
const auto& data = record.GetData();
@@ -154,6 +157,24 @@ void TColumnShard::OverloadWriteFail(const TString& overloadReason, TEvColumnSha
ctx.Send(ev->Get()->GetSource(), result.release());
}
+TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId, const ui64 dataSize) const {
+ if (IsAnyChannelYellowStop()) {
+ return EOverloadStatus::Disk;
+ }
+
+ if (InsertTable && InsertTable->IsOverloadedByCommitted(tableId)) {
+ CSCounters.OnOverloadInsertTable(dataSize);
+ return EOverloadStatus::InsertTable;
+ } else if (TablesManager.IsOverloaded(tableId)) {
+ CSCounters.OnOverloadGranule(dataSize);;
+ return EOverloadStatus::Granule;
+ } else if (ShardOverloaded()) {
+ CSCounters.OnOverloadShard(dataSize);
+ return EOverloadStatus::Shard;
+ }
+ return EOverloadStatus::None;
+}
+
// EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite
void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
LastAccessTime = TAppData::TimeProvider->Now();
@@ -168,15 +189,14 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
const TString dedupId = record.GetDedupId();
const auto putStatus = ev->Get()->GetPutStatus();
- bool isWritable = TablesManager.IsWritableTable(tableId);
- bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.HasPrimaryIndex() || !isWritable;
+ bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.IsReadyForWrite(tableId);
bool errorReturned = (putStatus != NKikimrProto::OK) && (putStatus != NKikimrProto::UNKNOWN);
- bool isOutOfSpace = IsAnyChannelYellowStop();
+ auto overloadStatus = CheckOverloaded(tableId, data.size());
if (error || errorReturned) {
LOG_S_NOTICE("Write (fail) " << data.size() << " bytes into pathId " << tableId
<< ", status " << putStatus
- << (TablesManager.HasPrimaryIndex()? "": ", no index") << (isWritable? "": ", ro")
+ << (TablesManager.HasPrimaryIndex()? "": ", no index")
<< " at tablet " << TabletID());
IncCounter(COUNTER_WRITE_FAIL);
@@ -206,24 +226,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
WritesSizeInFlight -= ev->Get()->ResourceUsage.SourceMemorySize;
Y_VERIFY(putStatus == NKikimrProto::OK);
Execute(new TTxWrite(this, ev), ctx);
- } else if (isOutOfSpace) {
- IncCounter(COUNTER_WRITE_FAIL);
- IncCounter(COUNTER_OUT_OF_SPACE);
- LOG_S_ERROR("Write (out of disk space) " << data.size() << " bytes into pathId " << tableId
- << " at tablet " << TabletID());
-
- auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
- TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::OVERLOADED);
- ctx.Send(ev->Get()->GetSource(), result.release());
- } else if (InsertTable && InsertTable->IsOverloadedByCommitted(tableId)) {
- CSCounters.OnOverloadInsertTable(data.size());
- OverloadWriteFail("insert_table", ev, ctx);
- } else if (TablesManager.IsOverloaded(tableId)) {
- CSCounters.OnOverloadGranule(data.size());
- OverloadWriteFail("granule", ev, ctx);
- } else if (ShardOverloaded()) {
- CSCounters.OnOverloadShard(data.size());
- OverloadWriteFail("shard", ev, ctx);
+ } else if (overloadStatus != EOverloadStatus::None) {
+ OverloadWriteFail(overloadStatus, ev, ctx);
} else {
if (record.HasLongTxId()) {
// TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId)
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index de16d09960..cc4ca9af31 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -161,8 +161,6 @@ class TColumnShard
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx);
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
- void OverloadWriteFail(const TString& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx);
-
ITransaction* CreateTxInitSchema();
ITransaction* CreateTxRunGc();
@@ -212,6 +210,19 @@ class TColumnShard
void ActivateTiering(const ui64 pathId, const TString& useTiering);
+public:
+ enum class EOverloadStatus {
+ Shard /* "shard" */,
+ Granule /* "granule" */,
+ InsertTable /* "insert_table" */,
+ Disk /* "disk" */,
+ None
+ };
+
+private:
+ void OverloadWriteFail(const EOverloadStatus& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx);
+ EOverloadStatus CheckOverloaded(const ui64 tableId, const ui64 dataSize) const;
+
protected:
STFUNC(StateInit) {
TRACE_EVENT(NKikimrServices::TX_COLUMNSHARD);
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index f462ecbfad..f255c87ec3 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -158,8 +158,8 @@ bool TTablesManager::HasTable(const ui64 pathId) const {
return true;
}
-bool TTablesManager::IsWritableTable(const ui64 pathId) const {
- return HasTable(pathId);
+bool TTablesManager::IsReadyForWrite(const ui64 pathId) const {
+ return HasPrimaryIndex() && HasTable(pathId);
}
bool TTablesManager::HasPreset(const ui32 presetId) const {
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
index 6ec2344214..10a1825919 100644
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -195,7 +195,7 @@ public:
ui64 GetMemoryUsage() const;
bool HasTable(const ui64 pathId) const;
- bool IsWritableTable(const ui64 pathId) const;
+ bool IsReadyForWrite(const ui64 pathId) const;
bool HasPreset(const ui32 presetId) const;
void DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db);
diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make
index 8ff6f8988e..34e0594f22 100644
--- a/ydb/core/tx/columnshard/ya.make
+++ b/ydb/core/tx/columnshard/ya.make
@@ -38,6 +38,7 @@ SRCS(
)
GENERATE_ENUM_SERIALIZATION(columnshard.h)
+GENERATE_ENUM_SERIALIZATION(columnshard_impl.h)
PEERDIR(
library/cpp/actors/core