aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-06-04 04:40:38 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-06-04 04:40:38 +0000
commitaeb6655daefe030fc9f43700a224b5adbf76191f (patch)
treebbbc8ef25becba5815d554ee841384319b0eabdd /library/cpp
parent6f60aaa00471a5b0694cb2017773a5d5afc94efa (diff)
parentec15d9dd90557956367f8675828c42b4abd8465e (diff)
downloadydb-aeb6655daefe030fc9f43700a224b5adbf76191f.tar.gz
Merge pull request #19265 from ydb-platform/merge-libs-250604-0050
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/lwtrace/mon/mon_lwtrace.cpp8
-rw-r--r--library/cpp/monlib/metrics/metric_registry.cpp21
-rw-r--r--library/cpp/monlib/metrics/metric_registry.h1
-rw-r--r--library/cpp/threading/chunk_queue/queue.h97
4 files changed, 84 insertions, 43 deletions
diff --git a/library/cpp/lwtrace/mon/mon_lwtrace.cpp b/library/cpp/lwtrace/mon/mon_lwtrace.cpp
index 09d56560c4b..2593845e487 100644
--- a/library/cpp/lwtrace/mon/mon_lwtrace.cpp
+++ b/library/cpp/lwtrace/mon/mon_lwtrace.cpp
@@ -301,7 +301,7 @@ public:
}
} catch (...) {
ythrow yexception()
- << CurrentExceptionMessage()
+ << EncodeHtmlPcdata(CurrentExceptionMessage())
<< " while parsing track log query: "
<< Text;
}
@@ -1853,7 +1853,7 @@ public:
try {
Os << src->GetStartTime().ToStringUpToSeconds();
} catch (...) {
- Os << "error: " << CurrentExceptionMessage();
+ Os << "error: " << EncodeHtmlPcdata(CurrentExceptionMessage());
}
Os << "</td>"
<< "<td><div class=\"dropdown\">"
@@ -3827,11 +3827,11 @@ public:
if (request.GetParams().Get("error") == "text") {
// Text error reply is helpful for ajax requests
out << NMonitoring::HTTPOKTEXT;
- out << CurrentExceptionMessage();
+ out << EncodeHtmlPcdata(CurrentExceptionMessage());
} else {
WWW_HTML(out) {
out << "<h2>Error</h2><pre>"
- << CurrentExceptionMessage()
+ << EncodeHtmlPcdata(CurrentExceptionMessage())
<< Endl;
}
}
diff --git a/library/cpp/monlib/metrics/metric_registry.cpp b/library/cpp/monlib/metrics/metric_registry.cpp
index dbbea603c16..cc2e27d1185 100644
--- a/library/cpp/monlib/metrics/metric_registry.cpp
+++ b/library/cpp/monlib/metrics/metric_registry.cpp
@@ -296,4 +296,25 @@ namespace NMonitoring {
);
}
}
+
+ void TMetricRegistry::Took(TInstant time, IMetricConsumer* consumer) const {
+ TReadGuard g{*Lock_};
+
+ for (const auto& it: Metrics_) {
+ ILabels* labels = it.first.Get();
+ IMetric* metric = it.second.Metric.Get();
+ TMetricOpts opts = it.second.Opts;
+ ConsumeMetric(
+ time,
+ consumer,
+ metric,
+ [&]() {
+ ConsumeLabels(consumer, CommonLabels_);
+ ConsumeLabels(consumer, *labels);
+ },
+ opts
+ );
+ metric->Reset();
+ }
+ }
}
diff --git a/library/cpp/monlib/metrics/metric_registry.h b/library/cpp/monlib/metrics/metric_registry.h
index 7669a8c0886..944bee1c6ac 100644
--- a/library/cpp/monlib/metrics/metric_registry.h
+++ b/library/cpp/monlib/metrics/metric_registry.h
@@ -212,6 +212,7 @@ namespace NMonitoring {
void Accept(TInstant time, IMetricConsumer* consumer) const override;
void Append(TInstant time, IMetricConsumer* consumer) const override;
+ void Took(TInstant time, IMetricConsumer* consumer) const;
const TLabels& CommonLabels() const noexcept override {
return CommonLabels_;
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h
index 17ebef91a86..7cb4be4b4ef 100644
--- a/library/cpp/threading/chunk_queue/queue.h
+++ b/library/cpp/threading/chunk_queue/queue.h
@@ -1,11 +1,12 @@
#pragma once
+#include <library/cpp/deprecated/atomic/atomic.h> // AtomicGet
+
#include <util/datetime/base.h>
#include <util/generic/noncopyable.h>
#include <util/generic/ptr.h>
#include <util/generic/typetraits.h>
#include <util/generic/ylimits.h>
-#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/system/guard.h>
#include <util/system/spinlock.h>
#include <util/system/yassert.h>
@@ -13,15 +14,15 @@
#include <atomic>
namespace NThreading {
-////////////////////////////////////////////////////////////////////////////////
-// Platform helpers
+ ////////////////////////////////////////////////////////////////////////////////
+ // Platform helpers
#if !defined(PLATFORM_CACHE_LINE)
-#define PLATFORM_CACHE_LINE 64
+ #define PLATFORM_CACHE_LINE 64
#endif
#if !defined(PLATFORM_PAGE_SIZE)
-#define PLATFORM_PAGE_SIZE 4 * 1024
+ #define PLATFORM_PAGE_SIZE 4 * 1024
#endif
template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
@@ -33,7 +34,7 @@ namespace NThreading {
Y_UNUSED(Pad);
}
- template<typename... Args>
+ template <typename... Args>
TPadded(Args&&... args)
: T(std::forward<Args>(args)...)
{
@@ -85,7 +86,7 @@ namespace NThreading {
TPodTypeHelper<T>,
TNonPodTypeHelper<T>>;
- }
+ } // namespace NImpl
////////////////////////////////////////////////////////////////////////////////
// One producer/one consumer chunked queue.
@@ -292,15 +293,19 @@ namespace NThreading {
bool TryEnqueue(TT&& value, ui64 tag) {
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[i];
- if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) {
- TEntry* entry = queue.PrepareWrite();
- Y_ASSERT(entry);
- TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
- entry->Tag = tag;
- queue.CompleteWrite();
- queue.WriteLock.Release();
- return true;
+ if (queue.WriteLock.IsLocked()) {
+ continue;
}
+ TTryGuard guard{queue.WriteLock};
+ if (!guard) {
+ continue;
+ }
+ TEntry* entry = queue.PrepareWrite();
+ Y_ASSERT(entry);
+ TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
+ entry->Tag = tag;
+ queue.CompleteWrite();
+ return true;
}
return false;
}
@@ -423,11 +428,15 @@ namespace NThreading {
size_t writePos = GetCycleCount();
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[writePos++ % Concurrency];
- if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) {
- queue.Enqueue(std::forward<TT>(value));
- queue.WriteLock.Release();
- return true;
+ if (queue.WriteLock.IsLocked()) {
+ continue;
+ }
+ TTryGuard guard{queue.WriteLock};
+ if (!guard) {
+ continue;
}
+ queue.Enqueue(std::forward<TT>(value));
+ return true;
}
return false;
}
@@ -461,12 +470,15 @@ namespace NThreading {
size_t readPos = GetCycleCount();
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[readPos++ % Concurrency];
- if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) {
- bool dequeued = queue.Dequeue(value);
- queue.ReadLock.Release();
- if (dequeued) {
- return true;
- }
+ if (queue.ReadLock.IsLocked()) {
+ continue;
+ }
+ TTryGuard guard{queue.ReadLock};
+ if (!guard) {
+ continue;
+ }
+ if (queue.Dequeue(value)) {
+ return true;
}
}
return false;
@@ -475,12 +487,15 @@ namespace NThreading {
bool IsEmpty() {
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[i];
- if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) {
- bool empty = queue.IsEmpty();
- queue.ReadLock.Release();
- if (!empty) {
- return false;
- }
+ if (queue.ReadLock.IsLocked()) {
+ continue;
+ }
+ TTryGuard guard{queue.ReadLock};
+ if (!guard) {
+ continue;
+ }
+ if (!queue.IsEmpty()) {
+ return false;
}
}
return true;
@@ -492,11 +507,15 @@ namespace NThreading {
size_t writePos = GetCycleCount();
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[writePos++ % Concurrency];
- if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) {
- queue.Enqueue(std::forward<TT>(value));
- queue.WriteLock.Release();
- return true;
+ if (queue.WriteLock.IsLocked()) {
+ continue;
}
+ TTryGuard guard{queue.WriteLock};
+ if (!guard) {
+ continue;
+ }
+ queue.Enqueue(std::forward<TT>(value));
+ return true;
}
return false;
}
@@ -514,18 +533,18 @@ namespace NThreading {
using TItem = TAutoPtr<T>;
~TAutoQueueBase() {
- TAutoPtr<T> value;
+ TItem value;
while (Dequeue(value)) {
// do nothing
}
}
- void Enqueue(TAutoPtr<T> value) {
+ void Enqueue(TItem value) {
Impl.Enqueue(value.Get());
Y_UNUSED(value.Release());
}
- bool Dequeue(TAutoPtr<T>& value) {
+ bool Dequeue(TItem& value) {
T* ptr = nullptr;
if (Impl.Dequeue(ptr)) {
value.Reset(ptr);
@@ -553,4 +572,4 @@ namespace NThreading {
template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
-}
+} // namespace NThreading