diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-06-30 13:53:02 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-06-30 13:53:02 +0300 |
commit | 000a0828a30804aaca971eb519441c85b85d51fb (patch) | |
tree | 93d78b849a88b3e789cc6697e8a9cfa7a15d1de4 | |
parent | bc0728ad8ee7a69eb70e90729bf79c94493fca9b (diff) | |
download | ydb-000a0828a30804aaca971eb519441c85b85d51fb.tar.gz |
New DB Access interface
ref:b5d2fa8f94a8cb4453ba9d8a8b681921ae8e3fc6
12 files changed, 444 insertions, 137 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h index 0257143f32..43a3151f03 100644 --- a/ydb/core/yq/libs/control_plane_storage/events/events.h +++ b/ydb/core/yq/libs/control_plane_storage/events/events.h @@ -252,6 +252,11 @@ struct TEvControlPlaneStorage { { } + explicit TControlPlaneResponse(const ProtoMessage& result, const NYql::TIssues& issues) + : Result(result), Issues(issues) + { + } + size_t GetByteSize() const { return sizeof(*this) + Result.ByteSizeLong() @@ -275,6 +280,11 @@ struct TEvControlPlaneStorage { : TControlPlaneResponse<TControlPlaneNonAuditableResponse<ProtoMessage, EventType>, ProtoMessage, EventType>(issues) { } + + explicit TControlPlaneNonAuditableResponse(const ProtoMessage& result, const NYql::TIssues& issues) + : TControlPlaneResponse<TControlPlaneNonAuditableResponse<ProtoMessage, EventType>, ProtoMessage, EventType>(result, issues) + { + } }; template<typename ProtoMessage, typename AuditMessage, ui32 EventType> @@ -291,6 +301,12 @@ struct TEvControlPlaneStorage { { } + explicit TControlPlaneAuditableResponse(const ProtoMessage& result, const NYql::TIssues& issues, const TAuditDetails<AuditMessage>& auditDetails) + : TControlPlaneResponse<TControlPlaneAuditableResponse<ProtoMessage, AuditMessage, EventType>, ProtoMessage, EventType>(result, issues) + , AuditDetails(auditDetails) + { + } + size_t GetByteSize() const { return TControlPlaneResponse<TControlPlaneAuditableResponse<ProtoMessage, AuditMessage, EventType>, ProtoMessage, EventType>::GetByteSize() + AuditDetails.GetByteSize(); diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index b5c14ced33..da842c919a 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -35,8 +35,8 @@ void TYdbControlPlaneStorageActor::Bootstrap() { CPS_LOG_I("Starting ydb control plane storage service. Actor id: " << SelfId()); NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YQ_CONTROL_PLANE_STORAGE_PROVIDER)); - DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10); YdbConnection = NewYdbConnection(Config.Proto.GetStorage(), CredProviderFactory, YqSharedResources->CoreYdbDriver); + DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10, YdbConnection->TablePathPrefix); CreateDirectory(); CreateQueriesTable(); CreatePendingSmallTable(); @@ -252,7 +252,7 @@ bool TYdbControlPlaneStorageActor::IsSuperUser(const TString& user) }); } -void TYdbControlPlaneStorageActor::InsertIdempotencyKey(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey, const TString& response, const TInstant& expireAt) { +void InsertIdempotencyKey(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey, const TString& response, const TInstant& expireAt) { if (idempotencyKey) { builder.AddString("scope", scope); builder.AddString("idempotency_key", idempotencyKey); @@ -265,7 +265,7 @@ void TYdbControlPlaneStorageActor::InsertIdempotencyKey(TSqlQueryBuilder& builde } } -void TYdbControlPlaneStorageActor::ReadIdempotencyKeyQuery(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey) { +void ReadIdempotencyKeyQuery(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey) { if (idempotencyKey) { builder.AddString("scope", scope); builder.AddString("idempotency_key", idempotencyKey); @@ -314,6 +314,11 @@ public: } }; +TAsyncStatus ExecDbRequest(TDbPool::TPtr dbPool, std::function<NYdb::TAsyncStatus(NYdb::NTable::TSession&)> handler) { + TPromise<NYdb::TStatus> promise = NewPromise<NYdb::TStatus>(); + TActivationContext::Register(new TDbRequest(dbPool, promise, handler)); + return promise.GetFuture(); +} std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> TYdbControlPlaneStorageActor::Read( const TString& query, diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index adaba3b58f..e7ed209cba 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -33,10 +33,12 @@ #include <ydb/core/yq/libs/control_plane_storage/events/events.h> #include <ydb/core/yq/libs/control_plane_storage/proto/yq_internal.pb.h> #include <ydb/core/yq/libs/db_schema/db_schema.h> +#include <ydb/core/yq/libs/events/events.h> #include <ydb/core/yq/libs/quota_manager/events/events.h> #include <ydb/core/yq/libs/ydb/util.h> #include <ydb/core/yq/libs/ydb/ydb.h> + namespace NYq { using namespace NActors; @@ -87,10 +89,79 @@ inline static bool HasManageAccess(TPermissions permissions, YandexQuery::Acl::V return HasAccessImpl(permissions, entityVisibility, entityUser, user, TPermissions::MANAGE_PRIVATE, TPermissions::MANAGE_PUBLIC); } +TAsyncStatus ExecDbRequest(TDbPool::TPtr dbPool, std::function<NYdb::TAsyncStatus(NYdb::NTable::TSession&)> handler); + LWTRACE_USING(YQ_CONTROL_PLANE_STORAGE_PROVIDER); using TRequestCountersPtr = TIntrusivePtr<TRequestCounters>; + template<typename T> + THashMap<TString, T> GetEntitiesWithVisibilityPriority(const TResultSet& resultSet, const TString& columnName) + { + THashMap<TString, T> entities; + TResultSetParser parser(resultSet); + while (parser.TryNextRow()) { + T entity; + Y_VERIFY(entity.ParseFromString(*parser.ColumnParser(columnName).GetOptionalString())); + const TString name = entity.content().name(); + if (auto it = entities.find(name); it != entities.end()) { + const auto visibility = entity.content().acl().visibility(); + if (visibility == YandexQuery::Acl::PRIVATE) { + entities[name] = std::move(entity); + } + } else { + entities[name] = std::move(entity); + } + } + + return entities; + } + + template<typename T> + TVector<T> GetEntities(const TResultSet& resultSet, const TString& columnName) + { + TVector<T> entities; + TResultSetParser parser(resultSet); + while (parser.TryNextRow()) { + Y_VERIFY(entities.emplace_back().ParseFromString(*parser.ColumnParser(columnName).GetOptionalString())); + } + return entities; + } + +void InsertIdempotencyKey(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey, const TString& response, const TInstant& expireAt); + +void ReadIdempotencyKeyQuery(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey); + +class TRequestCountersScope { + TRequestCountersPtr Counters; +public: + TRequestCountersScope(TRequestCountersPtr counters, ui64 requestSize) : Counters(counters) { + StartTime = TInstant::Now(); + Counters->InFly->Inc(); + Counters->RequestBytes->Add(requestSize); + } + + void Reply(const NYql::TIssues& issues, ui64 resultSize) { + Delta = TInstant::Now() - StartTime; + Counters->ResponseBytes->Add(resultSize); + Counters->InFly->Dec(); + Counters->LatencyMs->Collect(Delta.MilliSeconds()); + if (issues) { + Counters->Error->Inc(); + for (const auto& issue : issues) { + NYql::WalkThroughIssues(issue, true, [&counters=Counters](const NYql::TIssue& err, ui16 level) { + Y_UNUSED(level); + counters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc(); + }); + } + } else { + Counters->Ok->Inc(); + } + } + TInstant StartTime; + TDuration Delta; +}; + class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbControlPlaneStorageActor> { enum ERequestTypeScope { RTS_CREATE_QUERY, @@ -240,7 +311,6 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont THashMap<TString, ui32> QueryQuotas; THashMap<TString, TEvQuotaService::TQuotaUsageRequest::TPtr> QueryQuotaRequests; TInstant QuotasUpdatedAt = TInstant::Zero(); - ui32 QuotaGeneration = 0; bool QuotasUpdating = false; public: @@ -290,7 +360,7 @@ public: hFunc(TEvControlPlaneStorage::TEvNodesHealthCheckRequest, Handle); hFunc(NMon::TEvHttpInfo, Handle); hFunc(TEvQuotaService::TQuotaUsageRequest, Handle); - hFunc(TEvQuotaService::TQuotaUsageResponse, Handle); + hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } ); ) void Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev); @@ -323,7 +393,6 @@ public: void Handle(TEvControlPlaneStorage::TEvNodesHealthCheckRequest::TPtr& ev); void Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev); - void Handle(TEvQuotaService::TQuotaUsageResponse::TPtr& ev); template<typename T> NYql::TIssues ValidateConnection(T& ev, bool clickHousePasswordRequire = true) @@ -385,10 +454,6 @@ private: */ bool IsSuperUser(const TString& user); - void InsertIdempotencyKey(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey, const TString& response, const TInstant& expireAt); - - void ReadIdempotencyKeyQuery(TSqlQueryBuilder& builder, const TString& scope, const TString& idempotencyKey); - std::pair<TAsyncStatus, std::shared_ptr<TVector<NYdb::TResultSet>>> Read( const TString& query, const NYdb::TParams& params, @@ -426,39 +491,6 @@ private: TTxSettings transactionMode = TTxSettings::SerializableRW(), bool retryOnTli = true); - template<typename T> - THashMap<TString, T> GetEntitiesWithVisibilityPriority(const TResultSet& resultSet, const TString& columnName) - { - THashMap<TString, T> entities; - TResultSetParser parser(resultSet); - while (parser.TryNextRow()) { - T entity; - Y_VERIFY(entity.ParseFromString(*parser.ColumnParser(columnName).GetOptionalString())); - const TString name = entity.content().name(); - if (auto it = entities.find(name); it != entities.end()) { - const auto visibility = entity.content().acl().visibility(); - if (visibility == YandexQuery::Acl::PRIVATE) { - entities[name] = std::move(entity); - } - } else { - entities[name] = std::move(entity); - } - } - - return entities; - } - - template<typename T> - TVector<T> GetEntities(const TResultSet& resultSet, const TString& columnName) - { - TVector<T> entities; - TResultSetParser parser(resultSet); - while (parser.TryNextRow()) { - Y_VERIFY(entities.emplace_back().ParseFromString(*parser.ColumnParser(columnName).GetOptionalString())); - } - return entities; - } - template<class ResponseEvent, class Result, class RequestEventPtr> TFuture<bool> SendResponse(const TString& name, NActors::TActorSystem* actorSystem, diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 9eec713d25..0b2ba99384 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -14,6 +14,8 @@ #include <ydb/public/api/protos/yq.pb.h> #include <ydb/public/sdk/cpp/client/ydb_value/value.h> +#include <ydb/core/yq/libs/shared_resources/db_exec.h> + #include <util/digest/multi.h> namespace { diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp index 8cf0490da0..1dcfd05456 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_quotas.cpp @@ -17,6 +17,8 @@ #include <util/digest/multi.h> +#include <ydb/core/yq/libs/shared_resources/db_exec.h> + namespace NYq { struct TCloudAggregator { @@ -24,6 +26,8 @@ struct TCloudAggregator { TAtomic Finished = 0; }; +using TQuotaCountExecuter = TDbExecuter<THashMap<TString, ui32>>; + void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::TPtr& ev) { if (ev->Get()->SubjectType != SUBJECT_TYPE_CLOUD || ev->Get()->MetricName != QUOTA_COUNT_LIMIT) { @@ -41,97 +45,60 @@ void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageRequest::T } QuotasUpdating = true; - QuotaGeneration++; QueryQuotas.clear(); - TRequestCountersPtr requestCounters = Counters.GetCommonCounters(RTS_QUOTA_USAGE); - - TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "CountPendingQueries"); - - queryBuilder.AddText( - "SELECT `" SCOPE_COLUMN_NAME "`, COUNT(`" SCOPE_COLUMN_NAME "`) AS PENDING_COUNT\n" - "FROM `" PENDING_SMALL_TABLE_NAME "`\n" - "GROUP BY `" SCOPE_COLUMN_NAME "`\n" - ); - - const auto query = queryBuilder.Build(); - auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); - - auto aggregator = std::make_shared<TCloudAggregator>(); - - result.Apply([=, resultSets=resultSets, generation=QuotaGeneration](const auto& future) { - try { - TStatus status = future.GetValue(); - if (status.IsSuccess() && resultSets->size() == 1) { - TResultSetParser parser(resultSets->front()); - while (parser.TryNextRow()) { - auto scope = *parser.ColumnParser(SCOPE_COLUMN_NAME).GetOptionalString(); - auto count = parser.ColumnParser("PENDING_COUNT").GetUint64(); - TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "GetQueryCloudId"); - - queryBuilder.AddText( - "SELECT `" INTERNAL_COLUMN_NAME "`\n" - "FROM `" QUERIES_TABLE_NAME "`\n" - "WHERE `" SCOPE_COLUMN_NAME "` = $scope LIMIT 1;\n" - ); - - queryBuilder.AddString("scope", scope); - - const auto query = queryBuilder.Build(); - auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - - AtomicIncrement(aggregator->Started); - auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); - - result.Apply([=, resultSets=resultSets](const auto& future) { - try { - TStatus status = future.GetValue(); - if (status.IsSuccess() && resultSets->size() == 1) { - TResultSetParser parser(resultSets->front()); - if (parser.TryNextRow()) { - YandexQuery::Internal::QueryInternal internal; - if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support"; - } - Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, internal.cloud_id(), QUOTA_COUNT_LIMIT, count), 0, generation); - } - } - AtomicIncrement(aggregator->Finished); - if (AtomicGet(aggregator->Started) == AtomicGet(aggregator->Finished)) { - Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, "", QUOTA_COUNT_LIMIT, 0), 0, generation); - } - } catch (...) { - // Cerr << "EX2 " << CurrentExceptionMessage() << Endl; + TDbExecutable::TPtr executable; + auto& executer = TQuotaCountExecuter::Create(executable, [](TQuotaCountExecuter& executer) { executer.State.clear(); } ); + + executer.Read( + [=](TQuotaCountExecuter&, TSqlQueryBuilder& builder) { + builder.AddText( + "SELECT `" SCOPE_COLUMN_NAME "`, COUNT(`" SCOPE_COLUMN_NAME "`) AS PENDING_COUNT\n" + "FROM `" PENDING_SMALL_TABLE_NAME "`\n" + "GROUP BY `" SCOPE_COLUMN_NAME "`\n" + ); + }, + [=](TQuotaCountExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) { + TResultSetParser parser(resultSets.front()); + while (parser.TryNextRow()) { + auto scope = *parser.ColumnParser(SCOPE_COLUMN_NAME).GetOptionalString(); + auto count = parser.ColumnParser("PENDING_COUNT").GetUint64(); + executer.Read( + [=](TQuotaCountExecuter&, TSqlQueryBuilder& builder) { + builder.AddText( + "SELECT `" INTERNAL_COLUMN_NAME "`\n" + "FROM `" QUERIES_TABLE_NAME "`\n" + "WHERE `" SCOPE_COLUMN_NAME "` = $scope LIMIT 1;\n" + ); + builder.AddString("scope", scope); + }, + [=](TQuotaCountExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) { + TResultSetParser parser(resultSets.front()); + if (parser.TryNextRow()) { + YandexQuery::Internal::QueryInternal internal; + ParseProto(executer, internal, parser, INTERNAL_COLUMN_NAME); + executer.State[internal.cloud_id()] += count; } - }); - } - if (AtomicGet(aggregator->Started) == 0) { - Send(SelfId(), new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, "", QUOTA_COUNT_LIMIT, 0), 0, generation); - } + }, + "GetScopeCloud_" + scope + ); } - } catch (...) { - // Cerr << "EX1 " << CurrentExceptionMessage() << Endl; - } - }); -} - -void TYdbControlPlaneStorageActor::Handle(TEvQuotaService::TQuotaUsageResponse::TPtr& ev) -{ - if (ev->Cookie == QuotaGeneration) { - auto subjectId = ev->Get()->SubjectId; - if (subjectId == "") { - QuotasUpdatedAt = Now(); - for (auto& it : QueryQuotaRequests) { + }, + "GroupByPendingSmall" + ).Process(SelfId(), + [=, this](TQuotaCountExecuter& executer) { + this->QuotasUpdatedAt = Now(); + this->QueryQuotas.swap(executer.State); + for (auto& it : this->QueryQuotaRequests) { auto ev = it.second; - Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_COUNT_LIMIT, QueryQuotas.Value(it.first, 0))); + this->Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(SUBJECT_TYPE_CLOUD, it.first, QUOTA_COUNT_LIMIT, this->QueryQuotas.Value(it.first, 0))); } - QueryQuotaRequests.clear(); - QuotasUpdating = false; - } else { - QueryQuotas[subjectId] += ev->Get()->Usage; + this->QueryQuotaRequests.clear(); + this->QuotasUpdating = false; } - } + ); + + Exec(DbPool, executable); } } // NYq diff --git a/ydb/core/yq/libs/events/event_ids.h b/ydb/core/yq/libs/events/event_ids.h index 40e97d762b..150eef3ca9 100644 --- a/ydb/core/yq/libs/events/event_ids.h +++ b/ydb/core/yq/libs/events/event_ids.h @@ -44,6 +44,7 @@ struct TEventIds { EvGraphParams, EvRaiseTransientIssues, EvSchemaCreated, + EvCallback, // Special events EvEnd diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h index cc2013d827..7cdff0c590 100644 --- a/ydb/core/yq/libs/events/events.h +++ b/ydb/core/yq/libs/events/events.h @@ -231,6 +231,15 @@ struct TEvents { NYdb::TStatus Result; }; + + struct TEvCallback : public NActors::TEventLocal<TEvCallback, TEventIds::EvCallback> { + explicit TEvCallback(std::function<void()> callback) + : Callback(callback) + { + } + + std::function<void()> Callback; + }; }; } // namespace NYq diff --git a/ydb/core/yq/libs/shared_resources/CMakeLists.txt b/ydb/core/yq/libs/shared_resources/CMakeLists.txt index f976580eb3..2aade5a890 100644 --- a/ydb/core/yq/libs/shared_resources/CMakeLists.txt +++ b/ydb/core/yq/libs/shared_resources/CMakeLists.txt @@ -17,6 +17,7 @@ target_link_libraries(yq-libs-shared_resources PUBLIC cpp-actors-core cpp-monlib-dynamic_counters ydb-core-protos + libs-control_plane_storage-proto yq-libs-events libs-shared_resources-interface ydb-library-logger @@ -25,6 +26,7 @@ target_link_libraries(yq-libs-shared_resources PUBLIC cpp-client-ydb_table ) target_sources(yq-libs-shared_resources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/shared_resources/db_exec.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/shared_resources/db_pool.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/shared_resources/shared_resources.cpp ) diff --git a/ydb/core/yq/libs/shared_resources/db_exec.cpp b/ydb/core/yq/libs/shared_resources/db_exec.cpp new file mode 100644 index 0000000000..91af0f3a0e --- /dev/null +++ b/ydb/core/yq/libs/shared_resources/db_exec.cpp @@ -0,0 +1,9 @@ +#include "db_exec.h" + +namespace NYq { + +void TDbExecutable::Throw(const TString& message) { + Y_UNUSED(message); +} + +} /* namespace NYq */ diff --git a/ydb/core/yq/libs/shared_resources/db_exec.h b/ydb/core/yq/libs/shared_resources/db_exec.h new file mode 100644 index 0000000000..a2b4343544 --- /dev/null +++ b/ydb/core/yq/libs/shared_resources/db_exec.h @@ -0,0 +1,241 @@ +#pragma once + +#include "db_pool.h" + +#include <ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h> +#include <ydb/core/yq/libs/db_schema/db_schema.h> + +namespace NYq { + +class TDbExecutable { +public: + using TPtr = std::shared_ptr<TDbExecutable>; + + TDbExecutable(bool collectDebugInfo = false) { + if (collectDebugInfo) { + DebugInfo = std::make_shared<TDebugInfo>(); + } + } + + virtual TAsyncStatus Execute(NYdb::NTable::TSession& session) = 0; + void Throw(const TString& message); + + TDbPool::TPtr DbPool; + std::weak_ptr<TDbExecutable> SelfHolder; + NYql::TIssues Issues; + NYql::TIssues InternalIssues; + TDebugInfoPtr DebugInfo; +}; + +template <typename TProto> +void ParseProto(TDbExecutable& executable, TProto& proto, TResultSetParser& parser, const TString& columnName) { + if (!proto.ParseFromString(*parser.ColumnParser(columnName).GetOptionalString())) + { + executable.Throw(Sprintf("Error parsing proto message %s", proto.GetTypeName().c_str())); + } +} + +inline TAsyncStatus Exec(TDbPool::TPtr dbPool, TDbExecutable::TPtr executable) { + executable->DbPool = dbPool; + executable->SelfHolder = executable; + return ExecDbRequest(dbPool, [=](NYdb::NTable::TSession& session) { + return executable->Execute(session); + }); +} + +/* + * 1. TDbExecuter must be create like this: + * + * TDbExecutable::TPtr executable; + * auto& executer = TDbExecuter<...>::Create(executable); + * + * 2. Template param is state struct. It's lifecycle matches executer. It's expected to keep all state + * between async db calls. + * + * 3. TDbExecutable::Read adds read operation to DB access pipeline, Write adds write operation (w/o result + * processing). All calls are serialized. There is no concurrency issues in access TDbExecutable::State + * from callbacks. + * + * 4. Final callback (passed in TDbExecutable::Process) is expected to be called from AS thread, so actor + * actorId must implement TEvents::TEvCallback handler, f.e.: + * + * hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } ); + * + * it is safe to access TDbExecutable::State and actor class members from final callback w/o concurrency + * issus as well + * + * 5. All pipeline is retried automatically. If TDbExecutable::State need to be cleaned up from failed run, + * pass handler as stateInitCallback to TDbExecutable ctor. + */ + +template <typename TState> +class TDbExecuter : public TDbExecutable { + + struct TExecStep { + std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> BuilderCallback; + std::function<void(TDbExecuter<TState>&, const TVector<NYdb::TResultSet>&)> HandlerCallback; + std::function<void(TDbExecuter<TState>&)> ProcessCallback; + TString Name; + bool Commit = false; + }; + std::vector<TExecStep> Steps; + ui32 CurrentStepIndex = 0; + ui32 InsertStepIndex = 0; + NActors::TActorId HandlerActorId; + TMaybe<TTransaction> Transaction; + NActors::TActorSystem* ActorSystem = nullptr; + std::function<void(TDbExecuter<TState>&)> HandlerCallback; + std::function<void(TDbExecuter<TState>&)> StateInitCallback; + bool skipStep = false; + +protected: + TDbExecuter(bool collectDebugInfo, std::function<void(TDbExecuter<TState>&)> stateInitCallback = nullptr) + : TDbExecutable(collectDebugInfo), StateInitCallback(stateInitCallback) { + } + + TDbExecuter(const TDbExecuter& other) = delete; + +public: + virtual ~TDbExecuter() { + }; + + static TDbExecuter& Create(TDbExecutable::TPtr& holder, bool collectDebugInfo = false) { + auto executer = new TDbExecuter(collectDebugInfo); + holder.reset(executer); + return *executer; + }; + + void SkipStep() { + skipStep = true; + } + + TAsyncStatus NextStep(NYdb::NTable::TSession session) { + + if (CurrentStepIndex == Steps.size()) { + if (Transaction) { + auto transaction = *Transaction; + Transaction.Clear(); + return transaction.Commit() + .Apply([this, session=session](const TFuture<TCommitTransactionResult>& future) { + + TCommitTransactionResult result = future.GetValue(); + auto status = static_cast<TStatus>(result); + + return this->NextStep(session); + }); + } + if (HandlerActorId != NActors::TActorId{}) { + auto holder = SelfHolder.lock(); + if (holder) { + ActorSystem->Send(HandlerActorId, new TEvents::TEvCallback([this, holder=holder, handlerCallback=HandlerCallback]() { + handlerCallback(*this); + })); + } + } + return MakeFuture(TStatus{EStatus::SUCCESS, NYql::TIssues{}}); + } else { + TSqlQueryBuilder builder(DbPool->TablePathPrefix, Steps[CurrentStepIndex].Name); + skipStep = false; + Steps[CurrentStepIndex].BuilderCallback(*this, builder); + + if (skipStep) { // TODO Refactor this + this->CurrentStepIndex++; + return this->NextStep(session); + } + + const auto query = builder.Build(); + auto transaction = Transaction ? TTxControl::Tx(*Transaction) : TTxControl::BeginTx(TTxSettings::SerializableRW()); + if (Steps[CurrentStepIndex].Commit) { + transaction = transaction.CommitTx(); + } + + return session.ExecuteDataQuery(query.Sql, transaction, query.Params, NYdb::NTable::TExecDataQuerySettings().KeepInQueryCache(true)) + .Apply([this, session=session](const TFuture<TDataQueryResult>& future) { + + NYdb::NTable::TDataQueryResult result = future.GetValue(); + auto status = static_cast<TStatus>(result); + + if (status.GetStatus() == EStatus::SCHEME_ERROR) { // retry if table does not exist + this->Transaction.Clear(); + return MakeFuture(TStatus{EStatus::UNAVAILABLE, NYql::TIssues{status.GetIssues()}}); + } + if (!status.IsSuccess()) { + this->Transaction.Clear(); + return MakeFuture(status); + } + + if (this->Steps[CurrentStepIndex].Commit) { + this->Transaction.Clear(); + } else if (!this->Transaction) { + this->Transaction = result.GetTransaction(); + } + + if (this->Steps[CurrentStepIndex].HandlerCallback) { + try { + this->Steps[CurrentStepIndex].HandlerCallback(*this, result.GetResultSets()); + } catch (const TControlPlaneStorageException& exception) { + NYql::TIssue issue = MakeErrorIssue(exception.Code, exception.GetRawMessage()); + Issues.AddIssue(issue); + NYql::TIssue internalIssue = MakeErrorIssue(exception.Code, CurrentExceptionMessage()); + InternalIssues.AddIssue(internalIssue); + } catch (const std::exception& exception) { + NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, exception.what()); + Issues.AddIssue(issue); + NYql::TIssue internalIssue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage()); + InternalIssues.AddIssue(internalIssue); + } catch (...) { + NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage()); + Issues.AddIssue(issue); + NYql::TIssue internalIssue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage()); + InternalIssues.AddIssue(internalIssue); + } + } + + this->CurrentStepIndex++; + return this->NextStep(session); + }); + } + } + + TAsyncStatus Execute(NYdb::NTable::TSession& session) override { + if (StateInitCallback) { + StateInitCallback(*this); + } else { + State = TState{}; + } + return NextStep(session); + } + + TDbExecuter& Read( + std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> builderCallback + , std::function<void(TDbExecuter<TState>&, const TVector<NYdb::TResultSet>&)> handlerCallback + , const TString& Name = "DefaultReadName" + , bool commit = false + ) { + Steps.emplace(Steps.begin() + InsertStepIndex, TExecStep{builderCallback, handlerCallback, nullptr, Name, commit}); + InsertStepIndex++; + return *this; + } + + TDbExecuter& Write( + std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> builderCallback + , const TString& Name = "DefaultWriteName" + , bool commit = false + ) { + return Read(builderCallback, nullptr, Name, commit); + } + + void Process( + NActors::TActorId actorId + , std::function<void(TDbExecuter<TState>&)> handlerCallback + ) { + Y_VERIFY(HandlerActorId == NActors::TActorId{}, "Handler must be empty"); + ActorSystem = TActivationContext::ActorSystem(); + HandlerActorId = actorId; + HandlerCallback = handlerCallback; + } + + TState State; +}; + +} /* NYq */ diff --git a/ydb/core/yq/libs/shared_resources/db_pool.cpp b/ydb/core/yq/libs/shared_resources/db_pool.cpp index 372dfd72cd..1e2abadd6d 100644 --- a/ydb/core/yq/libs/shared_resources/db_pool.cpp +++ b/ydb/core/yq/libs/shared_resources/db_pool.cpp @@ -6,11 +6,20 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/core/yq/libs/actors/logging/log.h> + #include <util/stream/file.h> #include <util/string/strip.h> -#define LOG_E(stream) \ - LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, stream) +#define LOG_F(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, EMERG, STREAMS, logRecordStream) +#define LOG_A(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, ALERT, STREAMS, logRecordStream) +#define LOG_C(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, CRIT, STREAMS, logRecordStream) +#define LOG_E(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, ERROR, STREAMS, logRecordStream) +#define LOG_W(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, WARN, STREAMS, logRecordStream) +#define LOG_N(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, NOTICE, STREAMS, logRecordStream) +#define LOG_I(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, INFO, STREAMS, logRecordStream) +#define LOG_D(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, DEBUG, STREAMS, logRecordStream) +#define LOG_T(ctx, logRecordStream) LOG_STREAMS_IMPL_AS(ctx, TRACE, STREAMS, logRecordStream) namespace NYq { @@ -66,6 +75,8 @@ public: RequestInProgressTimestamp = TInstant::Now(); const auto& requestVariant = Requests.front(); + LOG_D(ctx, "TDbPoolActor: ProcessQueue " << SelfId() << " Queue size = " << Requests.size()); + if (auto pRequest = std::get_if<TRequest>(&requestVariant)) { auto& request = *pRequest; auto actorSystem = ctx.ActorSystem(); @@ -86,6 +97,8 @@ public: .Subscribe([state = std::weak_ptr<int>(State), sharedResult, actorSystem, cookie, selfId](const NThreading::TFuture<NYdb::TStatus>& statusFuture) { if (state.lock()) { actorSystem->Send(new IEventHandle(selfId, selfId, new TEvents::TEvDbResponse(statusFuture.GetValue(), *sharedResult), 0, cookie)); + } else { + LOG_D(*actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed"); } }); } else if (auto pRequest = std::get_if<TFunctionRequest>(&requestVariant)) { @@ -99,12 +112,15 @@ public: .Subscribe([state = std::weak_ptr<int>(State), actorSystem, selfId, cookie](const NThreading::TFuture<NYdb::TStatus>& statusFuture) { if (state.lock()) { actorSystem->Send(new IEventHandle(selfId, selfId, new TEvents::TEvDbFunctionResponse(statusFuture.GetValue()), 0, cookie)); + } else { + LOG_D(*actorSystem, "TDbPoolActor: ProcessQueue " << selfId << " State destroyed"); } }); } } void HandleRequest(TEvents::TEvDbRequest::TPtr& ev, const TActorContext& ctx) { + LOG_D(ctx, "TDbPoolActor: TEvDbRequest " << SelfId() << " Queue size = " << Requests.size()); auto request = ev->Get(); Requests.emplace_back(TRequest{ev->Sender, ev->Cookie, request->Sql, std::move(request->Params), request->Idempotent}); ProcessQueue(ctx); @@ -119,18 +135,21 @@ public: } void HandleResponse(TEvents::TEvDbResponse::TPtr& ev, const TActorContext& ctx) { + LOG_D(ctx, "TDbPoolActor: TEvDbResponse " << SelfId() << " Queue size = " << Requests.size()); const auto& request = Requests.front(); ctx.Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request))); PopFromQueueAndProcess(ctx); } void HandleRequest(TEvents::TEvDbFunctionRequest::TPtr& ev, const TActorContext& ctx) { + LOG_D(ctx, "TDbPoolActor: TEvDbFunctionRequest " << SelfId() << " Queue size = " << Requests.size()); auto request = ev->Get(); Requests.emplace_back(TFunctionRequest{ev->Sender, ev->Cookie, std::move(request->Handler)}); ProcessQueue(ctx); } void HandleResponse(TEvents::TEvDbFunctionResponse::TPtr& ev, const TActorContext& ctx) { + LOG_D(ctx, "TDbPoolActor: TEvDbFunctionResponse " << SelfId() << " Queue size = " << Requests.size()); const auto& request = Requests.front(); ctx.Send(ev->Forward(std::visit([](const auto& arg) { return arg.Sender; }, request))); PopFromQueueAndProcess(ctx); @@ -181,8 +200,10 @@ private: TDbPool::TDbPool( ui32 sessionsCount, const NYdb::NTable::TTableClient& tableClient, - const NMonitoring::TDynamicCounterPtr& counters) + const NMonitoring::TDynamicCounterPtr& counters, + const TString& tablePathPrefix) { + TablePathPrefix = tablePathPrefix; const auto& ctx = NActors::TActivationContext::AsActorContext(); auto parentId = ctx.SelfID; Actors.reserve(sessionsCount); @@ -264,11 +285,11 @@ void TDbPoolMap::Reset(const NYq::NConfig::TDbPoolConfig& config) { TableClient = nullptr; } -TDbPool::TPtr TDbPoolHolder::GetOrCreate(EDbPoolId dbPoolId, ui32 sessionsCount) { - return Pools->GetOrCreate(dbPoolId, sessionsCount); +TDbPool::TPtr TDbPoolHolder::GetOrCreate(EDbPoolId dbPoolId, ui32 sessionsCount, const TString& tablePathPrefix) { + return Pools->GetOrCreate(dbPoolId, sessionsCount, tablePathPrefix); } -TDbPool::TPtr TDbPoolMap::GetOrCreate(EDbPoolId dbPoolId, ui32 sessionsCount) { +TDbPool::TPtr TDbPoolMap::GetOrCreate(EDbPoolId dbPoolId, ui32 sessionsCount, const TString& tablePathPrefix) { TGuard<TMutex> lock(Mutex); auto it = Pools.find(dbPoolId); if (it != Pools.end()) { @@ -297,7 +318,7 @@ TDbPool::TPtr TDbPoolMap::GetOrCreate(EDbPoolId dbPoolId, ui32 sessionsCount) { TableClient = MakeHolder<NYdb::NTable::TTableClient>(Driver, clientSettings); } - TDbPool::TPtr dbPool = new TDbPool(sessionsCount, *TableClient, Counters); + TDbPool::TPtr dbPool = new TDbPool(sessionsCount, *TableClient, Counters, tablePathPrefix); Pools.emplace(dbPoolId, dbPool); return dbPool; } diff --git a/ydb/core/yq/libs/shared_resources/db_pool.h b/ydb/core/yq/libs/shared_resources/db_pool.h index 68d9adc2fc..0c7c1242c8 100644 --- a/ydb/core/yq/libs/shared_resources/db_pool.h +++ b/ydb/core/yq/libs/shared_resources/db_pool.h @@ -21,10 +21,12 @@ public: NActors::TActorId GetNextActor(); + TString TablePathPrefix; + private: friend class TDbPoolMap; - TDbPool(ui32 sessionsCount, const NYdb::NTable::TTableClient& tableClient, const NMonitoring::TDynamicCounterPtr& counters); + TDbPool(ui32 sessionsCount, const NYdb::NTable::TTableClient& tableClient, const NMonitoring::TDynamicCounterPtr& counters, const TString& tablePathPrefix); TMutex Mutex; TVector<NActors::TActorId> Actors; @@ -41,7 +43,7 @@ class TDbPoolMap: public TThrRefBase { public: using TPtr = TIntrusivePtr<TDbPoolMap>; - TDbPool::TPtr GetOrCreate(EDbPoolId poolId, ui32 sessionsCount); + TDbPool::TPtr GetOrCreate(EDbPoolId poolId, ui32 sessionsCount, const TString& tablePathPrefix); private: friend class TDbPoolHolder; @@ -73,7 +75,7 @@ public: ~TDbPoolHolder(); void Reset(const NYq::NConfig::TDbPoolConfig& config); - TDbPool::TPtr GetOrCreate(EDbPoolId poolId, ui32 sessionsCount); + TDbPool::TPtr GetOrCreate(EDbPoolId poolId, ui32 sessionsCount, const TString& tablePathPrefix); NYdb::TDriver& GetDriver(); TDbPoolMap::TPtr Get(); |