diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-06-04 04:40:38 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-06-04 04:40:38 +0000 |
commit | aeb6655daefe030fc9f43700a224b5adbf76191f (patch) | |
tree | bbbc8ef25becba5815d554ee841384319b0eabdd /library/cpp | |
parent | 6f60aaa00471a5b0694cb2017773a5d5afc94efa (diff) | |
parent | ec15d9dd90557956367f8675828c42b4abd8465e (diff) | |
download | ydb-aeb6655daefe030fc9f43700a224b5adbf76191f.tar.gz |
Merge pull request #19265 from ydb-platform/merge-libs-250604-0050
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/lwtrace/mon/mon_lwtrace.cpp | 8 | ||||
-rw-r--r-- | library/cpp/monlib/metrics/metric_registry.cpp | 21 | ||||
-rw-r--r-- | library/cpp/monlib/metrics/metric_registry.h | 1 | ||||
-rw-r--r-- | library/cpp/threading/chunk_queue/queue.h | 97 |
4 files changed, 84 insertions, 43 deletions
diff --git a/library/cpp/lwtrace/mon/mon_lwtrace.cpp b/library/cpp/lwtrace/mon/mon_lwtrace.cpp index 09d56560c4b..2593845e487 100644 --- a/library/cpp/lwtrace/mon/mon_lwtrace.cpp +++ b/library/cpp/lwtrace/mon/mon_lwtrace.cpp @@ -301,7 +301,7 @@ public: } } catch (...) { ythrow yexception() - << CurrentExceptionMessage() + << EncodeHtmlPcdata(CurrentExceptionMessage()) << " while parsing track log query: " << Text; } @@ -1853,7 +1853,7 @@ public: try { Os << src->GetStartTime().ToStringUpToSeconds(); } catch (...) { - Os << "error: " << CurrentExceptionMessage(); + Os << "error: " << EncodeHtmlPcdata(CurrentExceptionMessage()); } Os << "</td>" << "<td><div class=\"dropdown\">" @@ -3827,11 +3827,11 @@ public: if (request.GetParams().Get("error") == "text") { // Text error reply is helpful for ajax requests out << NMonitoring::HTTPOKTEXT; - out << CurrentExceptionMessage(); + out << EncodeHtmlPcdata(CurrentExceptionMessage()); } else { WWW_HTML(out) { out << "<h2>Error</h2><pre>" - << CurrentExceptionMessage() + << EncodeHtmlPcdata(CurrentExceptionMessage()) << Endl; } } diff --git a/library/cpp/monlib/metrics/metric_registry.cpp b/library/cpp/monlib/metrics/metric_registry.cpp index dbbea603c16..cc2e27d1185 100644 --- a/library/cpp/monlib/metrics/metric_registry.cpp +++ b/library/cpp/monlib/metrics/metric_registry.cpp @@ -296,4 +296,25 @@ namespace NMonitoring { ); } } + + void TMetricRegistry::Took(TInstant time, IMetricConsumer* consumer) const { + TReadGuard g{*Lock_}; + + for (const auto& it: Metrics_) { + ILabels* labels = it.first.Get(); + IMetric* metric = it.second.Metric.Get(); + TMetricOpts opts = it.second.Opts; + ConsumeMetric( + time, + consumer, + metric, + [&]() { + ConsumeLabels(consumer, CommonLabels_); + ConsumeLabels(consumer, *labels); + }, + opts + ); + metric->Reset(); + } + } } diff --git a/library/cpp/monlib/metrics/metric_registry.h b/library/cpp/monlib/metrics/metric_registry.h index 7669a8c0886..944bee1c6ac 100644 --- a/library/cpp/monlib/metrics/metric_registry.h +++ b/library/cpp/monlib/metrics/metric_registry.h @@ -212,6 +212,7 @@ namespace NMonitoring { void Accept(TInstant time, IMetricConsumer* consumer) const override; void Append(TInstant time, IMetricConsumer* consumer) const override; + void Took(TInstant time, IMetricConsumer* consumer) const; const TLabels& CommonLabels() const noexcept override { return CommonLabels_; diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h index 17ebef91a86..7cb4be4b4ef 100644 --- a/library/cpp/threading/chunk_queue/queue.h +++ b/library/cpp/threading/chunk_queue/queue.h @@ -1,11 +1,12 @@ #pragma once +#include <library/cpp/deprecated/atomic/atomic.h> // AtomicGet + #include <util/datetime/base.h> #include <util/generic/noncopyable.h> #include <util/generic/ptr.h> #include <util/generic/typetraits.h> #include <util/generic/ylimits.h> -#include <library/cpp/deprecated/atomic/atomic.h> #include <util/system/guard.h> #include <util/system/spinlock.h> #include <util/system/yassert.h> @@ -13,15 +14,15 @@ #include <atomic> namespace NThreading { -//////////////////////////////////////////////////////////////////////////////// -// Platform helpers + //////////////////////////////////////////////////////////////////////////////// + // Platform helpers #if !defined(PLATFORM_CACHE_LINE) -#define PLATFORM_CACHE_LINE 64 + #define PLATFORM_CACHE_LINE 64 #endif #if !defined(PLATFORM_PAGE_SIZE) -#define PLATFORM_PAGE_SIZE 4 * 1024 + #define PLATFORM_PAGE_SIZE 4 * 1024 #endif template <typename T, size_t PadSize = PLATFORM_CACHE_LINE> @@ -33,7 +34,7 @@ namespace NThreading { Y_UNUSED(Pad); } - template<typename... Args> + template <typename... Args> TPadded(Args&&... args) : T(std::forward<Args>(args)...) { @@ -85,7 +86,7 @@ namespace NThreading { TPodTypeHelper<T>, TNonPodTypeHelper<T>>; - } + } // namespace NImpl //////////////////////////////////////////////////////////////////////////////// // One producer/one consumer chunked queue. @@ -292,15 +293,19 @@ namespace NThreading { bool TryEnqueue(TT&& value, ui64 tag) { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; - if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { - TEntry* entry = queue.PrepareWrite(); - Y_ASSERT(entry); - TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); - entry->Tag = tag; - queue.CompleteWrite(); - queue.WriteLock.Release(); - return true; + if (queue.WriteLock.IsLocked()) { + continue; } + TTryGuard guard{queue.WriteLock}; + if (!guard) { + continue; + } + TEntry* entry = queue.PrepareWrite(); + Y_ASSERT(entry); + TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); + entry->Tag = tag; + queue.CompleteWrite(); + return true; } return false; } @@ -423,11 +428,15 @@ namespace NThreading { size_t writePos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[writePos++ % Concurrency]; - if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { - queue.Enqueue(std::forward<TT>(value)); - queue.WriteLock.Release(); - return true; + if (queue.WriteLock.IsLocked()) { + continue; + } + TTryGuard guard{queue.WriteLock}; + if (!guard) { + continue; } + queue.Enqueue(std::forward<TT>(value)); + return true; } return false; } @@ -461,12 +470,15 @@ namespace NThreading { size_t readPos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[readPos++ % Concurrency]; - if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) { - bool dequeued = queue.Dequeue(value); - queue.ReadLock.Release(); - if (dequeued) { - return true; - } + if (queue.ReadLock.IsLocked()) { + continue; + } + TTryGuard guard{queue.ReadLock}; + if (!guard) { + continue; + } + if (queue.Dequeue(value)) { + return true; } } return false; @@ -475,12 +487,15 @@ namespace NThreading { bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; - if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) { - bool empty = queue.IsEmpty(); - queue.ReadLock.Release(); - if (!empty) { - return false; - } + if (queue.ReadLock.IsLocked()) { + continue; + } + TTryGuard guard{queue.ReadLock}; + if (!guard) { + continue; + } + if (!queue.IsEmpty()) { + return false; } } return true; @@ -492,11 +507,15 @@ namespace NThreading { size_t writePos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[writePos++ % Concurrency]; - if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { - queue.Enqueue(std::forward<TT>(value)); - queue.WriteLock.Release(); - return true; + if (queue.WriteLock.IsLocked()) { + continue; } + TTryGuard guard{queue.WriteLock}; + if (!guard) { + continue; + } + queue.Enqueue(std::forward<TT>(value)); + return true; } return false; } @@ -514,18 +533,18 @@ namespace NThreading { using TItem = TAutoPtr<T>; ~TAutoQueueBase() { - TAutoPtr<T> value; + TItem value; while (Dequeue(value)) { // do nothing } } - void Enqueue(TAutoPtr<T> value) { + void Enqueue(TItem value) { Impl.Enqueue(value.Get()); Y_UNUSED(value.Release()); } - bool Dequeue(TAutoPtr<T>& value) { + bool Dequeue(TItem& value) { T* ptr = nullptr; if (Impl.Dequeue(ptr)) { value.Reset(ptr); @@ -553,4 +572,4 @@ namespace NThreading { template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>; -} +} // namespace NThreading |