diff options
| author | babenko <[email protected]> | 2025-11-05 16:26:08 +0300 |
|---|---|---|
| committer | babenko <[email protected]> | 2025-11-05 17:09:03 +0300 |
| commit | 659334cd4b78901187d4cef85b8a67ec63f84e4e (patch) | |
| tree | 962ddac21d1c8d9185434c2380df918bf6873c9b | |
| parent | 21e77ed3bcc80f6fa0dc578cbd22e5f52c234299 (diff) | |
YT-18571: Preliminary cosmetics and refactoring of log compression
commit_hash:57f52a44eb68d2c600e48de1c1234d6b708e4472
| -rw-r--r-- | yt/yt/core/logging/appendable_compressed_file.cpp | 200 | ||||
| -rw-r--r-- | yt/yt/core/logging/appendable_compressed_file.h | 26 | ||||
| -rw-r--r-- | yt/yt/core/logging/compression.cpp | 150 | ||||
| -rw-r--r-- | yt/yt/core/logging/compression.h | 84 | ||||
| -rw-r--r-- | yt/yt/core/logging/file_log_writer.cpp | 12 | ||||
| -rw-r--r-- | yt/yt/core/logging/log_codec.h | 32 | ||||
| -rw-r--r-- | yt/yt/core/logging/public.h | 3 | ||||
| -rw-r--r-- | yt/yt/core/logging/unittests/logging_ut.cpp | 29 | ||||
| -rw-r--r-- | yt/yt/core/logging/zstd_log_codec.cpp (renamed from yt/yt/core/logging/zstd_compression.cpp) | 34 | ||||
| -rw-r--r-- | yt/yt/core/logging/zstd_log_codec.h (renamed from yt/yt/core/logging/zstd_compression.h) | 2 | ||||
| -rw-r--r-- | yt/yt/core/ya.make | 4 |
11 files changed, 302 insertions, 274 deletions
diff --git a/yt/yt/core/logging/appendable_compressed_file.cpp b/yt/yt/core/logging/appendable_compressed_file.cpp new file mode 100644 index 00000000000..485932f640e --- /dev/null +++ b/yt/yt/core/logging/appendable_compressed_file.cpp @@ -0,0 +1,200 @@ +#include "appendable_compressed_file.h" + +#include "log_codec.h" +#include "stream_output.h" + +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/scheduler_api.h> + +namespace NYT::NLogging { + +using namespace NConcurrency; + +//////////////////////////////////////////////////////////////////////////////// + +class TAppendableCompressedFile + : public IStreamLogOutput +{ +public: + TAppendableCompressedFile( + TFile file, + ILogCodecPtr codec, + IInvokerPtr compressInvoker, + const TAppendableCompressedFileOptions& options) + : Codec_(std::move(codec)) + , CompressInvoker_(std::move(compressInvoker)) + , Options_(options) + , SerializedInvoker_(CreateSerializedInvoker( + CompressInvoker_, + NProfiling::TTagSet({ + {"invoker", "appendable_compressed_file"}, + {"file_name", file.GetName()}, + }))) + , MaxBlockSize_(Codec_->GetMaxBlockSize()) + , File_(std::move(file)) + { + i64 fileSize = File_.GetLength(); + OutputPosition_ = Codec_->Repair(&File_); + if (OutputPosition_ != fileSize && Options_.WriteTruncateMessage) { + auto message = Format("*** Truncated %v trailing bytes due to zstd repair\n", fileSize - OutputPosition_); + Input_.Append(message.data(), message.size()); + Flush(); + } + } + +private: + const ILogCodecPtr Codec_; + const IInvokerPtr CompressInvoker_; + const TAppendableCompressedFileOptions Options_; + const IInvokerPtr SerializedInvoker_; + const i64 MaxBlockSize_; + + TFile File_; + + // These fields are read and updated in the thread that owns this TAppendableCompressedFile. + TBuffer Input_; + i64 EnqueuedBuffersCount_ = 0; + i64 OutputPosition_ = 0; + + // These fields are read and updated in SerializedInvoker_. + THashMap<i64, TBuffer> CompressedBuffers_; + i64 WrittenBuffersCount_ = 0; + + // These fields are read and updated under FlushSpinLock_. + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, FlushSpinLock_); + i64 BuffersToFlushCount_ = 0; + i64 CompressedBuffersCount_ = 0; + TPromise<void> ReadyToFlushEvent_; + + void DoWrite(const void* buf, size_t len) final + { + const char* in = reinterpret_cast<const char*>(buf); + while (len > 0) { + size_t toWrite = len; + if (static_cast<i64>(Input_.Size()) >= MaxBlockSize_) { + toWrite = 0; + } else if (static_cast<i64>(Input_.Size() + len) >= MaxBlockSize_) { + toWrite = MaxBlockSize_ - Input_.Size(); + } + + Input_.Append(in, toWrite); + in += toWrite; + len -= toWrite; + + while (static_cast<i64>(Input_.Size()) >= MaxBlockSize_) { + EnqueueOneFrame(); + } + } + } + + void DoFlush() final + { + while (!Input_.Empty()) { + EnqueueOneFrame(); + } + FlushOutput(); + } + + void DoFinish() final + { + Flush(); + } + + void EnqueueBuffer(TBuffer buffer) + { + i64 bufferId = EnqueuedBuffersCount_; + ++EnqueuedBuffersCount_; + + BIND([this_ = MakeStrong(this), this, buffer = std::move(buffer)] { + TBuffer output; + Codec_->Compress(buffer, &output); + return output; + }) + .AsyncVia(CompressInvoker_) + .Run() + .Subscribe(BIND([this_ = MakeStrong(this), this, bufferId] (TErrorOr<TBuffer> result) { + YT_VERIFY(result.IsOK()); + + CompressedBuffers_[bufferId] = std::move(result.Value()); + + auto guard = Guard(FlushSpinLock_); + ++CompressedBuffersCount_; + if (CompressedBuffersCount_ == BuffersToFlushCount_) { + ReadyToFlushEvent_.Set(); + } + }) + .Via(SerializedInvoker_)); + } + + void EnqueueOneFrame() + { + if (Input_.Empty()) { + return; + } + + size_t toWrite = std::min(Input_.Size(), static_cast<size_t>(MaxBlockSize_)); + EnqueueBuffer(TBuffer(Input_.Data(), toWrite)); + Input_.ChopHead(toWrite); + } + + void FlushOutput() + { + TFuture<void> readyToFlushFuture; + + { + auto guard = Guard(FlushSpinLock_); + BuffersToFlushCount_ = EnqueuedBuffersCount_; + if (BuffersToFlushCount_ == CompressedBuffersCount_) { + readyToFlushFuture = VoidFuture; + } else { + ReadyToFlushEvent_ = NewPromise<void>(); + readyToFlushFuture = ReadyToFlushEvent_.ToFuture(); + } + } + + auto asyncOutputBuffer = readyToFlushFuture + .Apply(BIND([this_ = MakeStrong(this), this, outputPosition = OutputPosition_] { + TBuffer output; + + while (!CompressedBuffers_.empty()) { + auto it = CompressedBuffers_.find(WrittenBuffersCount_); + YT_VERIFY(it != CompressedBuffers_.end()); + + output.Append(it->second.Data(), it->second.Size()); + Codec_->AddSyncTag(outputPosition + output.Size(), &output); + + CompressedBuffers_.erase(it); + ++WrittenBuffersCount_; + } + + return output; + }) + .AsyncVia(SerializedInvoker_)); + + // We use .Get() here instead of WaitFor() here to ensure that this method doesn't do context switches. + // Otherwise, flush events in TLogManager may intersect, because another event could start while we are + // waiting in WaitFor(). + auto outputBuffer = asyncOutputBuffer.Get().ValueOrThrow(); + File_.Pwrite(outputBuffer.Data(), outputBuffer.Size(), OutputPosition_); + OutputPosition_ += outputBuffer.Size(); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +IStreamLogOutputPtr CreateAppendableCompressedFile( + TFile file, + ILogCodecPtr codec, + IInvokerPtr compressInvoker, + const TAppendableCompressedFileOptions& options) +{ + return New<TAppendableCompressedFile>( + std::move(file), + std::move(codec), + std::move(compressInvoker), + options); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NLogging diff --git a/yt/yt/core/logging/appendable_compressed_file.h b/yt/yt/core/logging/appendable_compressed_file.h new file mode 100644 index 00000000000..89705cb8b7f --- /dev/null +++ b/yt/yt/core/logging/appendable_compressed_file.h @@ -0,0 +1,26 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/core/actions/public.h> + +#include <util/stream/file.h> + +namespace NYT::NLogging { + +//////////////////////////////////////////////////////////////////////////////// + +struct TAppendableCompressedFileOptions +{ + bool WriteTruncateMessage = false; +}; + +IStreamLogOutputPtr CreateAppendableCompressedFile( + TFile file, + ILogCodecPtr codec, + IInvokerPtr compressInvoker, + const TAppendableCompressedFileOptions& options); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NLogging diff --git a/yt/yt/core/logging/compression.cpp b/yt/yt/core/logging/compression.cpp deleted file mode 100644 index 68d2623a2ca..00000000000 --- a/yt/yt/core/logging/compression.cpp +++ /dev/null @@ -1,150 +0,0 @@ -#include "compression.h" - -#include <yt/yt/core/concurrency/action_queue.h> -#include <yt/yt/core/concurrency/scheduler_api.h> - -namespace NYT::NLogging { - -using namespace NConcurrency; - -//////////////////////////////////////////////////////////////////////////////// - -TAppendableCompressedFile::TAppendableCompressedFile( - TFile file, - ILogCompressionCodecPtr codec, - IInvokerPtr compressInvoker, - bool writeTruncateMessage) - : Codec_(std::move(codec)) - , CompressInvoker_(std::move(compressInvoker)) - , SerializedInvoker_(CreateSerializedInvoker(CompressInvoker_, NProfiling::TTagSet({{"invoker", "appendable_compressed_file"}, {"file_name", file.GetName()}}))) - , MaxBlockSize_(static_cast<size_t>(Codec_->GetMaxBlockSize())) - , File_(std::move(file)) -{ - i64 fileSize = File_.GetLength(); - Codec_->Repair(&File_, OutputPosition_); - if (OutputPosition_ != fileSize && writeTruncateMessage) { - TStringBuilder message; - message.AppendFormat("Truncated %v bytes due to zstd repair.\n", fileSize - OutputPosition_); - TString messageStr = message.Flush(); - - Input_.Append(messageStr.data(), messageStr.size()); - Flush(); - } -} - -void TAppendableCompressedFile::DoWrite(const void* buf, size_t len) -{ - const char* in = reinterpret_cast<const char*>(buf); - while (len > 0) { - size_t toWrite = len; - if (Input_.Size() >= MaxBlockSize_) { - toWrite = 0; - } else if (Input_.Size() + len >= MaxBlockSize_) { - toWrite = MaxBlockSize_ - Input_.Size(); - } - - Input_.Append(in, toWrite); - in += toWrite; - len -= toWrite; - - while (Input_.Size() >= MaxBlockSize_) { - EnqueueOneFrame(); - } - } -} - -void TAppendableCompressedFile::EnqueueOneFrame() -{ - if (Input_.Empty()) { - return; - } - - size_t toWrite = std::min(Input_.Size(), MaxBlockSize_); - EnqueueBuffer(TBuffer(Input_.Data(), toWrite)); - Input_.ChopHead(toWrite); -} - -void TAppendableCompressedFile::EnqueueBuffer(TBuffer buffer) -{ - i64 bufferId = EnqueuedBuffersCount_; - ++EnqueuedBuffersCount_; - - BIND([this_ = MakeStrong(this), this, buffer = std::move(buffer)] { - TBuffer output; - Codec_->Compress(buffer, output); - return output; - }) - .AsyncVia(CompressInvoker_) - .Run() - .Subscribe(BIND([this_ = MakeStrong(this), this, bufferId] (TErrorOr<TBuffer> result) { - YT_VERIFY(result.IsOK()); - - CompressedBuffers_[bufferId] = std::move(result.Value()); - - auto guard = Guard(FlushSpinLock_); - ++CompressedBuffersCount_; - if (CompressedBuffersCount_ == BuffersToFlushCount_) { - ReadyToFlushEvent_.Set(); - } - }) - .Via(SerializedInvoker_)); -} - -void TAppendableCompressedFile::DoFlush() -{ - while (!Input_.Empty()) { - EnqueueOneFrame(); - } - FlushOutput(); -} - -void TAppendableCompressedFile::FlushOutput() -{ - TFuture<void> readyToFlushFuture; - - { - auto guard = Guard(FlushSpinLock_); - BuffersToFlushCount_ = EnqueuedBuffersCount_; - if (BuffersToFlushCount_ == CompressedBuffersCount_) { - readyToFlushFuture = VoidFuture; - } else { - ReadyToFlushEvent_ = NewPromise<void>(); - readyToFlushFuture = ReadyToFlushEvent_.ToFuture(); - } - } - - auto asyncOutputBuffer = readyToFlushFuture - .Apply(BIND([this_ = MakeStrong(this), this, outputPosition = OutputPosition_] { - TBuffer output; - - while (!CompressedBuffers_.empty()) { - auto it = CompressedBuffers_.find(WrittenBuffersCount_); - YT_VERIFY(it != CompressedBuffers_.end()); - - output.Append(it->second.Data(), it->second.Size()); - Codec_->AddSyncTag(outputPosition + output.Size(), output); - - CompressedBuffers_.erase(it); - ++WrittenBuffersCount_; - } - - return output; - }) - .AsyncVia(SerializedInvoker_)); - - // We use .Get() here instead of WaitFor() here to ensure that this method doesn't do context switches. - // Otherwise, flush events in TLogManager may intersect, because another event could start while we are - // waiting in WaitFor(). - auto outputBuffer = asyncOutputBuffer.Get().ValueOrThrow(); - File_.Pwrite(outputBuffer.Data(), outputBuffer.Size(), OutputPosition_); - OutputPosition_ += outputBuffer.Size(); -} - -void TAppendableCompressedFile::DoFinish() -{ - Flush(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NLogging diff --git a/yt/yt/core/logging/compression.h b/yt/yt/core/logging/compression.h deleted file mode 100644 index 857add43e45..00000000000 --- a/yt/yt/core/logging/compression.h +++ /dev/null @@ -1,84 +0,0 @@ -#pragma once - -#include "public.h" - -#include "stream_output.h" - -#include <yt/yt/core/actions/future.h> - -#include <library/cpp/yt/threading/spin_lock.h> - -#include <util/generic/buffer.h> - -#include <util/stream/file.h> - -namespace NYT::NLogging { - -//////////////////////////////////////////////////////////////////////////////// - -//! Base interface for compression codec in logs. It is different from ICodec in yt/yt/core/compression, -//! as we must have the possibility to repair the corrupted log file if the process died unexpectedly. -struct ILogCompressionCodec - : public TRefCounted -{ - // NB. All the methods in this interface must be thread-safe. - - virtual i64 GetMaxBlockSize() const = 0; - virtual void Compress(const TBuffer& input, TBuffer& output) = 0; - virtual void AddSyncTag(i64 offset, TBuffer& output) = 0; - virtual void Repair(TFile* file, i64& outputPosition) = 0; -}; - -DEFINE_REFCOUNTED_TYPE(ILogCompressionCodec) - -//////////////////////////////////////////////////////////////////////////////// - -class TAppendableCompressedFile - : public IStreamLogOutput -{ -public: - TAppendableCompressedFile( - TFile file, - ILogCompressionCodecPtr codec, - IInvokerPtr compressInvoker, - bool writeTruncateMessage); - -private: - const ILogCompressionCodecPtr Codec_; - const IInvokerPtr CompressInvoker_; - const IInvokerPtr SerializedInvoker_; - const size_t MaxBlockSize_; - - TFile File_; - - // These fields are read and updated in the thread that owns this TAppendableCompressedFile. - TBuffer Input_; - i64 EnqueuedBuffersCount_ = 0; - i64 OutputPosition_ = 0; - - // These fields are read and updated in SerializedInvoker_. - THashMap<i64, TBuffer> CompressedBuffers_; - i64 WrittenBuffersCount_ = 0; - - // These fields are read and updated under FlushSpinLock_. - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, FlushSpinLock_); - i64 BuffersToFlushCount_ = 0; - i64 CompressedBuffersCount_ = 0; - TPromise<void> ReadyToFlushEvent_; - - void DoWrite(const void* buf, size_t len) override; - void DoFlush() override; - void DoFinish() override; - - void EnqueueBuffer(TBuffer buffer); - void EnqueueOneFrame(); - - void FlushOutput(); -}; - -DECLARE_REFCOUNTED_TYPE(TAppendableCompressedFile) -DEFINE_REFCOUNTED_TYPE(TAppendableCompressedFile) - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NLogging diff --git a/yt/yt/core/logging/file_log_writer.cpp b/yt/yt/core/logging/file_log_writer.cpp index a00168f5274..a597de7cecc 100644 --- a/yt/yt/core/logging/file_log_writer.cpp +++ b/yt/yt/core/logging/file_log_writer.cpp @@ -1,5 +1,6 @@ #include "file_log_writer.h" +#include "appendable_compressed_file.h" #include "log_writer_detail.h" #include "config.h" #include "private.h" @@ -7,9 +8,8 @@ #include "random_access_gzip.h" #include "stream_output.h" #include "system_log_event_provider.h" -#include "compression.h" #include "log_writer_factory.h" -#include "zstd_compression.h" +#include "zstd_log_codec.h" #include "formatter.h" #include <yt/yt/core/misc/fs.h> @@ -186,11 +186,13 @@ private: if (Config_->EnableCompression) { switch (Config_->CompressionMethod) { case ECompressionMethod::Zstd: - OutputStream_ = New<TAppendableCompressedFile>( + OutputStream_ = CreateAppendableCompressedFile( *File_, - CreateZstdCompressionCodec(Config_->CompressionLevel), + CreateZstdLogCodec(Config_->CompressionLevel), Host_->GetCompressionInvoker(), - /*writeTruncateMessage*/ true); + { + .WriteTruncateMessage = true, + }); break; case ECompressionMethod::Gzip: diff --git a/yt/yt/core/logging/log_codec.h b/yt/yt/core/logging/log_codec.h new file mode 100644 index 00000000000..7cc1177bcee --- /dev/null +++ b/yt/yt/core/logging/log_codec.h @@ -0,0 +1,32 @@ +#pragma once + +#include "public.h" + +#include <util/generic/buffer.h> + +#include <util/stream/file.h> + +namespace NYT::NLogging { + +//////////////////////////////////////////////////////////////////////////////// + +//! Base interface for compression codec in logs. It is different from ICodec in yt/yt/core/compression, +//! as we must have the possibility to repair the corrupted log file if the process died unexpectedly. +/*! + * \note + * Thread affinity: any + */ +struct ILogCodec + : public TRefCounted +{ + virtual i64 GetMaxBlockSize() const = 0; + virtual void Compress(const TBuffer& input, TBuffer* output) = 0; + virtual void AddSyncTag(i64 offset, TBuffer* output) = 0; + virtual i64 Repair(TFile* file) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(ILogCodec) + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NLogging diff --git a/yt/yt/core/logging/public.h b/yt/yt/core/logging/public.h index b24e07fba1a..1fddef2715b 100644 --- a/yt/yt/core/logging/public.h +++ b/yt/yt/core/logging/public.h @@ -42,11 +42,12 @@ DECLARE_REFCOUNTED_STRUCT(TStderrLogWriterConfig) struct ILogFormatter; struct ISystemLogEventProvider; struct ILogWriterHost; + DECLARE_REFCOUNTED_STRUCT(ILogWriterFactory) DECLARE_REFCOUNTED_STRUCT(ILogWriter) DECLARE_REFCOUNTED_STRUCT(IFileLogWriter) DECLARE_REFCOUNTED_STRUCT(IStreamLogOutput) -DECLARE_REFCOUNTED_STRUCT(ILogCompressionCodec) +DECLARE_REFCOUNTED_STRUCT(ILogCodec) YT_DECLARE_RECONFIGURABLE_SINGLETON(TLogManagerConfig, TLogManagerDynamicConfig); diff --git a/yt/yt/core/logging/unittests/logging_ut.cpp b/yt/yt/core/logging/unittests/logging_ut.cpp index 25609f0c155..f12fd897a43 100644 --- a/yt/yt/core/logging/unittests/logging_ut.cpp +++ b/yt/yt/core/logging/unittests/logging_ut.cpp @@ -5,6 +5,7 @@ #include <yt/yt/core/concurrency/async_semaphore.h> +#include <yt/yt/core/logging/appendable_compressed_file.h> #include <yt/yt/core/logging/log.h> #include <yt/yt/core/logging/log_manager.h> #include <yt/yt/core/logging/log_writer.h> @@ -14,11 +15,11 @@ #include <yt/yt/core/logging/stream_log_writer.h> #include <yt/yt/core/logging/structured_log.h> #include <yt/yt/core/logging/random_access_gzip.h> -#include <yt/yt/core/logging/compression.h> -#include <yt/yt/core/logging/zstd_compression.h> +#include <yt/yt/core/logging/log.h> #include <yt/yt/core/logging/config.h> #include <yt/yt/core/logging/formatter.h> #include <yt/yt/core/logging/system_log_event_provider.h> +#include <yt/yt/core/logging/zstd_log_codec.h> #include <yt/yt/core/json/json_parser.h> @@ -1105,20 +1106,20 @@ protected: return {GenerateLogFileName() + ".zst"}; } - TAppendableCompressedFilePtr CreateAppendableZstdFile(TFile rawFile, bool writeTruncateMessage) + IStreamLogOutputPtr CreateAppendableZstdFile(TFile rawFile, const TAppendableCompressedFileOptions& options) { - return New<TAppendableCompressedFile>( + return CreateAppendableCompressedFile( std::move(rawFile), - CreateZstdCompressionCodec(), + CreateZstdLogCodec(), GetCurrentInvoker(), - writeTruncateMessage); + options); } - void WriteTestFile(const TString& filename, i64 addBytes, bool writeTruncateMessage) + void WriteTestFile(const TString& filename, i64 addBytes, const TAppendableCompressedFileOptions& options) { { TFile rawFile(filename, OpenAlways|RdWr|CloseOnExec); - auto file = CreateAppendableZstdFile(rawFile, writeTruncateMessage); + auto file = CreateAppendableZstdFile(rawFile, options); *file << "foo\n"; file->Flush(); *file << "bar\n"; @@ -1128,7 +1129,7 @@ protected: } { TFile rawFile(filename, OpenAlways|RdWr|CloseOnExec); - auto file = CreateAppendableZstdFile(rawFile, writeTruncateMessage); + auto file = CreateAppendableZstdFile(rawFile, options); *file << "zog\n"; file->Flush(); } @@ -1147,7 +1148,7 @@ protected: TEST_F(TAppendableZstdFileTest, Write) { auto logFile = GetLogFile(); - WriteTestFile(logFile.Name(), 0, false); + WriteTestFile(logFile.Name(), 0, {.WriteTruncateMessage = false}); TUnbufferedFileInput file(logFile.Name()); TZstdDecompress decompress(&file); @@ -1161,7 +1162,7 @@ TEST_F(TAppendableZstdFileTest, WriteMultipleFramesPerFlush) { TFile rawFile(logFile.Name(), OpenAlways|RdWr|CloseOnExec); - auto file = CreateAppendableZstdFile(rawFile, true); + auto file = CreateAppendableZstdFile(rawFile, {.WriteTruncateMessage = true}); file->Write(data.data(), data.size()); file->Finish(); } @@ -1177,7 +1178,7 @@ TEST_F(TAppendableZstdFileTest, WriteMultipleFramesPerFlush) TEST_F(TAppendableZstdFileTest, RepairSmall) { auto logFile = GetLogFile(); - WriteTestFile(logFile.Name(), -1, false); + WriteTestFile(logFile.Name(), -1, {.WriteTruncateMessage = false}); TUnbufferedFileInput file(logFile.Name()); TZstdDecompress decompress(&file); @@ -1187,13 +1188,13 @@ TEST_F(TAppendableZstdFileTest, RepairSmall) TEST_F(TAppendableZstdFileTest, RepairLarge) { auto logFile = GetLogFile(); - WriteTestFile(logFile.Name(), 10_MB, true); + WriteTestFile(logFile.Name(), 10_MB, {.WriteTruncateMessage = true}); TUnbufferedFileInput file(logFile.Name()); TZstdDecompress decompress(&file); TStringBuilder expected; - expected.AppendFormat("foo\nbar\nTruncated %v bytes due to zstd repair.\nzog\n", 10_MB); + expected.AppendFormat("foo\nbar\n*** Truncated %v trailing bytes due to zstd repair\nzog\n", 10_MB); EXPECT_EQ(expected.Flush(), decompress.ReadAll()); } diff --git a/yt/yt/core/logging/zstd_compression.cpp b/yt/yt/core/logging/zstd_log_codec.cpp index 6fe72108521..c82f3179295 100644 --- a/yt/yt/core/logging/zstd_compression.cpp +++ b/yt/yt/core/logging/zstd_log_codec.cpp @@ -1,6 +1,6 @@ -#include "zstd_compression.h" +#include "zstd_log_codec.h" -#include "compression.h" +#include "log_codec.h" #include <yt/yt/core/misc/finally.h> @@ -15,19 +15,19 @@ using namespace NCompression::NDetail; //////////////////////////////////////////////////////////////////////////////// class TZstdLogCompressionCodec - : public ILogCompressionCodec + : public ILogCodec { public: explicit TZstdLogCompressionCodec(int compressionLevel) : CompressionLevel_(compressionLevel) { } - i64 GetMaxBlockSize() const override + i64 GetMaxBlockSize() const final { return MaxZstdFrameUncompressedLength; } - void Compress(const TBuffer& input, TBuffer& output) override + void Compress(const TBuffer& input, TBuffer* output) final { auto context = ZSTD_createCCtx(); auto contextGuard = Finally([&] { @@ -36,10 +36,10 @@ public: auto frameLength = ZSTD_COMPRESSBOUND(std::min<i64>(MaxZstdFrameUncompressedLength, input.Size())); - output.Reserve(output.Size() + frameLength + ZstdSyncTagSize); + output->Reserve(output->Size() + frameLength + ZstdSyncTagSize); size_t size = ZSTD_compressCCtx( context, - output.Data() + output.Size(), + output->Data() + output->Size(), frameLength, input.Data(), input.Size(), @@ -49,16 +49,16 @@ public: THROW_ERROR_EXCEPTION("ZSTD_compressCCtx() failed") << TErrorAttribute("zstd_error", ZSTD_getErrorName(size)); } - output.Advance(size); + output->Advance(size); } - void AddSyncTag(i64 offset, TBuffer& output) override + void AddSyncTag(i64 offset, TBuffer* output) final { - output.Append(ZstdSyncTagPrefix.data(), ZstdSyncTagPrefix.size()); - output.Append(reinterpret_cast<const char*>(&offset), sizeof(offset)); + output->Append(ZstdSyncTagPrefix.data(), ZstdSyncTagPrefix.size()); + output->Append(reinterpret_cast<const char*>(&offset), sizeof(offset)); } - void Repair(TFile* file, i64& outputPosition) override + i64 Repair(TFile* file) final { constexpr auto ScanOverlap = ZstdSyncTagSize - 1; constexpr auto MaxZstdFrameLength = ZSTD_COMPRESSBOUND(MaxZstdFrameUncompressedLength); @@ -71,7 +71,7 @@ public: TBuffer buffer; - outputPosition = 0; + auto outputPosition = 0; while (bufSize >= ZstdSyncTagSize) { buffer.Resize(0); buffer.Reserve(bufSize); @@ -89,16 +89,16 @@ public: pos = newPos; } file->Resize(outputPosition); + return outputPosition; } private: - int CompressionLevel_; + const int CompressionLevel_; }; -DECLARE_REFCOUNTED_TYPE(TZstdLogCompressionCodec) -DEFINE_REFCOUNTED_TYPE(TZstdLogCompressionCodec) +//////////////////////////////////////////////////////////////////////////////// -ILogCompressionCodecPtr CreateZstdCompressionCodec(int compressionLevel) +ILogCodecPtr CreateZstdLogCodec(int compressionLevel) { return New<TZstdLogCompressionCodec>(compressionLevel); } diff --git a/yt/yt/core/logging/zstd_compression.h b/yt/yt/core/logging/zstd_log_codec.h index c53e4f032c9..f8934559e49 100644 --- a/yt/yt/core/logging/zstd_compression.h +++ b/yt/yt/core/logging/zstd_log_codec.h @@ -11,7 +11,7 @@ namespace NYT::NLogging { constexpr i64 MaxZstdFrameUncompressedLength = 5_MBs; constexpr const int DefaultZstdCompressionLevel = 3; -ILogCompressionCodecPtr CreateZstdCompressionCodec(int compressionLevel = DefaultZstdCompressionLevel); +ILogCodecPtr CreateZstdLogCodec(int compressionLevel = DefaultZstdCompressionLevel); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index 924fabb734e..e1dab60770c 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -99,7 +99,7 @@ SRCS( crypto/crypto.cpp crypto/tls.cpp - logging/compression.cpp + logging/appendable_compressed_file.cpp logging/config.cpp GLOBAL logging/configure_log_manager.cpp logging/formatter.cpp @@ -114,7 +114,7 @@ SRCS( logging/stream_log_writer.cpp logging/system_log_event_provider.cpp logging/random_access_gzip.cpp - logging/zstd_compression.cpp + logging/zstd_log_codec.cpp misc/arithmetic_formula.cpp misc/backtrace.cpp |
