diff options
| author | robot-piglet <[email protected]> | 2024-10-08 17:56:22 +0300 | 
|---|---|---|
| committer | robot-piglet <[email protected]> | 2024-10-08 18:07:23 +0300 | 
| commit | d17ef5729f3885185bdb7aa459fc4d3bada0c94d (patch) | |
| tree | 31f8c229410b09032912843fdd86c9717b623f39 /library/cpp | |
| parent | 37e325a9a8628ece2764f0f26b09ebc09ca39814 (diff) | |
Intermediate changes
commit_hash:8b7eb71badc9f2fcd168ee34e8c379b35577eccb
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/containers/dense_hash/dense_hash.h | 1 | ||||
| -rw-r--r-- | library/cpp/yt/stockpile/stockpile.h | 21 | ||||
| -rw-r--r-- | library/cpp/yt/stockpile/stockpile_linux.cpp | 99 | ||||
| -rw-r--r-- | library/cpp/yt/stockpile/stockpile_other.cpp | 5 | 
4 files changed, 114 insertions, 12 deletions
| diff --git a/library/cpp/containers/dense_hash/dense_hash.h b/library/cpp/containers/dense_hash/dense_hash.h index 5dae8487397..739479c25a3 100644 --- a/library/cpp/containers/dense_hash/dense_hash.h +++ b/library/cpp/containers/dense_hash/dense_hash.h @@ -2,6 +2,7 @@  #include "fwd.h" +#include <util/generic/bitops.h>  #include <util/generic/utility.h>  #include <util/generic/vector.h>  #include <util/generic/mapfindptr.h> diff --git a/library/cpp/yt/stockpile/stockpile.h b/library/cpp/yt/stockpile/stockpile.h index 1df9591de40..d84763cf284 100644 --- a/library/cpp/yt/stockpile/stockpile.h +++ b/library/cpp/yt/stockpile/stockpile.h @@ -1,5 +1,9 @@  #pragma once +#include <library/cpp/yt/cpu_clock/clock.h> + +#include <library/cpp/yt/misc/enum.h> +  #include <util/system/types.h>  #include <util/generic/size_literals.h> @@ -10,6 +14,14 @@ namespace NYT {  //////////////////////////////////////////////////////////////////////////////// +DEFINE_ENUM(EStockpileStrategy, +    ((FixedBreaks)          (0)) +    ((FlooredLoad)          (1)) +    ((ProgressiveBackoff)   (2)) +); + +//////////////////////////////////////////////////////////////////////////////// +  struct TStockpileOptions  {      static constexpr i64 DefaultBufferSize = 4_GBs; @@ -18,11 +30,18 @@ struct TStockpileOptions      static constexpr int DefaultThreadCount = 4;      int ThreadCount = DefaultThreadCount; +    static constexpr EStockpileStrategy DefaultStrategy = EStockpileStrategy::FixedBreaks; +    EStockpileStrategy Strategy = DefaultStrategy; +      static constexpr TDuration DefaultPeriod = TDuration::MilliSeconds(10);      TDuration Period = DefaultPeriod;  }; -void ConfigureStockpile(const TStockpileOptions& options); +//////////////////////////////////////////////////////////////////////////////// + +void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed); + +void RunDetachedStockpileThreads(TStockpileOptions options);  //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/stockpile/stockpile_linux.cpp b/library/cpp/yt/stockpile/stockpile_linux.cpp index 3ee83d93341..0cf5ea0a8bb 100644 --- a/library/cpp/yt/stockpile/stockpile_linux.cpp +++ b/library/cpp/yt/stockpile/stockpile_linux.cpp @@ -1,38 +1,117 @@  #include "stockpile.h" +#include "library/cpp/yt/logging/logger.h" +  #include <thread>  #include <mutex>  #include <sys/mman.h>  #include <util/system/thread.h> +#include <string.h>  namespace NYT {  //////////////////////////////////////////////////////////////////////////////// +static const auto Logger = NLogging::TLogger("Stockpile"); + +constexpr int MADV_STOCKPILE = 0x59410004; + +//////////////////////////////////////////////////////////////////////////////// +  namespace { -void RunStockpile(const TStockpileOptions& options) +void RunWithFixedBreaks(i64 bufferSize, TDuration period)  { -    TThread::SetCurrentThreadName("Stockpile"); +    auto returnCode = ::madvise(nullptr, bufferSize, MADV_STOCKPILE); +    YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" returned %v", strerror(returnCode)); +    Sleep(period); +} -    constexpr int MADV_STOCKPILE = 0x59410004; +void RunWithCappedLoad(i64 bufferSize, TDuration period) +{ +    auto started = GetApproximateCpuInstant(); +    auto returnCode = ::madvise(nullptr, bufferSize, MADV_STOCKPILE); +    YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" returned %v", strerror(returnCode)); +    auto duration = CpuDurationToDuration(GetApproximateCpuInstant() - started); -    while (true) { -        ::madvise(nullptr, options.BufferSize, MADV_STOCKPILE); -        Sleep(options.Period); +    if (duration < period) { +        Sleep(period - duration); +    } +} + +std::pair<i64, TDuration> RunWithBackoffs( +    i64 adjustedBufferSize, +    TDuration adjustedPeriod, +    const TStockpileOptions& options, +    i64 pageSize) +{ +    int returnCode = ::madvise(nullptr, adjustedBufferSize, MADV_STOCKPILE); +    YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" returned %v", strerror(returnCode)); + +    switch(returnCode) { +        case 0: +            Sleep(options.Period); +            return {options.BufferSize, options.Period}; + +        case ENOMEM: +            if (adjustedBufferSize / 2 >= pageSize) { +                // Immediately make an attempt to reclaim half as much. +                adjustedBufferSize = adjustedBufferSize / 2; +            } else { +                // Unless there is not even a single reclaimable page. +                Sleep(options.Period); +            } +            return {adjustedBufferSize, options.Period}; + +        case EAGAIN: +        case EINTR: +            Sleep(adjustedPeriod); +            return {options.BufferSize, adjustedPeriod + options.Period}; + +        default: +            Sleep(options.Period); +            return {options.BufferSize, options.Period};      }  }  } // namespace -void ConfigureStockpile(const TStockpileOptions& options) +void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed) +{ +    TThread::SetCurrentThreadName("Stockpile"); + +    const i64 pageSize = sysconf(_SC_PAGESIZE); +    auto bufferSize = options.BufferSize; +    auto period = options.Period; + +    while (!shouldProceed || shouldProceed->load()) { +        switch (options.Strategy) { +            case EStockpileStrategy::FixedBreaks: +                RunWithFixedBreaks(options.BufferSize, options.Period); +                break; + +            case EStockpileStrategy::FlooredLoad: +                RunWithCappedLoad(options.BufferSize, options.Period); +                break; + +            case EStockpileStrategy::ProgressiveBackoff: +                std::tie(bufferSize, period) = RunWithBackoffs(bufferSize, period, options, pageSize); +                break; + +            default: +                YT_ABORT(); +        } +    } +} + +void RunDetachedStockpileThreads(TStockpileOptions options)  {      static std::once_flag OnceFlag; -    std::call_once(OnceFlag, [options] { -        for (int i = 0; i < options.ThreadCount; i++) { -            std::thread(RunStockpile, options).detach(); +    std::call_once(OnceFlag, [options = std::move(options)] { +        for (int i = 0; i < options.ThreadCount; ++i) { +            std::thread(RunStockpileThread, options, nullptr).detach();          }      });  } diff --git a/library/cpp/yt/stockpile/stockpile_other.cpp b/library/cpp/yt/stockpile/stockpile_other.cpp index 3495d9c1cb6..ce52a876aca 100644 --- a/library/cpp/yt/stockpile/stockpile_other.cpp +++ b/library/cpp/yt/stockpile/stockpile_other.cpp @@ -4,7 +4,10 @@ namespace NYT {  //////////////////////////////////////////////////////////////////////////////// -void ConfigureStockpile(const TStockpileOptions& /*options*/) +void RunStockpileThread(TStockpileOptions /*options*/, std::atomic<bool>* /*shouldProceed*/) +{ } + +void RunDetachedStockpileThreads(TStockpileOptions /*options*/)  { }  //////////////////////////////////////////////////////////////////////////////// | 
