aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@ydb.tech>2023-10-02 18:15:55 +0300
committernsofya <nsofya@ydb.tech>2023-10-02 19:16:11 +0300
commit3244b37d549dcd72f6f588c1aa01406f33ad1dc2 (patch)
tree3feeebc8a6b0c1724b91ff4b9c97f806a207a136
parent711558a70897b63bda46b043bd23b3a576b8a074 (diff)
downloadydb-3244b37d549dcd72f6f588c1aa01406f33ad1dc2.tar.gz
Add BulkUpsert counters
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp15
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h43
2 files changed, 58 insertions, 0 deletions
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
index 3e0a6401fc..865bade5eb 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
@@ -1 +1,16 @@
#include "upload_rows_common_impl.h"
+
+
+namespace NKikimr {
+
+ TUploadCounters::TUploadCounters()
+ : TBase("BulkUpsert")
+ {
+ RequestsCount = TBase::GetDeriviative("Requests/Count");
+ RepliesCount = TBase::GetDeriviative("Replies/Count");
+ ReplyDuration = TBase::GetHistogram("Replies/Duration", NMonitoring::ExponentialHistogram(15, 2, 1));
+
+ RowsCount = TBase::GetDeriviative("Rows/Count");
+ PackageSize = TBase::GetHistogram("Rows/PackageSize", NMonitoring::ExponentialHistogram(15, 2, 10));
+ }
+}
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index dc73cc85e0..c49663c8ae 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -17,6 +17,9 @@
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/formats/arrow/size_calcer.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <ydb/core/tx/columnshard/counters/common/owner.h>
+
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/api/protos/ydb_value.pb.h>
#include <ydb/public/api/grpc/draft/ydb_long_tx_v1.pb.h>
@@ -33,6 +36,40 @@
namespace NKikimr {
+class TUploadCounters: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr RequestsCount;
+ NMonitoring::TDynamicCounters::TCounterPtr RepliesCount;
+ NMonitoring::THistogramPtr ReplyDuration;
+
+ NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
+ NMonitoring::THistogramPtr PackageSize;
+
+ NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
+ NMonitoring::THistogramPtr FailDuration;
+
+public:
+ TUploadCounters();
+
+ void OnRequest(const ui64 rowsCount) const {
+ RequestsCount->Add(1);
+ RowsCount->Add(rowsCount);
+ PackageSize->Collect(rowsCount);
+ }
+
+ void OnReply( const TDuration d) const {
+ RepliesCount->Add(1);
+ ReplyDuration->Collect(d.MilliSeconds());
+ }
+
+ void OnFail(const TDuration d) const {
+ FailsCount->Add(1);
+ FailDuration->Collect(d.MilliSeconds());
+ }
+};
+
+
using namespace NActors;
struct TUpsertCost {
@@ -132,6 +169,7 @@ private:
std::shared_ptr<NYql::TIssues> Issues = std::make_shared<NYql::TIssues>();
NLongTxService::TLongTxId LongTxId;
NThreading::TFuture<Ydb::LongTx::WriteResponse> WriteBatchResult;
+ TUploadCounters UploadCounters;
protected:
enum class EUploadSource {
@@ -653,6 +691,10 @@ private:
}
}
+ if (Batch) {
+ UploadCounters.OnRequest(Batch->num_rows());
+ }
+
if (TableKind == NSchemeCache::TSchemeCacheNavigate::KindTable) {
ResolveShards(ctx);
} else if (isColumnTable) {
@@ -1180,6 +1222,7 @@ private:
}
void ReplyWithResult(::Ydb::StatusIds::StatusCode status, const TActorContext& ctx) {
+ UploadCounters.OnReply(TAppData::TimeProvider->Now() - StartTime);
SendResult(ctx, status);
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, LogPrefix() << "completed with status " << status);