diff options
author | lukyan <lukyan@yandex-team.com> | 2024-06-20 00:45:46 +0300 |
---|---|---|
committer | lukyan <lukyan@yandex-team.com> | 2024-06-20 00:58:09 +0300 |
commit | 390ef299e5e7611462cd17cdef8e6dab65d87de6 (patch) | |
tree | 3dcae9c3229e8b9ff03ecc1e56a090e9cd912337 | |
parent | 10ee00c96ea853958790895d65a40b0286f8ea4a (diff) | |
download | ydb-390ef299e5e7611462cd17cdef8e6dab65d87de6.tar.gz |
Keep multiple last modification locations
Move TStaticRingQueue in separate file
44482a34376e587eef40bfc5a7aade3df1a7361d
-rw-r--r-- | yt/yt/core/concurrency/propagating_storage.cpp | 44 | ||||
-rw-r--r-- | yt/yt/core/concurrency/propagating_storage.h | 4 | ||||
-rw-r--r-- | yt/yt/core/misc/static_ring_queue-inl.h | 63 | ||||
-rw-r--r-- | yt/yt/core/misc/static_ring_queue.h | 28 | ||||
-rw-r--r-- | yt/yt/core/tracing/trace_context-inl.h | 13 | ||||
-rw-r--r-- | yt/yt/core/tracing/trace_context.h | 2 | ||||
-rw-r--r-- | yt/yt/core/yson/detail.h | 52 |
7 files changed, 137 insertions, 69 deletions
diff --git a/yt/yt/core/concurrency/propagating_storage.cpp b/yt/yt/core/concurrency/propagating_storage.cpp index 10a1e01582..c7390fa0f3 100644 --- a/yt/yt/core/concurrency/propagating_storage.cpp +++ b/yt/yt/core/concurrency/propagating_storage.cpp @@ -4,6 +4,8 @@ #include <library/cpp/yt/threading/fork_aware_spin_lock.h> +#include <yt/yt/core/misc/static_ring_queue.h> + namespace NYT::NConcurrency { //////////////////////////////////////////////////////////////////////////////// @@ -49,8 +51,26 @@ public: DEFINE_SIGNAL_SIMPLE(void(), OnBeforeUninstall); DEFINE_SIGNAL_SIMPLE(void(), OnAfterInstall); + void RecordLocation(TSourceLocation loc) + { + Locations_.Append(&loc, &loc + 1); + } + + void PrintModificationLocationsToStderr() + { + size_t size = Locations_.Size(); + TSourceLocation lastLocations[MaxSize]; + Locations_.CopyTailTo(size, &lastLocations[0]); + for (size_t i = 0; i < size; ++i) { + Cerr << NYT::ToString(lastLocations[i]) << Endl; + } + } + private: TStorage Data_; + + static constexpr int MaxSize = 8; + TStaticRingQueue<TSourceLocation, MaxSize> Locations_; }; //////////////////////////////////////////////////////////////////////////////// @@ -176,11 +196,21 @@ void TPropagatingStorage::EnsureUnique() Impl_ = Impl_->Clone(); } +void TPropagatingStorage::RecordLocation(TSourceLocation loc) +{ + Impl_->RecordLocation(loc); +} + +void TPropagatingStorage::PrintModificationLocationsToStderr() +{ + Impl_->PrintModificationLocationsToStderr(); +} + struct TPropagatingStorageInfo { TPropagatingStorage Storage; TSourceLocation Location; - TSourceLocation ModifyLocation; + TSourceLocation PrevLocation; }; YT_DEFINE_GLOBAL(TFlsSlot<TPropagatingStorageInfo>, PropagatingStorageSlot, {}); @@ -255,20 +285,18 @@ void InstallGlobalPropagatingStorageSwitchHandler(TPropagatingStorageGlobalSwitc TSourceLocation SwitchPropagatingStorageLocation(TSourceLocation loc) { + PropagatingStorageSlot()->PrevLocation = PropagatingStorageSlot()->Location; return std::exchange(PropagatingStorageSlot()->Location, loc); } -TSourceLocation SwitchPropagatingStorageModifyLocation(TSourceLocation loc) -{ - return std::exchange(PropagatingStorageSlot()->ModifyLocation, loc); -} - void PrintLocationToStderr() { Cerr << Format( - "PropagatingStorageLoccation: %v, ModificationLocation: %v", + "PropagatingStorageLocation: %v, PrevLocation: %v, ModificationLocations:", PropagatingStorageSlot()->Location, - PropagatingStorageSlot()->ModifyLocation) << Endl; + PropagatingStorageSlot()->PrevLocation) << Endl; + + PropagatingStorageSlot()->Storage.PrintModificationLocationsToStderr(); } TPropagatingStorageGuard::TPropagatingStorageGuard(TPropagatingStorage storage, TSourceLocation loc) diff --git a/yt/yt/core/concurrency/propagating_storage.h b/yt/yt/core/concurrency/propagating_storage.h index 4a4f9f3aaf..aa41381571 100644 --- a/yt/yt/core/concurrency/propagating_storage.h +++ b/yt/yt/core/concurrency/propagating_storage.h @@ -67,6 +67,9 @@ public: template <class T> std::optional<T> Remove(); + void RecordLocation(TSourceLocation loc); + void PrintModificationLocationsToStderr(); + DECLARE_SIGNAL(void(), OnAfterInstall); DECLARE_SIGNAL(void(), OnBeforeUninstall); @@ -149,7 +152,6 @@ private: //////////////////////////////////////////////////////////////////////////////// TSourceLocation SwitchPropagatingStorageLocation(TSourceLocation loc); -TSourceLocation SwitchPropagatingStorageModifyLocation(TSourceLocation loc); void PrintLocationToStderr(); diff --git a/yt/yt/core/misc/static_ring_queue-inl.h b/yt/yt/core/misc/static_ring_queue-inl.h new file mode 100644 index 0000000000..6b1b2753d1 --- /dev/null +++ b/yt/yt/core/misc/static_ring_queue-inl.h @@ -0,0 +1,63 @@ +#ifndef STATIC_RING_QUEUE_INL_H_ +#error "Direct inclusion of this file is not allowed, include static_ring_queue.h" +// For the sake of sane code completion. +#include "static_ring_queue.h" +#endif +#undef STATIC_RING_QUEUE_INL_H_ + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T, size_t Capacity> +void TStaticRingQueue<T, Capacity>::Append(const T* begin, const T* end) +{ + // Determine input tail. + if (std::distance(begin, end) > static_cast<i64>(Capacity)) { + begin = end - Capacity; + } + + size_t appendSize = std::distance(begin, end); + + if (Size_ > Capacity - appendSize) { + Size_ = Capacity; + } else { + Size_ += appendSize; + } + + EndOffset_ += appendSize; + if (EndOffset_ >= Capacity) { + EndOffset_ -= Capacity; + YT_VERIFY(EndOffset_ < Capacity); + } + + size_t tailSize = std::min<size_t>(EndOffset_, appendSize); + + std::copy(end - tailSize, end, Buffer_ + EndOffset_ - tailSize); + end -= tailSize; + std::copy(begin, end, Buffer_ + Capacity - (end - begin)); +} + +template <class T, size_t Capacity> +void TStaticRingQueue<T, Capacity>::CopyTailTo(size_t copySize, T* begin) const +{ + YT_VERIFY(copySize <= Size_); + + if (copySize > EndOffset_) { + size_t tailSize = copySize - EndOffset_; + std::copy(Buffer_ + Capacity - tailSize, Buffer_ + Capacity, begin); + std::copy(Buffer_, Buffer_ + EndOffset_, begin + tailSize); + } else { + std::copy(Buffer_ + EndOffset_ - copySize, Buffer_ + EndOffset_, begin); + } +} + +template <class T, size_t Capacity> +size_t TStaticRingQueue<T, Capacity>::Size() const +{ + return Size_; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/yt/core/misc/static_ring_queue.h b/yt/yt/core/misc/static_ring_queue.h new file mode 100644 index 0000000000..21d161f358 --- /dev/null +++ b/yt/yt/core/misc/static_ring_queue.h @@ -0,0 +1,28 @@ +#pragma once +#include "common.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T, size_t Capacity> +class TStaticRingQueue +{ +public: + void Append(const T* begin, const T* end); + void CopyTailTo(size_t copySize, T* begin) const; + size_t Size() const; + +private: + T Buffer_[Capacity]; + size_t EndOffset_ = 0; + size_t Size_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define STATIC_RING_QUEUE_INL_H_ +#include "static_ring_queue-inl.h" +#undef STATIC_RING_QUEUE_INL_H_ diff --git a/yt/yt/core/tracing/trace_context-inl.h b/yt/yt/core/tracing/trace_context-inl.h index 94bf3e0bdc..de1d767a83 100644 --- a/yt/yt/core/tracing/trace_context-inl.h +++ b/yt/yt/core/tracing/trace_context-inl.h @@ -194,14 +194,13 @@ Y_FORCE_INLINE TCurrentTraceContextGuard::TCurrentTraceContextGuard(TTraceContex { if (Active_) { OldTraceContext_ = NDetail::SwapTraceContext(std::move(traceContext)); - OldLocation_ = NConcurrency::SwitchPropagatingStorageModifyLocation(location); + NConcurrency::GetCurrentPropagatingStorage().RecordLocation(location); } } Y_FORCE_INLINE TCurrentTraceContextGuard::TCurrentTraceContextGuard(TCurrentTraceContextGuard&& other) : Active_(other.Active_) , OldTraceContext_(std::move(other.OldTraceContext_)) - , OldLocation_(other.OldLocation_) { other.Active_ = false; } @@ -220,7 +219,7 @@ Y_FORCE_INLINE void TCurrentTraceContextGuard::Release() { if (Active_) { NDetail::SwapTraceContext(std::move(OldTraceContext_)); - NConcurrency::SwitchPropagatingStorageModifyLocation(OldLocation_); + NConcurrency::GetCurrentPropagatingStorage().RecordLocation(FROM_HERE); Active_ = false; } } @@ -235,13 +234,13 @@ Y_FORCE_INLINE const TTraceContextPtr& TCurrentTraceContextGuard::GetOldTraceCon Y_FORCE_INLINE TNullTraceContextGuard::TNullTraceContextGuard(TSourceLocation location) : Active_(true) , OldTraceContext_(NDetail::SwapTraceContext(nullptr)) - , OldLocation_(NConcurrency::SwitchPropagatingStorageModifyLocation(location)) -{ } +{ + NConcurrency::GetCurrentPropagatingStorage().RecordLocation(location); +} Y_FORCE_INLINE TNullTraceContextGuard::TNullTraceContextGuard(TNullTraceContextGuard&& other) : Active_(other.Active_) , OldTraceContext_(std::move(other.OldTraceContext_)) - , OldLocation_(other.OldLocation_) { other.Active_ = false; } @@ -260,7 +259,7 @@ Y_FORCE_INLINE void TNullTraceContextGuard::Release() { if (Active_) { NDetail::SwapTraceContext(std::move(OldTraceContext_)); - NConcurrency::SwitchPropagatingStorageModifyLocation(OldLocation_); + NConcurrency::GetCurrentPropagatingStorage().RecordLocation(FROM_HERE); Active_ = false; } } diff --git a/yt/yt/core/tracing/trace_context.h b/yt/yt/core/tracing/trace_context.h index fba8141081..8df5446155 100644 --- a/yt/yt/core/tracing/trace_context.h +++ b/yt/yt/core/tracing/trace_context.h @@ -316,7 +316,6 @@ public: private: bool Active_; TTraceContextPtr OldTraceContext_; - TSourceLocation OldLocation_; }; //////////////////////////////////////////////////////////////////////////////// @@ -337,7 +336,6 @@ public: private: bool Active_; TTraceContextPtr OldTraceContext_; - TSourceLocation OldLocation_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/yson/detail.h b/yt/yt/core/yson/detail.h index 352c45a958..8cbb172626 100644 --- a/yt/yt/core/yson/detail.h +++ b/yt/yt/core/yson/detail.h @@ -8,6 +8,7 @@ #include <yt/yt/core/misc/error.h> #include <yt/yt/core/misc/parser_helpers.h> #include <yt/yt/core/misc/property.h> +#include <yt/yt/core/misc/static_ring_queue.h> #include <library/cpp/yt/coding/varint.h> #include <library/cpp/yt/coding/zig_zag.h> @@ -104,57 +105,6 @@ public: } }; -template <class T, size_t Capacity> -class TStaticRingQueue -{ -private: - static_assert(std::is_standard_layout_v<T> && std::is_trivial_v<T>, "T must be POD."); - - T Buffer_[Capacity]; - size_t EndOffset_ = 0; - size_t Size_ = 0; - -public: - void Append(const T* begin, const T* end) - { - if (std::distance(begin, end) > static_cast<i64>(Capacity)) { - begin = end - Capacity; - } - - size_t appendSize = std::distance(begin, end); - Size_ = std::min(Capacity, Size_ + appendSize); - - EndOffset_ += appendSize; - if (EndOffset_ >= Capacity) { - EndOffset_ -= Capacity; - YT_VERIFY(EndOffset_ < Capacity); - } - - size_t tailSize = std::min<size_t>(EndOffset_, appendSize); - std::copy(end - tailSize, end, Buffer_ + EndOffset_ - tailSize); - end -= tailSize; - std::copy(begin, end, Buffer_ + Capacity - (end - begin)); - } - - void CopyTailTo(size_t copySize, T* begin) const - { - YT_VERIFY(copySize <= Size_); - - if (copySize > EndOffset_) { - size_t tailSize = copySize - EndOffset_; - std::copy(Buffer_ + Capacity - tailSize, Buffer_ + Capacity, begin); - std::copy(Buffer_, Buffer_ + EndOffset_, begin + tailSize); - } else { - std::copy(Buffer_ + EndOffset_ - copySize, Buffer_ + EndOffset_, begin); - } - } - - size_t Size() const - { - return Size_; - } -}; - static constexpr size_t MaxMarginSize = 10; template <class TBlockStream, size_t MaxContextSize> |