aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-09-19 23:31:33 +0300
committerhcpp <hcpp@ydb.tech>2023-09-19 23:56:04 +0300
commit3c3e1bcfb4ad834b060cae2b12cb413a674fb915 (patch)
treed8093ee1416ad85b99677ab68a2d55ce7a34e274
parenta8bb6c3e287c1a638798f1fc315a222515f8943f (diff)
downloadydb-3c3e1bcfb4ad834b060cae2b12cb413a674fb915.tar.gz
skip some connections and bindings
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h6
-rw-r--r--ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp66
2 files changed, 53 insertions, 19 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index 5dd538cddee..caa27ee3c88 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -351,6 +351,12 @@ struct TEvYdbCompute {
, Status(status)
{}
+ TEvSynchronizeResponse(const TString& scope, NYql::TIssues issues)
+ : Scope(scope)
+ , Issues(std::move(issues))
+ , Status(NYdb::EStatus::SUCCESS)
+ {}
+
TString Scope;
NYql::TIssues Issues;
NYdb::EStatus Status;
diff --git a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
index d6cde46c3db..a4f343fcb6d 100644
--- a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
+++ b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
@@ -188,18 +188,24 @@ public:
hFunc(TEvControlPlaneStorage::TEvModifyDatabaseResponse, Handle);
)
- void Handle(const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr&) {
- CreatedConnections++;
- if (CreatedConnections == Connections.size()) {
+ void ProcessCreateConnection() {
+ ProcessedConnections++;
+ if (ProcessedConnections == Connections.size()) {
LOG_I("Start create external tables stage for the scope " << Scope);
Become(&TSynchronizeScopeActor::StateCreateExternalTablesFunc);
CreateExternalTables();
}
}
+ void Handle(const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr&) {
+ SuccessfullyCreatedConnections++;
+ ProcessCreateConnection();
+ }
+
void Handle(const TEvControlPlaneProxy::TEvCreateConnectionResponse::TPtr& ev) {
- LOG_E("Create external data source response (error): " << CreatedConnections << " of " << Connections.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString());
- ReplyErrorAndPassAway(ev.Get()->Get()->Issues, "Сonnection creation error at the synchronization stage");
+ LOG_E("Create external data source response (error): " << ProcessedConnections << " of " << Connections.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString());
+ ProcessCreateConnection();
+ Issues.AddIssues(ev.Get()->Get()->Issues);
}
STRICT_STFUNC(StateCreateExternalTablesFunc,
@@ -208,16 +214,24 @@ public:
hFunc(TEvControlPlaneStorage::TEvModifyDatabaseResponse, Handle);
)
- void Handle(const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr&) {
- CreatedBindings++;
- if (CreatedBindings == Bindings.size()) {
+ void ProcessCreateBinding() {
+ ProcessedBindings++;
+ if (ProcessedBindings == Bindings.size()) {
SendFinalModifyDatabase();
}
}
+ void Handle(const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr&) {
+ SuccessfullyCreatedBindings++;
+ ProcessCreateBinding();
+ }
+
void Handle(const TEvControlPlaneProxy::TEvCreateBindingResponse::TPtr& ev) {
- LOG_E("Create external table response (error): " << CreatedBindings << " of " << Bindings.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString());
- ReplyErrorAndPassAway(ev.Get()->Get()->Issues, "Binding creation error at the synchronization stage");
+ LOG_E("Create external table response (error): " << ProcessedBindings << " of " << Bindings.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString());
+
+ ProcessCreateBinding();
+
+ Issues.AddIssues(ev.Get()->Get()->Issues);
}
void Handle(const TEvControlPlaneStorage::TEvModifyDatabaseResponse::TPtr& ev) {
@@ -430,7 +444,7 @@ private:
}
void ReplyAndPassAway() {
- Send(ParentActorId, new TEvYdbCompute::TEvSynchronizeResponse{Scope});
+ Send(ParentActorId, new TEvYdbCompute::TEvSynchronizeResponse{Scope, Issues});
PassAway();
}
@@ -492,6 +506,14 @@ private:
}
void SendFinalModifyDatabase() {
+ if (Issues) {
+ LOG_I("Synchronization has already completed with errors for scope: " << Scope);
+ Issues.AddIssue(TStringBuilder{} << "Connections created " << SuccessfullyCreatedConnections << " of " << ProcessedConnections);
+ Issues.AddIssue(TStringBuilder{} << "Bindings created " << SuccessfullyCreatedBindings << " of " << ProcessedBindings);
+ ReplyAndPassAway();
+ return;
+ }
+
const auto& controlPlane = ComputeConfig.GetProto().GetYdb().GetControlPlane();
switch (controlPlane.type_case()) {
case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
@@ -535,12 +557,15 @@ private:
TString PageToken;
TSet<TString> BindingIds;
- uint64_t CreatedConnections = 0;
- uint64_t CreatedBindings = 0;
+ uint64_t SuccessfullyCreatedConnections = 0;
+ uint64_t SuccessfullyCreatedBindings = 0;
+ uint64_t ProcessedConnections = 0;
+ uint64_t ProcessedBindings = 0;
TMap<TString, FederatedQuery::Connection> Connections;
TMap<TString, FederatedQuery::Binding> Bindings;
NFq::NPrivate::TCounters Counters;
std::shared_ptr<NYdb::NTable::TTableClient> Client;
+ NYql::TIssues Issues;
};
class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchronizatinServiceActor> {
@@ -556,9 +581,9 @@ class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchron
struct TSynchtonizationCounters {
struct TCounters : public virtual TThrRefBase {
- TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
- : SynchronizationOk(counters->GetCounter("Ok", true))
- , SynchronizationFailed(counters->GetCounter("Failed", true))
+ TCounters(const ::NMonitoring::TDynamicCounterPtr& counters, bool derivative = true)
+ : SynchronizationOk(counters->GetCounter("Ok", derivative))
+ , SynchronizationFailed(counters->GetCounter("Failed", derivative))
{}
::NMonitoring::TDynamicCounters::TCounterPtr SynchronizationOk;
::NMonitoring::TDynamicCounters::TCounterPtr SynchronizationFailed;
@@ -566,7 +591,6 @@ class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchron
using TCountersPtr = TIntrusivePtr<TCounters>;
-
TSynchtonizationCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
: Counters(counters)
, SubgroupCounters(Counters->GetSubgroup("step", "Synchronization"))
@@ -589,7 +613,7 @@ class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchron
if (it != CountersByScope.end()) {
return it->second;
}
- return CountersByScope[scope] = MakeIntrusive<TCounters>(SubgroupCounters->GetSubgroup("scope", scope));
+ return CountersByScope[scope] = MakeIntrusive<TCounters>(SubgroupCounters->GetSubgroup("scope", scope), false);
}
public:
@@ -669,7 +693,11 @@ public:
it->second.Requests.clear();
- if (ev->Get()->Status == NYdb::EStatus::SUCCESS) {
+ if (ev->Get()->Status == NYdb::EStatus::SUCCESS && ev->Get()->Issues) {
+ LOG_E("Synchronization failed (skipped some bindings and connections) for " << ev->Get()->Scope << " with issues " << ev->Get()->Issues.ToOneLineString());
+ Counters.IncFailed(ev->Get()->Scope);
+ it->second.Status = EScopeStatus::SYNCHRONIZED;
+ } else if (ev->Get()->Status == NYdb::EStatus::SUCCESS) {
Counters.IncOk(ev->Get()->Scope);
it->second.Status = EScopeStatus::SYNCHRONIZED;
} else {