summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <[email protected]>2025-11-05 16:26:08 +0300
committerbabenko <[email protected]>2025-11-05 17:09:03 +0300
commit659334cd4b78901187d4cef85b8a67ec63f84e4e (patch)
tree962ddac21d1c8d9185434c2380df918bf6873c9b
parent21e77ed3bcc80f6fa0dc578cbd22e5f52c234299 (diff)
YT-18571: Preliminary cosmetics and refactoring of log compression
commit_hash:57f52a44eb68d2c600e48de1c1234d6b708e4472
-rw-r--r--yt/yt/core/logging/appendable_compressed_file.cpp200
-rw-r--r--yt/yt/core/logging/appendable_compressed_file.h26
-rw-r--r--yt/yt/core/logging/compression.cpp150
-rw-r--r--yt/yt/core/logging/compression.h84
-rw-r--r--yt/yt/core/logging/file_log_writer.cpp12
-rw-r--r--yt/yt/core/logging/log_codec.h32
-rw-r--r--yt/yt/core/logging/public.h3
-rw-r--r--yt/yt/core/logging/unittests/logging_ut.cpp29
-rw-r--r--yt/yt/core/logging/zstd_log_codec.cpp (renamed from yt/yt/core/logging/zstd_compression.cpp)34
-rw-r--r--yt/yt/core/logging/zstd_log_codec.h (renamed from yt/yt/core/logging/zstd_compression.h)2
-rw-r--r--yt/yt/core/ya.make4
11 files changed, 302 insertions, 274 deletions
diff --git a/yt/yt/core/logging/appendable_compressed_file.cpp b/yt/yt/core/logging/appendable_compressed_file.cpp
new file mode 100644
index 00000000000..485932f640e
--- /dev/null
+++ b/yt/yt/core/logging/appendable_compressed_file.cpp
@@ -0,0 +1,200 @@
+#include "appendable_compressed_file.h"
+
+#include "log_codec.h"
+#include "stream_output.h"
+
+#include <yt/yt/core/concurrency/action_queue.h>
+#include <yt/yt/core/concurrency/scheduler_api.h>
+
+namespace NYT::NLogging {
+
+using namespace NConcurrency;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAppendableCompressedFile
+ : public IStreamLogOutput
+{
+public:
+ TAppendableCompressedFile(
+ TFile file,
+ ILogCodecPtr codec,
+ IInvokerPtr compressInvoker,
+ const TAppendableCompressedFileOptions& options)
+ : Codec_(std::move(codec))
+ , CompressInvoker_(std::move(compressInvoker))
+ , Options_(options)
+ , SerializedInvoker_(CreateSerializedInvoker(
+ CompressInvoker_,
+ NProfiling::TTagSet({
+ {"invoker", "appendable_compressed_file"},
+ {"file_name", file.GetName()},
+ })))
+ , MaxBlockSize_(Codec_->GetMaxBlockSize())
+ , File_(std::move(file))
+ {
+ i64 fileSize = File_.GetLength();
+ OutputPosition_ = Codec_->Repair(&File_);
+ if (OutputPosition_ != fileSize && Options_.WriteTruncateMessage) {
+ auto message = Format("*** Truncated %v trailing bytes due to zstd repair\n", fileSize - OutputPosition_);
+ Input_.Append(message.data(), message.size());
+ Flush();
+ }
+ }
+
+private:
+ const ILogCodecPtr Codec_;
+ const IInvokerPtr CompressInvoker_;
+ const TAppendableCompressedFileOptions Options_;
+ const IInvokerPtr SerializedInvoker_;
+ const i64 MaxBlockSize_;
+
+ TFile File_;
+
+ // These fields are read and updated in the thread that owns this TAppendableCompressedFile.
+ TBuffer Input_;
+ i64 EnqueuedBuffersCount_ = 0;
+ i64 OutputPosition_ = 0;
+
+ // These fields are read and updated in SerializedInvoker_.
+ THashMap<i64, TBuffer> CompressedBuffers_;
+ i64 WrittenBuffersCount_ = 0;
+
+ // These fields are read and updated under FlushSpinLock_.
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, FlushSpinLock_);
+ i64 BuffersToFlushCount_ = 0;
+ i64 CompressedBuffersCount_ = 0;
+ TPromise<void> ReadyToFlushEvent_;
+
+ void DoWrite(const void* buf, size_t len) final
+ {
+ const char* in = reinterpret_cast<const char*>(buf);
+ while (len > 0) {
+ size_t toWrite = len;
+ if (static_cast<i64>(Input_.Size()) >= MaxBlockSize_) {
+ toWrite = 0;
+ } else if (static_cast<i64>(Input_.Size() + len) >= MaxBlockSize_) {
+ toWrite = MaxBlockSize_ - Input_.Size();
+ }
+
+ Input_.Append(in, toWrite);
+ in += toWrite;
+ len -= toWrite;
+
+ while (static_cast<i64>(Input_.Size()) >= MaxBlockSize_) {
+ EnqueueOneFrame();
+ }
+ }
+ }
+
+ void DoFlush() final
+ {
+ while (!Input_.Empty()) {
+ EnqueueOneFrame();
+ }
+ FlushOutput();
+ }
+
+ void DoFinish() final
+ {
+ Flush();
+ }
+
+ void EnqueueBuffer(TBuffer buffer)
+ {
+ i64 bufferId = EnqueuedBuffersCount_;
+ ++EnqueuedBuffersCount_;
+
+ BIND([this_ = MakeStrong(this), this, buffer = std::move(buffer)] {
+ TBuffer output;
+ Codec_->Compress(buffer, &output);
+ return output;
+ })
+ .AsyncVia(CompressInvoker_)
+ .Run()
+ .Subscribe(BIND([this_ = MakeStrong(this), this, bufferId] (TErrorOr<TBuffer> result) {
+ YT_VERIFY(result.IsOK());
+
+ CompressedBuffers_[bufferId] = std::move(result.Value());
+
+ auto guard = Guard(FlushSpinLock_);
+ ++CompressedBuffersCount_;
+ if (CompressedBuffersCount_ == BuffersToFlushCount_) {
+ ReadyToFlushEvent_.Set();
+ }
+ })
+ .Via(SerializedInvoker_));
+ }
+
+ void EnqueueOneFrame()
+ {
+ if (Input_.Empty()) {
+ return;
+ }
+
+ size_t toWrite = std::min(Input_.Size(), static_cast<size_t>(MaxBlockSize_));
+ EnqueueBuffer(TBuffer(Input_.Data(), toWrite));
+ Input_.ChopHead(toWrite);
+ }
+
+ void FlushOutput()
+ {
+ TFuture<void> readyToFlushFuture;
+
+ {
+ auto guard = Guard(FlushSpinLock_);
+ BuffersToFlushCount_ = EnqueuedBuffersCount_;
+ if (BuffersToFlushCount_ == CompressedBuffersCount_) {
+ readyToFlushFuture = VoidFuture;
+ } else {
+ ReadyToFlushEvent_ = NewPromise<void>();
+ readyToFlushFuture = ReadyToFlushEvent_.ToFuture();
+ }
+ }
+
+ auto asyncOutputBuffer = readyToFlushFuture
+ .Apply(BIND([this_ = MakeStrong(this), this, outputPosition = OutputPosition_] {
+ TBuffer output;
+
+ while (!CompressedBuffers_.empty()) {
+ auto it = CompressedBuffers_.find(WrittenBuffersCount_);
+ YT_VERIFY(it != CompressedBuffers_.end());
+
+ output.Append(it->second.Data(), it->second.Size());
+ Codec_->AddSyncTag(outputPosition + output.Size(), &output);
+
+ CompressedBuffers_.erase(it);
+ ++WrittenBuffersCount_;
+ }
+
+ return output;
+ })
+ .AsyncVia(SerializedInvoker_));
+
+ // We use .Get() here instead of WaitFor() here to ensure that this method doesn't do context switches.
+ // Otherwise, flush events in TLogManager may intersect, because another event could start while we are
+ // waiting in WaitFor().
+ auto outputBuffer = asyncOutputBuffer.Get().ValueOrThrow();
+ File_.Pwrite(outputBuffer.Data(), outputBuffer.Size(), OutputPosition_);
+ OutputPosition_ += outputBuffer.Size();
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IStreamLogOutputPtr CreateAppendableCompressedFile(
+ TFile file,
+ ILogCodecPtr codec,
+ IInvokerPtr compressInvoker,
+ const TAppendableCompressedFileOptions& options)
+{
+ return New<TAppendableCompressedFile>(
+ std::move(file),
+ std::move(codec),
+ std::move(compressInvoker),
+ options);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NLogging
diff --git a/yt/yt/core/logging/appendable_compressed_file.h b/yt/yt/core/logging/appendable_compressed_file.h
new file mode 100644
index 00000000000..89705cb8b7f
--- /dev/null
+++ b/yt/yt/core/logging/appendable_compressed_file.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/core/actions/public.h>
+
+#include <util/stream/file.h>
+
+namespace NYT::NLogging {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TAppendableCompressedFileOptions
+{
+ bool WriteTruncateMessage = false;
+};
+
+IStreamLogOutputPtr CreateAppendableCompressedFile(
+ TFile file,
+ ILogCodecPtr codec,
+ IInvokerPtr compressInvoker,
+ const TAppendableCompressedFileOptions& options);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NLogging
diff --git a/yt/yt/core/logging/compression.cpp b/yt/yt/core/logging/compression.cpp
deleted file mode 100644
index 68d2623a2ca..00000000000
--- a/yt/yt/core/logging/compression.cpp
+++ /dev/null
@@ -1,150 +0,0 @@
-#include "compression.h"
-
-#include <yt/yt/core/concurrency/action_queue.h>
-#include <yt/yt/core/concurrency/scheduler_api.h>
-
-namespace NYT::NLogging {
-
-using namespace NConcurrency;
-
-////////////////////////////////////////////////////////////////////////////////
-
-TAppendableCompressedFile::TAppendableCompressedFile(
- TFile file,
- ILogCompressionCodecPtr codec,
- IInvokerPtr compressInvoker,
- bool writeTruncateMessage)
- : Codec_(std::move(codec))
- , CompressInvoker_(std::move(compressInvoker))
- , SerializedInvoker_(CreateSerializedInvoker(CompressInvoker_, NProfiling::TTagSet({{"invoker", "appendable_compressed_file"}, {"file_name", file.GetName()}})))
- , MaxBlockSize_(static_cast<size_t>(Codec_->GetMaxBlockSize()))
- , File_(std::move(file))
-{
- i64 fileSize = File_.GetLength();
- Codec_->Repair(&File_, OutputPosition_);
- if (OutputPosition_ != fileSize && writeTruncateMessage) {
- TStringBuilder message;
- message.AppendFormat("Truncated %v bytes due to zstd repair.\n", fileSize - OutputPosition_);
- TString messageStr = message.Flush();
-
- Input_.Append(messageStr.data(), messageStr.size());
- Flush();
- }
-}
-
-void TAppendableCompressedFile::DoWrite(const void* buf, size_t len)
-{
- const char* in = reinterpret_cast<const char*>(buf);
- while (len > 0) {
- size_t toWrite = len;
- if (Input_.Size() >= MaxBlockSize_) {
- toWrite = 0;
- } else if (Input_.Size() + len >= MaxBlockSize_) {
- toWrite = MaxBlockSize_ - Input_.Size();
- }
-
- Input_.Append(in, toWrite);
- in += toWrite;
- len -= toWrite;
-
- while (Input_.Size() >= MaxBlockSize_) {
- EnqueueOneFrame();
- }
- }
-}
-
-void TAppendableCompressedFile::EnqueueOneFrame()
-{
- if (Input_.Empty()) {
- return;
- }
-
- size_t toWrite = std::min(Input_.Size(), MaxBlockSize_);
- EnqueueBuffer(TBuffer(Input_.Data(), toWrite));
- Input_.ChopHead(toWrite);
-}
-
-void TAppendableCompressedFile::EnqueueBuffer(TBuffer buffer)
-{
- i64 bufferId = EnqueuedBuffersCount_;
- ++EnqueuedBuffersCount_;
-
- BIND([this_ = MakeStrong(this), this, buffer = std::move(buffer)] {
- TBuffer output;
- Codec_->Compress(buffer, output);
- return output;
- })
- .AsyncVia(CompressInvoker_)
- .Run()
- .Subscribe(BIND([this_ = MakeStrong(this), this, bufferId] (TErrorOr<TBuffer> result) {
- YT_VERIFY(result.IsOK());
-
- CompressedBuffers_[bufferId] = std::move(result.Value());
-
- auto guard = Guard(FlushSpinLock_);
- ++CompressedBuffersCount_;
- if (CompressedBuffersCount_ == BuffersToFlushCount_) {
- ReadyToFlushEvent_.Set();
- }
- })
- .Via(SerializedInvoker_));
-}
-
-void TAppendableCompressedFile::DoFlush()
-{
- while (!Input_.Empty()) {
- EnqueueOneFrame();
- }
- FlushOutput();
-}
-
-void TAppendableCompressedFile::FlushOutput()
-{
- TFuture<void> readyToFlushFuture;
-
- {
- auto guard = Guard(FlushSpinLock_);
- BuffersToFlushCount_ = EnqueuedBuffersCount_;
- if (BuffersToFlushCount_ == CompressedBuffersCount_) {
- readyToFlushFuture = VoidFuture;
- } else {
- ReadyToFlushEvent_ = NewPromise<void>();
- readyToFlushFuture = ReadyToFlushEvent_.ToFuture();
- }
- }
-
- auto asyncOutputBuffer = readyToFlushFuture
- .Apply(BIND([this_ = MakeStrong(this), this, outputPosition = OutputPosition_] {
- TBuffer output;
-
- while (!CompressedBuffers_.empty()) {
- auto it = CompressedBuffers_.find(WrittenBuffersCount_);
- YT_VERIFY(it != CompressedBuffers_.end());
-
- output.Append(it->second.Data(), it->second.Size());
- Codec_->AddSyncTag(outputPosition + output.Size(), output);
-
- CompressedBuffers_.erase(it);
- ++WrittenBuffersCount_;
- }
-
- return output;
- })
- .AsyncVia(SerializedInvoker_));
-
- // We use .Get() here instead of WaitFor() here to ensure that this method doesn't do context switches.
- // Otherwise, flush events in TLogManager may intersect, because another event could start while we are
- // waiting in WaitFor().
- auto outputBuffer = asyncOutputBuffer.Get().ValueOrThrow();
- File_.Pwrite(outputBuffer.Data(), outputBuffer.Size(), OutputPosition_);
- OutputPosition_ += outputBuffer.Size();
-}
-
-void TAppendableCompressedFile::DoFinish()
-{
- Flush();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NLogging
diff --git a/yt/yt/core/logging/compression.h b/yt/yt/core/logging/compression.h
deleted file mode 100644
index 857add43e45..00000000000
--- a/yt/yt/core/logging/compression.h
+++ /dev/null
@@ -1,84 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include "stream_output.h"
-
-#include <yt/yt/core/actions/future.h>
-
-#include <library/cpp/yt/threading/spin_lock.h>
-
-#include <util/generic/buffer.h>
-
-#include <util/stream/file.h>
-
-namespace NYT::NLogging {
-
-////////////////////////////////////////////////////////////////////////////////
-
-//! Base interface for compression codec in logs. It is different from ICodec in yt/yt/core/compression,
-//! as we must have the possibility to repair the corrupted log file if the process died unexpectedly.
-struct ILogCompressionCodec
- : public TRefCounted
-{
- // NB. All the methods in this interface must be thread-safe.
-
- virtual i64 GetMaxBlockSize() const = 0;
- virtual void Compress(const TBuffer& input, TBuffer& output) = 0;
- virtual void AddSyncTag(i64 offset, TBuffer& output) = 0;
- virtual void Repair(TFile* file, i64& outputPosition) = 0;
-};
-
-DEFINE_REFCOUNTED_TYPE(ILogCompressionCodec)
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TAppendableCompressedFile
- : public IStreamLogOutput
-{
-public:
- TAppendableCompressedFile(
- TFile file,
- ILogCompressionCodecPtr codec,
- IInvokerPtr compressInvoker,
- bool writeTruncateMessage);
-
-private:
- const ILogCompressionCodecPtr Codec_;
- const IInvokerPtr CompressInvoker_;
- const IInvokerPtr SerializedInvoker_;
- const size_t MaxBlockSize_;
-
- TFile File_;
-
- // These fields are read and updated in the thread that owns this TAppendableCompressedFile.
- TBuffer Input_;
- i64 EnqueuedBuffersCount_ = 0;
- i64 OutputPosition_ = 0;
-
- // These fields are read and updated in SerializedInvoker_.
- THashMap<i64, TBuffer> CompressedBuffers_;
- i64 WrittenBuffersCount_ = 0;
-
- // These fields are read and updated under FlushSpinLock_.
- YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, FlushSpinLock_);
- i64 BuffersToFlushCount_ = 0;
- i64 CompressedBuffersCount_ = 0;
- TPromise<void> ReadyToFlushEvent_;
-
- void DoWrite(const void* buf, size_t len) override;
- void DoFlush() override;
- void DoFinish() override;
-
- void EnqueueBuffer(TBuffer buffer);
- void EnqueueOneFrame();
-
- void FlushOutput();
-};
-
-DECLARE_REFCOUNTED_TYPE(TAppendableCompressedFile)
-DEFINE_REFCOUNTED_TYPE(TAppendableCompressedFile)
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NLogging
diff --git a/yt/yt/core/logging/file_log_writer.cpp b/yt/yt/core/logging/file_log_writer.cpp
index a00168f5274..a597de7cecc 100644
--- a/yt/yt/core/logging/file_log_writer.cpp
+++ b/yt/yt/core/logging/file_log_writer.cpp
@@ -1,5 +1,6 @@
#include "file_log_writer.h"
+#include "appendable_compressed_file.h"
#include "log_writer_detail.h"
#include "config.h"
#include "private.h"
@@ -7,9 +8,8 @@
#include "random_access_gzip.h"
#include "stream_output.h"
#include "system_log_event_provider.h"
-#include "compression.h"
#include "log_writer_factory.h"
-#include "zstd_compression.h"
+#include "zstd_log_codec.h"
#include "formatter.h"
#include <yt/yt/core/misc/fs.h>
@@ -186,11 +186,13 @@ private:
if (Config_->EnableCompression) {
switch (Config_->CompressionMethod) {
case ECompressionMethod::Zstd:
- OutputStream_ = New<TAppendableCompressedFile>(
+ OutputStream_ = CreateAppendableCompressedFile(
*File_,
- CreateZstdCompressionCodec(Config_->CompressionLevel),
+ CreateZstdLogCodec(Config_->CompressionLevel),
Host_->GetCompressionInvoker(),
- /*writeTruncateMessage*/ true);
+ {
+ .WriteTruncateMessage = true,
+ });
break;
case ECompressionMethod::Gzip:
diff --git a/yt/yt/core/logging/log_codec.h b/yt/yt/core/logging/log_codec.h
new file mode 100644
index 00000000000..7cc1177bcee
--- /dev/null
+++ b/yt/yt/core/logging/log_codec.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include "public.h"
+
+#include <util/generic/buffer.h>
+
+#include <util/stream/file.h>
+
+namespace NYT::NLogging {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Base interface for compression codec in logs. It is different from ICodec in yt/yt/core/compression,
+//! as we must have the possibility to repair the corrupted log file if the process died unexpectedly.
+/*!
+ * \note
+ * Thread affinity: any
+ */
+struct ILogCodec
+ : public TRefCounted
+{
+ virtual i64 GetMaxBlockSize() const = 0;
+ virtual void Compress(const TBuffer& input, TBuffer* output) = 0;
+ virtual void AddSyncTag(i64 offset, TBuffer* output) = 0;
+ virtual i64 Repair(TFile* file) = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(ILogCodec)
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NLogging
diff --git a/yt/yt/core/logging/public.h b/yt/yt/core/logging/public.h
index b24e07fba1a..1fddef2715b 100644
--- a/yt/yt/core/logging/public.h
+++ b/yt/yt/core/logging/public.h
@@ -42,11 +42,12 @@ DECLARE_REFCOUNTED_STRUCT(TStderrLogWriterConfig)
struct ILogFormatter;
struct ISystemLogEventProvider;
struct ILogWriterHost;
+
DECLARE_REFCOUNTED_STRUCT(ILogWriterFactory)
DECLARE_REFCOUNTED_STRUCT(ILogWriter)
DECLARE_REFCOUNTED_STRUCT(IFileLogWriter)
DECLARE_REFCOUNTED_STRUCT(IStreamLogOutput)
-DECLARE_REFCOUNTED_STRUCT(ILogCompressionCodec)
+DECLARE_REFCOUNTED_STRUCT(ILogCodec)
YT_DECLARE_RECONFIGURABLE_SINGLETON(TLogManagerConfig, TLogManagerDynamicConfig);
diff --git a/yt/yt/core/logging/unittests/logging_ut.cpp b/yt/yt/core/logging/unittests/logging_ut.cpp
index 25609f0c155..f12fd897a43 100644
--- a/yt/yt/core/logging/unittests/logging_ut.cpp
+++ b/yt/yt/core/logging/unittests/logging_ut.cpp
@@ -5,6 +5,7 @@
#include <yt/yt/core/concurrency/async_semaphore.h>
+#include <yt/yt/core/logging/appendable_compressed_file.h>
#include <yt/yt/core/logging/log.h>
#include <yt/yt/core/logging/log_manager.h>
#include <yt/yt/core/logging/log_writer.h>
@@ -14,11 +15,11 @@
#include <yt/yt/core/logging/stream_log_writer.h>
#include <yt/yt/core/logging/structured_log.h>
#include <yt/yt/core/logging/random_access_gzip.h>
-#include <yt/yt/core/logging/compression.h>
-#include <yt/yt/core/logging/zstd_compression.h>
+#include <yt/yt/core/logging/log.h>
#include <yt/yt/core/logging/config.h>
#include <yt/yt/core/logging/formatter.h>
#include <yt/yt/core/logging/system_log_event_provider.h>
+#include <yt/yt/core/logging/zstd_log_codec.h>
#include <yt/yt/core/json/json_parser.h>
@@ -1105,20 +1106,20 @@ protected:
return {GenerateLogFileName() + ".zst"};
}
- TAppendableCompressedFilePtr CreateAppendableZstdFile(TFile rawFile, bool writeTruncateMessage)
+ IStreamLogOutputPtr CreateAppendableZstdFile(TFile rawFile, const TAppendableCompressedFileOptions& options)
{
- return New<TAppendableCompressedFile>(
+ return CreateAppendableCompressedFile(
std::move(rawFile),
- CreateZstdCompressionCodec(),
+ CreateZstdLogCodec(),
GetCurrentInvoker(),
- writeTruncateMessage);
+ options);
}
- void WriteTestFile(const TString& filename, i64 addBytes, bool writeTruncateMessage)
+ void WriteTestFile(const TString& filename, i64 addBytes, const TAppendableCompressedFileOptions& options)
{
{
TFile rawFile(filename, OpenAlways|RdWr|CloseOnExec);
- auto file = CreateAppendableZstdFile(rawFile, writeTruncateMessage);
+ auto file = CreateAppendableZstdFile(rawFile, options);
*file << "foo\n";
file->Flush();
*file << "bar\n";
@@ -1128,7 +1129,7 @@ protected:
}
{
TFile rawFile(filename, OpenAlways|RdWr|CloseOnExec);
- auto file = CreateAppendableZstdFile(rawFile, writeTruncateMessage);
+ auto file = CreateAppendableZstdFile(rawFile, options);
*file << "zog\n";
file->Flush();
}
@@ -1147,7 +1148,7 @@ protected:
TEST_F(TAppendableZstdFileTest, Write)
{
auto logFile = GetLogFile();
- WriteTestFile(logFile.Name(), 0, false);
+ WriteTestFile(logFile.Name(), 0, {.WriteTruncateMessage = false});
TUnbufferedFileInput file(logFile.Name());
TZstdDecompress decompress(&file);
@@ -1161,7 +1162,7 @@ TEST_F(TAppendableZstdFileTest, WriteMultipleFramesPerFlush)
{
TFile rawFile(logFile.Name(), OpenAlways|RdWr|CloseOnExec);
- auto file = CreateAppendableZstdFile(rawFile, true);
+ auto file = CreateAppendableZstdFile(rawFile, {.WriteTruncateMessage = true});
file->Write(data.data(), data.size());
file->Finish();
}
@@ -1177,7 +1178,7 @@ TEST_F(TAppendableZstdFileTest, WriteMultipleFramesPerFlush)
TEST_F(TAppendableZstdFileTest, RepairSmall)
{
auto logFile = GetLogFile();
- WriteTestFile(logFile.Name(), -1, false);
+ WriteTestFile(logFile.Name(), -1, {.WriteTruncateMessage = false});
TUnbufferedFileInput file(logFile.Name());
TZstdDecompress decompress(&file);
@@ -1187,13 +1188,13 @@ TEST_F(TAppendableZstdFileTest, RepairSmall)
TEST_F(TAppendableZstdFileTest, RepairLarge)
{
auto logFile = GetLogFile();
- WriteTestFile(logFile.Name(), 10_MB, true);
+ WriteTestFile(logFile.Name(), 10_MB, {.WriteTruncateMessage = true});
TUnbufferedFileInput file(logFile.Name());
TZstdDecompress decompress(&file);
TStringBuilder expected;
- expected.AppendFormat("foo\nbar\nTruncated %v bytes due to zstd repair.\nzog\n", 10_MB);
+ expected.AppendFormat("foo\nbar\n*** Truncated %v trailing bytes due to zstd repair\nzog\n", 10_MB);
EXPECT_EQ(expected.Flush(), decompress.ReadAll());
}
diff --git a/yt/yt/core/logging/zstd_compression.cpp b/yt/yt/core/logging/zstd_log_codec.cpp
index 6fe72108521..c82f3179295 100644
--- a/yt/yt/core/logging/zstd_compression.cpp
+++ b/yt/yt/core/logging/zstd_log_codec.cpp
@@ -1,6 +1,6 @@
-#include "zstd_compression.h"
+#include "zstd_log_codec.h"
-#include "compression.h"
+#include "log_codec.h"
#include <yt/yt/core/misc/finally.h>
@@ -15,19 +15,19 @@ using namespace NCompression::NDetail;
////////////////////////////////////////////////////////////////////////////////
class TZstdLogCompressionCodec
- : public ILogCompressionCodec
+ : public ILogCodec
{
public:
explicit TZstdLogCompressionCodec(int compressionLevel)
: CompressionLevel_(compressionLevel)
{ }
- i64 GetMaxBlockSize() const override
+ i64 GetMaxBlockSize() const final
{
return MaxZstdFrameUncompressedLength;
}
- void Compress(const TBuffer& input, TBuffer& output) override
+ void Compress(const TBuffer& input, TBuffer* output) final
{
auto context = ZSTD_createCCtx();
auto contextGuard = Finally([&] {
@@ -36,10 +36,10 @@ public:
auto frameLength = ZSTD_COMPRESSBOUND(std::min<i64>(MaxZstdFrameUncompressedLength, input.Size()));
- output.Reserve(output.Size() + frameLength + ZstdSyncTagSize);
+ output->Reserve(output->Size() + frameLength + ZstdSyncTagSize);
size_t size = ZSTD_compressCCtx(
context,
- output.Data() + output.Size(),
+ output->Data() + output->Size(),
frameLength,
input.Data(),
input.Size(),
@@ -49,16 +49,16 @@ public:
THROW_ERROR_EXCEPTION("ZSTD_compressCCtx() failed")
<< TErrorAttribute("zstd_error", ZSTD_getErrorName(size));
}
- output.Advance(size);
+ output->Advance(size);
}
- void AddSyncTag(i64 offset, TBuffer& output) override
+ void AddSyncTag(i64 offset, TBuffer* output) final
{
- output.Append(ZstdSyncTagPrefix.data(), ZstdSyncTagPrefix.size());
- output.Append(reinterpret_cast<const char*>(&offset), sizeof(offset));
+ output->Append(ZstdSyncTagPrefix.data(), ZstdSyncTagPrefix.size());
+ output->Append(reinterpret_cast<const char*>(&offset), sizeof(offset));
}
- void Repair(TFile* file, i64& outputPosition) override
+ i64 Repair(TFile* file) final
{
constexpr auto ScanOverlap = ZstdSyncTagSize - 1;
constexpr auto MaxZstdFrameLength = ZSTD_COMPRESSBOUND(MaxZstdFrameUncompressedLength);
@@ -71,7 +71,7 @@ public:
TBuffer buffer;
- outputPosition = 0;
+ auto outputPosition = 0;
while (bufSize >= ZstdSyncTagSize) {
buffer.Resize(0);
buffer.Reserve(bufSize);
@@ -89,16 +89,16 @@ public:
pos = newPos;
}
file->Resize(outputPosition);
+ return outputPosition;
}
private:
- int CompressionLevel_;
+ const int CompressionLevel_;
};
-DECLARE_REFCOUNTED_TYPE(TZstdLogCompressionCodec)
-DEFINE_REFCOUNTED_TYPE(TZstdLogCompressionCodec)
+////////////////////////////////////////////////////////////////////////////////
-ILogCompressionCodecPtr CreateZstdCompressionCodec(int compressionLevel)
+ILogCodecPtr CreateZstdLogCodec(int compressionLevel)
{
return New<TZstdLogCompressionCodec>(compressionLevel);
}
diff --git a/yt/yt/core/logging/zstd_compression.h b/yt/yt/core/logging/zstd_log_codec.h
index c53e4f032c9..f8934559e49 100644
--- a/yt/yt/core/logging/zstd_compression.h
+++ b/yt/yt/core/logging/zstd_log_codec.h
@@ -11,7 +11,7 @@ namespace NYT::NLogging {
constexpr i64 MaxZstdFrameUncompressedLength = 5_MBs;
constexpr const int DefaultZstdCompressionLevel = 3;
-ILogCompressionCodecPtr CreateZstdCompressionCodec(int compressionLevel = DefaultZstdCompressionLevel);
+ILogCodecPtr CreateZstdLogCodec(int compressionLevel = DefaultZstdCompressionLevel);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make
index 924fabb734e..e1dab60770c 100644
--- a/yt/yt/core/ya.make
+++ b/yt/yt/core/ya.make
@@ -99,7 +99,7 @@ SRCS(
crypto/crypto.cpp
crypto/tls.cpp
- logging/compression.cpp
+ logging/appendable_compressed_file.cpp
logging/config.cpp
GLOBAL logging/configure_log_manager.cpp
logging/formatter.cpp
@@ -114,7 +114,7 @@ SRCS(
logging/stream_log_writer.cpp
logging/system_log_event_provider.cpp
logging/random_access_gzip.cpp
- logging/zstd_compression.cpp
+ logging/zstd_log_codec.cpp
misc/arithmetic_formula.cpp
misc/backtrace.cpp