aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-06-30 13:53:02 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-06-30 13:53:02 +0300
commit000a0828a30804aaca971eb519441c85b85d51fb (patch)
tree93d78b849a88b3e789cc6697e8a9cfa7a15d1de4
parentbc0728ad8ee7a69eb70e90729bf79c94493fca9b (diff)
downloadydb-000a0828a30804aaca971eb519441c85b85d51fb.tar.gz
New DB Access interface
ref:b5d2fa8f94a8cb4453ba9d8a8b681921ae8e3fc6
-rw-r--r--ydb/core/yq/libs/control_plane_storage/events/events.h16
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp11
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h112
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp135
-rw-r--r--ydb/core/yq/libs/events/event_ids.h1
-rw-r--r--ydb/core/yq/libs/events/events.h9
-rw-r--r--ydb/core/yq/libs/shared_resources/CMakeLists.txt2
-rw-r--r--ydb/core/yq/libs/shared_resources/db_exec.cpp9
-rw-r--r--ydb/core/yq/libs/shared_resources/db_exec.h241
-rw-r--r--ydb/core/yq/libs/shared_resources/db_pool.cpp35
-rw-r--r--ydb/core/yq/libs/shared_resources/db_pool.h8
12 files changed, 444 insertions, 137 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h
index 0257143f32..43a3151f03 100644
--- a/ydb/core/yq/libs/control_plane_storage/events/events.h
+++ b/ydb/core/yq/libs/control_plane_storage/events/events.h
@@ -252,6 +252,11 @@ struct TEvControlPlaneStorage {
{
}
+ explicit TControlPlaneResponse(const ProtoMessage& result, const NYql::TIssues& issues)
+ : Result(result), Issues(issues)
+ {
+ }
+
size_t GetByteSize() const {
return sizeof(*this)
+ Result.ByteSizeLong()
@@ -275,6 +280,11 @@ struct TEvControlPlaneStorage {
: TControlPlaneResponse<TControlPlaneNonAuditableResponse<ProtoMessage, EventType>, ProtoMessage, EventType>(issues)
{
}
+
+ explicit TControlPlaneNonAuditableResponse(const ProtoMessage& result, const NYql::TIssues& issues)
+ : TControlPlaneResponse<TControlPlaneNonAuditableResponse<ProtoMessage, EventType>, ProtoMessage, EventType>(result, issues)
+ {
+ }
};
template<typename ProtoMessage, typename AuditMessage, ui32 EventType>
@@ -291,6 +301,12 @@ struct TEvControlPlaneStorage {
{
}
+ explicit TControlPlaneAuditableResponse(const ProtoMessage& result, const NYql::TIssues& issues, const TAuditDetails<AuditMessage>& auditDetails)
+ : TControlPlaneResponse<TControlPlaneAuditableResponse<ProtoMessage, AuditMessage, EventType>, ProtoMessage, EventType>(result, issues)
+ , AuditDetails(auditDetails)
+ {
+ }
+
size_t GetByteSize() const {
return TControlPlaneResponse<TControlPlaneAuditableResponse<ProtoMessage, AuditMessage, EventType>, ProtoMessage, EventType>::GetByteSize()
+ AuditDetails.GetByteSize();
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index b5c14ced33..da842c919a 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -35,8 +35,8 @@ void TYdbControlPlaneStorageActor::Bootstrap() {
CPS_LOG_I("Starting ydb control plane storage service. Actor id: " << SelfId());
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YQ_CONTROL_PLANE_STORAGE_PROVIDER));
- DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10);
YdbConnection = NewYdbConnection(Config.Proto.GetStorage(), CredProviderFactory, YqSharedResources->CoreYdbDriver);
+ DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10, YdbConnection->TablePathPrefix);
CreateDirectory();
CreateQueriesTable();
CreatePendingSmallTable();
@@ -252,7 +252,7 @@ bool TYdbControlPlaneStorageActor::IsSuperUser(const TString& user)
});
}
-void TYdbControlPlaneStorageActor::InsertIdempotencyKey(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey, const TString& response, const TInstant& expireAt) {
+void InsertIdempotencyKey(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey, const TString& response, const TInstant& expireAt) {
if (idempotencyKey) {
builder.AddString("scope", scope);
builder.AddString("idempotency_key", idempotencyKey);
@@ -265,7 +265,7 @@ void TYdbControlPlaneStorageActor::InsertIdempotencyKey(TSqlQueryBuilder& builde
}
}
-void TYdbControlPlaneStorageActor::ReadIdempotencyKeyQuery(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey) {
+void ReadIdempotencyKeyQuery(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey) {
if (idempotencyKey) {
builder.AddString("scope", scope);
builder.AddString("idempotency_key", idempotencyKey);
@@ -314,6 +314,11 @@ public:
}
};
+TAsyncStatus ExecDbRequest(TDbPool::TPtr dbPool, std::function<NYdb::TAsyncStatus(NYdb::NTable::TSession&)> handler) {
+ TPromise<NYdb::TStatus> promise = NewPromise<NYdb::TStatus>();
+ TActivationContext::Register(new TDbRequest(dbPool, promise, handler));
+ return promise.GetFuture();
+}
std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> TYdbControlPlaneStorageActor::Read(
const TString& query,
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index adaba3b58f..e7ed209cba 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -33,10 +33,12 @@
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
#include <ydb/core/yq/libs/control_plane_storage/proto/yq_internal.pb.h>
#include <ydb/core/yq/libs/db_schema/db_schema.h>
+#include <ydb/core/yq/libs/events/events.h>
#include <ydb/core/yq/libs/quota_manager/events/events.h>
#include <ydb/core/yq/libs/ydb/util.h>
#include <ydb/core/yq/libs/ydb/ydb.h>
+
namespace NYq {
using namespace NActors;
@@ -87,10 +89,79 @@ inline static bool HasManageAccess(TPermissions permissions, YandexQuery::Acl::V
return HasAccessImpl(permissions, entityVisibility, entityUser, user, TPermissions::MANAGE_PRIVATE, TPermissions::MANAGE_PUBLIC);
}
+TAsyncStatus ExecDbRequest(TDbPool::TPtr dbPool, std::function<NYdb::TAsyncStatus(NYdb::NTable::TSession&)> handler);
+
LWTRACE_USING(YQ_CONTROL_PLANE_STORAGE_PROVIDER);
using TRequestCountersPtr = TIntrusivePtr<TRequestCounters>;
+ template<typename T>
+ THashMap<TString, T> GetEntitiesWithVisibilityPriority(const TResultSet& resultSet, const TString& columnName)
+ {
+ THashMap<TString, T> entities;
+ TResultSetParser parser(resultSet);
+ while (parser.TryNextRow()) {
+ T entity;
+ Y_VERIFY(entity.ParseFromString(*parser.ColumnParser(columnName).GetOptionalString()));
+ const TString name = entity.content().name();
+ if (auto it = entities.find(name); it != entities.end()) {
+ const auto visibility = entity.content().acl().visibility();
+ if (visibility == YandexQuery::Acl::PRIVATE) {
+ entities[name] = std::move(entity);
+ }
+ } else {
+ entities[name] = std::move(entity);
+ }
+ }
+
+ return entities;
+ }
+
+ template<typename T>
+ TVector<T> GetEntities(const TResultSet& resultSet, const TString& columnName)
+ {
+ TVector<T> entities;
+ TResultSetParser parser(resultSet);
+ while (parser.TryNextRow()) {
+ Y_VERIFY(entities.emplace_back().ParseFromString(*parser.ColumnParser(columnName).GetOptionalString()));
+ }
+ return entities;
+ }
+
+void InsertIdempotencyKey(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey, const TString& response, const TInstant& expireAt);
+
+void ReadIdempotencyKeyQuery(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey);
+
+class TRequestCountersScope {
+ TRequestCountersPtr Counters;
+public:
+ TRequestCountersScope(TRequestCountersPtr counters, ui64 requestSize) : Counters(counters) {
+ StartTime = TInstant::Now();
+ Counters->InFly->Inc();
+ Counters->RequestBytes->Add(requestSize);
+ }
+
+ void Reply(const NYql::TIssues& issues, ui64 resultSize) {
+ Delta = TInstant::Now() - StartTime;
+ Counters->ResponseBytes->Add(resultSize);
+ Counters->InFly->Dec();
+ Counters->LatencyMs->Collect(Delta.MilliSeconds());
+ if (issues) {
+ Counters->Error->Inc();
+ for (const auto& issue : issues) {
+ NYql::WalkThroughIssues(issue, true, [&counters=Counters](const NYql::TIssue& err, ui16 level) {
+ Y_UNUSED(level);
+ counters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc();
+ });
+ }
+ } else {
+ Counters->Ok->Inc();
+ }
+ }
+ TInstant StartTime;
+ TDuration Delta;
+};
+
class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbControlPlaneStorageActor> {
enum ERequestTypeScope {
RTS_CREATE_QUERY,
@@ -240,7 +311,6 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
THashMap<TString, ui32> QueryQuotas;
THashMap<TString, TEvQuotaService::TQuotaUsageRequest::TPtr> QueryQuotaRequests;
TInstant QuotasUpdatedAt = TInstant::Zero();
- ui32 QuotaGeneration = 0;
bool QuotasUpdating = false;
public:
@@ -290,7 +360,7 @@ public:
hFunc(TEvControlPlaneStorage::TEvNodesHealthCheckRequest, Handle);
hFunc(NMon::TEvHttpInfo, Handle);
hFunc(TEvQuotaService::TQuotaUsageRequest, Handle);
- hFunc(TEvQuotaService::TQuotaUsageResponse, Handle);
+ hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } );
)
void Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev);
@@ -323,7 +393,6 @@ public:
void Handle(TEvControlPlaneStorage::TEvNodesHealthCheckRequest::TPtr& ev);
void Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev);
- void Handle(TEvQuotaService::TQuotaUsageResponse::TPtr& ev);
template<typename T>
NYql::TIssues ValidateConnection(T& ev, bool clickHousePasswordRequire = true)
@@ -385,10 +454,6 @@ private:
*/
bool IsSuperUser(const TString& user);
- void InsertIdempotencyKey(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey, const TString& response, const TInstant& expireAt);
-
- void ReadIdempotencyKeyQuery(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey);
-
std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> Read(
const TString& query,
const NYdb::TParams& params,
@@ -426,39 +491,6 @@ private:
TTxSettings transactionMode = TTxSettings::SerializableRW(),
bool retryOnTli = true);
- template<typename T>
- THashMap<TString, T> GetEntitiesWithVisibilityPriority(const TResultSet& resultSet, const TString& columnName)
- {
- THashMap<TString, T> entities;
- TResultSetParser parser(resultSet);
- while (parser.TryNextRow()) {
- T entity;
- Y_VERIFY(entity.ParseFromString(*parser.ColumnParser(columnName).GetOptionalString()));
- const TString name = entity.content().name();
- if (auto it = entities.find(name); it != entities.end()) {
- const auto visibility = entity.content().acl().visibility();
- if (visibility == YandexQuery::Acl::PRIVATE) {
- entities[name] = std::move(entity);
- }
- } else {
- entities[name] = std::move(entity);
- }
- }
-
- return entities;
- }
-
- template<typename T>
- TVector<T> GetEntities(const TResultSet& resultSet, const TString& columnName)
- {
- TVector<T> entities;
- TResultSetParser parser(resultSet);
- while (parser.TryNextRow()) {
- Y_VERIFY(entities.emplace_back().ParseFromString(*parser.ColumnParser(columnName).GetOptionalString()));
- }
- return entities;
- }
-
template<class ResponseEvent, class Result, class RequestEventPtr>
TFuture<bool> SendResponse(const TString& name,
NActors::TActorSystem* actorSystem,
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index 9eec713d25..0b2ba99384 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -14,6 +14,8 @@
#include <ydb/public/api/protos/yq.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
+#include <ydb/core/yq/libs/shared_resources/db_exec.h>
+
#include <util/digest/multi.h>
namespace {
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
index 8cf0490da0..1dcfd05456 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp
@@ -17,6 +17,8 @@
#include <util/digest/multi.h>
+#include <ydb/core/yq/libs/shared_resources/db_exec.h>
+
namespace NYq {
struct TCloudAggregator {
@@ -24,6 +26,8 @@ struct TCloudAggregator {
TAtomic Finished = 0;
};
+using TQuotaCountExecuter = TDbExecuter<THashMap<TString, ui32>>;
+
void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev)
{
if (ev->Get()->SubjectType != SUBJECT_TYPE_CLOUD || ev->Get()->MetricName != QUOTA_COUNT_LIMIT) {
@@ -41,97 +45,60 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T
}
QuotasUpdating = true;
- QuotaGeneration++;
QueryQuotas.clear();
- TRequestCountersPtr requestCounters = Counters.GetCommonCounters(RTS_QUOTA_USAGE);
-
- TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "CountPendingQueries");
-
- queryBuilder.AddText(
- "SELECT `" SCOPE_COLUMN_NAME "`, COUNT(`" SCOPE_COLUMN_NAME "`) AS PENDING_COUNT\n"
- "FROM `" PENDING_SMALL_TABLE_NAME "`\n"
- "GROUP BY `" SCOPE_COLUMN_NAME "`\n"
- );
-
- const auto query = queryBuilder.Build();
- auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
-
- auto aggregator = std::make_shared<TCloudAggregator>();
-
- result.Apply([=, resultSets=resultSets, generation=QuotaGeneration](const auto& future) {
- try {
- TStatus status = future.GetValue();
- if (status.IsSuccess() && resultSets->size() == 1) {
- TResultSetParser parser(resultSets->front());
- while (parser.TryNextRow()) {
- auto scope = *parser.ColumnParser(SCOPE_COLUMN_NAME).GetOptionalString();
- auto count = parser.ColumnParser("PENDING_COUNT").GetUint64();
- TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "GetQueryCloudId");
-
- queryBuilder.AddText(
- "SELECT `" INTERNAL_COLUMN_NAME "`\n"
- "FROM `" QUERIES_TABLE_NAME "`\n"
- "WHERE `" SCOPE_COLUMN_NAME "` = $scope LIMIT 1;\n"
- );
-
- queryBuilder.AddString("scope", scope);
-
- const auto query = queryBuilder.Build();
- auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
-
- AtomicIncrement(aggregator->Started);
- auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo);
-
- result.Apply([=, resultSets=resultSets](const auto& future) {
- try {
- TStatus status = future.GetValue();
- if (status.IsSuccess() && resultSets->size() == 1) {
- TResultSetParser parser(resultSets->front());
- if (parser.TryNextRow()) {
- YandexQuery::Internal::QueryInternal internal;
- if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support";
- }
- Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, internal.cloud_id(), QUOTA_COUNT_LIMIT, count), 0, generation);
- }
- }
- AtomicIncrement(aggregator->Finished);
- if (AtomicGet(aggregator->Started) == AtomicGet(aggregator->Finished)) {
- Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, "", QUOTA_COUNT_LIMIT, 0), 0, generation);
- }
- } catch (...) {
- // Cerr << "EX2 " << CurrentExceptionMessage() << Endl;
+ TDbExecutable::TPtr executable;
+ auto& executer = TQuotaCountExecuter::Create(executable, [](TQuotaCountExecuter& executer) { executer.State.clear(); } );
+
+ executer.Read(
+ [=](TQuotaCountExecuter&, TSqlQueryBuilder& builder) {
+ builder.AddText(
+ "SELECT `" SCOPE_COLUMN_NAME "`, COUNT(`" SCOPE_COLUMN_NAME "`) AS PENDING_COUNT\n"
+ "FROM `" PENDING_SMALL_TABLE_NAME "`\n"
+ "GROUP BY `" SCOPE_COLUMN_NAME "`\n"
+ );
+ },
+ [=](TQuotaCountExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) {
+ TResultSetParser parser(resultSets.front());
+ while (parser.TryNextRow()) {
+ auto scope = *parser.ColumnParser(SCOPE_COLUMN_NAME).GetOptionalString();
+ auto count = parser.ColumnParser("PENDING_COUNT").GetUint64();
+ executer.Read(
+ [=](TQuotaCountExecuter&, TSqlQueryBuilder& builder) {
+ builder.AddText(
+ "SELECT `" INTERNAL_COLUMN_NAME "`\n"
+ "FROM `" QUERIES_TABLE_NAME "`\n"
+ "WHERE `" SCOPE_COLUMN_NAME "` = $scope LIMIT 1;\n"
+ );
+ builder.AddString("scope", scope);
+ },
+ [=](TQuotaCountExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) {
+ TResultSetParser parser(resultSets.front());
+ if (parser.TryNextRow()) {
+ YandexQuery::Internal::QueryInternal internal;
+ ParseProto(executer, internal, parser, INTERNAL_COLUMN_NAME);
+ executer.State[internal.cloud_id()] += count;
}
- });
- }
- if (AtomicGet(aggregator->Started) == 0) {
- Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, "", QUOTA_COUNT_LIMIT, 0), 0, generation);
- }
+ },
+ "GetScopeCloud_" + scope
+ );
}
- } catch (...) {
- // Cerr << "EX1 " << CurrentExceptionMessage() << Endl;
- }
- });
-}
-
-void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageResponse::TPtr& ev)
-{
- if (ev->Cookie == QuotaGeneration) {
- auto subjectId = ev->Get()->SubjectId;
- if (subjectId == "") {
- QuotasUpdatedAt = Now();
- for (auto& it : QueryQuotaRequests) {
+ },
+ "GroupByPendingSmall"
+ ).Process(SelfId(),
+ [=, this](TQuotaCountExecuter& executer) {
+ this->QuotasUpdatedAt = Now();
+ this->QueryQuotas.swap(executer.State);
+ for (auto& it : this->QueryQuotaRequests) {
auto ev = it.second;
- Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_COUNT_LIMIT, QueryQuotas.Value(it.first, 0)));
+ this->Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_COUNT_LIMIT, this->QueryQuotas.Value(it.first, 0)));
}
- QueryQuotaRequests.clear();
- QuotasUpdating = false;
- } else {
- QueryQuotas[subjectId] += ev->Get()->Usage;
+ this->QueryQuotaRequests.clear();
+ this->QuotasUpdating = false;
}
- }
+ );
+
+ Exec(DbPool, executable);
}
} // NYq
diff --git a/ydb/core/yq/libs/events/event_ids.h b/ydb/core/yq/libs/events/event_ids.h
index 40e97d762b..150eef3ca9 100644
--- a/ydb/core/yq/libs/events/event_ids.h
+++ b/ydb/core/yq/libs/events/event_ids.h
@@ -44,6 +44,7 @@ struct TEventIds {
EvGraphParams,
EvRaiseTransientIssues,
EvSchemaCreated,
+ EvCallback,
// Special events
EvEnd
diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h
index cc2013d827..7cdff0c590 100644
--- a/ydb/core/yq/libs/events/events.h
+++ b/ydb/core/yq/libs/events/events.h
@@ -231,6 +231,15 @@ struct TEvents {
NYdb::TStatus Result;
};
+
+ struct TEvCallback : public NActors::TEventLocal<TEvCallback, TEventIds::EvCallback> {
+ explicit TEvCallback(std::function<void()> callback)
+ : Callback(callback)
+ {
+ }
+
+ std::function<void()> Callback;
+ };
};
} // namespace NYq
diff --git a/ydb/core/yq/libs/shared_resources/CMakeLists.txt b/ydb/core/yq/libs/shared_resources/CMakeLists.txt
index f976580eb3..2aade5a890 100644
--- a/ydb/core/yq/libs/shared_resources/CMakeLists.txt
+++ b/ydb/core/yq/libs/shared_resources/CMakeLists.txt
@@ -17,6 +17,7 @@ target_link_libraries(yq-libs-shared_resources PUBLIC
cpp-actors-core
cpp-monlib-dynamic_counters
ydb-core-protos
+ libs-control_plane_storage-proto
yq-libs-events
libs-shared_resources-interface
ydb-library-logger
@@ -25,6 +26,7 @@ target_link_libraries(yq-libs-shared_resources PUBLIC
cpp-client-ydb_table
)
target_sources(yq-libs-shared_resources PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/shared_resources/db_exec.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/shared_resources/db_pool.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/shared_resources/shared_resources.cpp
)
diff --git a/ydb/core/yq/libs/shared_resources/db_exec.cpp b/ydb/core/yq/libs/shared_resources/db_exec.cpp
new file mode 100644
index 0000000000..91af0f3a0e
--- /dev/null
+++ b/ydb/core/yq/libs/shared_resources/db_exec.cpp
@@ -0,0 +1,9 @@
+#include "db_exec.h"
+
+namespace NYq {
+
+void TDbExecutable::Throw(const TString& message) {
+ Y_UNUSED(message);
+}
+
+} /* namespace NYq */
diff --git a/ydb/core/yq/libs/shared_resources/db_exec.h b/ydb/core/yq/libs/shared_resources/db_exec.h
new file mode 100644
index 0000000000..a2b4343544
--- /dev/null
+++ b/ydb/core/yq/libs/shared_resources/db_exec.h
@@ -0,0 +1,241 @@
+#pragma once
+
+#include "db_pool.h"
+
+#include <ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h>
+#include <ydb/core/yq/libs/db_schema/db_schema.h>
+
+namespace NYq {
+
+class TDbExecutable {
+public:
+ using TPtr = std::shared_ptr<TDbExecutable>;
+
+ TDbExecutable(bool collectDebugInfo = false) {
+ if (collectDebugInfo) {
+ DebugInfo = std::make_shared<TDebugInfo>();
+ }
+ }
+
+ virtual TAsyncStatus Execute(NYdb::NTable::TSession& session) = 0;
+ void Throw(const TString& message);
+
+ TDbPool::TPtr DbPool;
+ std::weak_ptr<TDbExecutable> SelfHolder;
+ NYql::TIssues Issues;
+ NYql::TIssues InternalIssues;
+ TDebugInfoPtr DebugInfo;
+};
+
+template <typename TProto>
+void ParseProto(TDbExecutable& executable, TProto& proto, TResultSetParser& parser, const TString& columnName) {
+ if (!proto.ParseFromString(*parser.ColumnParser(columnName).GetOptionalString()))
+ {
+ executable.Throw(Sprintf("Error parsing proto message %s", proto.GetTypeName().c_str()));
+ }
+}
+
+inline TAsyncStatus Exec(TDbPool::TPtr dbPool, TDbExecutable::TPtr executable) {
+ executable->DbPool = dbPool;
+ executable->SelfHolder = executable;
+ return ExecDbRequest(dbPool, [=](NYdb::NTable::TSession& session) {
+ return executable->Execute(session);
+ });
+}
+
+/*
+ * 1. TDbExecuter must be create like this:
+ *
+ * TDbExecutable::TPtr executable;
+ * auto& executer = TDbExecuter<...>::Create(executable);
+ *
+ * 2. Template param is state struct. It's lifecycle matches executer. It's expected to keep all state
+ * between async db calls.
+ *
+ * 3. TDbExecutable::Read adds read operation to DB access pipeline, Write adds write operation (w/o result
+ * processing). All calls are serialized. There is no concurrency issues in access TDbExecutable::State
+ * from callbacks.
+ *
+ * 4. Final callback (passed in TDbExecutable::Process) is expected to be called from AS thread, so actor
+ * actorId must implement TEvents::TEvCallback handler, f.e.:
+ *
+ * hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } );
+ *
+ * it is safe to access TDbExecutable::State and actor class members from final callback w/o concurrency
+ * issus as well
+ *
+ * 5. All pipeline is retried automatically. If TDbExecutable::State need to be cleaned up from failed run,
+ * pass handler as stateInitCallback to TDbExecutable ctor.
+ */
+
+template <typename TState>
+class TDbExecuter : public TDbExecutable {
+
+ struct TExecStep {
+ std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> BuilderCallback;
+ std::function<void(TDbExecuter<TState>&, const TVector<NYdb::TResultSet>&)> HandlerCallback;
+ std::function<void(TDbExecuter<TState>&)> ProcessCallback;
+ TString Name;
+ bool Commit = false;
+ };
+ std::vector<TExecStep> Steps;
+ ui32 CurrentStepIndex = 0;
+ ui32 InsertStepIndex = 0;
+ NActors::TActorId HandlerActorId;
+ TMaybe<TTransaction> Transaction;
+ NActors::TActorSystem* ActorSystem = nullptr;
+ std::function<void(TDbExecuter<TState>&)> HandlerCallback;
+ std::function<void(TDbExecuter<TState>&)> StateInitCallback;
+ bool skipStep = false;
+
+protected:
+ TDbExecuter(bool collectDebugInfo, std::function<void(TDbExecuter<TState>&)> stateInitCallback = nullptr)
+ : TDbExecutable(collectDebugInfo), StateInitCallback(stateInitCallback) {
+ }
+
+ TDbExecuter(const TDbExecuter& other) = delete;
+
+public:
+ virtual ~TDbExecuter() {
+ };
+
+ static TDbExecuter& Create(TDbExecutable::TPtr& holder, bool collectDebugInfo = false) {
+ auto executer = new TDbExecuter(collectDebugInfo);
+ holder.reset(executer);
+ return *executer;
+ };
+
+ void SkipStep() {
+ skipStep = true;
+ }
+
+ TAsyncStatus NextStep(NYdb::NTable::TSession session) {
+
+ if (CurrentStepIndex == Steps.size()) {
+ if (Transaction) {
+ auto transaction = *Transaction;
+ Transaction.Clear();
+ return transaction.Commit()
+ .Apply([this, session=session](const TFuture<TCommitTransactionResult>& future) {
+
+ TCommitTransactionResult result = future.GetValue();
+ auto status = static_cast<TStatus>(result);
+
+ return this->NextStep(session);
+ });
+ }
+ if (HandlerActorId != NActors::TActorId{}) {
+ auto holder = SelfHolder.lock();
+ if (holder) {
+ ActorSystem->Send(HandlerActorId, new TEvents::TEvCallback([this, holder=holder, handlerCallback=HandlerCallback]() {
+ handlerCallback(*this);
+ }));
+ }
+ }
+ return MakeFuture(TStatus{EStatus::SUCCESS, NYql::TIssues{}});
+ } else {
+ TSqlQueryBuilder builder(DbPool->TablePathPrefix, Steps[CurrentStepIndex].Name);
+ skipStep = false;
+ Steps[CurrentStepIndex].BuilderCallback(*this, builder);
+
+ if (skipStep) { // TODO Refactor this
+ this->CurrentStepIndex++;
+ return this->NextStep(session);
+ }
+
+ const auto query = builder.Build();
+ auto transaction = Transaction ? TTxControl::Tx(*Transaction) : TTxControl::BeginTx(TTxSettings::SerializableRW());
+ if (Steps[CurrentStepIndex].Commit) {
+ transaction = transaction.CommitTx();
+ }
+
+ return session.ExecuteDataQuery(query.Sql, transaction, query.Params, NYdb::NTable::TExecDataQuerySettings().KeepInQueryCache(true))
+ .Apply([this, session=session](const TFuture<TDataQueryResult>& future) {
+
+ NYdb::NTable::TDataQueryResult result = future.GetValue();
+ auto status = static_cast<TStatus>(result);
+
+ if (status.GetStatus() == EStatus::SCHEME_ERROR) { // retry if table does not exist
+ this->Transaction.Clear();
+ return MakeFuture(TStatus{EStatus::UNAVAILABLE, NYql::TIssues{status.GetIssues()}});
+ }
+ if (!status.IsSuccess()) {
+ this->Transaction.Clear();
+ return MakeFuture(status);
+ }
+
+ if (this->Steps[CurrentStepIndex].Commit) {
+ this->Transaction.Clear();
+ } else if (!this->Transaction) {
+ this->Transaction = result.GetTransaction();
+ }
+
+ if (this->Steps[CurrentStepIndex].HandlerCallback) {
+ try {
+ this->Steps[CurrentStepIndex].HandlerCallback(*this, result.GetResultSets());
+ } catch (const TControlPlaneStorageException& exception) {
+ NYql::TIssue issue = MakeErrorIssue(exception.Code, exception.GetRawMessage());
+ Issues.AddIssue(issue);
+ NYql::TIssue internalIssue = MakeErrorIssue(exception.Code, CurrentExceptionMessage());
+ InternalIssues.AddIssue(internalIssue);
+ } catch (const std::exception& exception) {
+ NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, exception.what());
+ Issues.AddIssue(issue);
+ NYql::TIssue internalIssue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage());
+ InternalIssues.AddIssue(internalIssue);
+ } catch (...) {
+ NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage());
+ Issues.AddIssue(issue);
+ NYql::TIssue internalIssue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage());
+ InternalIssues.AddIssue(internalIssue);
+ }
+ }
+
+ this->CurrentStepIndex++;
+ return this->NextStep(session);
+ });
+ }
+ }
+
+ TAsyncStatus Execute(NYdb::NTable::TSession& session) override {
+ if (StateInitCallback) {
+ StateInitCallback(*this);
+ } else {
+ State = TState{};
+ }
+ return NextStep(session);
+ }
+
+ TDbExecuter& Read(
+ std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> builderCallback
+ , std::function<void(TDbExecuter<TState>&, const TVector<NYdb::TResultSet>&)> handlerCallback
+ , const TString& Name = "DefaultReadName"
+ , bool commit = false
+ ) {
+ Steps.emplace(Steps.begin() + InsertStepIndex, TExecStep{builderCallback, handlerCallback, nullptr, Name, commit});
+ InsertStepIndex++;
+ return *this;
+ }
+
+ TDbExecuter& Write(
+ std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> builderCallback
+ , const TString& Name = "DefaultWriteName"
+ , bool commit = false
+ ) {
+ return Read(builderCallback, nullptr, Name, commit);
+ }
+
+ void Process(
+ NActors::TActorId actorId
+ , std::function<void(TDbExecuter<TState>&)> handlerCallback
+ ) {
+ Y_VERIFY(HandlerActorId == NActors::TActorId{}, "Handler must be empty");
+ ActorSystem = TActivationContext::ActorSystem();
+ HandlerActorId = actorId;
+ HandlerCallback = handlerCallback;
+ }
+
+ TState State;
+};
+
+} /* NYq */
diff --git a/ydb/core/yq/libs/shared_resources/db_pool.cpp b/ydb/core/yq/libs/shared_resources/db_pool.cpp
index 372dfd72cd..1e2abadd6d 100644
--- a/ydb/core/yq/libs/shared_resources/db_pool.cpp
+++ b/ydb/core/yq/libs/shared_resources/db_pool.cpp
@@ -6,11 +6,20 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/core/yq/libs/actors/logging/log.h>
+
#include <util/stream/file.h>
#include <util/string/strip.h>
-#define LOG_E(stream) \
- LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, stream)
+#define LOG_F(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, EMERG, STREAMS, logRecordStream)
+#define LOG_A(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, ALERT, STREAMS, logRecordStream)
+#define LOG_C(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, CRIT, STREAMS, logRecordStream)
+#define LOG_E(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, ERROR, STREAMS, logRecordStream)
+#define LOG_W(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, WARN, STREAMS, logRecordStream)
+#define LOG_N(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, NOTICE, STREAMS, logRecordStream)
+#define LOG_I(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, INFO, STREAMS, logRecordStream)
+#define LOG_D(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, DEBUG, STREAMS, logRecordStream)
+#define LOG_T(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, TRACE, STREAMS, logRecordStream)
namespace NYq {
@@ -66,6 +75,8 @@ public:
RequestInProgressTimestamp = TInstant::Now();
const auto& requestVariant = Requests.front();
+ LOG_D(ctx, "TDbPoolActor: ProcessQueue " << SelfId() << " Queue size = " << Requests.size());
+
if (auto pRequest = std::get_if<TRequest>(&requestVariant)) {
auto& request = *pRequest;
auto actorSystem = ctx.ActorSystem();
@@ -86,6 +97,8 @@ public:
.Subscribe([state = std::weak_ptr<int>(State), sharedResult, actorSystem, cookie, selfId](const NThreading::TFuture<NYdb::TStatus>& statusFuture) {
if (state.lock()) {
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvents::TEvDbResponse(statusFuture.GetValue(), *sharedResult), 0, cookie));
+ } else {
+ LOG_D(*actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed");
}
});
} else if (auto pRequest = std::get_if<TFunctionRequest>(&requestVariant)) {
@@ -99,12 +112,15 @@ public:
.Subscribe([state = std::weak_ptr<int>(State), actorSystem, selfId, cookie](const NThreading::TFuture<NYdb::TStatus>& statusFuture) {
if (state.lock()) {
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvents::TEvDbFunctionResponse(statusFuture.GetValue()), 0, cookie));
+ } else {
+ LOG_D(*actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed");
}
});
}
}
void HandleRequest(TEvents::TEvDbRequest::TPtr& ev, const TActorContext& ctx) {
+ LOG_D(ctx, "TDbPoolActor: TEvDbRequest " << SelfId() << " Queue size = " << Requests.size());
auto request = ev->Get();
Requests.emplace_back(TRequest{ev->Sender, ev->Cookie, request->Sql, std::move(request->Params), request->Idempotent});
ProcessQueue(ctx);
@@ -119,18 +135,21 @@ public:
}
void HandleResponse(TEvents::TEvDbResponse::TPtr& ev, const TActorContext& ctx) {
+ LOG_D(ctx, "TDbPoolActor: TEvDbResponse " << SelfId() << " Queue size = " << Requests.size());
const auto& request = Requests.front();
ctx.Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request)));
PopFromQueueAndProcess(ctx);
}
void HandleRequest(TEvents::TEvDbFunctionRequest::TPtr& ev, const TActorContext& ctx) {
+ LOG_D(ctx, "TDbPoolActor: TEvDbFunctionRequest " << SelfId() << " Queue size = " << Requests.size());
auto request = ev->Get();
Requests.emplace_back(TFunctionRequest{ev->Sender, ev->Cookie, std::move(request->Handler)});
ProcessQueue(ctx);
}
void HandleResponse(TEvents::TEvDbFunctionResponse::TPtr& ev, const TActorContext& ctx) {
+ LOG_D(ctx, "TDbPoolActor: TEvDbFunctionResponse " << SelfId() << " Queue size = " << Requests.size());
const auto& request = Requests.front();
ctx.Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request)));
PopFromQueueAndProcess(ctx);
@@ -181,8 +200,10 @@ private:
TDbPool::TDbPool(
ui32 sessionsCount,
const NYdb::NTable::TTableClient& tableClient,
- const NMonitoring::TDynamicCounterPtr& counters)
+ const NMonitoring::TDynamicCounterPtr& counters,
+ const TString& tablePathPrefix)
{
+ TablePathPrefix = tablePathPrefix;
const auto& ctx = NActors::TActivationContext::AsActorContext();
auto parentId = ctx.SelfID;
Actors.reserve(sessionsCount);
@@ -264,11 +285,11 @@ void TDbPoolMap::Reset(const NYq::NConfig::TDbPoolConfig& config) {
TableClient = nullptr;
}
-TDbPool::TPtr TDbPoolHolder::GetOrCreate(EDbPoolId dbPoolId, ui32 sessionsCount) {
- return Pools->GetOrCreate(dbPoolId, sessionsCount);
+TDbPool::TPtr TDbPoolHolder::GetOrCreate(EDbPoolId dbPoolId, ui32 sessionsCount, const TString& tablePathPrefix) {
+ return Pools->GetOrCreate(dbPoolId, sessionsCount, tablePathPrefix);
}
-TDbPool::TPtr TDbPoolMap::GetOrCreate(EDbPoolId dbPoolId, ui32 sessionsCount) {
+TDbPool::TPtr TDbPoolMap::GetOrCreate(EDbPoolId dbPoolId, ui32 sessionsCount, const TString& tablePathPrefix) {
TGuard<TMutex> lock(Mutex);
auto it = Pools.find(dbPoolId);
if (it != Pools.end()) {
@@ -297,7 +318,7 @@ TDbPool::TPtr TDbPoolMap::GetOrCreate(EDbPoolId dbPoolId, ui32 sessionsCount) {
TableClient = MakeHolder<NYdb::NTable::TTableClient>(Driver, clientSettings);
}
- TDbPool::TPtr dbPool = new TDbPool(sessionsCount, *TableClient, Counters);
+ TDbPool::TPtr dbPool = new TDbPool(sessionsCount, *TableClient, Counters, tablePathPrefix);
Pools.emplace(dbPoolId, dbPool);
return dbPool;
}
diff --git a/ydb/core/yq/libs/shared_resources/db_pool.h b/ydb/core/yq/libs/shared_resources/db_pool.h
index 68d9adc2fc..0c7c1242c8 100644
--- a/ydb/core/yq/libs/shared_resources/db_pool.h
+++ b/ydb/core/yq/libs/shared_resources/db_pool.h
@@ -21,10 +21,12 @@ public:
NActors::TActorId GetNextActor();
+ TString TablePathPrefix;
+
private:
friend class TDbPoolMap;
- TDbPool(ui32 sessionsCount, const NYdb::NTable::TTableClient& tableClient, const NMonitoring::TDynamicCounterPtr& counters);
+ TDbPool(ui32 sessionsCount, const NYdb::NTable::TTableClient& tableClient, const NMonitoring::TDynamicCounterPtr& counters, const TString& tablePathPrefix);
TMutex Mutex;
TVector<NActors::TActorId> Actors;
@@ -41,7 +43,7 @@ class TDbPoolMap: public TThrRefBase {
public:
using TPtr = TIntrusivePtr<TDbPoolMap>;
- TDbPool::TPtr GetOrCreate(EDbPoolId poolId, ui32 sessionsCount);
+ TDbPool::TPtr GetOrCreate(EDbPoolId poolId, ui32 sessionsCount, const TString& tablePathPrefix);
private:
friend class TDbPoolHolder;
@@ -73,7 +75,7 @@ public:
~TDbPoolHolder();
void Reset(const NYq::NConfig::TDbPoolConfig& config);
- TDbPool::TPtr GetOrCreate(EDbPoolId poolId, ui32 sessionsCount);
+ TDbPool::TPtr GetOrCreate(EDbPoolId poolId, ui32 sessionsCount, const TString& tablePathPrefix);
NYdb::TDriver& GetDriver();
TDbPoolMap::TPtr Get();