diff options
author | hcpp <hcpp@ydb.tech> | 2023-09-19 23:31:33 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-09-19 23:56:04 +0300 |
commit | 3c3e1bcfb4ad834b060cae2b12cb413a674fb915 (patch) | |
tree | d8093ee1416ad85b99677ab68a2d55ce7a34e274 | |
parent | a8bb6c3e287c1a638798f1fc315a222515f8943f (diff) | |
download | ydb-3c3e1bcfb4ad834b060cae2b12cb413a674fb915.tar.gz |
skip some connections and bindings
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/events/events.h | 6 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp | 66 |
2 files changed, 53 insertions, 19 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index 5dd538cddee..caa27ee3c88 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -351,6 +351,12 @@ struct TEvYdbCompute { , Status(status) {} + TEvSynchronizeResponse(const TString& scope, NYql::TIssues issues) + : Scope(scope) + , Issues(std::move(issues)) + , Status(NYdb::EStatus::SUCCESS) + {} + TString Scope; NYql::TIssues Issues; NYdb::EStatus Status; diff --git a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp index d6cde46c3db..a4f343fcb6d 100644 --- a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp +++ b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp @@ -188,18 +188,24 @@ public: hFunc(TEvControlPlaneStorage::TEvModifyDatabaseResponse, Handle); ) - void Handle(const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr&) { - CreatedConnections++; - if (CreatedConnections == Connections.size()) { + void ProcessCreateConnection() { + ProcessedConnections++; + if (ProcessedConnections == Connections.size()) { LOG_I("Start create external tables stage for the scope " << Scope); Become(&TSynchronizeScopeActor::StateCreateExternalTablesFunc); CreateExternalTables(); } } + void Handle(const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr&) { + SuccessfullyCreatedConnections++; + ProcessCreateConnection(); + } + void Handle(const TEvControlPlaneProxy::TEvCreateConnectionResponse::TPtr& ev) { - LOG_E("Create external data source response (error): " << CreatedConnections << " of " << Connections.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString()); - ReplyErrorAndPassAway(ev.Get()->Get()->Issues, "Сonnection creation error at the synchronization stage"); + LOG_E("Create external data source response (error): " << ProcessedConnections << " of " << Connections.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString()); + ProcessCreateConnection(); + Issues.AddIssues(ev.Get()->Get()->Issues); } STRICT_STFUNC(StateCreateExternalTablesFunc, @@ -208,16 +214,24 @@ public: hFunc(TEvControlPlaneStorage::TEvModifyDatabaseResponse, Handle); ) - void Handle(const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr&) { - CreatedBindings++; - if (CreatedBindings == Bindings.size()) { + void ProcessCreateBinding() { + ProcessedBindings++; + if (ProcessedBindings == Bindings.size()) { SendFinalModifyDatabase(); } } + void Handle(const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr&) { + SuccessfullyCreatedBindings++; + ProcessCreateBinding(); + } + void Handle(const TEvControlPlaneProxy::TEvCreateBindingResponse::TPtr& ev) { - LOG_E("Create external table response (error): " << CreatedBindings << " of " << Bindings.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString()); - ReplyErrorAndPassAway(ev.Get()->Get()->Issues, "Binding creation error at the synchronization stage"); + LOG_E("Create external table response (error): " << ProcessedBindings << " of " << Bindings.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString()); + + ProcessCreateBinding(); + + Issues.AddIssues(ev.Get()->Get()->Issues); } void Handle(const TEvControlPlaneStorage::TEvModifyDatabaseResponse::TPtr& ev) { @@ -430,7 +444,7 @@ private: } void ReplyAndPassAway() { - Send(ParentActorId, new TEvYdbCompute::TEvSynchronizeResponse{Scope}); + Send(ParentActorId, new TEvYdbCompute::TEvSynchronizeResponse{Scope, Issues}); PassAway(); } @@ -492,6 +506,14 @@ private: } void SendFinalModifyDatabase() { + if (Issues) { + LOG_I("Synchronization has already completed with errors for scope: " << Scope); + Issues.AddIssue(TStringBuilder{} << "Connections created " << SuccessfullyCreatedConnections << " of " << ProcessedConnections); + Issues.AddIssue(TStringBuilder{} << "Bindings created " << SuccessfullyCreatedBindings << " of " << ProcessedBindings); + ReplyAndPassAway(); + return; + } + const auto& controlPlane = ComputeConfig.GetProto().GetYdb().GetControlPlane(); switch (controlPlane.type_case()) { case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET: @@ -535,12 +557,15 @@ private: TString PageToken; TSet<TString> BindingIds; - uint64_t CreatedConnections = 0; - uint64_t CreatedBindings = 0; + uint64_t SuccessfullyCreatedConnections = 0; + uint64_t SuccessfullyCreatedBindings = 0; + uint64_t ProcessedConnections = 0; + uint64_t ProcessedBindings = 0; TMap<TString, FederatedQuery::Connection> Connections; TMap<TString, FederatedQuery::Binding> Bindings; NFq::NPrivate::TCounters Counters; std::shared_ptr<NYdb::NTable::TTableClient> Client; + NYql::TIssues Issues; }; class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchronizatinServiceActor> { @@ -556,9 +581,9 @@ class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchron struct TSynchtonizationCounters { struct TCounters : public virtual TThrRefBase { - TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) - : SynchronizationOk(counters->GetCounter("Ok", true)) - , SynchronizationFailed(counters->GetCounter("Failed", true)) + TCounters(const ::NMonitoring::TDynamicCounterPtr& counters, bool derivative = true) + : SynchronizationOk(counters->GetCounter("Ok", derivative)) + , SynchronizationFailed(counters->GetCounter("Failed", derivative)) {} ::NMonitoring::TDynamicCounters::TCounterPtr SynchronizationOk; ::NMonitoring::TDynamicCounters::TCounterPtr SynchronizationFailed; @@ -566,7 +591,6 @@ class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchron using TCountersPtr = TIntrusivePtr<TCounters>; - TSynchtonizationCounters(const ::NMonitoring::TDynamicCounterPtr& counters) : Counters(counters) , SubgroupCounters(Counters->GetSubgroup("step", "Synchronization")) @@ -589,7 +613,7 @@ class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchron if (it != CountersByScope.end()) { return it->second; } - return CountersByScope[scope] = MakeIntrusive<TCounters>(SubgroupCounters->GetSubgroup("scope", scope)); + return CountersByScope[scope] = MakeIntrusive<TCounters>(SubgroupCounters->GetSubgroup("scope", scope), false); } public: @@ -669,7 +693,11 @@ public: it->second.Requests.clear(); - if (ev->Get()->Status == NYdb::EStatus::SUCCESS) { + if (ev->Get()->Status == NYdb::EStatus::SUCCESS && ev->Get()->Issues) { + LOG_E("Synchronization failed (skipped some bindings and connections) for " << ev->Get()->Scope << " with issues " << ev->Get()->Issues.ToOneLineString()); + Counters.IncFailed(ev->Get()->Scope); + it->second.Status = EScopeStatus::SYNCHRONIZED; + } else if (ev->Get()->Status == NYdb::EStatus::SUCCESS) { Counters.IncOk(ev->Get()->Scope); it->second.Status = EScopeStatus::SYNCHRONIZED; } else { |