aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2024-10-16 00:39:33 +0300
committerbabenko <babenko@yandex-team.com>2024-10-16 00:54:03 +0300
commitd9797921c7989ee196f79f8c738da5af4894007e (patch)
tree134dede966e270d29d49977e6c3c24011f464701
parent1b5197fff8ac2009345ea6e150ae6ebe39d5aa5e (diff)
downloadydb-d9797921c7989ee196f79f8c738da5af4894007e.tar.gz
Refactor TLogManager state and fix some data races
commit_hash:0218f8df1fd2bd35447425ca7b390ac1d221de89
-rw-r--r--yt/yt/core/logging/log_manager.cpp151
1 files changed, 102 insertions, 49 deletions
diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp
index 8d7baba787..fa96cbc28a 100644
--- a/yt/yt/core/logging/log_manager.cpp
+++ b/yt/yt/core/logging/log_manager.cpp
@@ -448,18 +448,20 @@ public:
{
ShutdownRequested_.store(true);
+ auto config = Config_.Acquire();
+
if (LoggingThread_->GetThreadId() == GetCurrentThreadId()) {
FlushWriters();
} else {
// Wait for all previously enqueued messages to be flushed
// but no more than ShutdownGraceTimeout to prevent hanging.
- Synchronize(TInstant::Now() + Config_->ShutdownGraceTimeout);
+ Synchronize(TInstant::Now() + config->ShutdownGraceTimeout);
}
// For now this is the only way to wait for log writers that perform asynchronous flushes.
// TODO(achulkov2): Refactor log manager to support asynchronous operations.
- if (Config_->ShutdownBusyTimeout) {
- Sleep(Config_->ShutdownBusyTimeout);
+ if (config->ShutdownBusyTimeout != TDuration::Zero()) {
+ Sleep(config->ShutdownBusyTimeout);
}
EventQueue_->Shutdown();
@@ -486,13 +488,14 @@ public:
}
auto guard = Guard(SpinLock_);
+ auto config = Config_.Acquire();
auto it = NameToCategory_.find(categoryName);
if (it == NameToCategory_.end()) {
auto category = std::make_unique<TLoggingCategory>();
category->Name = categoryName;
category->ActualVersion = &Version_;
it = NameToCategory_.emplace(categoryName, std::move(category)).first;
- DoUpdateCategory(it->second.get());
+ DoUpdateCategory(config, it->second.get());
}
return it->second.get();
}
@@ -500,14 +503,17 @@ public:
void UpdateCategory(TLoggingCategory* category)
{
auto guard = Guard(SpinLock_);
- DoUpdateCategory(category);
+ auto config = Config_.Acquire();
+ DoUpdateCategory(config, category);
}
void UpdateAnchor(TLoggingAnchor* anchor)
{
auto guard = Guard(SpinLock_);
+ auto config = Config_.Acquire();
+
bool enabled = true;
- for (const auto& prefix : Config_->SuppressedMessages) {
+ for (const auto& prefix : config->SuppressedMessages) {
if (anchor->AnchorMessage.StartsWith(prefix)) {
enabled = false;
break;
@@ -645,11 +651,9 @@ public:
void SuppressRequest(TRequestId requestId)
{
- if (!RequestSuppressionEnabled_) {
- return;
+ if (RequestSuppressionEnabled_.load(std::memory_order_relaxed)) {
+ SuppressedRequestIdQueue_.Enqueue(requestId);
}
-
- SuppressedRequestIdQueue_.Enqueue(requestId);
}
void Synchronize(TInstant deadline = TInstant::Max())
@@ -704,6 +708,8 @@ private:
void EnsureStarted()
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
std::call_once(Started_, [&] {
if (LoggingThread_->IsStopping()) {
return;
@@ -721,7 +727,9 @@ private:
});
}
- const std::vector<ILogWriterPtr>& GetWriters(const TLogEvent& event)
+ const std::vector<ILogWriterPtr>& GetWriters(
+ const TLogManagerConfigPtr& config,
+ const TLogEvent& event)
{
VERIFY_THREAD_AFFINITY(LoggingThread);
@@ -736,7 +744,7 @@ private:
}
THashSet<TString> writerNames;
- for (const auto& rule : Config_->Rules) {
+ for (const auto& rule : config->Rules) {
if (rule->IsApplicable(event.Category->Name, event.Level, event.Family)) {
writerNames.insert(rule->Writers.begin(), rule->Writers.end());
}
@@ -783,10 +791,7 @@ private:
return;
}
- AbortOnAlert_.store(event.Config->AbortOnAlert);
-
EnsureStarted();
-
FlushWriters();
try {
@@ -797,8 +802,10 @@ private:
}
}
- std::unique_ptr<ILogFormatter> CreateFormatter(const TLogWriterConfigPtr& writerConfig)
+ static std::unique_ptr<ILogFormatter> CreateFormatter(const TLogWriterConfigPtr& writerConfig)
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
switch (writerConfig->Format) {
case ELogFormat::PlainText:
return std::make_unique<TPlainTextLogFormatter>(
@@ -821,7 +828,10 @@ private:
void DoUpdateConfig(const TLogManagerConfigPtr& config, bool fromEnv)
{
- if (AreNodesEqual(ConvertToNode(Config_), ConvertToNode(config))) {
+ // This could be called both from ctor and from LoggingThread.
+
+ auto oldConfig = Config_.Acquire();
+ if (AreNodesEqual(ConvertToNode(oldConfig), ConvertToNode(config))) {
return;
}
@@ -876,31 +886,36 @@ private:
category->StructuredValidationSamplingRate.store(config->StructuredValidationSamplingRate, std::memory_order::relaxed);
}
- Config_ = config;
ConfiguredFromEnv_.store(fromEnv);
- HighBacklogWatermark_.store(Config_->HighBacklogWatermark);
- LowBacklogWatermark_.store(Config_->LowBacklogWatermark);
- RequestSuppressionEnabled_.store(Config_->RequestSuppressionTimeout != TDuration::Zero());
+ HighBacklogWatermark_.store(config->HighBacklogWatermark);
+ LowBacklogWatermark_.store(config->LowBacklogWatermark);
+ RequestSuppressionEnabled_.store(config->RequestSuppressionTimeout != TDuration::Zero());
+ AbortOnAlert_.store(config->AbortOnAlert);
- CompressionThreadPool_->Configure(Config_->CompressionThreadCount);
+ CompressionThreadPool_->Configure(config->CompressionThreadCount);
if (RequestSuppressionEnabled_) {
- SuppressedRequestIdSet_.SetTtl((Config_->RequestSuppressionTimeout + DequeuePeriod) * 2);
+ SuppressedRequestIdSet_.SetTtl((config->RequestSuppressionTimeout + DequeuePeriod) * 2);
} else {
SuppressedRequestIdSet_.Clear();
SuppressedRequestIdQueue_.DequeueAll();
}
- FlushExecutor_->SetPeriod(Config_->FlushPeriod);
- WatchExecutor_->SetPeriod(Config_->WatchPeriod);
- CheckSpaceExecutor_->SetPeriod(Config_->CheckSpacePeriod);
- FileRotationExecutor_->SetPeriod(Config_->RotationCheckPeriod);
+ FlushExecutor_->SetPeriod(config->FlushPeriod);
+ WatchExecutor_->SetPeriod(config->WatchPeriod);
+ CheckSpaceExecutor_->SetPeriod(config->CheckSpacePeriod);
+ FileRotationExecutor_->SetPeriod(config->RotationCheckPeriod);
+ Config_.Store(std::move(config));
Version_++;
}
- void WriteEvent(const TLogEvent& event)
+ void WriteEvent(
+ const TLogManagerConfigPtr& config,
+ const TLogEvent& event)
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
if (ReopenRequested_.exchange(false)) {
ReloadWriters();
}
@@ -912,21 +927,26 @@ private:
event.Anchor->ByteCounter.Current += std::ssize(event.MessageRef);
}
- for (const auto& writer : GetWriters(event)) {
+ for (const auto& writer : GetWriters(config, event)) {
writer->Write(event);
}
}
void FlushWriters()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
for (const auto& [name, writer] : NameToWriter_) {
writer->Flush();
}
+
FlushedEvents_ = WrittenEvents_.load();
}
void RotateFiles()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
for (const auto& [name, writer] : NameToWriter_) {
if (auto fileWriter = DynamicPointerCast<IFileLogWriter>(writer)) {
fileWriter->MaybeRotate();
@@ -936,6 +956,8 @@ private:
void ReloadWriters()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
Version_++;
for (const auto& [name, writer] : NameToWriter_) {
writer->Reload();
@@ -944,15 +966,20 @@ private:
void CheckSpace()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
+ auto config = Config_.Acquire();
for (const auto& [name, writer] : NameToWriter_) {
if (auto fileWriter = DynamicPointerCast<IFileLogWriter>(writer)) {
- fileWriter->CheckSpace(Config_->MinDiskSpace);
+ fileWriter->CheckSpace(config->MinDiskSpace);
}
}
}
void RegisterNotificatonWatch(TNotificationWatch* watch)
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
if (watch->IsValid()) {
// Watch can fail to initialize if the writer is disabled
// e.g. due to the lack of space.
@@ -1004,6 +1031,8 @@ private:
void PushEvent(TLoggerQueueItem&& event)
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
auto& perThreadQueue = PerThreadQueue();
if (!perThreadQueue) {
perThreadQueue = new TThreadLocalQueue();
@@ -1020,9 +1049,10 @@ private:
const TCounter& GetWrittenEventsCounter(const TLogEvent& event)
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
auto key = std::pair(event.Category->Name, event.Level);
auto it = WrittenEventsCounters_.find(key);
-
if (it == WrittenEventsCounters_.end()) {
// TODO(prime@): optimize sensor count
auto counter = Profiler
@@ -1038,6 +1068,8 @@ private:
void CollectSensors(ISensorWriter* writer) override
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
auto writtenEvents = WrittenEvents_.load();
auto enqueuedEvents = EnqueuedEvents_.load();
auto suppressedEvents = SuppressedEvents_.load();
@@ -1053,6 +1085,8 @@ private:
void OnDiskProfiling()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
try {
auto minLogStorageAvailableSpace = std::numeric_limits<i64>::max();
auto minLogStorageFreeSpace = std::numeric_limits<i64>::max();
@@ -1085,6 +1119,8 @@ private:
std::vector<TLoggingAnchorStat> CaptureAnchorStats()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
auto now = TInstant::Now();
auto deltaSeconds = (now - LastAnchorStatsCaptureTime_).SecondsFloat();
LastAnchorStatsCaptureTime_ = now;
@@ -1112,14 +1148,17 @@ private:
void OnAnchorProfiling()
{
- if (Config_->EnableAnchorProfiling && !AnchorBufferedProducer_) {
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
+ auto config = Config_.Acquire();
+ if (config->EnableAnchorProfiling && !AnchorBufferedProducer_) {
AnchorBufferedProducer_ = New<TBufferedProducer>();
Profiler
.WithSparse()
.WithDefaultDisabled()
.WithProducerRemoveSupport()
.AddProducer("/anchors", AnchorBufferedProducer_);
- } else if (!Config_->EnableAnchorProfiling && AnchorBufferedProducer_) {
+ } else if (!config->EnableAnchorProfiling && AnchorBufferedProducer_) {
AnchorBufferedProducer_.Reset();
}
@@ -1131,7 +1170,7 @@ private:
TSensorBuffer sensorBuffer;
for (const auto& stat : stats) {
- if (stat.MessageRate < Config_->MinLoggedMessageRateToProfile) {
+ if (stat.MessageRate < config->MinLoggedMessageRateToProfile) {
continue;
}
TWithTagGuard tagGuard(&sensorBuffer, "message", stat.Anchor->AnchorMessage);
@@ -1268,20 +1307,24 @@ private:
WrittenEvents_ += eventsWritten;
- if (!Config_->FlushPeriod || ShutdownRequested_) {
+ auto config = Config_.Acquire();
+ if (!config->FlushPeriod || ShutdownRequested_) {
FlushWriters();
}
}
int ProcessTimeOrderedBuffer()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
int eventsWritten = 0;
int eventsSuppressed = 0;
SuppressedRequestIdSet_.InsertMany(Now(), SuppressedRequestIdQueue_.DequeueAll());
auto requestSuppressionEnabled = RequestSuppressionEnabled_.load(std::memory_order::relaxed);
- auto suppressionDeadline = GetCpuInstant() - DurationToCpuDuration(Config_->RequestSuppressionTimeout);
+ auto config = Config_.Acquire();
+ auto suppressionDeadline = GetCpuInstant() - DurationToCpuDuration(config->RequestSuppressionTimeout);
while (!TimeOrderedBuffer_.empty()) {
const auto& event = TimeOrderedBuffer_.front();
@@ -1300,7 +1343,7 @@ private:
if (requestSuppressionEnabled && event.RequestId && SuppressedRequestIdSet_.Contains(event.RequestId)) {
++eventsSuppressed;
} else {
- WriteEvent(event);
+ WriteEvent(config, event);
}
});
@@ -1312,10 +1355,14 @@ private:
return eventsWritten;
}
- void DoUpdateCategory(TLoggingCategory* category)
+ void DoUpdateCategory(
+ const TLogManagerConfigPtr& config,
+ TLoggingCategory* category)
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
auto minPlainTextLevel = ELogLevel::Maximum;
- for (const auto& rule : Config_->Rules) {
+ for (const auto& rule : config->Rules) {
if (rule->IsApplicable(category->Name, ELogFamily::PlainText)) {
minPlainTextLevel = std::min(minPlainTextLevel, rule->MinLevel);
}
@@ -1323,11 +1370,13 @@ private:
category->MinPlainTextLevel.store(minPlainTextLevel, std::memory_order::relaxed);
category->CurrentVersion.store(GetVersion(), std::memory_order::relaxed);
- category->StructuredValidationSamplingRate.store(Config_->StructuredValidationSamplingRate, std::memory_order::relaxed);
+ category->StructuredValidationSamplingRate.store(config->StructuredValidationSamplingRate, std::memory_order::relaxed);
}
void DoRegisterAnchor(TLoggingAnchor* anchor)
{
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
+
// NB: Duplicates are not desirable but possible.
AnchorMap_.emplace(anchor->AnchorMessage, anchor);
anchor->NextAnchor = FirstAnchor_;
@@ -1357,21 +1406,23 @@ private:
DECLARE_THREAD_AFFINITY_SLOT(LoggingThread);
- // Configuration.
+ TAtomicIntrusivePtr<TLogManagerConfig> Config_;
+
+ // Protects the section of members below.
NThreading::TForkAwareSpinLock SpinLock_;
- // Version forces this very module's Logger object to update to our own
- // default configuration (default level etc.).
- std::atomic<int> Version_ = 0;
- std::atomic<bool> AbortOnAlert_ = false;
- TLogManagerConfigPtr Config_;
- std::atomic<bool> ConfiguredFromEnv_ = false;
THashMap<TString, std::unique_ptr<TLoggingCategory>> NameToCategory_;
THashMap<TString, ILogWriterFactoryPtr> TypeNameToWriterFactory_;
- const TLoggingCategory* SystemCategory_;
- // These are just copies from Config_.
+
+ // Incrementing version forces loggers to update their own default configuration (default level etc.).
+ std::atomic<int> Version_ = 0;
+
+ std::atomic<bool> ConfiguredFromEnv_ = false;
+
+ // These are just cached (for performance reason) copies from Config_.
// The values are being read from arbitrary threads but stale values are fine.
std::atomic<ui64> HighBacklogWatermark_ = Max<ui64>();
std::atomic<ui64> LowBacklogWatermark_ = Max<ui64>();
+ std::atomic<bool> AbortOnAlert_ = false;
std::atomic<bool> InitializationStarted_ = false;
std::atomic<NThreading::TThreadId> InitializerThreadId_ = NThreading::InvalidThreadId;
@@ -1409,7 +1460,9 @@ private:
THashMap<TString, ILogWriterPtr> NameToWriter_;
THashMap<TLogWriterCacheKey, std::vector<ILogWriterPtr>> KeyToCachedWriter_;
+
const std::vector<ILogWriterPtr> SystemWriters_;
+ const TLoggingCategory* SystemCategory_;
std::atomic<bool> ReopenRequested_ = false;
std::atomic<bool> ShutdownRequested_ = false;