diff options
author | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-05-06 14:09:49 +0300 |
---|---|---|
committer | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-05-06 14:09:49 +0300 |
commit | c6c03ee079d245c66db80036c1aefe49668c1fd0 (patch) | |
tree | d2ad03ea3735803e2a697dd5bde7bc3a2f8b44d8 | |
parent | f37f9be0a24ff9246db4360b6614da440b779386 (diff) | |
download | ydb-c6c03ee079d245c66db80036c1aefe49668c1fd0.tar.gz |
intermediate changes
ref:73802948623046c57dcf98a142cb281f33eea5c7
-rw-r--r-- | build/ya.conf.json | 4 | ||||
-rw-r--r-- | library/cpp/actors/http/CMakeLists.txt | 2 | ||||
-rw-r--r-- | library/cpp/actors/http/http_compress.cpp | 2 | ||||
-rw-r--r-- | library/cpp/logger/all.h | 1 | ||||
-rw-r--r-- | library/cpp/logger/reopen.h | 40 | ||||
-rw-r--r-- | library/cpp/logger/reopen_ut.cpp | 119 |
6 files changed, 165 insertions, 3 deletions
diff --git a/build/ya.conf.json b/build/ya.conf.json index 01d6b076a6..750ba4cd34 100644 --- a/build/ya.conf.json +++ b/build/ya.conf.json @@ -7659,11 +7659,11 @@ "sandbox_id": [ 1299398111 ], - "match": "yp_util" + "match": "yp-util" }, "executable": { "yp-util": [ - "yp_util" + "yp-util" ] } }, diff --git a/library/cpp/actors/http/CMakeLists.txt b/library/cpp/actors/http/CMakeLists.txt index 166af4cec5..277ffc4560 100644 --- a/library/cpp/actors/http/CMakeLists.txt +++ b/library/cpp/actors/http/CMakeLists.txt @@ -7,12 +7,14 @@ find_package(OpenSSL REQUIRED) +find_package(ZLIB REQUIRED) add_library(cpp-actors-http) target_link_libraries(cpp-actors-http PUBLIC contrib-libs-cxxsupp yutil OpenSSL::OpenSSL + ZLIB::ZLIB cpp-actors-core cpp-actors-interconnect library-cpp-dns diff --git a/library/cpp/actors/http/http_compress.cpp b/library/cpp/actors/http/http_compress.cpp index 33ff3e1674..0c5a2cd73c 100644 --- a/library/cpp/actors/http/http_compress.cpp +++ b/library/cpp/actors/http/http_compress.cpp @@ -1,6 +1,6 @@ #include "http.h" -#include <contrib/libs/zlib/zlib.h> +#include <zlib.h> namespace NHttp { diff --git a/library/cpp/logger/all.h b/library/cpp/logger/all.h index ee1666844e..2bbf45822a 100644 --- a/library/cpp/logger/all.h +++ b/library/cpp/logger/all.h @@ -6,4 +6,5 @@ #include "stream.h" #include "thread.h" #include "system.h" +#include "reopen.h" #include "sync_page_cache_file.h" diff --git a/library/cpp/logger/reopen.h b/library/cpp/logger/reopen.h new file mode 100644 index 0000000000..47f44be11b --- /dev/null +++ b/library/cpp/logger/reopen.h @@ -0,0 +1,40 @@ +#pragma once + +#include "log.h" +#include "backend.h" + +#include <util/generic/fwd.h> +#include <util/generic/ptr.h> +#include <util/generic/size_literals.h> + +#include <atomic> + +class TReopenLogBackend: public TLogBackend { +public: + explicit TReopenLogBackend(THolder<TLogBackend>&& backend, ui64 bytesWrittenLimit = 1_GB) + : Backend_(std::move(backend)), BytesWrittenLimit_(bytesWrittenLimit), BytesWritten_(0) { + Y_ENSURE(BytesWrittenLimit_ > 0); + } + + void WriteData(const TLogRecord& rec) override { + const ui64 prevWritten = BytesWritten_.fetch_add(rec.Len); + if (prevWritten < BytesWrittenLimit_ && prevWritten + rec.Len >= BytesWrittenLimit_) { + try { + ReopenLog(); + } catch (...) { + } + } + Backend_->WriteData(rec); + } + + void ReopenLog() override { + BytesWritten_.store(0); + Backend_->ReopenLog(); + } + +private: + const THolder<TLogBackend> Backend_; + + const ui64 BytesWrittenLimit_; + std::atomic<ui64> BytesWritten_; +}; diff --git a/library/cpp/logger/reopen_ut.cpp b/library/cpp/logger/reopen_ut.cpp new file mode 100644 index 0000000000..dab133718f --- /dev/null +++ b/library/cpp/logger/reopen_ut.cpp @@ -0,0 +1,119 @@ +#include "backend.h" +#include "reopen.h" + +#include <exception> +#include <util/generic/string.h> +#include <util/generic/array_ref.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/testing/unittest/tests_data.h> + +#include <future> +#include <atomic> +#include <barrier> + +namespace { + +struct TMockLogBackend : public TLogBackend { + void WriteData(const TLogRecord& rec) override { + BytesWritten.fetch_add(rec.Len); + } + + void ReopenLog() override { + NumReopens.fetch_add(1); + } + + std::atomic<ui64> BytesWritten{0}; + std::atomic<ui64> NumReopens{0}; +}; + +void WriteData(TReopenLogBackend& log, const TString& data) { + log.WriteData(TLogRecord(ELogPriority::TLOG_INFO, data.data(), data.size())); +} + +} + +Y_UNIT_TEST_SUITE(ReopenLogSuite) { + Y_UNIT_TEST(TestSimple) { + constexpr ui64 limit = 5; + const auto testData = {"test", "dshkafhuadshfiasuh", "log", "data"}; + constexpr ui64 expectedReopens = 2; // considering the limit, the first reopen after the second string and one more at the end + + auto mockHolder = MakeHolder<TMockLogBackend>(); + auto& mock = *mockHolder; + TReopenLogBackend log(std::move(mockHolder), limit); + + ui64 expectedWritten = 0; + for (const TString str : testData) { + WriteData(log, str); + expectedWritten += str.size(); + } + + UNIT_ASSERT(mock.BytesWritten.load() == expectedWritten); + UNIT_ASSERT(mock.NumReopens.load() == expectedReopens); + } + + Y_UNIT_TEST(TestSingleThreaded) { + constexpr ui64 limit = 1_KB; + constexpr ui64 numLogs = 123; + constexpr ui64 logSize = 1_KB / 4; + + static_assert((limit / logSize) * logSize == limit); // should be divisible for this test + constexpr ui64 expectedWritten = numLogs * logSize; + constexpr ui64 expectedReopens = expectedWritten / limit; + + auto mockHolder = MakeHolder<TMockLogBackend>(); + auto& mock = *mockHolder; + TReopenLogBackend log(std::move(mockHolder), limit); + + for (ui64 i = 0; i < numLogs; ++i) { + WriteData(log, TString(logSize, 'a')); + } + + UNIT_ASSERT(mock.BytesWritten.load() == expectedWritten); + UNIT_ASSERT(mock.NumReopens.load() == expectedReopens); + } + + Y_UNIT_TEST(TestMultiThreaded) { + constexpr ui64 limit = 1_KB; + constexpr ui64 numLogsPerThread = 123; + constexpr ui64 numThreads = 12; + constexpr ui64 logSize = 1_KB / 4; + + static_assert((limit / logSize) * logSize == limit); // should be divisible for this test + constexpr ui64 expectedWritten = numLogsPerThread * numThreads * logSize; + + // can't guarantee consistent number of reopens every N bytes in multithreaded setting + constexpr ui64 minExpectedReopens = limit / logSize; + constexpr ui64 maxExpectedReopens = expectedWritten / limit; + + auto mockHolder = MakeHolder<TMockLogBackend>(); + auto& mock = *mockHolder; + TReopenLogBackend log(std::move(mockHolder), limit); + + std::barrier barrier(numThreads); + const auto job = [&]() { + barrier.arrive_and_wait(); + + for (ui64 i = 0; i < numLogsPerThread; ++i) { + WriteData(log, TString(logSize, 'a')); + } + }; + + std::vector<std::future<void>> jobs; + for (ui64 i = 0; i < numThreads; ++i) { + jobs.emplace_back(std::async(std::launch::async, job)); + } + for (auto& res : jobs) { + res.wait(); + } + + UNIT_ASSERT(mock.BytesWritten.load() == expectedWritten); + UNIT_ASSERT(mock.NumReopens.load() >= minExpectedReopens); + UNIT_ASSERT(mock.NumReopens.load() <= maxExpectedReopens); + } + + Y_UNIT_TEST(TestZeroThrows) { + UNIT_ASSERT_EXCEPTION(TReopenLogBackend(MakeHolder<TMockLogBackend>(), 0), std::exception); + } +} |