diff options
author | hor911 <hor911@ydb.tech> | 2022-08-05 09:32:37 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-08-05 09:32:37 +0300 |
commit | 7e04ccaa05215d096368b8807ea6ab05fe1da1da (patch) | |
tree | 5fb97146156e65cdbba6af611f8aaca82afd38a2 | |
parent | a8d863a231b00ef5c1cc13f73c2e53f286fa878c (diff) | |
download | ydb-7e04ccaa05215d096368b8807ea6ab05fe1da1da.tar.gz |
Fix DB Write Race/TLI
-rw-r--r-- | ydb/core/yq/libs/quota_manager/quota_manager.cpp | 30 | ||||
-rw-r--r-- | ydb/core/yq/libs/shared_resources/db_exec.h | 1 |
2 files changed, 23 insertions, 8 deletions
diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.cpp b/ydb/core/yq/libs/quota_manager/quota_manager.cpp index bb703cf2571..a440314840d 100644 --- a/ydb/core/yq/libs/quota_manager/quota_manager.cpp +++ b/ydb/core/yq/libs/quota_manager/quota_manager.cpp @@ -23,6 +23,17 @@ #define LOG_T(stream) \ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream) +#define LOG_AS_E(actorSystem, stream) \ + LOG_ERROR_S(actorSystem, NKikimrServices::FQ_QUOTA_SERVICE, stream) +#define LOG_AS_W(actorSystem, stream) \ + LOG_WARN_S(actorSystem, NKikimrServices::FQ_QUOTA_SERVICE, stream) +#define LOG_AS_I(actorSystem, stream) \ + LOG_INFO_S(actorSystem, NKikimrServices::FQ_QUOTA_SERVICE, stream) +#define LOG_AS_D(actorSystem, stream) \ + LOG_DEBUG_S(actorSystem, NKikimrServices::FQ_QUOTA_SERVICE, stream) +#define LOG_AS_T(actorSystem, stream) \ + LOG_TRACE_S(actorSystem, NKikimrServices::FQ_QUOTA_SERVICE, stream) + namespace NYq { NActors::TActorId MakeQuotaServiceActorId(ui32 nodeId) { @@ -340,7 +351,7 @@ private: if (cached.Usage.Limit.Value != ev->Get()->Limit) { cached.Usage.Limit.Value = ev->Get()->Limit; cached.Usage.Limit.UpdatedAt = Now(); - LOG_T(cached.Usage.ToString(subjectType, subjectId, metricName) << " LIMIT Changed"); + LOG_T(cached.Usage.ToString(subjectType, subjectId, metricName) << " LIMIT Change Accepted"); SyncQuota(subjectType, subjectId, metricName, cached); } } @@ -508,6 +519,7 @@ private: cached.ChangedAfterSync = false; cached.SyncInProgress = true; + executer.Read( [](TSyncQuotaExecuter& executer, TSqlQueryBuilder& builder) { builder.AddText( @@ -569,7 +581,7 @@ private: ).Process(SelfId(), [this](TSyncQuotaExecuter& executer) { if (executer.State.Refreshed) { - UpdateQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, executer.State.Usage); + this->UpdateQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, executer.State.Usage); } else { this->NotifyClusterNodes(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, executer.State.Usage); } @@ -580,10 +592,11 @@ private: auto& cache = it->second; auto itQ = cache.UsageMap.find(executer.State.MetricName); if (itQ != cache.UsageMap.end()) { - itQ->second.SyncInProgress = false; - if (itQ->second.ChangedAfterSync) { - LOG_T(itQ->second.Usage.ToString(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName) << " RESYNC"); - SyncQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, itQ->second); + auto& cached = itQ->second; + cached.SyncInProgress = false; + if (cached.ChangedAfterSync) { + LOG_T(cached.Usage.ToString(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName) << " RESYNC"); + this->SyncQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, cached); } } } @@ -600,8 +613,9 @@ private: auto& cache = it->second; auto itQ = cache.UsageMap.find(metricName); if (itQ != cache.UsageMap.end()) { - itQ->second.Usage.Merge(usage); - LOG_T(itQ->second.Usage.ToString(subjectType, subjectId, metricName) << " MERGED"); + auto& cached = itQ->second; + cached.Usage.Merge(usage); + LOG_T(cached.Usage.ToString(subjectType, subjectId, metricName) << " MERGED " << reinterpret_cast<ui64>(&cached)); } } } diff --git a/ydb/core/yq/libs/shared_resources/db_exec.h b/ydb/core/yq/libs/shared_resources/db_exec.h index bf3a2287e77..07479eed7b7 100644 --- a/ydb/core/yq/libs/shared_resources/db_exec.h +++ b/ydb/core/yq/libs/shared_resources/db_exec.h @@ -219,6 +219,7 @@ public: } TAsyncStatus Execute(NYdb::NTable::TSession& session) override { + CurrentStepIndex = 0; if (StateInitCallback) { StateInitCallback(*this); } |