aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornusratbek <nusratbek@yandex-team.ru>2022-02-10 16:52:24 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:52:24 +0300
commit9ef43d18a5630a61d0f746fdbd86ac344db72829 (patch)
tree3c0c7f84262a5dbdc4d112d177301af7c1a423d9
parenta815b0df01710081be7ee69fbb91ca234f0af5dc (diff)
downloadydb-9ef43d18a5630a61d0f746fdbd86ac344db72829.tar.gz
Restoring authorship annotation for <nusratbek@yandex-team.ru>. Commit 1 of 2.
-rw-r--r--ydb/public/sdk/cpp/client/extensions/solomon_stats/README.md58
-rw-r--r--ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.cpp72
-rw-r--r--ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h96
-rw-r--r--ydb/public/sdk/cpp/client/extensions/solomon_stats/ya.make30
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h520
-rw-r--r--ydb/public/sdk/cpp/client/ydb_extension/extension.cpp44
-rw-r--r--ydb/public/sdk/cpp/client/ydb_extension/extension.h44
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp276
-rw-r--r--ydb/services/ydb/ut/ya.make18
-rw-r--r--ydb/services/ydb/ydb_stats_ut.cpp564
10 files changed, 861 insertions, 861 deletions
diff --git a/ydb/public/sdk/cpp/client/extensions/solomon_stats/README.md b/ydb/public/sdk/cpp/client/extensions/solomon_stats/README.md
index 869678d8dd7..ac6f9e0bd42 100644
--- a/ydb/public/sdk/cpp/client/extensions/solomon_stats/README.md
+++ b/ydb/public/sdk/cpp/client/extensions/solomon_stats/README.md
@@ -1,5 +1,5 @@
# YDB C++ SDK Metrics
-
+
You can plug in YDB C++ SDK extension to monitor how your application interacts with YDB. In particular you can monitor:
- Transport errors;
- Per host messages in fligh;
@@ -8,11 +8,11 @@ You can plug in YDB C++ SDK extension to monitor how your application interacts
- Number of sessions per host;
- Number of server endpoints available;
- Query size, latency, etc.
-
+
You can get these metrics via http server provided by YDB C++ SDK or implement your own MetricRegistry if it more convenient.
## Setting up Solomon Monitoring
-
+
> This is Yandex specific section for setting up internal monitoring called Solomon
### Setup Solomon Environment
@@ -20,45 +20,45 @@ TSolomonStatPullExtension class allows you to quickly setup you application moni
[Create project, cluster and service, and connect them.](https://wiki.yandex-team.ru/solomon/howtostart/).
Add hostnames which run your application to **Cluster hosts** (use hostnames without "http://"). Solomon's fetcher will use **Port** field for all added hosts.
Fill in the following params:
-- **Monitoring model** : **PULL**;
-- **URL Path** : **/stats**;
-- **Interval** : **10**;
+- **Monitoring model** : **PULL**;
+- **URL Path** : **/stats**;
+- **Interval** : **10**;
- **Port** : a port of http server YDB SDK runs.
-
+
### Setup TSolomonStatPullExtension in Your Application Code
-
+
After creating NYdb::TDriver you need to add Solomon Monitoring extension. If you set up incorrect hostname or port **TSystemError** exception will be thrown.
-
+
> **Important**: you must plug in monitoring before driver creation.
-```cl
+```cl
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h>
-
-...
-
-{
+
+...
+
+{
auto config = NYdb::TDriverConfig();
- NYdb::TDriver driver(config);
- try {
-
+ NYdb::TDriver driver(config);
+ try {
+
const TString host = ... // use hostname without http://
- const ui64 port = ...
+ const ui64 port = ...
const TString project = ... // Solomon's project id
const TString service = ... // Solomon's service id
const TString cluster = ... // Solomon's cluster id
- NSolomonStatExtension::TSolomonStatPullExtension::TParams params(host, port, project, service, cluster);
- driver.AddExtension<NSolomonStatExtension::TSolomonStatPullExtension>(params);
-
- } catch (TSystemError& error) {
- ...
- }
-}
-
-```
-
+ NSolomonStatExtension::TSolomonStatPullExtension::TParams params(host, port, project, service, cluster);
+ driver.AddExtension<NSolomonStatExtension::TSolomonStatPullExtension>(params);
+
+ } catch (TSystemError& error) {
+ ...
+ }
+}
+
+```
+
## Setup Monitoring of Your Choice
-
+
Implementing NMonitoring::IMetricRegistry provides more flexibility. You can deliver application metrics to Prometheus or any other system of your choice, just register your specific NMonitoring::IMetricRegistry implementation via AddMetricRegistry function.
> **Important**: you must plug in monitoring before driver creation.
diff --git a/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.cpp b/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.cpp
index ec7b0f0db68..bdad901dfd4 100644
--- a/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.cpp
+++ b/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.cpp
@@ -1,47 +1,47 @@
-#include "pull_client.h"
-
-namespace NSolomonStatExtension {
-
-TSolomonStatPullExtension::TParams::TParams(const TString& host
- , ui16 port
- , const TString& project
- , const TString& service
- , const TString& cluster
- , const TVector<std::pair<TString, TString>>& labels)
- : Host_(host), Port_(port), Labels_()
+#include "pull_client.h"
+
+namespace NSolomonStatExtension {
+
+TSolomonStatPullExtension::TParams::TParams(const TString& host
+ , ui16 port
+ , const TString& project
+ , const TString& service
+ , const TString& cluster
+ , const TVector<std::pair<TString, TString>>& labels)
+ : Host_(host), Port_(port), Labels_()
{
Labels_.Add("project", project);
Labels_.Add("service", service);
Labels_.Add("cluster", cluster);
for (const auto& label: labels) {
Labels_.Add(label.first, label.second);
- }
+ }
}
-
+
NMonitoring::TLabels TSolomonStatPullExtension::TParams::GetLabels() const {
- return Labels_;
-}
-
-
-TSolomonStatPullExtension::TSolomonStatPage::TSolomonStatPage(const TString& title, const TString& path, IApi* api)
- : NMonitoring::IMonPage(title, path), Api_(api)
- { }
-
-void TSolomonStatPullExtension::TSolomonStatPage::Output(NMonitoring::IMonHttpRequest& request) {
- request.Output() << NMonitoring::HTTPOKJSON;
- auto json = NMonitoring::EncoderJson(&request.Output());
- Api_->Accept(json.Get());
-}
-
-TSolomonStatPullExtension::TSolomonStatPullExtension(const TSolomonStatPullExtension::TParams& params, IApi* api)
+ return Labels_;
+}
+
+
+TSolomonStatPullExtension::TSolomonStatPage::TSolomonStatPage(const TString& title, const TString& path, IApi* api)
+ : NMonitoring::IMonPage(title, path), Api_(api)
+ { }
+
+void TSolomonStatPullExtension::TSolomonStatPage::Output(NMonitoring::IMonHttpRequest& request) {
+ request.Output() << NMonitoring::HTTPOKJSON;
+ auto json = NMonitoring::EncoderJson(&request.Output());
+ Api_->Accept(json.Get());
+}
+
+TSolomonStatPullExtension::TSolomonStatPullExtension(const TSolomonStatPullExtension::TParams& params, IApi* api)
: MetricRegistry_(new NMonitoring::TMetricRegistry(params.GetLabels()))
, MonService_(params.Port_, params.Host_, 0), Page_( new TSolomonStatPage("stats", "Statistics", api) ) {
api->SetMetricRegistry(MetricRegistry_.get());
- MonService_.Register(Page_);
- MonService_.StartOrThrow();
- }
-
-TSolomonStatPullExtension::~TSolomonStatPullExtension()
- { }
-
-} // NSolomonStatExtension
+ MonService_.Register(Page_);
+ MonService_.StartOrThrow();
+ }
+
+TSolomonStatPullExtension::~TSolomonStatPullExtension()
+ { }
+
+} // NSolomonStatExtension
diff --git a/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h b/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h
index 8d410c2fa2e..ce1d9f994b9 100644
--- a/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h
+++ b/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include <ydb/public/sdk/cpp/client/ydb_extension/extension.h>
#include <library/cpp/http/server/response.h>
@@ -8,53 +8,53 @@
#include <library/cpp/monlib/metrics/metric_registry.h>
#include <library/cpp/monlib/service/pages/mon_page.h>
#include <library/cpp/monlib/service/monservice.h>
-
+
#include <util/generic/vector.h>
-namespace NSolomonStatExtension {
-
-class TSolomonStatPullExtension: public NYdb::IExtension {
-public:
- using IApi = NYdb::NSdkStats::IStatApi;
-
- class TParams {
- friend class TSolomonStatPullExtension;
-
- public:
- TParams(const TString& host
- , ui16 port
- , const TString& project
- , const TString& service
- , const TString& cluster
- , const TVector<std::pair<TString, TString>>& labels = {});
-
+namespace NSolomonStatExtension {
+
+class TSolomonStatPullExtension: public NYdb::IExtension {
+public:
+ using IApi = NYdb::NSdkStats::IStatApi;
+
+ class TParams {
+ friend class TSolomonStatPullExtension;
+
+ public:
+ TParams(const TString& host
+ , ui16 port
+ , const TString& project
+ , const TString& service
+ , const TString& cluster
+ , const TVector<std::pair<TString, TString>>& labels = {});
+
NMonitoring::TLabels GetLabels() const;
-
- private:
- const TString Host_;
- ui16 Port_;
- NMonitoring::TLabels Labels_;
- };
-
- TSolomonStatPullExtension(const TParams& params, IApi* api);
- ~TSolomonStatPullExtension();
-
-private:
- class TSolomonStatPage: public NMonitoring::IMonPage {
- friend class TSolomonStatPullExtension;
- public:
- TSolomonStatPage(const TString& title, const TString& path, IApi* api);
-
- void Output(NMonitoring::IMonHttpRequest& request) override ;
-
- private:
- IApi* Api_;
- };
-
-private:
+
+ private:
+ const TString Host_;
+ ui16 Port_;
+ NMonitoring::TLabels Labels_;
+ };
+
+ TSolomonStatPullExtension(const TParams& params, IApi* api);
+ ~TSolomonStatPullExtension();
+
+private:
+ class TSolomonStatPage: public NMonitoring::IMonPage {
+ friend class TSolomonStatPullExtension;
+ public:
+ TSolomonStatPage(const TString& title, const TString& path, IApi* api);
+
+ void Output(NMonitoring::IMonHttpRequest& request) override ;
+
+ private:
+ IApi* Api_;
+ };
+
+private:
std::shared_ptr<NMonitoring::TMetricRegistry> MetricRegistry_;
- NMonitoring::TMonService2 MonService_;
- TIntrusivePtr<TSolomonStatPage> Page_;
-};
-
-} // namespace NSolomonStatExtension
+ NMonitoring::TMonService2 MonService_;
+ TIntrusivePtr<TSolomonStatPage> Page_;
+};
+
+} // namespace NSolomonStatExtension
diff --git a/ydb/public/sdk/cpp/client/extensions/solomon_stats/ya.make b/ydb/public/sdk/cpp/client/extensions/solomon_stats/ya.make
index 18fa0b584fd..1f37285d583 100644
--- a/ydb/public/sdk/cpp/client/extensions/solomon_stats/ya.make
+++ b/ydb/public/sdk/cpp/client/extensions/solomon_stats/ya.make
@@ -1,21 +1,21 @@
-LIBRARY()
-
-OWNER(
- dcherednik
- g:kikimr
-)
-
-SRCS(
- pull_client.cpp
+LIBRARY()
+
+OWNER(
+ dcherednik
+ g:kikimr
+)
+
+SRCS(
+ pull_client.cpp
pull_connector.cpp
-)
-
-PEERDIR(
+)
+
+PEERDIR(
library/cpp/monlib/encode/json
library/cpp/monlib/metrics
library/cpp/monlib/service
library/cpp/monlib/service/pages
ydb/public/sdk/cpp/client/ydb_extension
-)
-
-END()
+)
+
+END()
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
index cae40996824..c9f31083ef4 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h
@@ -1,4 +1,4 @@
-#pragma once
+#pragma once
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/common/type_switcher.h>
@@ -7,15 +7,15 @@
#include <library/cpp/monlib/metrics/metric_registry.h>
#include <library/cpp/monlib/metrics/histogram_collector.h>
-#include <util/string/builder.h>
-
+#include <util/string/builder.h>
+
#include <atomic>
#include <memory>
-namespace NYdb {
-
-namespace NSdkStats {
-
+namespace NYdb {
+
+namespace NSdkStats {
+
// works only for case normal (foo_bar) underscore
inline TStringType UnderscoreToUpperCamel(const TStringType& in) {
@@ -43,18 +43,18 @@ inline TStringType UnderscoreToUpperCamel(const TStringType& in) {
return result;
}
-template<typename TPointer>
-class TAtomicPointer {
-public:
-
- TAtomicPointer(TPointer* pointer = nullptr) {
- Set(pointer);
- }
-
+template<typename TPointer>
+class TAtomicPointer {
+public:
+
+ TAtomicPointer(TPointer* pointer = nullptr) {
+ Set(pointer);
+ }
+
TAtomicPointer(const TAtomicPointer& other) {
Set(other.Get());
- }
-
+ }
+
TAtomicPointer& operator=(const TAtomicPointer& other) {
Set(other.Get());
return *this;
@@ -64,202 +64,202 @@ public:
return Pointer_.load();
}
- void Set(TPointer* pointer) {
+ void Set(TPointer* pointer) {
Pointer_.store(pointer);
- }
-
-private:
+ }
+
+private:
std::atomic<TPointer*> Pointer_;
-};
-
-template<typename TPointer>
-class TAtomicCounter: public TAtomicPointer<TPointer> {
- public:
- void Add(ui64 value) {
- if (auto counter = this->Get()) {
- counter->Add(value);
- }
- }
-
- void Inc() {
- if (auto counter = this->Get()) {
- counter->Inc();
- }
- }
-
- void Dec() {
- if (auto counter = this->Get()) {
- counter->Dec();
- }
- }
-
- void SetValue(ui64 value) {
- if (auto counter = this->Get()) {
- counter->Set(value);
- }
- }
-};
-
-template<typename TCounter>
-class FastLocalCounter {
-public:
- FastLocalCounter(TAtomicCounter<TCounter>& counter)
- : Counter(counter), Value(0)
- { }
-
- ~FastLocalCounter() {
- Counter.Add(Value);
- }
-
- FastLocalCounter<TCounter>& operator++ () {
- ++Value;
- return *this;
- }
-
- TAtomicCounter<TCounter>& Counter;
- ui64 Value;
-};
-
-template<typename TPointer>
-class TAtomicHistogram: public TAtomicPointer<TPointer> {
-public:
-
- void Record(i64 value) {
- if (auto histogram = this->Get()) {
- histogram->Record(value);
- }
- }
-
- bool IsCollecting() {
- return this->Get() != nullptr;
- }
-};
-
-// Sessions count for all clients
-// Every client has 3 TSessionCounter for active, in session pool, in settler sessions
-// TSessionCounters in different clients with same role share one sensor
-class TSessionCounter: public TAtomicPointer<NMonitoring::TIntGauge> {
-public:
-
- // Call with mutex
- void Apply(i64 newValue) {
- if (auto gauge = this->Get()) {
- gauge->Add(newValue - oldValue);
- oldValue = newValue;
- }
- }
-
- ~TSessionCounter() {
- NMonitoring::TIntGauge* gauge = this->Get();
- if (gauge) {
- gauge->Add(-oldValue);
- }
- }
-
-private:
- i64 oldValue = 0;
-};
-
-struct TStatCollector {
+};
+
+template<typename TPointer>
+class TAtomicCounter: public TAtomicPointer<TPointer> {
+ public:
+ void Add(ui64 value) {
+ if (auto counter = this->Get()) {
+ counter->Add(value);
+ }
+ }
+
+ void Inc() {
+ if (auto counter = this->Get()) {
+ counter->Inc();
+ }
+ }
+
+ void Dec() {
+ if (auto counter = this->Get()) {
+ counter->Dec();
+ }
+ }
+
+ void SetValue(ui64 value) {
+ if (auto counter = this->Get()) {
+ counter->Set(value);
+ }
+ }
+};
+
+template<typename TCounter>
+class FastLocalCounter {
+public:
+ FastLocalCounter(TAtomicCounter<TCounter>& counter)
+ : Counter(counter), Value(0)
+ { }
+
+ ~FastLocalCounter() {
+ Counter.Add(Value);
+ }
+
+ FastLocalCounter<TCounter>& operator++ () {
+ ++Value;
+ return *this;
+ }
+
+ TAtomicCounter<TCounter>& Counter;
+ ui64 Value;
+};
+
+template<typename TPointer>
+class TAtomicHistogram: public TAtomicPointer<TPointer> {
+public:
+
+ void Record(i64 value) {
+ if (auto histogram = this->Get()) {
+ histogram->Record(value);
+ }
+ }
+
+ bool IsCollecting() {
+ return this->Get() != nullptr;
+ }
+};
+
+// Sessions count for all clients
+// Every client has 3 TSessionCounter for active, in session pool, in settler sessions
+// TSessionCounters in different clients with same role share one sensor
+class TSessionCounter: public TAtomicPointer<NMonitoring::TIntGauge> {
+public:
+
+ // Call with mutex
+ void Apply(i64 newValue) {
+ if (auto gauge = this->Get()) {
+ gauge->Add(newValue - oldValue);
+ oldValue = newValue;
+ }
+ }
+
+ ~TSessionCounter() {
+ NMonitoring::TIntGauge* gauge = this->Get();
+ if (gauge) {
+ gauge->Add(-oldValue);
+ }
+ }
+
+private:
+ i64 oldValue = 0;
+};
+
+struct TStatCollector {
using TMetricRegistry = NMonitoring::TMetricRegistry;
-
-public:
-
- struct TEndpointElectorStatCollector {
-
- TEndpointElectorStatCollector(NMonitoring::TIntGauge* endpointCount = nullptr
- , NMonitoring::TIntGauge* pessimizationRatio = nullptr
- , NMonitoring::TIntGauge* activeEndpoints = nullptr)
- : EndpointCount(endpointCount)
- , PessimizationRatio(pessimizationRatio)
- , EndpointActive(activeEndpoints)
- { }
-
- NMonitoring::TIntGauge* EndpointCount;
- NMonitoring::TIntGauge* PessimizationRatio;
- NMonitoring::TIntGauge* EndpointActive;
- };
-
- struct TSessionPoolStatCollector {
-
- enum class EStatCollectorType: size_t {
- SESSIONPOOL,
- SETTLERPOOL
- };
-
- TSessionPoolStatCollector(NMonitoring::TIntGauge* activeSessions = nullptr
- , NMonitoring::TIntGauge* inPoolSessions = nullptr
+
+public:
+
+ struct TEndpointElectorStatCollector {
+
+ TEndpointElectorStatCollector(NMonitoring::TIntGauge* endpointCount = nullptr
+ , NMonitoring::TIntGauge* pessimizationRatio = nullptr
+ , NMonitoring::TIntGauge* activeEndpoints = nullptr)
+ : EndpointCount(endpointCount)
+ , PessimizationRatio(pessimizationRatio)
+ , EndpointActive(activeEndpoints)
+ { }
+
+ NMonitoring::TIntGauge* EndpointCount;
+ NMonitoring::TIntGauge* PessimizationRatio;
+ NMonitoring::TIntGauge* EndpointActive;
+ };
+
+ struct TSessionPoolStatCollector {
+
+ enum class EStatCollectorType: size_t {
+ SESSIONPOOL,
+ SETTLERPOOL
+ };
+
+ TSessionPoolStatCollector(NMonitoring::TIntGauge* activeSessions = nullptr
+ , NMonitoring::TIntGauge* inPoolSessions = nullptr
, NMonitoring::TRate* fakeSessions = nullptr)
: ActiveSessions(activeSessions), InPoolSessions(inPoolSessions), FakeSessions(fakeSessions)
- { }
-
- NMonitoring::TIntGauge* ActiveSessions;
- NMonitoring::TIntGauge* InPoolSessions;
+ { }
+
+ NMonitoring::TIntGauge* ActiveSessions;
+ NMonitoring::TIntGauge* InPoolSessions;
NMonitoring::TRate* FakeSessions;
- };
-
- struct TClientRetryOperationStatCollector {
-
+ };
+
+ struct TClientRetryOperationStatCollector {
+
TClientRetryOperationStatCollector() : MetricRegistry_(), Database_() {}
-
+
TClientRetryOperationStatCollector(NMonitoring::TMetricRegistry* registry, const TStringType& database)
: MetricRegistry_(registry), Database_(database)
- { }
-
- void IncSyncRetryOperation(const EStatus& status) {
+ { }
+
+ void IncSyncRetryOperation(const EStatus& status) {
if (auto registry = MetricRegistry_.Get()) {
TString statusName = TStringBuilder() << status;
TString sensor = TStringBuilder() << "RetryOperation/" << UnderscoreToUpperCamel(statusName);
registry->Rate({ {"database", Database_}, {"sensor", sensor} })->Inc();
- }
- }
-
- void IncAsyncRetryOperation(const EStatus& status) {
+ }
+ }
+
+ void IncAsyncRetryOperation(const EStatus& status) {
if (auto registry = MetricRegistry_.Get()) {
TString statusName = TStringBuilder() << status;
TString sensor = TStringBuilder() << "RetryOperation/" << UnderscoreToUpperCamel(statusName);
registry->Rate({ {"database", Database_}, {"sensor", sensor} })->Inc();
- }
- }
-
- private:
+ }
+ }
+
+ private:
TAtomicPointer<NMonitoring::TMetricRegistry> MetricRegistry_;
TStringType Database_;
- };
-
- struct TClientStatCollector {
-
+ };
+
+ struct TClientStatCollector {
+
TClientStatCollector(NMonitoring::TRate* cacheMiss = nullptr
- , NMonitoring::THistogram* querySize = nullptr
- , NMonitoring::THistogram* paramsSize = nullptr
+ , NMonitoring::THistogram* querySize = nullptr
+ , NMonitoring::THistogram* paramsSize = nullptr
, NMonitoring::TRate* sessionRemoved = nullptr
, NMonitoring::TRate* requestMigrated = nullptr
- , TClientRetryOperationStatCollector retryOperationStatCollector = TClientRetryOperationStatCollector())
+ , TClientRetryOperationStatCollector retryOperationStatCollector = TClientRetryOperationStatCollector())
: CacheMiss(cacheMiss)
, QuerySize(querySize)
, ParamsSize(paramsSize)
, SessionRemovedDueBalancing(sessionRemoved)
, RequestMigrated(requestMigrated)
, RetryOperationStatCollector(retryOperationStatCollector)
- { }
-
+ { }
+
NMonitoring::TRate* CacheMiss;
- NMonitoring::THistogram* QuerySize;
- NMonitoring::THistogram* ParamsSize;
+ NMonitoring::THistogram* QuerySize;
+ NMonitoring::THistogram* ParamsSize;
NMonitoring::TRate* SessionRemovedDueBalancing;
NMonitoring::TRate* RequestMigrated;
- TClientRetryOperationStatCollector RetryOperationStatCollector;
- };
-
+ TClientRetryOperationStatCollector RetryOperationStatCollector;
+ };
+
TStatCollector(const TStringType& database, TMetricRegistry* sensorsRegistry)
: Database_(database)
, DatabaseLabel_({"database", database})
- {
- if (sensorsRegistry) {
+ {
+ if (sensorsRegistry) {
SetMetricRegistry(sensorsRegistry);
- }
- }
-
+ }
+ }
+
void SetMetricRegistry(TMetricRegistry* sensorsRegistry) {
Y_VERIFY(sensorsRegistry, "TMetricRegistry is null in stats collector.");
MetricRegistryPtr_.Set(sensorsRegistry);
@@ -280,53 +280,53 @@ public:
GRpcInFlight_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Grpc/InFlight"} }));
RequestLatency_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/Latency"} },
- NMonitoring::ExponentialHistogram(20, 2, 1)));
+ NMonitoring::ExponentialHistogram(20, 2, 1)));
QuerySize_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/QuerySize"} },
- NMonitoring::ExponentialHistogram(20, 2, 32)));
+ NMonitoring::ExponentialHistogram(20, 2, 32)));
ParamsSize_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/ParamsSize"} },
- NMonitoring::ExponentialHistogram(10, 2, 32)));
+ NMonitoring::ExponentialHistogram(10, 2, 32)));
ResultSize_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/ResultSize"} },
- NMonitoring::ExponentialHistogram(20, 2, 32)));
- }
-
- void IncDiscoveryDuePessimization() {
- DiscoveryDuePessimization_.Inc();
- }
-
- void IncDiscoveryDueExpiration() {
- DiscoveryDueExpiration_.Inc();
- }
-
- void IncDiscoveryFailDueTransportError() {
- DiscoveryFailDueTransportError_.Inc();
- }
-
- void IncReqFailQueueOverflow() {
- RequestFailDueQueueOverflow_.Inc();
- }
-
- void IncReqFailNoEndpoint() {
- RequestFailDueNoEndpoint_.Inc();
- }
-
- void IncReqFailDueTransportError() {
- RequestFailDueTransportError_.Inc();
- }
-
- void IncRequestLatency(TDuration duration) {
- RequestLatency_.Record(duration.MilliSeconds());
- }
-
- void IncResultSize(const size_t& size) {
- ResultSize_.Record(size);
- }
-
+ NMonitoring::ExponentialHistogram(20, 2, 32)));
+ }
+
+ void IncDiscoveryDuePessimization() {
+ DiscoveryDuePessimization_.Inc();
+ }
+
+ void IncDiscoveryDueExpiration() {
+ DiscoveryDueExpiration_.Inc();
+ }
+
+ void IncDiscoveryFailDueTransportError() {
+ DiscoveryFailDueTransportError_.Inc();
+ }
+
+ void IncReqFailQueueOverflow() {
+ RequestFailDueQueueOverflow_.Inc();
+ }
+
+ void IncReqFailNoEndpoint() {
+ RequestFailDueNoEndpoint_.Inc();
+ }
+
+ void IncReqFailDueTransportError() {
+ RequestFailDueTransportError_.Inc();
+ }
+
+ void IncRequestLatency(TDuration duration) {
+ RequestLatency_.Record(duration.MilliSeconds());
+ }
+
+ void IncResultSize(const size_t& size) {
+ ResultSize_.Record(size);
+ }
+
void IncCounter(const TStringType& sensor) {
if (auto registry = MetricRegistryPtr_.Get()) {
- registry->Counter({ {"database", Database_}, {"sensor", sensor} })->Inc();
- }
- }
-
+ registry->Counter({ {"database", Database_}, {"sensor", sensor} })->Inc();
+ }
+ }
+
void SetSessionCV(ui32 cv) {
SessionCV_.SetValue(cv);
}
@@ -339,47 +339,47 @@ public:
GRpcInFlight_.Dec();
}
- TEndpointElectorStatCollector GetEndpointElectorStatCollector() {
+ TEndpointElectorStatCollector GetEndpointElectorStatCollector() {
if (auto registry = MetricRegistryPtr_.Get()) {
auto endpointCoint = registry->IntGauge({ {"database", Database_}, {"sensor", "Endpoints/Total"} });
auto pessimizationRatio = registry->IntGauge({ {"database", Database_}, {"sensor", "Endpoints/BadRatio"} });
auto activeEndpoints = registry->IntGauge({ {"database", Database_}, {"sensor", "Endpoints/Good"} });
- return TEndpointElectorStatCollector(endpointCoint, pessimizationRatio, activeEndpoints);
- } else {
- return TEndpointElectorStatCollector();
- }
- }
-
- TSessionPoolStatCollector GetSessionPoolStatCollector(TSessionPoolStatCollector::EStatCollectorType type) {
- if (!IsCollecting()) {
- return TSessionPoolStatCollector();
- }
-
- switch (type) {
+ return TEndpointElectorStatCollector(endpointCoint, pessimizationRatio, activeEndpoints);
+ } else {
+ return TEndpointElectorStatCollector();
+ }
+ }
+
+ TSessionPoolStatCollector GetSessionPoolStatCollector(TSessionPoolStatCollector::EStatCollectorType type) {
+ if (!IsCollecting()) {
+ return TSessionPoolStatCollector();
+ }
+
+ switch (type) {
case TSessionPoolStatCollector::EStatCollectorType::SESSIONPOOL:
return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get());
- case TSessionPoolStatCollector::EStatCollectorType::SETTLERPOOL:
+ case TSessionPoolStatCollector::EStatCollectorType::SETTLERPOOL:
return TSessionPoolStatCollector(nullptr, SettlerSessions_.Get(), nullptr);
- }
+ }
return TSessionPoolStatCollector();
- }
-
- TClientStatCollector GetClientStatCollector() {
- if (IsCollecting()) {
+ }
+
+ TClientStatCollector GetClientStatCollector() {
+ if (IsCollecting()) {
return TClientStatCollector(CacheMiss_.Get(), QuerySize_.Get(), ParamsSize_.Get(),
SessionRemovedDueBalancing_.Get(), RequestMigrated_.Get(),
TClientRetryOperationStatCollector(MetricRegistryPtr_.Get(), Database_));
- } else {
- return TClientStatCollector();
- }
- }
-
- bool IsCollecting() {
+ } else {
+ return TClientStatCollector();
+ }
+ }
+
+ bool IsCollecting() {
return MetricRegistryPtr_.Get() != nullptr;
- }
-
+ }
+
void IncSessionsOnHost(const TStringType& host);
void DecSessionsOnHost(const TStringType& host);
@@ -389,7 +389,7 @@ public:
void DecGRpcInFlightByHost(const TStringType& host);
void DeleteHost(const TStringType& host);
-private:
+private:
const TStringType Database_;
const NMonitoring::TLabel DatabaseLabel_;
TAtomicPointer<TMetricRegistry> MetricRegistryPtr_;
@@ -399,21 +399,21 @@ private:
TAtomicCounter<NMonitoring::TRate> RequestFailDueNoEndpoint_;
TAtomicCounter<NMonitoring::TRate> RequestFailDueTransportError_;
TAtomicCounter<NMonitoring::TRate> DiscoveryFailDueTransportError_;
- TAtomicPointer<NMonitoring::TIntGauge> ActiveSessions_;
- TAtomicPointer<NMonitoring::TIntGauge> InPoolSessions_;
- TAtomicPointer<NMonitoring::TIntGauge> SettlerSessions_;
+ TAtomicPointer<NMonitoring::TIntGauge> ActiveSessions_;
+ TAtomicPointer<NMonitoring::TIntGauge> InPoolSessions_;
+ TAtomicPointer<NMonitoring::TIntGauge> SettlerSessions_;
TAtomicCounter<NMonitoring::TIntGauge> SessionCV_;
TAtomicCounter<NMonitoring::TRate> SessionRemovedDueBalancing_;
TAtomicCounter<NMonitoring::TRate> RequestMigrated_;
TAtomicCounter<NMonitoring::TRate> FakeSessions_;
TAtomicCounter<NMonitoring::TRate> CacheMiss_;
TAtomicCounter<NMonitoring::TIntGauge> GRpcInFlight_;
- TAtomicHistogram<NMonitoring::THistogram> RequestLatency_;
- TAtomicHistogram<NMonitoring::THistogram> QuerySize_;
- TAtomicHistogram<NMonitoring::THistogram> ParamsSize_;
- TAtomicHistogram<NMonitoring::THistogram> ResultSize_;
-};
-
-} // namespace NSdkStats
-
-} // namespace Nydb
+ TAtomicHistogram<NMonitoring::THistogram> RequestLatency_;
+ TAtomicHistogram<NMonitoring::THistogram> QuerySize_;
+ TAtomicHistogram<NMonitoring::THistogram> ParamsSize_;
+ TAtomicHistogram<NMonitoring::THistogram> ResultSize_;
+};
+
+} // namespace NSdkStats
+
+} // namespace Nydb
diff --git a/ydb/public/sdk/cpp/client/ydb_extension/extension.cpp b/ydb/public/sdk/cpp/client/ydb_extension/extension.cpp
index 4cb966a5ce8..9df96ed6fd3 100644
--- a/ydb/public/sdk/cpp/client/ydb_extension/extension.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_extension/extension.cpp
@@ -1,28 +1,28 @@
#include "extension.h"
-
-#define INCLUDE_YDB_INTERNAL_H
+
+#define INCLUDE_YDB_INTERNAL_H
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/stats_extractor/extractor.h>
-#undef INCLUDE_YDB_INTERNAL_H
-
-namespace NYdb {
-
-void IExtension::SelfRegister(TDriver driver) {
- CreateInternalInterface(driver)->RegisterExtension(this);
-}
-
-void IExtensionApi::SelfRegister(TDriver driver) {
- CreateInternalInterface(driver)->RegisterExtensionApi(this);
-}
-
-namespace NSdkStats {
-
+#undef INCLUDE_YDB_INTERNAL_H
+
+namespace NYdb {
+
+void IExtension::SelfRegister(TDriver driver) {
+ CreateInternalInterface(driver)->RegisterExtension(this);
+}
+
+void IExtensionApi::SelfRegister(TDriver driver) {
+ CreateInternalInterface(driver)->RegisterExtensionApi(this);
+}
+
+namespace NSdkStats {
+
IStatApi* IStatApi::Create(TDriver driver) {
return new TStatsExtractor(CreateInternalInterface(driver));
-}
-
-} // namespace YSdkStats
-
+}
+
+} // namespace YSdkStats
+
class TDiscoveryMutator : public IDiscoveryMutatorApi {
public:
TDiscoveryMutator(std::shared_ptr<TGRpcConnectionsImpl> driverImpl)
@@ -40,5 +40,5 @@ IDiscoveryMutatorApi* IDiscoveryMutatorApi::Create(TDriver driver) {
return new TDiscoveryMutator(CreateInternalInterface(driver));
}
-} // namespace NYdb
-
+} // namespace NYdb
+
diff --git a/ydb/public/sdk/cpp/client/ydb_extension/extension.h b/ydb/public/sdk/cpp/client/ydb_extension/extension.h
index 3371bffddc5..68ff0a6ce8b 100644
--- a/ydb/public/sdk/cpp/client/ydb_extension/extension.h
+++ b/ydb/public/sdk/cpp/client/ydb_extension/extension.h
@@ -1,9 +1,9 @@
-#pragma once
-
+#pragma once
+
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
-
+
namespace Ydb {
namespace Discovery {
@@ -12,8 +12,8 @@ class ListEndpointsResult;
}
}
-namespace NYdb {
-
+namespace NYdb {
+
class IExtensionApi {
public:
friend class TDriver;
@@ -31,20 +31,20 @@ private:
void SelfRegister(TDriver driver);
};
-namespace NSdkStats {
-
-class IStatApi: public IExtensionApi {
-public:
+namespace NSdkStats {
+
+class IStatApi: public IExtensionApi {
+public:
static IStatApi* Create(TDriver driver);
-public:
+public:
virtual void SetMetricRegistry(NMonitoring::IMetricRegistry* sensorsRegistry) = 0;
virtual void Accept(NMonitoring::IMetricConsumer* consumer) const = 0;
-};
-
-class DestroyedClientException: public yexception {};
-
-} // namespace NSdkStats
-
+};
+
+class DestroyedClientException: public yexception {};
+
+} // namespace NSdkStats
+
class IDiscoveryMutatorApi: public IExtensionApi {
public:
@@ -56,13 +56,13 @@ public:
};
-template<typename TExtension>
+template<typename TExtension>
void TDriver::AddExtension(typename TExtension::TParams params) {
typename TExtension::IApi* api = TExtension::IApi::Create(*this);
- auto extension = new TExtension(params, api);
- extension->SelfRegister(*this);
+ auto extension = new TExtension(params, api);
+ extension->SelfRegister(*this);
if (api)
api->SelfRegister(*this);
-}
-
-} // namespace NYdb
+}
+
+} // namespace NYdb
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 610fec38847..af4a21c8725 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -1319,7 +1319,7 @@ class TSessionPoolImpl {
(std::shared_ptr<TTableClient::TImpl> client, const TCreateSessionSettings& settings);
public:
using TKeepAliveCmd = std::function<void(TSession session)>;
- using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>;
+ using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>;
TSessionPoolImpl(ui32 maxActiveSessions);
// TAwareSessonProvider:
// function is called if session pool is empty,
@@ -1334,27 +1334,27 @@ public:
bool DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl> client, const TString& endpoint);
// Returns true if session returned to pool successfully
bool ReturnSession(TSession::TImpl* impl, bool active);
- TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate);
+ TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate);
i64 GetActiveSessions() const;
i64 GetActiveSessionsLimit() const;
i64 GetCurrentPoolSize() const;
void DecrementActiveCounter();
void Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close);
- void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector);
+ void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector);
static void CreateFakeSession(NThreading::TPromise<TCreateSessionResult>& promise,
std::shared_ptr<TTableClient::TImpl> client);
private:
- void UpdateStats();
-
+ void UpdateStats();
+
mutable std::mutex Mtx_;
TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_;
bool Closed_;
i64 ActiveSessions_;
const ui32 MaxActiveSessions_;
- NSdkStats::TSessionCounter ActiveSessionsCounter_;
- NSdkStats::TSessionCounter InPoolSessionsCounter_;
+ NSdkStats::TSessionCounter ActiveSessionsCounter_;
+ NSdkStats::TSessionCounter InPoolSessionsCounter_;
NSdkStats::TAtomicCounter<NMonitoring::TRate> FakeSessionsCounter_;
};
@@ -1471,19 +1471,19 @@ public:
, Settings_(settings)
, SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_)
, SettlerPool_(0)
- {
- if (!DbDriverState_->StatCollector.IsCollecting()) {
- return;
- }
-
- SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector());
- SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector(
- NSdkStats::TStatCollector::TSessionPoolStatCollector::EStatCollectorType::SESSIONPOOL
- ));
- SettlerPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector(
- NSdkStats::TStatCollector::TSessionPoolStatCollector::EStatCollectorType::SETTLERPOOL
- ));
- }
+ {
+ if (!DbDriverState_->StatCollector.IsCollecting()) {
+ return;
+ }
+
+ SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector());
+ SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector(
+ NSdkStats::TStatCollector::TSessionPoolStatCollector::EStatCollectorType::SESSIONPOOL
+ ));
+ SettlerPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector(
+ NSdkStats::TStatCollector::TSessionPoolStatCollector::EStatCollectorType::SETTLERPOOL
+ ));
+ }
~TImpl() {
RequestMigrator_.Wait();
@@ -1506,10 +1506,10 @@ public:
promise.SetException("no more client");
return promise.GetFuture();
}
- return strong->Drain();
+ return strong->Drain();
};
-
- DbDriverState_->AddCb(std::move(cb), TDbDriverState::ENotifyType::STOP);
+
+ DbDriverState_->AddCb(std::move(cb), TDbDriverState::ENotifyType::STOP);
}
NThreading::TFuture<void> Drain() {
@@ -1557,21 +1557,21 @@ public:
}
void StartPeriodicSessionPoolTask() {
-
- auto deletePredicate = [](TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount) {
-
- const auto sessionPoolSettings = client->Settings_.SessionPoolSettings_;
- const auto spentTime = session->GetTimeToTouchFast() - session->GetTimeInPastFast();
-
- if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) {
- if (sessionsCount > sessionPoolSettings.MinPoolSize_) {
- return true;
- }
- }
-
- return false;
- };
-
+
+ auto deletePredicate = [](TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount) {
+
+ const auto sessionPoolSettings = client->Settings_.SessionPoolSettings_;
+ const auto spentTime = session->GetTimeToTouchFast() - session->GetTimeInPastFast();
+
+ if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) {
+ if (sessionsCount > sessionPoolSettings.MinPoolSize_) {
+ return true;
+ }
+ }
+
+ return false;
+ };
+
auto keepAliveCmd = [](TSession session) {
Y_VERIFY(session.GetId());
@@ -1619,8 +1619,8 @@ public:
Connections_->AddPeriodicTask(
SessionPool_.CreatePeriodicTask(
weak,
- std::move(keepAliveCmd),
- std::move(deletePredicate)
+ std::move(keepAliveCmd),
+ std::move(deletePredicate)
), PERIODIC_ACTION_INTERVAL);
}
@@ -1679,10 +1679,10 @@ public:
void StartPeriodicSettlerTask() {
- auto deletePredicate = [](TSession::TImpl* , TTableClient::TImpl* , size_t) {
- return false;
- };
-
+ auto deletePredicate = [](TSession::TImpl* , TTableClient::TImpl* , size_t) {
+ return false;
+ };
+
auto ttl = Settings_.SettlerSessionPoolTTL_;
auto keepAliveCmd = [ttl](TSession session) {
Y_VERIFY(session.GetId());
@@ -1699,8 +1699,8 @@ public:
Connections_->AddPeriodicTask(
SettlerPool_.CreatePeriodicTask(
weak,
- std::move(keepAliveCmd),
- std::move(deletePredicate)
+ std::move(keepAliveCmd),
+ std::move(deletePredicate)
), SETTLER_PERIODIC_ACTION_INTERVAL);
}
@@ -1976,8 +1976,8 @@ public:
settings.ClientTimeout_,
preferedLocation);
- std::weak_ptr<TDbDriverState> state = DbDriverState_;
-
+ std::weak_ptr<TDbDriverState> state = DbDriverState_;
+
return createSessionPromise.GetFuture();
}
@@ -2149,8 +2149,8 @@ public:
return ExecuteDataQuery(session, dataQuery, txControl, params, settings, true);
}
- CacheMissCounter.Inc();
-
+ CacheMissCounter.Inc();
+
return InjectSessionStatusInterception(session.SessionImpl_,
ExecuteDataQueryInternal(session, query, txControl, params, settings, false),
true, GetMinTimeToTouch(Settings_.SessionPoolSettings_));
@@ -2207,8 +2207,8 @@ public:
promise.SetValue(std::move(prepareQueryResult));
};
- CollectQuerySize(query, QuerySizeHistogram);
-
+ CollectQuerySize(query, QuerySizeHistogram);
+
Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::PrepareDataQueryRequest, Ydb::Table::PrepareDataQueryResponse>(
std::move(request),
extractor,
@@ -2513,15 +2513,15 @@ public:
return Settings_.SessionPoolSettings_.RetryLimit_;
}
- void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector) {
- CacheMissCounter.Set(collector.CacheMiss);
- QuerySizeHistogram.Set(collector.QuerySize);
- ParamsSizeHistogram.Set(collector.ParamsSize);
- RetryOperationStatCollector = collector.RetryOperationStatCollector;
+ void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector) {
+ CacheMissCounter.Set(collector.CacheMiss);
+ QuerySizeHistogram.Set(collector.QuerySize);
+ ParamsSizeHistogram.Set(collector.ParamsSize);
+ RetryOperationStatCollector = collector.RetryOperationStatCollector;
SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing);
RequestMigrated.Set(collector.RequestMigrated);
- }
-
+ }
+
TAsyncBulkUpsertResult BulkUpsert(const TString& table, TValue&& rows, const TBulkUpsertSettings& settings) {
auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings);
request.set_table(table);
@@ -2675,42 +2675,42 @@ private:
*request->mutable_parameters() = params;
}
- static void CollectParams(
- ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
- NSdkStats::TAtomicHistogram<NMonitoring::THistogram> histgoram)
- {
-
- if (params && histgoram.IsCollecting()) {
- size_t size = 0;
- for (auto& keyvalue: *params) {
- size += keyvalue.second.ByteSizeLong();
- }
- histgoram.Record(size);
- }
- }
-
- static void CollectParams(
- const ::google::protobuf::Map<TString, Ydb::TypedValue>& params,
- NSdkStats::TAtomicHistogram<NMonitoring::THistogram> histgoram)
- {
-
- if (histgoram.IsCollecting()) {
- size_t size = 0;
- for (auto& keyvalue: params) {
- size += keyvalue.second.ByteSizeLong();
- }
- histgoram.Record(size);
- }
- }
-
- static void CollectQuerySize(const TString& query, NSdkStats::TAtomicHistogram<NMonitoring::THistogram>& querySizeHistogram) {
- if (querySizeHistogram.IsCollecting()) {
- querySizeHistogram.Record(query.size());
- }
- }
-
- static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<NMonitoring::THistogram>&) {}
-
+ static void CollectParams(
+ ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
+ NSdkStats::TAtomicHistogram<NMonitoring::THistogram> histgoram)
+ {
+
+ if (params && histgoram.IsCollecting()) {
+ size_t size = 0;
+ for (auto& keyvalue: *params) {
+ size += keyvalue.second.ByteSizeLong();
+ }
+ histgoram.Record(size);
+ }
+ }
+
+ static void CollectParams(
+ const ::google::protobuf::Map<TString, Ydb::TypedValue>& params,
+ NSdkStats::TAtomicHistogram<NMonitoring::THistogram> histgoram)
+ {
+
+ if (histgoram.IsCollecting()) {
+ size_t size = 0;
+ for (auto& keyvalue: params) {
+ size += keyvalue.second.ByteSizeLong();
+ }
+ histgoram.Record(size);
+ }
+ }
+
+ static void CollectQuerySize(const TString& query, NSdkStats::TAtomicHistogram<NMonitoring::THistogram>& querySizeHistogram) {
+ if (querySizeHistogram.IsCollecting()) {
+ querySizeHistogram.Record(query.size());
+ }
+ }
+
+ static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<NMonitoring::THistogram>&) {}
+
template <typename TQueryType, typename TParamsType>
TAsyncDataQueryResult ExecuteDataQueryInternal(const TSession& session, const TQueryType& query,
const TTxControl& txControl, TParamsType params,
@@ -2729,10 +2729,10 @@ private:
request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_));
SetQuery(query, request.mutable_query());
- CollectQuerySize(query, QuerySizeHistogram);
+ CollectQuerySize(query, QuerySizeHistogram);
SetParams(params, &request);
- CollectParams(params, ParamsSizeHistogram);
+ CollectParams(params, ParamsSizeHistogram);
SetQueryCachePolicy(query, settings, request.mutable_query_cache_policy());
@@ -2855,11 +2855,11 @@ private:
std::shared_ptr<TTableClient::TImpl> client,
const TCreateSessionSettings& settings);
-public:
+public:
NSdkStats::TAtomicCounter<NMonitoring::TRate> CacheMissCounter;
- NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector;
- NSdkStats::TAtomicHistogram<NMonitoring::THistogram> QuerySizeHistogram;
- NSdkStats::TAtomicHistogram<NMonitoring::THistogram> ParamsSizeHistogram;
+ NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector;
+ NSdkStats::TAtomicHistogram<NMonitoring::THistogram> QuerySizeHistogram;
+ NSdkStats::TAtomicHistogram<NMonitoring::THistogram> ParamsSizeHistogram;
NSdkStats::TAtomicCounter<NMonitoring::TRate> SessionRemovedDueBalancing;
NSdkStats::TAtomicCounter<NMonitoring::TRate> RequestMigrated;
@@ -2966,10 +2966,10 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession(
sessionImpl = std::move(it->second);
Sessions_.erase(it);
}
- UpdateStats();
+ UpdateStats();
}
if (returnFakeSession) {
- FakeSessionsCounter_.Inc();
+ FakeSessionsCounter_.Inc();
CreateFakeSession(createSessionPromise, client);
return createSessionPromise.GetFuture();
} else if (sessionImpl) {
@@ -3028,7 +3028,7 @@ bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active) {
ActiveSessions_--;
impl->SetNeedUpdateActiveCounter(false);
}
- UpdateStats();
+ UpdateStats();
}
return true;
}
@@ -3053,9 +3053,9 @@ void TSessionPoolImpl::Drain(std::function<bool(std::unique_ptr<TSession::TImpl>
}
TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient,
- TKeepAliveCmd&& cmd, TDeletePredicate&& deletePredicate)
+ TKeepAliveCmd&& cmd, TDeletePredicate&& deletePredicate)
{
- auto periodicCb = [this, weakClient, cmd=std::move(cmd), deletePredicate=std::move(deletePredicate)](NYql::TIssues&&, EStatus status) {
+ auto periodicCb = [this, weakClient, cmd=std::move(cmd), deletePredicate=std::move(deletePredicate)](NYql::TIssues&&, EStatus status) {
if (status != EStatus::SUCCESS) {
return false;
}
@@ -3069,8 +3069,8 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm
auto keepAliveBatchSize = PERIODIC_ACTION_BATCH_SIZE;
TVector<std::unique_ptr<TSession::TImpl>> sessionsToTouch;
sessionsToTouch.reserve(keepAliveBatchSize);
- TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete;
- sessionsToDelete.reserve(keepAliveBatchSize);
+ TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete;
+ sessionsToDelete.reserve(keepAliveBatchSize);
auto now = TInstant::Now();
{
std::lock_guard guard(Mtx_);
@@ -3080,15 +3080,15 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm
while (it != sessions.end() && keepAliveBatchSize--) {
if (now < it->second->GetTimeToTouchFast())
break;
-
- if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) {
- sessionsToDelete.emplace_back(std::move(it->second));
- } else {
- sessionsToTouch.emplace_back(std::move(it->second));
- }
+
+ if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) {
+ sessionsToDelete.emplace_back(std::move(it->second));
+ } else {
+ sessionsToTouch.emplace_back(std::move(it->second));
+ }
sessions.erase(it++);
}
- UpdateStats();
+ UpdateStats();
}
for (auto& sessionImpl : sessionsToTouch) {
@@ -3101,14 +3101,14 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm
cmd(session);
}
}
-
- for (auto& sessionImpl : sessionsToDelete) {
- if (sessionImpl) {
- Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE ||
- sessionImpl->GetState() == TSession::TImpl::S_DISCONNECTED);
+
+ for (auto& sessionImpl : sessionsToDelete) {
+ if (sessionImpl) {
+ Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE ||
+ sessionImpl->GetState() == TSession::TImpl::S_DISCONNECTED);
TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient);
- }
- }
+ }
+ }
}
return true;
@@ -3171,17 +3171,17 @@ TSessionInspectorFn TSession::TImpl::GetSessionInspector(
};
}
-void TSessionPoolImpl::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) {
- ActiveSessionsCounter_.Set(statCollector.ActiveSessions);
- InPoolSessionsCounter_.Set(statCollector.InPoolSessions);
- FakeSessionsCounter_.Set(statCollector.FakeSessions);
-}
-
-void TSessionPoolImpl::UpdateStats() {
- ActiveSessionsCounter_.Apply(ActiveSessions_);
- InPoolSessionsCounter_.Apply(Sessions_.size());
-}
-
+void TSessionPoolImpl::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) {
+ ActiveSessionsCounter_.Set(statCollector.ActiveSessions);
+ InPoolSessionsCounter_.Set(statCollector.InPoolSessions);
+ FakeSessionsCounter_.Set(statCollector.FakeSessions);
+}
+
+void TSessionPoolImpl::UpdateStats() {
+ ActiveSessionsCounter_.Apply(ActiveSessions_);
+ InPoolSessionsCounter_.Apply(Sessions_.size());
+}
+
TTableClient::TTableClient(const TDriver& driver, const TClientSettings& settings)
: Impl_(new TImpl(CreateInternalInterface(driver), settings)) {
Impl_->StartPeriodicSessionPoolTask();
@@ -3265,7 +3265,7 @@ protected:
, Promise(NThreading::NewPromise<TStatus>())
, RetryNumber(0)
{}
-
+
static void RunOp(TRetryContextPtr self) {
self->Execute();
}
@@ -3432,7 +3432,7 @@ TAsyncStatus TTableClient::RetryOperation(TOperationWithoutSessionFunc&& operati
TStatus TTableClient::RetryOperationSyncHelper(const TOperationWrapperSyncFunc& operationWrapper, const TRetryOperationSettings& settings) {
TRetryState retryState;
TMaybe<NYdb::TStatus> status;
-
+
for (ui32 retryNumber = 0; retryNumber <= settings.MaxRetries_; ++retryNumber) {
status = operationWrapper(retryState);
@@ -3488,7 +3488,7 @@ TStatus TTableClient::RetryOperationSyncHelper(const TOperationWrapperSyncFunc&
default:
return *status;
}
- Impl_->RetryOperationStatCollector.IncSyncRetryOperation(status->GetStatus());
+ Impl_->RetryOperationStatCollector.IncSyncRetryOperation(status->GetStatus());
}
return *status;
@@ -3905,8 +3905,8 @@ TAsyncPrepareQueryResult TSession::PrepareDataQuery(const TString& query, const
return MakeFuture(result);
}
- Client_->CacheMissCounter.Inc();
-
+ Client_->CacheMissCounter.Inc();
+
return InjectSessionStatusInterception(
SessionImpl_,
Client_->PrepareDataQuery(*this, query, settings),
diff --git a/ydb/services/ydb/ut/ya.make b/ydb/services/ydb/ut/ya.make
index b7209faa5f9..3cab9f5607f 100644
--- a/ydb/services/ydb/ut/ya.make
+++ b/ydb/services/ydb/ut/ya.make
@@ -7,15 +7,15 @@ OWNER(
FORK_SUBTESTS()
-IF (SANITIZER_TYPE OR WITH_VALGRIND)
+IF (SANITIZER_TYPE OR WITH_VALGRIND)
SPLIT_FACTOR(60)
- TIMEOUT(3600)
- SIZE(LARGE)
- TAG(ya:fat)
-ELSE()
- TIMEOUT(300)
- SIZE(MEDIUM)
-ENDIF()
+ TIMEOUT(3600)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ TIMEOUT(300)
+ SIZE(MEDIUM)
+ENDIF()
SRCS(
ydb_bulk_upsert_ut.cpp
@@ -28,7 +28,7 @@ SRCS(
ydb_scripting_ut.cpp
ydb_table_ut.cpp
ydb_table_split_ut.cpp
- ydb_stats_ut.cpp
+ ydb_stats_ut.cpp
ydb_long_tx_ut.cpp
ydb_logstore_ut.cpp
ydb_olapstore_ut.cpp
diff --git a/ydb/services/ydb/ydb_stats_ut.cpp b/ydb/services/ydb/ydb_stats_ut.cpp
index b40e22fc42d..16494823b80 100644
--- a/ydb/services/ydb/ydb_stats_ut.cpp
+++ b/ydb/services/ydb/ydb_stats_ut.cpp
@@ -10,329 +10,329 @@
#include <library/cpp/monlib/encode/json/json.h>
#include <util/generic/ptr.h>
-#include <util/system/valgrind.h>
-
-struct TStatCounters {
- i64 EndpointCount = 0;
- i64 EndpointActive = 0;
- i64 PessimizationRatio = 0;
-
- ui64 DiscoveryDuePessimization = 0;
- ui64 DiscoveryDueExpiration = 0;
- ui64 DiscoveryDueTransportError = 0;
-
- ui64 RequestFailDueNoEndpoint = 0;
- ui64 RequestFailDueQueueOverflow = 0;
- ui64 RequestFailDueTransportError = 0;
-
- i64 ActiveSessionsRatio = 0;
- i64 ReadySessionsRatio = 0;
- ui64 FakeSessions = 0;
- ui64 CacheMiss = 0;
+#include <util/system/valgrind.h>
+
+struct TStatCounters {
+ i64 EndpointCount = 0;
+ i64 EndpointActive = 0;
+ i64 PessimizationRatio = 0;
+
+ ui64 DiscoveryDuePessimization = 0;
+ ui64 DiscoveryDueExpiration = 0;
+ ui64 DiscoveryDueTransportError = 0;
+
+ ui64 RequestFailDueNoEndpoint = 0;
+ ui64 RequestFailDueQueueOverflow = 0;
+ ui64 RequestFailDueTransportError = 0;
+
+ i64 ActiveSessionsRatio = 0;
+ i64 ReadySessionsRatio = 0;
+ ui64 FakeSessions = 0;
+ ui64 CacheMiss = 0;
ui64 RetryOperationDueAborted = 0;
-};
-
+};
+
class TMetricEncoder: public NMonitoring::IMetricEncoder {
-public:
-
- enum class ECounterType: size_t {
-
- ENDPOINTCOUNT,
- ENDPOINTACTIVE,
- ENDPOINTPESSIMIZATIONRATIO,
-
- ACTIVESESSIONSRATIO,
- READYSESSIONSRATIO,
- FAKESESSIONS,
- CACHEMISS,
+public:
+
+ enum class ECounterType: size_t {
+
+ ENDPOINTCOUNT,
+ ENDPOINTACTIVE,
+ ENDPOINTPESSIMIZATIONRATIO,
+
+ ACTIVESESSIONSRATIO,
+ READYSESSIONSRATIO,
+ FAKESESSIONS,
+ CACHEMISS,
RETRYOPERATIONDUEABORTED,
-
- DISCOVERYDUEPESSIMIZATION,
- DISCOVERYDUEEXPIRATION,
- DISCOVERYFAILDUETRANSPORTERROR,
-
- REQUESTFAILDUEQUEUEOVERFLOW,
- REQUESTFAILDUENOENDPOINT,
- REQUESTFAILDUETRANSPORTERROR,
- REQUESTLATENCY,
-
- UNKNOWN
- };
-
- void Close() override { }
-
- void OnStreamBegin() override { }
+
+ DISCOVERYDUEPESSIMIZATION,
+ DISCOVERYDUEEXPIRATION,
+ DISCOVERYFAILDUETRANSPORTERROR,
+
+ REQUESTFAILDUEQUEUEOVERFLOW,
+ REQUESTFAILDUENOENDPOINT,
+ REQUESTFAILDUETRANSPORTERROR,
+ REQUESTLATENCY,
+
+ UNKNOWN
+ };
+
+ void Close() override { }
+
+ void OnStreamBegin() override { }
void OnStreamEnd() override { }
-
- void OnCommonTime(TInstant) override { }
-
+
+ void OnCommonTime(TInstant) override { }
+
void OnMetricBegin(NMonitoring::EMetricType) override { }
void OnMetricEnd() override { }
-
- void OnLabelsBegin() override { }
- void OnLabelsEnd() override { }
-
+
+ void OnLabelsBegin() override { }
+ void OnLabelsEnd() override { }
+
void OnLabel(const TStringBuf name, const TStringBuf value) override {
- if (name != "sensor") {
- return;
- }
-
+ if (name != "sensor") {
+ return;
+ }
+
if (value == "Discovery/TooManyBadEndpoints") {
- State = ECounterType::DISCOVERYDUEPESSIMIZATION;
+ State = ECounterType::DISCOVERYDUEPESSIMIZATION;
} else if (value == "Discovery/Regular") {
- State = ECounterType::DISCOVERYDUEEXPIRATION;
+ State = ECounterType::DISCOVERYDUEEXPIRATION;
} else if (value == "Request/FailedDiscoveryQueueOverflow") {
- State = ECounterType::REQUESTFAILDUEQUEUEOVERFLOW;
+ State = ECounterType::REQUESTFAILDUEQUEUEOVERFLOW;
} else if (value == "Request/FailedNoEndpoint") {
- State = ECounterType::REQUESTFAILDUENOENDPOINT;
+ State = ECounterType::REQUESTFAILDUENOENDPOINT;
} else if (value == "Request/FailedTransportError") {
- State = ECounterType::REQUESTFAILDUETRANSPORTERROR;
+ State = ECounterType::REQUESTFAILDUETRANSPORTERROR;
} else if (value == "Discovery/FailedTransportError") {
- State = ECounterType::DISCOVERYFAILDUETRANSPORTERROR;
+ State = ECounterType::DISCOVERYFAILDUETRANSPORTERROR;
} else if (value == "Endpoints/Total") {
- State = ECounterType::ENDPOINTCOUNT;
+ State = ECounterType::ENDPOINTCOUNT;
} else if (value == "Endpoints/BadRatio") {
- State = ECounterType::ENDPOINTPESSIMIZATIONRATIO;
+ State = ECounterType::ENDPOINTPESSIMIZATIONRATIO;
} else if (value == "Endpoints/Good") {
- State = ECounterType::ENDPOINTACTIVE;
+ State = ECounterType::ENDPOINTACTIVE;
} else if (value == "Sessions/InUse") {
- State = ECounterType::ACTIVESESSIONSRATIO;
- } else if (value == "ready sessions ratio") {
- State = ECounterType::READYSESSIONSRATIO;
+ State = ECounterType::ACTIVESESSIONSRATIO;
+ } else if (value == "ready sessions ratio") {
+ State = ECounterType::READYSESSIONSRATIO;
} else if (value == "Sessions/SessionsLimitExceeded") {
- State = ECounterType::FAKESESSIONS;
+ State = ECounterType::FAKESESSIONS;
} else if (value == "Request/ClientQueryCacheMiss") {
- State = ECounterType::CACHEMISS;
+ State = ECounterType::CACHEMISS;
} else if (value == "RetryOperation/Aborted") {
State = ECounterType::RETRYOPERATIONDUEABORTED;
- } else if (value == "request latency") {
- State = ECounterType::REQUESTLATENCY;
- } else {
- State = ECounterType::UNKNOWN;
- }
- }
-
- void OnDouble(TInstant, double) override { }
-
- void OnInt64(TInstant, i64 value) override {
- switch (State) {
- case ECounterType::ENDPOINTCOUNT: Counters.EndpointCount = value; break;
- case ECounterType::ENDPOINTACTIVE: Counters.EndpointActive = value; break;
- case ECounterType::ENDPOINTPESSIMIZATIONRATIO: Counters.PessimizationRatio = value; break;
-
- case ECounterType::ACTIVESESSIONSRATIO: Counters.ActiveSessionsRatio = value; break;
- case ECounterType::READYSESSIONSRATIO: Counters.ReadySessionsRatio = value; break;
- default: return;
- }
- }
-
- void OnUint64(TInstant, ui64 value) override {
- switch (State) {
- case ECounterType::DISCOVERYDUEPESSIMIZATION: Counters.DiscoveryDuePessimization = value; break;
- case ECounterType::DISCOVERYDUEEXPIRATION: Counters.DiscoveryDueExpiration = value; break;
- case ECounterType::DISCOVERYFAILDUETRANSPORTERROR: Counters.DiscoveryDueTransportError = value; break;
-
- case ECounterType::REQUESTFAILDUENOENDPOINT: Counters.RequestFailDueNoEndpoint = value; break;
- case ECounterType::REQUESTFAILDUEQUEUEOVERFLOW: Counters.RequestFailDueQueueOverflow = value; break;
- case ECounterType::REQUESTFAILDUETRANSPORTERROR: Counters.RequestFailDueTransportError = value; break;
-
- case ECounterType::FAKESESSIONS: Counters.FakeSessions = value; break;
- case ECounterType::CACHEMISS: Counters.CacheMiss = value; break;
+ } else if (value == "request latency") {
+ State = ECounterType::REQUESTLATENCY;
+ } else {
+ State = ECounterType::UNKNOWN;
+ }
+ }
+
+ void OnDouble(TInstant, double) override { }
+
+ void OnInt64(TInstant, i64 value) override {
+ switch (State) {
+ case ECounterType::ENDPOINTCOUNT: Counters.EndpointCount = value; break;
+ case ECounterType::ENDPOINTACTIVE: Counters.EndpointActive = value; break;
+ case ECounterType::ENDPOINTPESSIMIZATIONRATIO: Counters.PessimizationRatio = value; break;
+
+ case ECounterType::ACTIVESESSIONSRATIO: Counters.ActiveSessionsRatio = value; break;
+ case ECounterType::READYSESSIONSRATIO: Counters.ReadySessionsRatio = value; break;
+ default: return;
+ }
+ }
+
+ void OnUint64(TInstant, ui64 value) override {
+ switch (State) {
+ case ECounterType::DISCOVERYDUEPESSIMIZATION: Counters.DiscoveryDuePessimization = value; break;
+ case ECounterType::DISCOVERYDUEEXPIRATION: Counters.DiscoveryDueExpiration = value; break;
+ case ECounterType::DISCOVERYFAILDUETRANSPORTERROR: Counters.DiscoveryDueTransportError = value; break;
+
+ case ECounterType::REQUESTFAILDUENOENDPOINT: Counters.RequestFailDueNoEndpoint = value; break;
+ case ECounterType::REQUESTFAILDUEQUEUEOVERFLOW: Counters.RequestFailDueQueueOverflow = value; break;
+ case ECounterType::REQUESTFAILDUETRANSPORTERROR: Counters.RequestFailDueTransportError = value; break;
+
+ case ECounterType::FAKESESSIONS: Counters.FakeSessions = value; break;
+ case ECounterType::CACHEMISS: Counters.CacheMiss = value; break;
case ECounterType::RETRYOPERATIONDUEABORTED: Counters.RetryOperationDueAborted = value; break;
- default: return;
- }
- }
-
- void OnHistogram(TInstant, NMonitoring::IHistogramSnapshotPtr) override { }
-
+ default: return;
+ }
+ }
+
+ void OnHistogram(TInstant, NMonitoring::IHistogramSnapshotPtr) override { }
+
void OnLogHistogram(TInstant, NMonitoring::TLogHistogramSnapshotPtr) override { }
void OnSummaryDouble(TInstant, NMonitoring::ISummaryDoubleSnapshotPtr) override { }
- ECounterType State = ECounterType::UNKNOWN;
- TStatCounters Counters;
-};
-
-class TCountersExtractor;
-
-class TCountersExtractExtension: public NYdb::IExtension {
-public:
-
- class TParams {
- public:
-
- TParams& SetExtractor(TCountersExtractor* extractor) {
- Extractor = extractor;
- return *this;
- }
-
- NMonitoring::TLabels GetApiParams() const {
- return {};
- }
-
- TCountersExtractor* Extractor;
- };
-
- using IApi = NYdb::NSdkStats::IStatApi;
-
- TCountersExtractExtension(const TParams& params, IApi* api);
-
- TStatCounters Pull() {
+ ECounterType State = ECounterType::UNKNOWN;
+ TStatCounters Counters;
+};
+
+class TCountersExtractor;
+
+class TCountersExtractExtension: public NYdb::IExtension {
+public:
+
+ class TParams {
+ public:
+
+ TParams& SetExtractor(TCountersExtractor* extractor) {
+ Extractor = extractor;
+ return *this;
+ }
+
+ NMonitoring::TLabels GetApiParams() const {
+ return {};
+ }
+
+ TCountersExtractor* Extractor;
+ };
+
+ using IApi = NYdb::NSdkStats::IStatApi;
+
+ TCountersExtractExtension(const TParams& params, IApi* api);
+
+ TStatCounters Pull() {
TMetricEncoder extractor;
- Api_->Accept(&extractor);
- return extractor.Counters;
- }
-
-private:
+ Api_->Accept(&extractor);
+ return extractor.Counters;
+ }
+
+private:
std::shared_ptr<NMonitoring::TMetricRegistry> MetricRegistry_;
- IApi* Api_ = nullptr;
-};
-
-class TCountersExtractor {
-public:
- TStatCounters Extract() {
- return Extension_->Pull();
- }
-
- void Register(TCountersExtractExtension* extension) {
- Extension_ = extension;
- }
-
-private:
- TCountersExtractExtension* Extension_ = nullptr;
-};
-
-TCountersExtractExtension::TCountersExtractExtension(const TParams& params, IApi* api)
+ IApi* Api_ = nullptr;
+};
+
+class TCountersExtractor {
+public:
+ TStatCounters Extract() {
+ return Extension_->Pull();
+ }
+
+ void Register(TCountersExtractExtension* extension) {
+ Extension_ = extension;
+ }
+
+private:
+ TCountersExtractExtension* Extension_ = nullptr;
+};
+
+TCountersExtractExtension::TCountersExtractExtension(const TParams& params, IApi* api)
: MetricRegistry_(new NMonitoring::TMetricRegistry())
, Api_(api)
{
api->SetMetricRegistry(MetricRegistry_.get());
params.Extractor->Register(this);
}
-
-using namespace NYdb::NTable;
-
-static const TDuration OPERATION_TIMEOUT = TDuration::Seconds(NValgrind::PlainOrUnderValgrind(5, 100));
-
-static NYdb::TStatus SimpleSelect(TSession session, const TString& query) {
- auto txControl = NYdb::NTable::TTxControl::BeginTx().CommitTx();
- auto settings = TExecDataQuerySettings().KeepInQueryCache(true).OperationTimeout(OPERATION_TIMEOUT);
- auto result = session.ExecuteDataQuery(query, txControl, settings).GetValueSync();
- return result;
-}
-
-Y_UNIT_TEST_SUITE(ClientStatsCollector) {
- Y_UNIT_TEST(CounterCacheMiss) {
- NYdb::TKikimrWithGrpcAndRootSchema server;
- auto endpoint = TStringBuilder() << "localhost:" << server.GetPort();
- NYdb::TDriver driver(NYdb::TDriverConfig().SetEndpoint(endpoint));
- TCountersExtractor extractor;
- driver.AddExtension<TCountersExtractExtension>(TCountersExtractExtension::TParams().SetExtractor(&extractor));
- NYdb::NTable::TTableClient client(driver);
-
- auto createSessionResult = client.GetSession(TCreateSessionSettings().ClientTimeout(OPERATION_TIMEOUT)).GetValueSync();
- UNIT_ASSERT(createSessionResult.IsSuccess());
-
- auto session = createSessionResult.GetSession();
- UNIT_ASSERT(SimpleSelect(session, "SELECT 1;").IsSuccess());
-
- TStatCounters counters = extractor.Extract();
- UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 1);
-
- UNIT_ASSERT(SimpleSelect(session, "SELECT 1;").IsSuccess());
-
- counters = extractor.Extract();
- UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 1);
-
- UNIT_ASSERT(SimpleSelect(session, "SELECT 2;").IsSuccess());
-
- counters = extractor.Extract();
- UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 2);
-
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(CounterRetryOperation) {
- NYdb::TKikimrWithGrpcAndRootSchema server;
- auto endpoint = TStringBuilder() << "localhost:" << server.GetPort();
- NYdb::TDriver driver(NYdb::TDriverConfig().SetEndpoint(endpoint));
- TCountersExtractor extractor;
- driver.AddExtension<TCountersExtractExtension>(TCountersExtractExtension::TParams().SetExtractor(&extractor));
- NYdb::NTable::TTableClient client(driver);
-
- auto retrySettings = TRetryOperationSettings().GetSessionClientTimeout(OPERATION_TIMEOUT);
-
- UNIT_ASSERT(client.RetryOperationSync([](TSession session){
- auto desc = TTableBuilder()
- .AddNullableColumn("id", NYdb::EPrimitiveType::Uint64)
- .AddNullableColumn("name", NYdb::EPrimitiveType::Utf8)
- .SetPrimaryKeyColumn("id")
- .Build();
-
- auto settings = NYdb::NTable::TCreateTableSettings().OperationTimeout(TDuration::Seconds(100));
- return session.CreateTable("Root/names", std::move(desc), settings).GetValueSync();
- }, retrySettings).IsSuccess());
-
- auto settings = TExecDataQuerySettings().OperationTimeout(OPERATION_TIMEOUT);
- auto txSettings = TBeginTxSettings().OperationTimeout(OPERATION_TIMEOUT);
-
- auto upsertOperation = [&client, settings, retrySettings] {
- UNIT_ASSERT(client.RetryOperationSync([settings](TSession session){
- auto query = Sprintf(R"(
- UPSERT into [Root/names] (id, name) VALUES (1, "Alex");
- )");
- return session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), settings).GetValueSync();
- }, retrySettings).IsSuccess());
- };
-
- const ui64 retriesCount = 2;
- auto retrySelectSettings = TRetryOperationSettings().MaxRetries(retriesCount).GetSessionClientTimeout(OPERATION_TIMEOUT);
-
- UNIT_ASSERT(client.RetryOperationSync([&upsertOperation, settings, txSettings](TSession session){
- auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW(), txSettings).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(beginResult.IsSuccess(), true);
-
- auto tx = beginResult.GetTransaction();
- auto query = Sprintf(R"(
- SELECT * FROM [Root/names];
+
+using namespace NYdb::NTable;
+
+static const TDuration OPERATION_TIMEOUT = TDuration::Seconds(NValgrind::PlainOrUnderValgrind(5, 100));
+
+static NYdb::TStatus SimpleSelect(TSession session, const TString& query) {
+ auto txControl = NYdb::NTable::TTxControl::BeginTx().CommitTx();
+ auto settings = TExecDataQuerySettings().KeepInQueryCache(true).OperationTimeout(OPERATION_TIMEOUT);
+ auto result = session.ExecuteDataQuery(query, txControl, settings).GetValueSync();
+ return result;
+}
+
+Y_UNIT_TEST_SUITE(ClientStatsCollector) {
+ Y_UNIT_TEST(CounterCacheMiss) {
+ NYdb::TKikimrWithGrpcAndRootSchema server;
+ auto endpoint = TStringBuilder() << "localhost:" << server.GetPort();
+ NYdb::TDriver driver(NYdb::TDriverConfig().SetEndpoint(endpoint));
+ TCountersExtractor extractor;
+ driver.AddExtension<TCountersExtractExtension>(TCountersExtractExtension::TParams().SetExtractor(&extractor));
+ NYdb::NTable::TTableClient client(driver);
+
+ auto createSessionResult = client.GetSession(TCreateSessionSettings().ClientTimeout(OPERATION_TIMEOUT)).GetValueSync();
+ UNIT_ASSERT(createSessionResult.IsSuccess());
+
+ auto session = createSessionResult.GetSession();
+ UNIT_ASSERT(SimpleSelect(session, "SELECT 1;").IsSuccess());
+
+ TStatCounters counters = extractor.Extract();
+ UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 1);
+
+ UNIT_ASSERT(SimpleSelect(session, "SELECT 1;").IsSuccess());
+
+ counters = extractor.Extract();
+ UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 1);
+
+ UNIT_ASSERT(SimpleSelect(session, "SELECT 2;").IsSuccess());
+
+ counters = extractor.Extract();
+ UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 2);
+
+ driver.Stop(true);
+ }
+
+ Y_UNIT_TEST(CounterRetryOperation) {
+ NYdb::TKikimrWithGrpcAndRootSchema server;
+ auto endpoint = TStringBuilder() << "localhost:" << server.GetPort();
+ NYdb::TDriver driver(NYdb::TDriverConfig().SetEndpoint(endpoint));
+ TCountersExtractor extractor;
+ driver.AddExtension<TCountersExtractExtension>(TCountersExtractExtension::TParams().SetExtractor(&extractor));
+ NYdb::NTable::TTableClient client(driver);
+
+ auto retrySettings = TRetryOperationSettings().GetSessionClientTimeout(OPERATION_TIMEOUT);
+
+ UNIT_ASSERT(client.RetryOperationSync([](TSession session){
+ auto desc = TTableBuilder()
+ .AddNullableColumn("id", NYdb::EPrimitiveType::Uint64)
+ .AddNullableColumn("name", NYdb::EPrimitiveType::Utf8)
+ .SetPrimaryKeyColumn("id")
+ .Build();
+
+ auto settings = NYdb::NTable::TCreateTableSettings().OperationTimeout(TDuration::Seconds(100));
+ return session.CreateTable("Root/names", std::move(desc), settings).GetValueSync();
+ }, retrySettings).IsSuccess());
+
+ auto settings = TExecDataQuerySettings().OperationTimeout(OPERATION_TIMEOUT);
+ auto txSettings = TBeginTxSettings().OperationTimeout(OPERATION_TIMEOUT);
+
+ auto upsertOperation = [&client, settings, retrySettings] {
+ UNIT_ASSERT(client.RetryOperationSync([settings](TSession session){
+ auto query = Sprintf(R"(
+ UPSERT into [Root/names] (id, name) VALUES (1, "Alex");
+ )");
+ return session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), settings).GetValueSync();
+ }, retrySettings).IsSuccess());
+ };
+
+ const ui64 retriesCount = 2;
+ auto retrySelectSettings = TRetryOperationSettings().MaxRetries(retriesCount).GetSessionClientTimeout(OPERATION_TIMEOUT);
+
+ UNIT_ASSERT(client.RetryOperationSync([&upsertOperation, settings, txSettings](TSession session){
+ auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW(), txSettings).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(beginResult.IsSuccess(), true);
+
+ auto tx = beginResult.GetTransaction();
+ auto query = Sprintf(R"(
+ SELECT * FROM [Root/names];
UPSERT INTO [Root/names] (id, name) VALUES (2, "Bob");
- )");
- auto queryResult = session.ExecuteDataQuery(query, TTxControl::Tx(tx), settings).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(queryResult.IsSuccess(), true);
-
- upsertOperation();
-
- return tx.Commit().GetValueSync();
- }, retrySelectSettings).IsSuccess() == false);
-
- TStatCounters counters = extractor.Extract();
+ )");
+ auto queryResult = session.ExecuteDataQuery(query, TTxControl::Tx(tx), settings).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(queryResult.IsSuccess(), true);
+
+ upsertOperation();
+
+ return tx.Commit().GetValueSync();
+ }, retrySelectSettings).IsSuccess() == false);
+
+ TStatCounters counters = extractor.Extract();
UNIT_ASSERT_VALUES_EQUAL(counters.RetryOperationDueAborted, retriesCount);
-
- UNIT_ASSERT(client.RetryOperation([&upsertOperation, settings, txSettings](TSession session){
- auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW(), txSettings).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(beginResult.IsSuccess(), true);
-
- auto tx = beginResult.GetTransaction();
- auto query = Sprintf(R"(
- SELECT * FROM [Root/names];
+
+ UNIT_ASSERT(client.RetryOperation([&upsertOperation, settings, txSettings](TSession session){
+ auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW(), txSettings).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(beginResult.IsSuccess(), true);
+
+ auto tx = beginResult.GetTransaction();
+ auto query = Sprintf(R"(
+ SELECT * FROM [Root/names];
UPSERT INTO [Root/names] (id, name) VALUES (2, "Bob");
- )");
- auto queryResult = session.ExecuteDataQuery(query, TTxControl::Tx(tx), settings).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(queryResult.IsSuccess(), true);
-
- upsertOperation();
-
+ )");
+ auto queryResult = session.ExecuteDataQuery(query, TTxControl::Tx(tx), settings).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(queryResult.IsSuccess(), true);
+
+ upsertOperation();
+
return tx.Commit().Apply([](const auto& future) {
return NThreading::MakeFuture<NYdb::TStatus>(future.GetValue());
});
- }, retrySelectSettings).GetValueSync().IsSuccess() == false);
-
- counters = extractor.Extract();
+ }, retrySelectSettings).GetValueSync().IsSuccess() == false);
+
+ counters = extractor.Extract();
// cumulative counter
UNIT_ASSERT_VALUES_EQUAL(counters.RetryOperationDueAborted, retriesCount * 2);
-
- driver.Stop(true);
- }
+
+ driver.Stop(true);
+ }
Y_UNIT_TEST(ExternalMetricRegistryByRawPtr) {
NMonitoring::TMetricRegistry sensorsRegistry;
@@ -382,4 +382,4 @@ Y_UNIT_TEST_SUITE(ClientStatsCollector) {
Y_UNIT_TEST(ExternalMetricRegistryStdSharedPtr) {
TestExternalMetricRegistry<std::shared_ptr>();
}
-}
+}