diff options
author | Semyon <yentsovsemyon@ydb.tech> | 2024-11-22 14:08:46 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-22 11:08:46 +0000 |
commit | 36d42e52673b3cd4cd2e9ce8e0a7fa0c41f35e16 (patch) | |
tree | efb382cd8a7a35f42fae2f7090e6df8919b370fe | |
parent | b13bd4af6109e62064aef918bbf593cc549cafa6 (diff) | |
download | ydb-36d42e52673b3cd4cd2e9ce8e0a7fa0c41f35e16.tar.gz |
add subcodes to tx-proxy reply code counters (#11862)
-rw-r--r-- | ydb/core/tx/columnshard/counters/common/owner.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 236 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_counters.cpp | 99 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_counters.h | 136 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/ya.make | 2 |
6 files changed, 307 insertions, 197 deletions
diff --git a/ydb/core/tx/columnshard/counters/common/owner.h b/ydb/core/tx/columnshard/counters/common/owner.h index 456699c80d..aa2920b63c 100644 --- a/ydb/core/tx/columnshard/counters/common/owner.h +++ b/ydb/core/tx/columnshard/counters/common/owner.h @@ -56,6 +56,10 @@ public: NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const; TCommonCountersOwner(const TString& module, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals = nullptr); + + TCommonCountersOwner(TCommonCountersOwner&& other) + : TCommonCountersOwner(other) { + } }; class TValueGuard { diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp index 281d8bf05b..41a5433a77 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp @@ -1,28 +1,3 @@ #include "upload_rows_common_impl.h" - -namespace NKikimr { - - TUploadCounters::TUploadCounters() - : TBase("BulkUpsert") - { - RequestsCount = TBase::GetDeriviative("Requests/Count"); - ReplyDuration = TBase::GetHistogram("Replies/Duration", NMonitoring::ExponentialHistogram(15, 2, 10)); - - RowsCount = TBase::GetDeriviative("Rows/Count"); - PackageSizeRecordsByRecords = TBase::GetHistogram("ByRecords/PackageSize/Records", NMonitoring::ExponentialHistogram(15, 2, 10)); - PackageSizeCountByRecords = TBase::GetHistogram("ByRecords/PackageSize/Count", NMonitoring::ExponentialHistogram(15, 2, 10)); - - PreparingDuration = TBase::GetHistogram("Preparing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); - WritingDuration = TBase::GetHistogram("Writing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); - CommitDuration = TBase::GetHistogram("Commit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); - PrepareReplyDuration = TBase::GetHistogram("ToReply/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); - - const google::protobuf::EnumDescriptor* descriptor = ::Ydb::StatusIds::StatusCode_descriptor(); - for (ui32 i = 0; i < (ui32)descriptor->value_count(); ++i) { - auto vDescription = descriptor->value(i); - CodesCount.emplace(vDescription->name(), CreateSubGroup("reply_code", vDescription->name()).GetDeriviative("Replies/Count")); - } - } - -} +namespace NKikimr {} diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 4d6bb057ad..d92f7a0503 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -15,6 +15,7 @@ #include <ydb/core/scheme/scheme_types_proto.h> #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/core/tx/tx_proxy/upload_rows_counters.h> #include <ydb/core/formats/arrow/size_calcer.h> #include <library/cpp/monlib/dynamic_counters/counters.h> @@ -31,93 +32,13 @@ #include <ydb/library/wilson_ids/wilson.h> #include <ydb/library/ydb_issue/issue_helpers.h> +#include <boost/container_hash/hash_fwd.hpp> +#include <util/generic/size_literals.h> #include <util/string/join.h> #include <util/string/vector.h> -#include <util/generic/size_literals.h> namespace NKikimr { -class TUploadCounters: public NColumnShard::TCommonCountersOwner { -private: - using TBase = NColumnShard::TCommonCountersOwner; - NMonitoring::TDynamicCounters::TCounterPtr RequestsCount; - NMonitoring::THistogramPtr ReplyDuration; - - NMonitoring::TDynamicCounters::TCounterPtr RowsCount; - NMonitoring::THistogramPtr PackageSizeRecordsByRecords; - NMonitoring::THistogramPtr PackageSizeCountByRecords; - - NMonitoring::THistogramPtr PreparingDuration; - NMonitoring::THistogramPtr WritingDuration; - NMonitoring::THistogramPtr CommitDuration; - NMonitoring::THistogramPtr PrepareReplyDuration; - - THashMap<TString, NMonitoring::TDynamicCounters::TCounterPtr> CodesCount; -public: - TUploadCounters(); - - class TGuard: TMoveOnly { - private: - TMonotonic Start = TMonotonic::Now(); - std::optional<TMonotonic> WritingStarted; - std::optional<TMonotonic> CommitStarted; - std::optional<TMonotonic> CommitFinished; - std::optional<TMonotonic> ReplyFinished; - TUploadCounters& Owner; - public: - TGuard(const TMonotonic start, TUploadCounters& owner) - : Start(start) - , Owner(owner) - { - - } - - void OnWritingStarted() { - WritingStarted = TMonotonic::Now(); - Owner.PreparingDuration->Collect((*WritingStarted - Start).MilliSeconds()); - } - - void OnCommitStarted() { - CommitStarted = TMonotonic::Now(); - AFL_VERIFY(WritingStarted); - Owner.WritingDuration->Collect((*CommitStarted - *WritingStarted).MilliSeconds()); - } - - void OnCommitFinished() { - CommitFinished = TMonotonic::Now(); - AFL_VERIFY(CommitStarted); - Owner.CommitDuration->Collect((*CommitFinished - *CommitStarted).MilliSeconds()); - } - - void OnReply(const ::Ydb::StatusIds::StatusCode code) { - ReplyFinished = TMonotonic::Now(); - if (CommitFinished) { - Owner.PrepareReplyDuration->Collect((*ReplyFinished - *CommitFinished).MilliSeconds()); - } - Owner.ReplyDuration->Collect((*ReplyFinished - Start).MilliSeconds()); - - const TString name = ::Ydb::StatusIds::StatusCode_Name(code); - auto it = Owner.CodesCount.find(name); - Y_ABORT_UNLESS(it != Owner.CodesCount.end()); - it->second->Add(1); - } - }; - - TGuard BuildGuard(const TMonotonic start) { - return TGuard(start, *this); - } - - void OnRequest(const ui64 rowsCount) const { - RequestsCount->Add(1); - RowsCount->Add(rowsCount); - PackageSizeRecordsByRecords->Collect((i64)rowsCount, rowsCount); - PackageSizeCountByRecords->Collect(rowsCount); - } - - void OnReply(const TDuration dFull, const TDuration dDelta, const ::Ydb::StatusIds::StatusCode code) const; -}; - - using namespace NActors; struct TUpsertCost { @@ -219,8 +140,7 @@ private: NSchemeCache::TSchemeCacheNavigate::EKind TableKind = NSchemeCache::TSchemeCacheNavigate::KindUnknown; THashSet<TTabletId> ShardRepliesLeft; THashMap<TTabletId, TShardUploadRetryState> ShardUploadRetryStates; - Ydb::StatusIds::StatusCode Status; - TString ErrorMessage; + TUploadStatus Status; std::shared_ptr<NYql::TIssues> Issues = std::make_shared<NYql::TIssues>(); NLongTxService::TLongTxId LongTxId; TUploadCounters UploadCounters; @@ -648,8 +568,8 @@ private: NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.Path = ::NKikimr::SplitPath(table); if (entry.Path.empty()) { - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() - << "Bulk upsert. Invalid table path specified: '" << table << "'", ctx); + return ReplyWithError( + Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Bulk upsert. Invalid table path specified: '" << table << "'", ctx); } entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; entry.SyncVersion = true; @@ -666,8 +586,9 @@ private: void HandleTimeout(const TActorContext& ctx) { ShardRepliesLeft.clear(); return ReplyWithError(Ydb::StatusIds::TIMEOUT, - LogPrefix() << "longTx " << LongTxId.ToString() - << " timed out, duration: " << (TAppData::TimeProvider->Now() - StartTime).Seconds() << " sec", ctx); + TStringBuilder() << "longTx " << LongTxId.ToString() + << " timed out, duration: " << (TAppData::TimeProvider->Now() - StartTime).Seconds() << " sec", + ctx); } void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { @@ -676,37 +597,22 @@ private: Y_ABORT_UNLESS(request.ResultSet.size() == 1); const NSchemeCache::TSchemeCacheNavigate::TEntry& entry = request.ResultSet.front(); - switch (entry.Status) { - case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: - break; - case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError: - case NSchemeCache::TSchemeCacheNavigate::EStatus::RedirectLookupError: - return ReplyWithError(Ydb::StatusIds::UNAVAILABLE, LogPrefix() << "table unavaliable", ctx); - case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable: - case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotPath: - case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete: - case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown: - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "unknown table", ctx); - case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown: - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "unknown database", ctx); - case NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied: - return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, LogPrefix() << "access denied", ctx); - case NSchemeCache::TSchemeCacheNavigate::EStatus::Unknown: - return ReplyWithError(Ydb::StatusIds::GENERIC_ERROR, LogPrefix() << "unknown error", ctx); + if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + return ReplyWithError(entry.Status, ctx); } TableKind = entry.Kind; bool isColumnTable = (TableKind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable); if (entry.TableId.IsSystemView()) { - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, - LogPrefix() << "is not supported. Table is a system view", ctx); + return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "is not supported. Table is a system view", ctx); } // TODO: fast fail for all tables? if (isColumnTable && DiskQuotaExceeded) { - return ReplyWithError(Ydb::StatusIds::UNAVAILABLE, - LogPrefix() << "cannot perform writes: database is out of disk space", ctx); + return ReplyWithError(TUploadStatus(Ydb::StatusIds::UNAVAILABLE, TUploadStatus::ECustomSubcode::DISK_QUOTA_EXCEEDED, + "cannot perform writes: database is out of disk space"), + ctx); } ResolveNamesResult.reset(ev->Get()->Request.Release()); @@ -714,20 +620,20 @@ private: bool makeYdbSchema = isColumnTable || (GetSourceType() != EUploadSource::ProtoValues); TString errorMessage; if (!BuildSchema(ctx, errorMessage, makeYdbSchema)) { - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << errorMessage, ctx); + return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, errorMessage, ctx); } switch (GetSourceType()) { case EUploadSource::ProtoValues: { if (!ExtractRows(errorMessage)) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << errorMessage, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); } if (isColumnTable) { // TUploadRowsRPCPublic::ExtractBatch() - converted JsonDocument, DynNumbers, ... if (!ExtractBatch(errorMessage)) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << errorMessage, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); } } else { FindMinMaxKeys(); @@ -740,12 +646,13 @@ private: if (isColumnTable) { // TUploadColumnsRPCPublic::ExtractBatch() - NOT converted JsonDocument, DynNumbers, ... if (!ExtractBatch(errorMessage)) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << errorMessage, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); } if (!ColumnsToConvertInplace.empty()) { auto convertResult = NArrow::InplaceConvertColumns(Batch, ColumnsToConvertInplace); if (!convertResult.ok()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << "Cannot convert arrow batch inplace:" << convertResult.status().ToString(), ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() << "Cannot convert arrow batch inplace:" << convertResult.status().ToString(), ctx); } Batch = *convertResult; } @@ -753,18 +660,19 @@ private: if (!ColumnsToConvert.empty()) { auto convertResult = NArrow::ConvertColumns(Batch, ColumnsToConvert); if (!convertResult.ok()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << "Cannot convert arrow batch:" << convertResult.status().ToString(), ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() << "Cannot convert arrow batch:" << convertResult.status().ToString(), ctx); } Batch = *convertResult; } } else { // TUploadColumnsRPCPublic::ExtractBatch() - NOT converted JsonDocument, DynNumbers, ... if (!ExtractBatch(errorMessage)) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << errorMessage, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); } // Implicit types conversion inside ExtractRows(), in TArrowToYdbConverter if (!ExtractRows(errorMessage)) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << errorMessage, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); } FindMinMaxKeys(); } @@ -792,7 +700,7 @@ private: // Batch is already converted WriteToColumnTable(ctx); } else { - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "is not supported", ctx); + return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "is not supported", ctx); } } @@ -800,10 +708,10 @@ private: UploadCountersGuard.OnWritingStarted(); TString accessCheckError; if (!CheckAccess(accessCheckError)) { - return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, LogPrefix() << accessCheckError, ctx); + return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, accessCheckError, ctx); } - LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, LogPrefix() << "starting LongTx"); + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "starting LongTx"); // Begin Long Tx for writing a batch into OLAP table TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()); @@ -834,24 +742,23 @@ private: LongTxId = msg->GetLongTxId(); - LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, LogPrefix() << "started LongTx '" << LongTxId.ToString() << "'"); + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, TStringBuilder() << "started LongTx '" << LongTxId.ToString() << "'"); auto outputColumns = GetOutputColumns(ctx); if (!outputColumns.empty()) { if (!Batch) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, - LogPrefix() << "no data or conversion error", ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, "no data or conversion error", ctx); } auto batch = NArrow::TColumnOperator().ErrorIfAbsent().Extract(Batch, outputColumns); if (!batch) { for (auto& columnName : outputColumns) { if (Batch->schema()->GetFieldIndex(columnName) < 0) { - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, - LogPrefix() << "no expected column '" << columnName << "' in data", ctx); + return ReplyWithError( + Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "no expected column '" << columnName << "' in data", ctx); } } - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "cannot prepare data", ctx); + return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "cannot prepare data", ctx); } Y_ABORT_UNLESS(batch); @@ -859,9 +766,8 @@ private: #if 1 // TODO: check we call ValidateFull() once over pipeline (upsert -> long tx -> shard insert) auto validationInfo = batch->ValidateFull(); if (!validationInfo.ok()) { - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() - << "bad batch in data: " + validationInfo.message() - << "; order:" + JoinSeq(", ", outputColumns), ctx); + return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, + TStringBuilder() << "bad batch in data: " + validationInfo.message() << "; order:" + JoinSeq(", ", outputColumns), ctx); } #endif @@ -875,19 +781,19 @@ private: Y_ABORT_UNLESS(ResolveNamesResult); if (ResolveNamesResult->ErrorCount > 0) { - ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "failed to get table schema", ctx); + ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "failed to get table schema", ctx); return {}; } auto& entry = ResolveNamesResult->ResultSet[0]; if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) { - ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "specified path is not a column table", ctx); + ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "specified path is not a column table", ctx); return {}; } if (!entry.ColumnTableInfo || !entry.ColumnTableInfo->Description.HasSchema()) { - ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "column table has no schema", ctx); + ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "column table has no schema", ctx); return {}; } @@ -922,8 +828,7 @@ private: } void RollbackLongTx(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, - LogPrefix() << "rolling back LongTx '" << LongTxId.ToString() << "'"); + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, TStringBuilder() << "rolling back LongTx '" << LongTxId.ToString() << "'"); TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()); ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvRollbackTx(LongTxId), 0, 0, Span.GetTraceId()); @@ -1056,12 +961,12 @@ private: ResolvePartitionsResult = msg->Request; if (ResolvePartitionsResult->ErrorCount > 0) { - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "unknown table", ctx); + return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "unknown table", ctx); } TString accessCheckError; if (!CheckAccess(accessCheckError)) { - return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, LogPrefix() << accessCheckError, ctx); + return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, accessCheckError, ctx); } auto getShardsString = [] (const TVector<TKeyDesc::TPartitionInfo>& partitions) { @@ -1194,7 +1099,8 @@ private: void Handle(TEvents::TEvUndelivered::TPtr &ev, const TActorContext &ctx) { Y_UNUSED(ev); - SetError(Ydb::StatusIds::INTERNAL_ERROR, "Internal error: pipe cache is not available, the cluster might not be configured properly"); + SetError(TUploadStatus( + Ydb::StatusIds::INTERNAL_ERROR, "Internal error: pipe cache is not available, the cluster might not be configured properly")); ShardRepliesLeft.clear(); @@ -1204,7 +1110,8 @@ private: void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx) { ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId()); - SetError(Ydb::StatusIds::UNAVAILABLE, Sprintf("Failed to connect to shard %" PRIu64, ev->Get()->TabletId)); + SetError(TUploadStatus(Ydb::StatusIds::UNAVAILABLE, TUploadStatus::ECustomSubcode::DELIVERY_PROBLEM, + Sprintf("Failed to connect to shard %" PRIu64, ev->Get()->TabletId))); ShardRepliesLeft.erase(ev->Get()->TabletId); return ReplyIfDone(ctx); @@ -1234,28 +1141,10 @@ private: << " from shard " << shardResponse.GetTabletID()); if (shardResponse.GetStatus() != NKikimrTxDataShard::TError::OK) { - ::Ydb::StatusIds::StatusCode status = Ydb::StatusIds::GENERIC_ERROR; - - switch (shardResponse.GetStatus()) { - case NKikimrTxDataShard::TError::WRONG_SHARD_STATE: - case NKikimrTxDataShard::TError::SHARD_IS_BLOCKED: + if (shardResponse.GetStatus() == NKikimrTxDataShard::TError::WRONG_SHARD_STATE || + shardResponse.GetStatus() == NKikimrTxDataShard::TError::SHARD_IS_BLOCKED) { ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId()); - status = Ydb::StatusIds::OVERLOADED; - break; - case NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED: - case NKikimrTxDataShard::TError::OUT_OF_SPACE: - status = Ydb::StatusIds::UNAVAILABLE; - break; - case NKikimrTxDataShard::TError::SCHEME_ERROR: - status = Ydb::StatusIds::SCHEME_ERROR; - break; - case NKikimrTxDataShard::TError::BAD_ARGUMENT: - status = Ydb::StatusIds::BAD_REQUEST; - break; - case NKikimrTxDataShard::TError::EXECUTION_CANCELLED: - status = Ydb::StatusIds::TIMEOUT; - break; - }; + } if (auto* state = ShardUploadRetryStates.FindPtr(shardId)) { if (!shardResponse.HasOverloadSubscribed()) { @@ -1268,7 +1157,8 @@ private: } } - SetError(status, shardResponse.GetErrorDescription()); + SetError( + TUploadStatus(static_cast<NKikimrTxDataShard::TError::EKind>(shardResponse.GetStatus()), shardResponse.GetErrorDescription())); } // Notify the cache that we are done with the pipe @@ -1292,13 +1182,12 @@ private: } } - void SetError(::Ydb::StatusIds::StatusCode status, const TString& message) { - if (Status != ::Ydb::StatusIds::SUCCESS) { + void SetError(const TUploadStatus& status) { + if (Status.GetCode() != ::Ydb::StatusIds::SUCCESS) { return; } Status = status; - ErrorMessage = message; } void ReplyIfDone(const NActors::TActorContext& ctx) { @@ -1307,27 +1196,32 @@ private: return; } - if (!ErrorMessage.empty()) { - RaiseIssue(NYql::TIssue(ErrorMessage)); + if (Status.GetErrorMessage()) { + RaiseIssue(NYql::TIssue(LogPrefix() << *Status.GetErrorMessage())); } return ReplyWithResult(Status, ctx); } - void ReplyWithError(::Ydb::StatusIds::StatusCode status, const TString& message, const TActorContext& ctx) { - LOG_NOTICE_S(ctx, NKikimrServices::RPC_REQUEST, message); + void ReplyWithError(const Ydb::StatusIds::StatusCode code, const TString& errorMessage, const TActorContext& ctx) { + return ReplyWithError(TUploadStatus(code, errorMessage), ctx); + } + + void ReplyWithError(const TUploadStatus& status, const TActorContext& ctx) { + AFL_VERIFY(status.GetCode() != Ydb::StatusIds::SUCCESS); + LOG_NOTICE_S(ctx, NKikimrServices::RPC_REQUEST, LogPrefix() << status.GetErrorMessage()); - SetError(status, message); + SetError(status); Y_DEBUG_ABORT_UNLESS(ShardRepliesLeft.empty()); return ReplyIfDone(ctx); } - void ReplyWithResult(::Ydb::StatusIds::StatusCode status, const TActorContext& ctx) { + void ReplyWithResult(const TUploadStatus& status, const TActorContext& ctx) { UploadCountersGuard.OnReply(status); - SendResult(ctx, status); + SendResult(ctx, status.GetCode()); - LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, LogPrefix() << "completed with status " << status); + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, TStringBuilder() << "completed with status " << status.GetCode()); if (LongTxId != NLongTxService::TLongTxId()) { // LongTxId is reset after successful commit diff --git a/ydb/core/tx/tx_proxy/upload_rows_counters.cpp b/ydb/core/tx/tx_proxy/upload_rows_counters.cpp new file mode 100644 index 0000000000..22ee7263e3 --- /dev/null +++ b/ydb/core/tx/tx_proxy/upload_rows_counters.cpp @@ -0,0 +1,99 @@ +#include "upload_rows_counters.h" + + +namespace NKikimr { + +TUploadStatus::TUploadStatus(const NSchemeCache::TSchemeCacheNavigate::EStatus status) { + switch (status) { + case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: + Code = Ydb::StatusIds::SUCCESS; + break; + case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError: + case NSchemeCache::TSchemeCacheNavigate::EStatus::RedirectLookupError: + Code = Ydb::StatusIds::UNAVAILABLE; + ErrorMessage = "table unavaliable"; + break; + case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable: + case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotPath: + case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete: + case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown: + Code = Ydb::StatusIds::SCHEME_ERROR; + ErrorMessage = "unknown table"; + break; + case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown: + Code = Ydb::StatusIds::SCHEME_ERROR; + ErrorMessage = "unknown database"; + break; + case NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied: + Code = Ydb::StatusIds::UNAUTHORIZED; + ErrorMessage = "access denied"; + break; + case NSchemeCache::TSchemeCacheNavigate::EStatus::Unknown: + Code = Ydb::StatusIds::GENERIC_ERROR; + ErrorMessage = "unknown error"; + break; + } +} + +TUploadStatus::TUploadStatus(const NKikimrTxDataShard::TError::EKind status, const TString& errorDescription) { + Subcode = NKikimrTxDataShard::TError::EKind_Name(status); + if (status != NKikimrTxDataShard::TError::OK) { + ErrorMessage = errorDescription; + } + switch (status) { + case NKikimrTxDataShard::TError::OK: + Code = Ydb::StatusIds::SUCCESS; + break; + case NKikimrTxDataShard::TError::WRONG_SHARD_STATE: + case NKikimrTxDataShard::TError::SHARD_IS_BLOCKED: + Code = Ydb::StatusIds::OVERLOADED; + break; + case NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED: + case NKikimrTxDataShard::TError::OUT_OF_SPACE: + Code = Ydb::StatusIds::UNAVAILABLE; + break; + case NKikimrTxDataShard::TError::SCHEME_ERROR: + Code = Ydb::StatusIds::SCHEME_ERROR; + break; + case NKikimrTxDataShard::TError::BAD_ARGUMENT: + Code = Ydb::StatusIds::BAD_REQUEST; + break; + case NKikimrTxDataShard::TError::EXECUTION_CANCELLED: + Code = Ydb::StatusIds::TIMEOUT; + break; + default: + Code = Ydb::StatusIds::GENERIC_ERROR; + break; + }; +} + +NMonitoring::TDynamicCounters::TCounterPtr TUploadCounters::GetCodeCounter(const TUploadStatus& status) { + auto it = CodesCount.FindPtr(status); + if (it) { + return *it; + } + const auto counters = [this, &status]() { + auto groupByCode = CreateSubGroup("reply_code", status.GetCodeString()); + if (status.GetSubcode()) { + return groupByCode.CreateSubGroup("subcode", *status.GetSubcode()); + } + return groupByCode; + }(); + return CodesCount.emplace(status, counters.GetDeriviative("Replies/Count")).first->second; +} + +TUploadCounters::TUploadCounters() + : TBase("BulkUpsert") { + RequestsCount = TBase::GetDeriviative("Requests/Count"); + ReplyDuration = TBase::GetHistogram("Replies/Duration", NMonitoring::ExponentialHistogram(15, 2, 10)); + + RowsCount = TBase::GetDeriviative("Rows/Count"); + PackageSizeRecordsByRecords = TBase::GetHistogram("ByRecords/PackageSize/Records", NMonitoring::ExponentialHistogram(15, 2, 10)); + PackageSizeCountByRecords = TBase::GetHistogram("ByRecords/PackageSize/Count", NMonitoring::ExponentialHistogram(15, 2, 10)); + + PreparingDuration = TBase::GetHistogram("Preparing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); + WritingDuration = TBase::GetHistogram("Writing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); + CommitDuration = TBase::GetHistogram("Commit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); + PrepareReplyDuration = TBase::GetHistogram("ToReply/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); +} +} diff --git a/ydb/core/tx/tx_proxy/upload_rows_counters.h b/ydb/core/tx/tx_proxy/upload_rows_counters.h new file mode 100644 index 0000000000..4839e76da1 --- /dev/null +++ b/ydb/core/tx/tx_proxy/upload_rows_counters.h @@ -0,0 +1,136 @@ +#pragma once + +#include <ydb/library/actors/core/log.h> +#include <ydb/core/tx/columnshard/counters/common/owner.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> + +#include <ydb/public/api/protos/ydb_status_codes.pb.h> +#include <ydb/public/api/protos/ydb_value.pb.h> +#include <ydb/core/protos/tx_datashard.pb.h> + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NKikimr { + +class TUploadStatus { +private: + YDB_READONLY_DEF(Ydb::StatusIds::StatusCode, Code); + YDB_READONLY_DEF(std::optional<TString>, Subcode); + YDB_READONLY_DEF(std::optional<TString>, ErrorMessage); + +public: + enum class ECustomSubcode { + DISK_QUOTA_EXCEEDED, + DELIVERY_PROBLEM, + }; + +public: + TUploadStatus(const Ydb::StatusIds::StatusCode code) + : Code(code) { + } + TUploadStatus(const Ydb::StatusIds::StatusCode code, const TString& errorMessage) + : Code(code) + , ErrorMessage(errorMessage) { + AFL_VERIFY(code != Ydb::StatusIds::SUCCESS); + } + TUploadStatus(const Ydb::StatusIds::StatusCode code, const ECustomSubcode& subcode, const TString& errorMessage) + : Code(code) + , Subcode(ToString(subcode)) + , ErrorMessage(errorMessage) { + AFL_VERIFY(code != Ydb::StatusIds::SUCCESS); + } + TUploadStatus(const NSchemeCache::TSchemeCacheNavigate::EStatus status); + TUploadStatus(const NKikimrTxDataShard::TError::EKind status, const TString& errorDescription); + + struct THasher { + ui64 operator()(const TUploadStatus& object) const { + return MultiHash(object.GetCode(), object.GetSubcode().has_value(), object.GetSubcode().value_or("")); + } + }; + + bool operator==(const TUploadStatus& other) const { + return Code == other.Code && Subcode == other.Subcode; + } + + TString GetCodeString() const { + return Ydb::StatusIds::StatusCode_Name(Code); + } +}; + +class TUploadCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr RequestsCount; + NMonitoring::THistogramPtr ReplyDuration; + + NMonitoring::TDynamicCounters::TCounterPtr RowsCount; + NMonitoring::THistogramPtr PackageSizeRecordsByRecords; + NMonitoring::THistogramPtr PackageSizeCountByRecords; + + NMonitoring::THistogramPtr PreparingDuration; + NMonitoring::THistogramPtr WritingDuration; + NMonitoring::THistogramPtr CommitDuration; + NMonitoring::THistogramPtr PrepareReplyDuration; + + THashMap<TUploadStatus, NMonitoring::TDynamicCounters::TCounterPtr, TUploadStatus::THasher> CodesCount; + + NMonitoring::TDynamicCounters::TCounterPtr GetCodeCounter(const TUploadStatus& status); + +public: + TUploadCounters(); + + class TGuard: TMoveOnly { + private: + TMonotonic Start = TMonotonic::Now(); + std::optional<TMonotonic> WritingStarted; + std::optional<TMonotonic> CommitStarted; + std::optional<TMonotonic> CommitFinished; + std::optional<TMonotonic> ReplyFinished; + TUploadCounters& Owner; + + public: + TGuard(const TMonotonic start, TUploadCounters& owner) + : Start(start) + , Owner(owner) { + } + + void OnWritingStarted() { + WritingStarted = TMonotonic::Now(); + Owner.PreparingDuration->Collect((*WritingStarted - Start).MilliSeconds()); + } + + void OnCommitStarted() { + CommitStarted = TMonotonic::Now(); + AFL_VERIFY(WritingStarted); + Owner.WritingDuration->Collect((*CommitStarted - *WritingStarted).MilliSeconds()); + } + + void OnCommitFinished() { + CommitFinished = TMonotonic::Now(); + AFL_VERIFY(CommitStarted); + Owner.CommitDuration->Collect((*CommitFinished - *CommitStarted).MilliSeconds()); + } + + void OnReply(const TUploadStatus& status) { + ReplyFinished = TMonotonic::Now(); + if (CommitFinished) { + Owner.PrepareReplyDuration->Collect((*ReplyFinished - *CommitFinished).MilliSeconds()); + } + Owner.ReplyDuration->Collect((*ReplyFinished - Start).MilliSeconds()); + Owner.GetCodeCounter(status)->Add(1); + } + }; + + TGuard BuildGuard(const TMonotonic start) { + return TGuard(start, *this); + } + + void OnRequest(const ui64 rowsCount) const { + RequestsCount->Add(1); + RowsCount->Add(rowsCount); + PackageSizeRecordsByRecords->Collect((i64)rowsCount, rowsCount); + PackageSizeCountByRecords->Collect(rowsCount); + } +}; + +} // namespace NKikimr diff --git a/ydb/core/tx/tx_proxy/ya.make b/ydb/core/tx/tx_proxy/ya.make index d592810a65..a5042ac58b 100644 --- a/ydb/core/tx/tx_proxy/ya.make +++ b/ydb/core/tx/tx_proxy/ya.make @@ -12,12 +12,14 @@ SRCS( rpc_long_tx.cpp snapshotreq.cpp commitreq.cpp + upload_rows_counters.cpp upload_rows_common_impl.cpp upload_rows.cpp global.cpp ) GENERATE_ENUM_SERIALIZATION(read_table_impl.h) +GENERATE_ENUM_SERIALIZATION(upload_rows_counters.h) PEERDIR( ydb/library/actors/core |