aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSemyon <yentsovsemyon@ydb.tech>2024-11-22 14:08:46 +0300
committerGitHub <noreply@github.com>2024-11-22 11:08:46 +0000
commit36d42e52673b3cd4cd2e9ce8e0a7fa0c41f35e16 (patch)
treeefb382cd8a7a35f42fae2f7090e6df8919b370fe
parentb13bd4af6109e62064aef918bbf593cc549cafa6 (diff)
downloadydb-36d42e52673b3cd4cd2e9ce8e0a7fa0c41f35e16.tar.gz
add subcodes to tx-proxy reply code counters (#11862)
-rw-r--r--ydb/core/tx/columnshard/counters/common/owner.h4
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp27
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h236
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_counters.cpp99
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_counters.h136
-rw-r--r--ydb/core/tx/tx_proxy/ya.make2
6 files changed, 307 insertions, 197 deletions
diff --git a/ydb/core/tx/columnshard/counters/common/owner.h b/ydb/core/tx/columnshard/counters/common/owner.h
index 456699c80d..aa2920b63c 100644
--- a/ydb/core/tx/columnshard/counters/common/owner.h
+++ b/ydb/core/tx/columnshard/counters/common/owner.h
@@ -56,6 +56,10 @@ public:
NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const;
TCommonCountersOwner(const TString& module, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals = nullptr);
+
+ TCommonCountersOwner(TCommonCountersOwner&& other)
+ : TCommonCountersOwner(other) {
+ }
};
class TValueGuard {
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
index 281d8bf05b..41a5433a77 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
@@ -1,28 +1,3 @@
#include "upload_rows_common_impl.h"
-
-namespace NKikimr {
-
- TUploadCounters::TUploadCounters()
- : TBase("BulkUpsert")
- {
- RequestsCount = TBase::GetDeriviative("Requests/Count");
- ReplyDuration = TBase::GetHistogram("Replies/Duration", NMonitoring::ExponentialHistogram(15, 2, 10));
-
- RowsCount = TBase::GetDeriviative("Rows/Count");
- PackageSizeRecordsByRecords = TBase::GetHistogram("ByRecords/PackageSize/Records", NMonitoring::ExponentialHistogram(15, 2, 10));
- PackageSizeCountByRecords = TBase::GetHistogram("ByRecords/PackageSize/Count", NMonitoring::ExponentialHistogram(15, 2, 10));
-
- PreparingDuration = TBase::GetHistogram("Preparing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
- WritingDuration = TBase::GetHistogram("Writing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
- CommitDuration = TBase::GetHistogram("Commit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
- PrepareReplyDuration = TBase::GetHistogram("ToReply/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
-
- const google::protobuf::EnumDescriptor* descriptor = ::Ydb::StatusIds::StatusCode_descriptor();
- for (ui32 i = 0; i < (ui32)descriptor->value_count(); ++i) {
- auto vDescription = descriptor->value(i);
- CodesCount.emplace(vDescription->name(), CreateSubGroup("reply_code", vDescription->name()).GetDeriviative("Replies/Count"));
- }
- }
-
-}
+namespace NKikimr {}
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 4d6bb057ad..d92f7a0503 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -15,6 +15,7 @@
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/tx/tx_proxy/upload_rows_counters.h>
#include <ydb/core/formats/arrow/size_calcer.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
@@ -31,93 +32,13 @@
#include <ydb/library/wilson_ids/wilson.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
+#include <boost/container_hash/hash_fwd.hpp>
+#include <util/generic/size_literals.h>
#include <util/string/join.h>
#include <util/string/vector.h>
-#include <util/generic/size_literals.h>
namespace NKikimr {
-class TUploadCounters: public NColumnShard::TCommonCountersOwner {
-private:
- using TBase = NColumnShard::TCommonCountersOwner;
- NMonitoring::TDynamicCounters::TCounterPtr RequestsCount;
- NMonitoring::THistogramPtr ReplyDuration;
-
- NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
- NMonitoring::THistogramPtr PackageSizeRecordsByRecords;
- NMonitoring::THistogramPtr PackageSizeCountByRecords;
-
- NMonitoring::THistogramPtr PreparingDuration;
- NMonitoring::THistogramPtr WritingDuration;
- NMonitoring::THistogramPtr CommitDuration;
- NMonitoring::THistogramPtr PrepareReplyDuration;
-
- THashMap<TString, NMonitoring::TDynamicCounters::TCounterPtr> CodesCount;
-public:
- TUploadCounters();
-
- class TGuard: TMoveOnly {
- private:
- TMonotonic Start = TMonotonic::Now();
- std::optional<TMonotonic> WritingStarted;
- std::optional<TMonotonic> CommitStarted;
- std::optional<TMonotonic> CommitFinished;
- std::optional<TMonotonic> ReplyFinished;
- TUploadCounters& Owner;
- public:
- TGuard(const TMonotonic start, TUploadCounters& owner)
- : Start(start)
- , Owner(owner)
- {
-
- }
-
- void OnWritingStarted() {
- WritingStarted = TMonotonic::Now();
- Owner.PreparingDuration->Collect((*WritingStarted - Start).MilliSeconds());
- }
-
- void OnCommitStarted() {
- CommitStarted = TMonotonic::Now();
- AFL_VERIFY(WritingStarted);
- Owner.WritingDuration->Collect((*CommitStarted - *WritingStarted).MilliSeconds());
- }
-
- void OnCommitFinished() {
- CommitFinished = TMonotonic::Now();
- AFL_VERIFY(CommitStarted);
- Owner.CommitDuration->Collect((*CommitFinished - *CommitStarted).MilliSeconds());
- }
-
- void OnReply(const ::Ydb::StatusIds::StatusCode code) {
- ReplyFinished = TMonotonic::Now();
- if (CommitFinished) {
- Owner.PrepareReplyDuration->Collect((*ReplyFinished - *CommitFinished).MilliSeconds());
- }
- Owner.ReplyDuration->Collect((*ReplyFinished - Start).MilliSeconds());
-
- const TString name = ::Ydb::StatusIds::StatusCode_Name(code);
- auto it = Owner.CodesCount.find(name);
- Y_ABORT_UNLESS(it != Owner.CodesCount.end());
- it->second->Add(1);
- }
- };
-
- TGuard BuildGuard(const TMonotonic start) {
- return TGuard(start, *this);
- }
-
- void OnRequest(const ui64 rowsCount) const {
- RequestsCount->Add(1);
- RowsCount->Add(rowsCount);
- PackageSizeRecordsByRecords->Collect((i64)rowsCount, rowsCount);
- PackageSizeCountByRecords->Collect(rowsCount);
- }
-
- void OnReply(const TDuration dFull, const TDuration dDelta, const ::Ydb::StatusIds::StatusCode code) const;
-};
-
-
using namespace NActors;
struct TUpsertCost {
@@ -219,8 +140,7 @@ private:
NSchemeCache::TSchemeCacheNavigate::EKind TableKind = NSchemeCache::TSchemeCacheNavigate::KindUnknown;
THashSet<TTabletId> ShardRepliesLeft;
THashMap<TTabletId, TShardUploadRetryState> ShardUploadRetryStates;
- Ydb::StatusIds::StatusCode Status;
- TString ErrorMessage;
+ TUploadStatus Status;
std::shared_ptr<NYql::TIssues> Issues = std::make_shared<NYql::TIssues>();
NLongTxService::TLongTxId LongTxId;
TUploadCounters UploadCounters;
@@ -648,8 +568,8 @@ private:
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = ::NKikimr::SplitPath(table);
if (entry.Path.empty()) {
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder()
- << "Bulk upsert. Invalid table path specified: '" << table << "'", ctx);
+ return ReplyWithError(
+ Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Bulk upsert. Invalid table path specified: '" << table << "'", ctx);
}
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
entry.SyncVersion = true;
@@ -666,8 +586,9 @@ private:
void HandleTimeout(const TActorContext& ctx) {
ShardRepliesLeft.clear();
return ReplyWithError(Ydb::StatusIds::TIMEOUT,
- LogPrefix() << "longTx " << LongTxId.ToString()
- << " timed out, duration: " << (TAppData::TimeProvider->Now() - StartTime).Seconds() << " sec", ctx);
+ TStringBuilder() << "longTx " << LongTxId.ToString()
+ << " timed out, duration: " << (TAppData::TimeProvider->Now() - StartTime).Seconds() << " sec",
+ ctx);
}
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
@@ -676,37 +597,22 @@ private:
Y_ABORT_UNLESS(request.ResultSet.size() == 1);
const NSchemeCache::TSchemeCacheNavigate::TEntry& entry = request.ResultSet.front();
- switch (entry.Status) {
- case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok:
- break;
- case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError:
- case NSchemeCache::TSchemeCacheNavigate::EStatus::RedirectLookupError:
- return ReplyWithError(Ydb::StatusIds::UNAVAILABLE, LogPrefix() << "table unavaliable", ctx);
- case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable:
- case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotPath:
- case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete:
- case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "unknown table", ctx);
- case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "unknown database", ctx);
- case NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied:
- return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, LogPrefix() << "access denied", ctx);
- case NSchemeCache::TSchemeCacheNavigate::EStatus::Unknown:
- return ReplyWithError(Ydb::StatusIds::GENERIC_ERROR, LogPrefix() << "unknown error", ctx);
+ if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ return ReplyWithError(entry.Status, ctx);
}
TableKind = entry.Kind;
bool isColumnTable = (TableKind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable);
if (entry.TableId.IsSystemView()) {
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR,
- LogPrefix() << "is not supported. Table is a system view", ctx);
+ return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "is not supported. Table is a system view", ctx);
}
// TODO: fast fail for all tables?
if (isColumnTable && DiskQuotaExceeded) {
- return ReplyWithError(Ydb::StatusIds::UNAVAILABLE,
- LogPrefix() << "cannot perform writes: database is out of disk space", ctx);
+ return ReplyWithError(TUploadStatus(Ydb::StatusIds::UNAVAILABLE, TUploadStatus::ECustomSubcode::DISK_QUOTA_EXCEEDED,
+ "cannot perform writes: database is out of disk space"),
+ ctx);
}
ResolveNamesResult.reset(ev->Get()->Request.Release());
@@ -714,20 +620,20 @@ private:
bool makeYdbSchema = isColumnTable || (GetSourceType() != EUploadSource::ProtoValues);
TString errorMessage;
if (!BuildSchema(ctx, errorMessage, makeYdbSchema)) {
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << errorMessage, ctx);
+ return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, errorMessage, ctx);
}
switch (GetSourceType()) {
case EUploadSource::ProtoValues:
{
if (!ExtractRows(errorMessage)) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << errorMessage, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
}
if (isColumnTable) {
// TUploadRowsRPCPublic::ExtractBatch() - converted JsonDocument, DynNumbers, ...
if (!ExtractBatch(errorMessage)) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << errorMessage, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
}
} else {
FindMinMaxKeys();
@@ -740,12 +646,13 @@ private:
if (isColumnTable) {
// TUploadColumnsRPCPublic::ExtractBatch() - NOT converted JsonDocument, DynNumbers, ...
if (!ExtractBatch(errorMessage)) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << errorMessage, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
}
if (!ColumnsToConvertInplace.empty()) {
auto convertResult = NArrow::InplaceConvertColumns(Batch, ColumnsToConvertInplace);
if (!convertResult.ok()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << "Cannot convert arrow batch inplace:" << convertResult.status().ToString(), ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST,
+ TStringBuilder() << "Cannot convert arrow batch inplace:" << convertResult.status().ToString(), ctx);
}
Batch = *convertResult;
}
@@ -753,18 +660,19 @@ private:
if (!ColumnsToConvert.empty()) {
auto convertResult = NArrow::ConvertColumns(Batch, ColumnsToConvert);
if (!convertResult.ok()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << "Cannot convert arrow batch:" << convertResult.status().ToString(), ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST,
+ TStringBuilder() << "Cannot convert arrow batch:" << convertResult.status().ToString(), ctx);
}
Batch = *convertResult;
}
} else {
// TUploadColumnsRPCPublic::ExtractBatch() - NOT converted JsonDocument, DynNumbers, ...
if (!ExtractBatch(errorMessage)) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << errorMessage, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
}
// Implicit types conversion inside ExtractRows(), in TArrowToYdbConverter
if (!ExtractRows(errorMessage)) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, LogPrefix() << errorMessage, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
}
FindMinMaxKeys();
}
@@ -792,7 +700,7 @@ private:
// Batch is already converted
WriteToColumnTable(ctx);
} else {
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "is not supported", ctx);
+ return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "is not supported", ctx);
}
}
@@ -800,10 +708,10 @@ private:
UploadCountersGuard.OnWritingStarted();
TString accessCheckError;
if (!CheckAccess(accessCheckError)) {
- return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, LogPrefix() << accessCheckError, ctx);
+ return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, accessCheckError, ctx);
}
- LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, LogPrefix() << "starting LongTx");
+ LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "starting LongTx");
// Begin Long Tx for writing a batch into OLAP table
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
@@ -834,24 +742,23 @@ private:
LongTxId = msg->GetLongTxId();
- LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, LogPrefix() << "started LongTx '" << LongTxId.ToString() << "'");
+ LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, TStringBuilder() << "started LongTx '" << LongTxId.ToString() << "'");
auto outputColumns = GetOutputColumns(ctx);
if (!outputColumns.empty()) {
if (!Batch) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST,
- LogPrefix() << "no data or conversion error", ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, "no data or conversion error", ctx);
}
auto batch = NArrow::TColumnOperator().ErrorIfAbsent().Extract(Batch, outputColumns);
if (!batch) {
for (auto& columnName : outputColumns) {
if (Batch->schema()->GetFieldIndex(columnName) < 0) {
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR,
- LogPrefix() << "no expected column '" << columnName << "' in data", ctx);
+ return ReplyWithError(
+ Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "no expected column '" << columnName << "' in data", ctx);
}
}
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "cannot prepare data", ctx);
+ return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "cannot prepare data", ctx);
}
Y_ABORT_UNLESS(batch);
@@ -859,9 +766,8 @@ private:
#if 1 // TODO: check we call ValidateFull() once over pipeline (upsert -> long tx -> shard insert)
auto validationInfo = batch->ValidateFull();
if (!validationInfo.ok()) {
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix()
- << "bad batch in data: " + validationInfo.message()
- << "; order:" + JoinSeq(", ", outputColumns), ctx);
+ return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR,
+ TStringBuilder() << "bad batch in data: " + validationInfo.message() << "; order:" + JoinSeq(", ", outputColumns), ctx);
}
#endif
@@ -875,19 +781,19 @@ private:
Y_ABORT_UNLESS(ResolveNamesResult);
if (ResolveNamesResult->ErrorCount > 0) {
- ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "failed to get table schema", ctx);
+ ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "failed to get table schema", ctx);
return {};
}
auto& entry = ResolveNamesResult->ResultSet[0];
if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) {
- ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "specified path is not a column table", ctx);
+ ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "specified path is not a column table", ctx);
return {};
}
if (!entry.ColumnTableInfo || !entry.ColumnTableInfo->Description.HasSchema()) {
- ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "column table has no schema", ctx);
+ ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "column table has no schema", ctx);
return {};
}
@@ -922,8 +828,7 @@ private:
}
void RollbackLongTx(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST,
- LogPrefix() << "rolling back LongTx '" << LongTxId.ToString() << "'");
+ LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, TStringBuilder() << "rolling back LongTx '" << LongTxId.ToString() << "'");
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvRollbackTx(LongTxId), 0, 0, Span.GetTraceId());
@@ -1056,12 +961,12 @@ private:
ResolvePartitionsResult = msg->Request;
if (ResolvePartitionsResult->ErrorCount > 0) {
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, LogPrefix() << "unknown table", ctx);
+ return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "unknown table", ctx);
}
TString accessCheckError;
if (!CheckAccess(accessCheckError)) {
- return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, LogPrefix() << accessCheckError, ctx);
+ return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, accessCheckError, ctx);
}
auto getShardsString = [] (const TVector<TKeyDesc::TPartitionInfo>& partitions) {
@@ -1194,7 +1099,8 @@ private:
void Handle(TEvents::TEvUndelivered::TPtr &ev, const TActorContext &ctx) {
Y_UNUSED(ev);
- SetError(Ydb::StatusIds::INTERNAL_ERROR, "Internal error: pipe cache is not available, the cluster might not be configured properly");
+ SetError(TUploadStatus(
+ Ydb::StatusIds::INTERNAL_ERROR, "Internal error: pipe cache is not available, the cluster might not be configured properly"));
ShardRepliesLeft.clear();
@@ -1204,7 +1110,8 @@ private:
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx) {
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId());
- SetError(Ydb::StatusIds::UNAVAILABLE, Sprintf("Failed to connect to shard %" PRIu64, ev->Get()->TabletId));
+ SetError(TUploadStatus(Ydb::StatusIds::UNAVAILABLE, TUploadStatus::ECustomSubcode::DELIVERY_PROBLEM,
+ Sprintf("Failed to connect to shard %" PRIu64, ev->Get()->TabletId)));
ShardRepliesLeft.erase(ev->Get()->TabletId);
return ReplyIfDone(ctx);
@@ -1234,28 +1141,10 @@ private:
<< " from shard " << shardResponse.GetTabletID());
if (shardResponse.GetStatus() != NKikimrTxDataShard::TError::OK) {
- ::Ydb::StatusIds::StatusCode status = Ydb::StatusIds::GENERIC_ERROR;
-
- switch (shardResponse.GetStatus()) {
- case NKikimrTxDataShard::TError::WRONG_SHARD_STATE:
- case NKikimrTxDataShard::TError::SHARD_IS_BLOCKED:
+ if (shardResponse.GetStatus() == NKikimrTxDataShard::TError::WRONG_SHARD_STATE ||
+ shardResponse.GetStatus() == NKikimrTxDataShard::TError::SHARD_IS_BLOCKED) {
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId());
- status = Ydb::StatusIds::OVERLOADED;
- break;
- case NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED:
- case NKikimrTxDataShard::TError::OUT_OF_SPACE:
- status = Ydb::StatusIds::UNAVAILABLE;
- break;
- case NKikimrTxDataShard::TError::SCHEME_ERROR:
- status = Ydb::StatusIds::SCHEME_ERROR;
- break;
- case NKikimrTxDataShard::TError::BAD_ARGUMENT:
- status = Ydb::StatusIds::BAD_REQUEST;
- break;
- case NKikimrTxDataShard::TError::EXECUTION_CANCELLED:
- status = Ydb::StatusIds::TIMEOUT;
- break;
- };
+ }
if (auto* state = ShardUploadRetryStates.FindPtr(shardId)) {
if (!shardResponse.HasOverloadSubscribed()) {
@@ -1268,7 +1157,8 @@ private:
}
}
- SetError(status, shardResponse.GetErrorDescription());
+ SetError(
+ TUploadStatus(static_cast<NKikimrTxDataShard::TError::EKind>(shardResponse.GetStatus()), shardResponse.GetErrorDescription()));
}
// Notify the cache that we are done with the pipe
@@ -1292,13 +1182,12 @@ private:
}
}
- void SetError(::Ydb::StatusIds::StatusCode status, const TString& message) {
- if (Status != ::Ydb::StatusIds::SUCCESS) {
+ void SetError(const TUploadStatus& status) {
+ if (Status.GetCode() != ::Ydb::StatusIds::SUCCESS) {
return;
}
Status = status;
- ErrorMessage = message;
}
void ReplyIfDone(const NActors::TActorContext& ctx) {
@@ -1307,27 +1196,32 @@ private:
return;
}
- if (!ErrorMessage.empty()) {
- RaiseIssue(NYql::TIssue(ErrorMessage));
+ if (Status.GetErrorMessage()) {
+ RaiseIssue(NYql::TIssue(LogPrefix() << *Status.GetErrorMessage()));
}
return ReplyWithResult(Status, ctx);
}
- void ReplyWithError(::Ydb::StatusIds::StatusCode status, const TString& message, const TActorContext& ctx) {
- LOG_NOTICE_S(ctx, NKikimrServices::RPC_REQUEST, message);
+ void ReplyWithError(const Ydb::StatusIds::StatusCode code, const TString& errorMessage, const TActorContext& ctx) {
+ return ReplyWithError(TUploadStatus(code, errorMessage), ctx);
+ }
+
+ void ReplyWithError(const TUploadStatus& status, const TActorContext& ctx) {
+ AFL_VERIFY(status.GetCode() != Ydb::StatusIds::SUCCESS);
+ LOG_NOTICE_S(ctx, NKikimrServices::RPC_REQUEST, LogPrefix() << status.GetErrorMessage());
- SetError(status, message);
+ SetError(status);
Y_DEBUG_ABORT_UNLESS(ShardRepliesLeft.empty());
return ReplyIfDone(ctx);
}
- void ReplyWithResult(::Ydb::StatusIds::StatusCode status, const TActorContext& ctx) {
+ void ReplyWithResult(const TUploadStatus& status, const TActorContext& ctx) {
UploadCountersGuard.OnReply(status);
- SendResult(ctx, status);
+ SendResult(ctx, status.GetCode());
- LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, LogPrefix() << "completed with status " << status);
+ LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, TStringBuilder() << "completed with status " << status.GetCode());
if (LongTxId != NLongTxService::TLongTxId()) {
// LongTxId is reset after successful commit
diff --git a/ydb/core/tx/tx_proxy/upload_rows_counters.cpp b/ydb/core/tx/tx_proxy/upload_rows_counters.cpp
new file mode 100644
index 0000000000..22ee7263e3
--- /dev/null
+++ b/ydb/core/tx/tx_proxy/upload_rows_counters.cpp
@@ -0,0 +1,99 @@
+#include "upload_rows_counters.h"
+
+
+namespace NKikimr {
+
+TUploadStatus::TUploadStatus(const NSchemeCache::TSchemeCacheNavigate::EStatus status) {
+ switch (status) {
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok:
+ Code = Ydb::StatusIds::SUCCESS;
+ break;
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::RedirectLookupError:
+ Code = Ydb::StatusIds::UNAVAILABLE;
+ ErrorMessage = "table unavaliable";
+ break;
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotPath:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
+ Code = Ydb::StatusIds::SCHEME_ERROR;
+ ErrorMessage = "unknown table";
+ break;
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
+ Code = Ydb::StatusIds::SCHEME_ERROR;
+ ErrorMessage = "unknown database";
+ break;
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied:
+ Code = Ydb::StatusIds::UNAUTHORIZED;
+ ErrorMessage = "access denied";
+ break;
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::Unknown:
+ Code = Ydb::StatusIds::GENERIC_ERROR;
+ ErrorMessage = "unknown error";
+ break;
+ }
+}
+
+TUploadStatus::TUploadStatus(const NKikimrTxDataShard::TError::EKind status, const TString& errorDescription) {
+ Subcode = NKikimrTxDataShard::TError::EKind_Name(status);
+ if (status != NKikimrTxDataShard::TError::OK) {
+ ErrorMessage = errorDescription;
+ }
+ switch (status) {
+ case NKikimrTxDataShard::TError::OK:
+ Code = Ydb::StatusIds::SUCCESS;
+ break;
+ case NKikimrTxDataShard::TError::WRONG_SHARD_STATE:
+ case NKikimrTxDataShard::TError::SHARD_IS_BLOCKED:
+ Code = Ydb::StatusIds::OVERLOADED;
+ break;
+ case NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED:
+ case NKikimrTxDataShard::TError::OUT_OF_SPACE:
+ Code = Ydb::StatusIds::UNAVAILABLE;
+ break;
+ case NKikimrTxDataShard::TError::SCHEME_ERROR:
+ Code = Ydb::StatusIds::SCHEME_ERROR;
+ break;
+ case NKikimrTxDataShard::TError::BAD_ARGUMENT:
+ Code = Ydb::StatusIds::BAD_REQUEST;
+ break;
+ case NKikimrTxDataShard::TError::EXECUTION_CANCELLED:
+ Code = Ydb::StatusIds::TIMEOUT;
+ break;
+ default:
+ Code = Ydb::StatusIds::GENERIC_ERROR;
+ break;
+ };
+}
+
+NMonitoring::TDynamicCounters::TCounterPtr TUploadCounters::GetCodeCounter(const TUploadStatus& status) {
+ auto it = CodesCount.FindPtr(status);
+ if (it) {
+ return *it;
+ }
+ const auto counters = [this, &status]() {
+ auto groupByCode = CreateSubGroup("reply_code", status.GetCodeString());
+ if (status.GetSubcode()) {
+ return groupByCode.CreateSubGroup("subcode", *status.GetSubcode());
+ }
+ return groupByCode;
+ }();
+ return CodesCount.emplace(status, counters.GetDeriviative("Replies/Count")).first->second;
+}
+
+TUploadCounters::TUploadCounters()
+ : TBase("BulkUpsert") {
+ RequestsCount = TBase::GetDeriviative("Requests/Count");
+ ReplyDuration = TBase::GetHistogram("Replies/Duration", NMonitoring::ExponentialHistogram(15, 2, 10));
+
+ RowsCount = TBase::GetDeriviative("Rows/Count");
+ PackageSizeRecordsByRecords = TBase::GetHistogram("ByRecords/PackageSize/Records", NMonitoring::ExponentialHistogram(15, 2, 10));
+ PackageSizeCountByRecords = TBase::GetHistogram("ByRecords/PackageSize/Count", NMonitoring::ExponentialHistogram(15, 2, 10));
+
+ PreparingDuration = TBase::GetHistogram("Preparing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
+ WritingDuration = TBase::GetHistogram("Writing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
+ CommitDuration = TBase::GetHistogram("Commit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
+ PrepareReplyDuration = TBase::GetHistogram("ToReply/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
+}
+}
diff --git a/ydb/core/tx/tx_proxy/upload_rows_counters.h b/ydb/core/tx/tx_proxy/upload_rows_counters.h
new file mode 100644
index 0000000000..4839e76da1
--- /dev/null
+++ b/ydb/core/tx/tx_proxy/upload_rows_counters.h
@@ -0,0 +1,136 @@
+#pragma once
+
+#include <ydb/library/actors/core/log.h>
+#include <ydb/core/tx/columnshard/counters/common/owner.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+
+#include <ydb/public/api/protos/ydb_status_codes.pb.h>
+#include <ydb/public/api/protos/ydb_value.pb.h>
+#include <ydb/core/protos/tx_datashard.pb.h>
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NKikimr {
+
+class TUploadStatus {
+private:
+ YDB_READONLY_DEF(Ydb::StatusIds::StatusCode, Code);
+ YDB_READONLY_DEF(std::optional<TString>, Subcode);
+ YDB_READONLY_DEF(std::optional<TString>, ErrorMessage);
+
+public:
+ enum class ECustomSubcode {
+ DISK_QUOTA_EXCEEDED,
+ DELIVERY_PROBLEM,
+ };
+
+public:
+ TUploadStatus(const Ydb::StatusIds::StatusCode code)
+ : Code(code) {
+ }
+ TUploadStatus(const Ydb::StatusIds::StatusCode code, const TString& errorMessage)
+ : Code(code)
+ , ErrorMessage(errorMessage) {
+ AFL_VERIFY(code != Ydb::StatusIds::SUCCESS);
+ }
+ TUploadStatus(const Ydb::StatusIds::StatusCode code, const ECustomSubcode& subcode, const TString& errorMessage)
+ : Code(code)
+ , Subcode(ToString(subcode))
+ , ErrorMessage(errorMessage) {
+ AFL_VERIFY(code != Ydb::StatusIds::SUCCESS);
+ }
+ TUploadStatus(const NSchemeCache::TSchemeCacheNavigate::EStatus status);
+ TUploadStatus(const NKikimrTxDataShard::TError::EKind status, const TString& errorDescription);
+
+ struct THasher {
+ ui64 operator()(const TUploadStatus& object) const {
+ return MultiHash(object.GetCode(), object.GetSubcode().has_value(), object.GetSubcode().value_or(""));
+ }
+ };
+
+ bool operator==(const TUploadStatus& other) const {
+ return Code == other.Code && Subcode == other.Subcode;
+ }
+
+ TString GetCodeString() const {
+ return Ydb::StatusIds::StatusCode_Name(Code);
+ }
+};
+
+class TUploadCounters: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr RequestsCount;
+ NMonitoring::THistogramPtr ReplyDuration;
+
+ NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
+ NMonitoring::THistogramPtr PackageSizeRecordsByRecords;
+ NMonitoring::THistogramPtr PackageSizeCountByRecords;
+
+ NMonitoring::THistogramPtr PreparingDuration;
+ NMonitoring::THistogramPtr WritingDuration;
+ NMonitoring::THistogramPtr CommitDuration;
+ NMonitoring::THistogramPtr PrepareReplyDuration;
+
+ THashMap<TUploadStatus, NMonitoring::TDynamicCounters::TCounterPtr, TUploadStatus::THasher> CodesCount;
+
+ NMonitoring::TDynamicCounters::TCounterPtr GetCodeCounter(const TUploadStatus& status);
+
+public:
+ TUploadCounters();
+
+ class TGuard: TMoveOnly {
+ private:
+ TMonotonic Start = TMonotonic::Now();
+ std::optional<TMonotonic> WritingStarted;
+ std::optional<TMonotonic> CommitStarted;
+ std::optional<TMonotonic> CommitFinished;
+ std::optional<TMonotonic> ReplyFinished;
+ TUploadCounters& Owner;
+
+ public:
+ TGuard(const TMonotonic start, TUploadCounters& owner)
+ : Start(start)
+ , Owner(owner) {
+ }
+
+ void OnWritingStarted() {
+ WritingStarted = TMonotonic::Now();
+ Owner.PreparingDuration->Collect((*WritingStarted - Start).MilliSeconds());
+ }
+
+ void OnCommitStarted() {
+ CommitStarted = TMonotonic::Now();
+ AFL_VERIFY(WritingStarted);
+ Owner.WritingDuration->Collect((*CommitStarted - *WritingStarted).MilliSeconds());
+ }
+
+ void OnCommitFinished() {
+ CommitFinished = TMonotonic::Now();
+ AFL_VERIFY(CommitStarted);
+ Owner.CommitDuration->Collect((*CommitFinished - *CommitStarted).MilliSeconds());
+ }
+
+ void OnReply(const TUploadStatus& status) {
+ ReplyFinished = TMonotonic::Now();
+ if (CommitFinished) {
+ Owner.PrepareReplyDuration->Collect((*ReplyFinished - *CommitFinished).MilliSeconds());
+ }
+ Owner.ReplyDuration->Collect((*ReplyFinished - Start).MilliSeconds());
+ Owner.GetCodeCounter(status)->Add(1);
+ }
+ };
+
+ TGuard BuildGuard(const TMonotonic start) {
+ return TGuard(start, *this);
+ }
+
+ void OnRequest(const ui64 rowsCount) const {
+ RequestsCount->Add(1);
+ RowsCount->Add(rowsCount);
+ PackageSizeRecordsByRecords->Collect((i64)rowsCount, rowsCount);
+ PackageSizeCountByRecords->Collect(rowsCount);
+ }
+};
+
+} // namespace NKikimr
diff --git a/ydb/core/tx/tx_proxy/ya.make b/ydb/core/tx/tx_proxy/ya.make
index d592810a65..a5042ac58b 100644
--- a/ydb/core/tx/tx_proxy/ya.make
+++ b/ydb/core/tx/tx_proxy/ya.make
@@ -12,12 +12,14 @@ SRCS(
rpc_long_tx.cpp
snapshotreq.cpp
commitreq.cpp
+ upload_rows_counters.cpp
upload_rows_common_impl.cpp
upload_rows.cpp
global.cpp
)
GENERATE_ENUM_SERIALIZATION(read_table_impl.h)
+GENERATE_ENUM_SERIALIZATION(upload_rows_counters.h)
PEERDIR(
ydb/library/actors/core