aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-08-05 09:32:37 +0300
committerhor911 <hor911@ydb.tech>2022-08-05 09:32:37 +0300
commit7e04ccaa05215d096368b8807ea6ab05fe1da1da (patch)
tree5fb97146156e65cdbba6af611f8aaca82afd38a2
parenta8d863a231b00ef5c1cc13f73c2e53f286fa878c (diff)
downloadydb-7e04ccaa05215d096368b8807ea6ab05fe1da1da.tar.gz
Fix DB Write Race/TLI
-rw-r--r--ydb/core/yq/libs/quota_manager/quota_manager.cpp30
-rw-r--r--ydb/core/yq/libs/shared_resources/db_exec.h1
2 files changed, 23 insertions, 8 deletions
diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.cpp b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
index bb703cf2571..a440314840d 100644
--- a/ydb/core/yq/libs/quota_manager/quota_manager.cpp
+++ b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
@@ -23,6 +23,17 @@
#define LOG_T(stream) \
LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+#define LOG_AS_E(actorSystem, stream) \
+ LOG_ERROR_S(actorSystem, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+#define LOG_AS_W(actorSystem, stream) \
+ LOG_WARN_S(actorSystem, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+#define LOG_AS_I(actorSystem, stream) \
+ LOG_INFO_S(actorSystem, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+#define LOG_AS_D(actorSystem, stream) \
+ LOG_DEBUG_S(actorSystem, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+#define LOG_AS_T(actorSystem, stream) \
+ LOG_TRACE_S(actorSystem, NKikimrServices::FQ_QUOTA_SERVICE, stream)
+
namespace NYq {
NActors::TActorId MakeQuotaServiceActorId(ui32 nodeId) {
@@ -340,7 +351,7 @@ private:
if (cached.Usage.Limit.Value != ev->Get()->Limit) {
cached.Usage.Limit.Value = ev->Get()->Limit;
cached.Usage.Limit.UpdatedAt = Now();
- LOG_T(cached.Usage.ToString(subjectType, subjectId, metricName) << " LIMIT Changed");
+ LOG_T(cached.Usage.ToString(subjectType, subjectId, metricName) << " LIMIT Change Accepted");
SyncQuota(subjectType, subjectId, metricName, cached);
}
}
@@ -508,6 +519,7 @@ private:
cached.ChangedAfterSync = false;
cached.SyncInProgress = true;
+
executer.Read(
[](TSyncQuotaExecuter& executer, TSqlQueryBuilder& builder) {
builder.AddText(
@@ -569,7 +581,7 @@ private:
).Process(SelfId(),
[this](TSyncQuotaExecuter& executer) {
if (executer.State.Refreshed) {
- UpdateQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, executer.State.Usage);
+ this->UpdateQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, executer.State.Usage);
} else {
this->NotifyClusterNodes(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, executer.State.Usage);
}
@@ -580,10 +592,11 @@ private:
auto& cache = it->second;
auto itQ = cache.UsageMap.find(executer.State.MetricName);
if (itQ != cache.UsageMap.end()) {
- itQ->second.SyncInProgress = false;
- if (itQ->second.ChangedAfterSync) {
- LOG_T(itQ->second.Usage.ToString(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName) << " RESYNC");
- SyncQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, itQ->second);
+ auto& cached = itQ->second;
+ cached.SyncInProgress = false;
+ if (cached.ChangedAfterSync) {
+ LOG_T(cached.Usage.ToString(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName) << " RESYNC");
+ this->SyncQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, cached);
}
}
}
@@ -600,8 +613,9 @@ private:
auto& cache = it->second;
auto itQ = cache.UsageMap.find(metricName);
if (itQ != cache.UsageMap.end()) {
- itQ->second.Usage.Merge(usage);
- LOG_T(itQ->second.Usage.ToString(subjectType, subjectId, metricName) << " MERGED");
+ auto& cached = itQ->second;
+ cached.Usage.Merge(usage);
+ LOG_T(cached.Usage.ToString(subjectType, subjectId, metricName) << " MERGED " << reinterpret_cast<ui64>(&cached));
}
}
}
diff --git a/ydb/core/yq/libs/shared_resources/db_exec.h b/ydb/core/yq/libs/shared_resources/db_exec.h
index bf3a2287e77..07479eed7b7 100644
--- a/ydb/core/yq/libs/shared_resources/db_exec.h
+++ b/ydb/core/yq/libs/shared_resources/db_exec.h
@@ -219,6 +219,7 @@ public:
}
TAsyncStatus Execute(NYdb::NTable::TSession& session) override {
+ CurrentStepIndex = 0;
if (StateInitCallback) {
StateInitCallback(*this);
}