diff options
author | babenko <babenko@yandex-team.com> | 2025-01-24 14:18:41 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2025-01-24 14:41:16 +0300 |
commit | 44d7e0082415e0cfcc10fc97f63fffc70ff90653 (patch) | |
tree | 0fca5d5108508163d9d971b09f549ba5e20a5b2d | |
parent | a9b47db2069a01e1f93339ecc3a6c2eb0ec86630 (diff) | |
download | ydb-44d7e0082415e0cfcc10fc97f63fffc70ff90653.tar.gz |
Introduce checksum snapshot dump mode
* Changelog entry
Type: feature
Component: master dynamic-tables
For master and tablet node `dump-snapshot` CLI command now accepts `snapshot-dump-mode` which could be either `content` (as usual) and `checksum` (a new option). In the latter case the binary dumps checksums for various (hierarchically nested) _scopes_ of snapshot. This mode could be used to aid diagnosing snapshot mismatch issues.
commit_hash:6e64e081aa30e6512d55d710a61573f68047c5c6
-rw-r--r-- | yt/yt/core/misc/public.h | 9 | ||||
-rw-r--r-- | yt/yt/core/misc/serialize-inl.h | 3 | ||||
-rw-r--r-- | yt/yt/core/misc/serialize.cpp | 157 | ||||
-rw-r--r-- | yt/yt/core/misc/serialize.h | 57 | ||||
-rw-r--r-- | yt/yt/core/misc/serialize_dump.h | 77 | ||||
-rw-r--r-- | yt/yt/core/phoenix/unittests/phoenix_ut.cpp | 4 |
6 files changed, 249 insertions, 58 deletions
diff --git a/yt/yt/core/misc/public.h b/yt/yt/core/misc/public.h index 5fb6839f12..e67d095ae5 100644 --- a/yt/yt/core/misc/public.h +++ b/yt/yt/core/misc/public.h @@ -81,6 +81,12 @@ struct TValueBoundSerializer; template <class T, class C, class = void> struct TSerializerTraits; +DEFINE_ENUM(ESerializationDumpMode, + (None) + (Content) + (Checksum) +); + template <class TKey, class TComparer> class TSkipList; @@ -116,9 +122,6 @@ constexpr TChecksum NullChecksum = 0; template <class T, size_t N> class TCompactVector; -class TRef; -class TMutableRef; - template <class TProto> class TRefCountedProto; diff --git a/yt/yt/core/misc/serialize-inl.h b/yt/yt/core/misc/serialize-inl.h index 1b588bbff3..38fea1742e 100644 --- a/yt/yt/core/misc/serialize-inl.h +++ b/yt/yt/core/misc/serialize-inl.h @@ -6,6 +6,7 @@ #include "collection_helpers.h" #include "maybe_inf.h" +#include "mpl.h" #include <yt/yt/core/phoenix/concepts.h> @@ -17,6 +18,8 @@ #include <library/cpp/yt/containers/enum_indexed_array.h> +#include <library/cpp/yt/assert/assert.h> + #include <optional> #include <variant> diff --git a/yt/yt/core/misc/serialize.cpp b/yt/yt/core/misc/serialize.cpp index b5d8720a35..6bc396ddfc 100644 --- a/yt/yt/core/misc/serialize.cpp +++ b/yt/yt/core/misc/serialize.cpp @@ -112,58 +112,169 @@ void TStreamSaveContext::Finish() //////////////////////////////////////////////////////////////////////////////// -TLoadContextStream::TLoadContextStream(IInputStream* input) - : Input_(input) +TLoadContextStream::TLoadContextStream( + TStreamLoadContext* context, + IInputStream* input) + : Context_(context) + , Input_(input) { } -TLoadContextStream::TLoadContextStream(IZeroCopyInput* input) - : ZeroCopyInput_(input) +TLoadContextStream::TLoadContextStream( + TStreamLoadContext* context, + IZeroCopyInput* input) + : Context_(context) + , Input_(input) { } -void TLoadContextStream::ClearBuffer() +void TLoadContextStream::SkipToCheckpoint() { - if (BufferRemaining_ > 0) { - BufferPtr_ = nullptr; - BufferRemaining_ = 0; + if (BufferRemaining_ == 0) { + return; + } + + if (!ScopeStack_.empty()) { + THROW_ERROR_EXCEPTION("Cannot skip to checkpoint when scope stack is not empty"); + } + + BufferPtr_ = nullptr; + BufferRemaining_ = 0; +} + +void TLoadContextStream::UpdateTopmostScopeChecksum(void* buf, size_t len) +{ + auto& scope = ScopeStack_.back(); + scope.CurrentChecksum = GetChecksum(TRef(buf, len), scope.CurrentChecksum); +} + +void TLoadContextStream::UpdateScopesChecksum() +{ + if (Context_->Dumper().IsChecksumDumpActive() && !ScopeStack_.empty()) { + auto& topmostScope = ScopeStack_.back(); + UpdateTopmostScopeChecksum( + topmostScope.CurrentChecksumPtr, + BufferPtr_ - topmostScope.CurrentChecksumPtr); + UpdateScopesCurrentChecksumPtr(); + } +} + +void TLoadContextStream::UpdateScopesCurrentChecksumPtr() +{ + if (Context_->Dumper().IsChecksumDumpActive() && !ScopeStack_.empty()) { + for (auto& scope : ScopeStack_) { + scope.CurrentChecksumPtr = BufferPtr_; + } } } -size_t TLoadContextStream::LoadSlow(void* buf, size_t len) +size_t TLoadContextStream::LoadSlow(void* buf_, size_t len) { + size_t bytesRead = 0; if (ZeroCopyInput_) { - auto bufPtr = static_cast<char*>(buf); - auto toRead = len; - while (toRead > 0) { + auto buf = static_cast<char*>(buf_); + auto bytesToRead = len; + while (bytesToRead > 0) { if (BufferRemaining_ == 0) { + UpdateScopesChecksum(); + BufferRemaining_ = ZeroCopyInput_->Next(&BufferPtr_); if (BufferRemaining_ == 0) { break; } + + UpdateScopesCurrentChecksumPtr(); } - YT_ASSERT(BufferRemaining_ > 0); - auto toCopy = std::min(toRead, BufferRemaining_); - ::memcpy(bufPtr, BufferPtr_, toCopy); - BufferPtr_ += toCopy; - BufferRemaining_ -= toCopy; - bufPtr += toCopy; - toRead -= toCopy; + + auto bytesToCopy = std::min(bytesToRead, BufferRemaining_); + ::memcpy(buf, BufferPtr_, bytesToCopy); + + BufferPtr_ += bytesToCopy; + BufferRemaining_ -= bytesToCopy; + buf += bytesToCopy; + bytesToRead -= bytesToCopy; + bytesRead += bytesToCopy; } - return len - toRead; + return len - bytesToRead; } else { - return Input_->Load(buf, len); + bytesRead = Input_->Load(buf_, len); + if (Context_->Dumper().IsChecksumDumpActive() && !ScopeStack_.empty()) { + UpdateTopmostScopeChecksum(buf_, bytesRead); + } + } + return bytesRead; +} + +void TLoadContextStream::BeginScope(TStringBuf name) +{ + UpdateScopesChecksum(); + + ScopeStack_.push_back({ + .ScopeNameLength = name.size(), + }); + + CurrentScopePath_ += "/"; + CurrentScopePath_ += name; + + if (Context_->Dumper().IsChecksumDumpActive()) { + ScopeStack_.back().CurrentChecksumPtr = BufferPtr_; } } +void TLoadContextStream::EndScope() +{ + UpdateScopesChecksum(); + + YT_VERIFY(!ScopeStack_.empty()); + auto& topmostScope = ScopeStack_.back(); + + if (Context_->Dumper().IsChecksumDumpActive()) { + for (int index = 0; index < std::ssize(ScopeStack_) - 1; ++index) { + auto& scope = ScopeStack_[index]; + scope.CurrentChecksum = GetChecksum(TRef::FromPod(topmostScope.CurrentChecksum), scope.CurrentChecksum); + } + + Context_->Dumper().WriteChecksum(CurrentScopePath_, topmostScope.CurrentChecksum); + } + + CurrentScopePath_.resize(CurrentScopePath_.size() - topmostScope.ScopeNameLength - 1); + ScopeStack_.pop_back(); +} + //////////////////////////////////////////////////////////////////////////////// TStreamLoadContext::TStreamLoadContext(IInputStream* input) - : Input_(input) + : Input_(this, input) { } TStreamLoadContext::TStreamLoadContext(IZeroCopyInput* input) - : Input_(input) + : Input_(this, input) { } +void TStreamLoadContext::BeginScope(TStringBuf name) +{ + Input_.BeginScope(name); +} + +void TStreamLoadContext::EndScope() +{ + Input_.EndScope(); +} + +//////////////////////////////////////////////////////////////////////////////// + +TStreamLoadContextScopeGuard::TStreamLoadContextScopeGuard( + TStreamLoadContext& context, + TStringBuf name) + : Context_(context) +{ + YT_VERIFY(!name.empty()); + context.BeginScope(name); +} + +TStreamLoadContextScopeGuard::~TStreamLoadContextScopeGuard() +{ + Context_.EndScope(); +} + //////////////////////////////////////////////////////////////////////////////// TEntityStreamSaveContext::TEntityStreamSaveContext( diff --git a/yt/yt/core/misc/serialize.h b/yt/yt/core/misc/serialize.h index a59522bc75..d51a5012cf 100644 --- a/yt/yt/core/misc/serialize.h +++ b/yt/yt/core/misc/serialize.h @@ -1,13 +1,8 @@ #pragma once #include "public.h" -#include "error.h" -#include "mpl.h" #include "property.h" #include "serialize_dump.h" -#include "maybe_inf.h" - -#include <library/cpp/yt/assert/assert.h> #include <library/cpp/yt/memory/ref.h> @@ -19,6 +14,8 @@ #include <util/system/align.h> +#include <util/generic/size_literals.h> + namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -148,20 +145,44 @@ protected: class TLoadContextStream { public: - explicit TLoadContextStream(IInputStream* input); - explicit TLoadContextStream(IZeroCopyInput* input); + TLoadContextStream( + TStreamLoadContext* context, + IInputStream* input); + TLoadContextStream( + TStreamLoadContext* context, + IZeroCopyInput* input); size_t Load(void* buf, size_t len); - void ClearBuffer(); + void SkipToCheckpoint(); private: + friend class TStreamLoadContext; + + TStreamLoadContext* const Context_; IInputStream* const Input_ = nullptr; IZeroCopyInput* const ZeroCopyInput_ = nullptr; char* BufferPtr_ = nullptr; size_t BufferRemaining_ = 0; + struct TScope + { + size_t ScopeNameLength; + char* CurrentChecksumPtr; + TChecksum CurrentChecksum = {}; + }; + + std::vector<TScope> ScopeStack_; + std::string CurrentScopePath_; + size_t LoadSlow(void* buf, size_t len); + + void UpdateTopmostScopeChecksum(void* buf, size_t len); + void UpdateScopesChecksum(); + void UpdateScopesCurrentChecksumPtr(); + + void BeginScope(TStringBuf name); + void EndScope(); }; //////////////////////////////////////////////////////////////////////////////// @@ -180,12 +201,32 @@ public: TLoadContextStream* GetInput(); + void BeginScope(TStringBuf name); + void EndScope(); + protected: TLoadContextStream Input_; }; //////////////////////////////////////////////////////////////////////////////// +class TStreamLoadContextScopeGuard +{ +public: + TStreamLoadContextScopeGuard( + TStreamLoadContext& context, + TStringBuf name); + TStreamLoadContextScopeGuard(const TStreamLoadContextScopeGuard&) = delete; + ~TStreamLoadContextScopeGuard(); + + TStreamLoadContextScopeGuard& operator=(const TStreamLoadContextScopeGuard&) = delete; + +private: + TStreamLoadContext& Context_; +}; + +//////////////////////////////////////////////////////////////////////////////// + template <class TSaveContext, class TLoadContext, class TSnapshotVersion> class TCustomPersistenceContext { diff --git a/yt/yt/core/misc/serialize_dump.h b/yt/yt/core/misc/serialize_dump.h index 6a01444638..31a53e73e8 100644 --- a/yt/yt/core/misc/serialize_dump.h +++ b/yt/yt/core/misc/serialize_dump.h @@ -1,7 +1,11 @@ #pragma once +#include <yt/yt/core/misc/checksum.h> + #include <library/cpp/yt/string/format.h> +#include <library/cpp/yt/misc/enum.h> + namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -9,14 +13,14 @@ namespace NYT { class TSerializationDumper { public: - Y_FORCE_INLINE bool IsEnabled() const + Y_FORCE_INLINE ESerializationDumpMode GetMode() const { - return Enabled_; + return Mode_; } - Y_FORCE_INLINE void SetEnabled(bool value) + Y_FORCE_INLINE void SetMode(ESerializationDumpMode value) { - Enabled_ = value; + Mode_ = value; } Y_FORCE_INLINE void Indent() @@ -46,9 +50,14 @@ public: } - Y_FORCE_INLINE bool IsActive() const + Y_FORCE_INLINE bool IsContentDumpActive() const { - return IsEnabled() && !IsSuspended(); + return GetMode() == ESerializationDumpMode::Content && !IsSuspended(); + } + + Y_FORCE_INLINE bool IsChecksumDumpActive() const + { + return GetMode() == ESerializationDumpMode::Checksum; } @@ -58,32 +67,48 @@ public: } template <class... TArgs> - void Write(const char* format, const TArgs&... args) + void WriteContent(const char* format, const TArgs&... args) { - if (!IsActive()) { - return; - } - - TStringBuilder builder; - builder.AppendChar(' ', IndentCount_ * 2); + BeginWrite(); + ScratchBuilder_.AppendChar(' ', IndentCount_ * 2); if (FieldName_) { - builder.AppendString(FieldName_); - builder.AppendString(": "); + ScratchBuilder_.AppendString(FieldName_); + ScratchBuilder_.AppendString(": "); FieldName_ = {}; } - builder.AppendFormat(format, args...); - builder.AppendChar('\n'); - auto buffer = builder.GetBuffer(); - fwrite(buffer.begin(), buffer.length(), 1, stderr); + ScratchBuilder_.AppendFormat(format, args...); + ScratchBuilder_.AppendChar('\n'); + EndWrite(); + } + + void WriteChecksum(TStringBuf path, TChecksum checksum) + { + BeginWrite(); + ScratchBuilder_.AppendFormat("%v => %x\n", path, checksum); + EndWrite(); } private: - bool Enabled_ = false; + ESerializationDumpMode Mode_ = ESerializationDumpMode::None; int IndentCount_ = 0; int SuspendCount_ = 0; TStringBuf FieldName_; + TStringBuilder ScratchBuilder_; + + void BeginWrite() + { + ScratchBuilder_.Reset(); + } + + void EndWrite() + { + auto buffer = ScratchBuilder_.GetBuffer(); + fwrite(buffer.begin(), buffer.length(), 1, stderr); + } }; +//////////////////////////////////////////////////////////////////////////////// + class TSerializeDumpIndentGuard : private TNonCopyable { @@ -117,6 +142,8 @@ private: TSerializationDumper* Dumper_; }; +//////////////////////////////////////////////////////////////////////////////// + class TSerializeDumpSuspendGuard : private TNonCopyable { @@ -150,10 +177,12 @@ private: TSerializationDumper* Dumper_; }; +//////////////////////////////////////////////////////////////////////////////// + #define SERIALIZATION_DUMP_WRITE(context, ...) \ - if (Y_LIKELY(!(context).Dumper().IsActive())) { \ + if (Y_LIKELY(!(context).Dumper().IsContentDumpActive())) { \ } else \ - (context).Dumper().Write(__VA_ARGS__) + (context).Dumper().WriteContent(__VA_ARGS__) #define SERIALIZATION_DUMP_INDENT(context) \ if (auto SERIALIZATION_DUMP_INDENT__Guard = NYT::TSerializeDumpIndentGuard(&(context).Dumper())) { \ @@ -165,6 +194,8 @@ private: Y_UNREACHABLE(); \ } else +//////////////////////////////////////////////////////////////////////////////// + inline TString DumpRangeToHex(TRef data) { TStringBuilder builder; @@ -178,6 +209,8 @@ inline TString DumpRangeToHex(TRef data) return builder.Flush(); } +//////////////////////////////////////////////////////////////////////////////// + template <class T> struct TSerializationDumpPodWriter { diff --git a/yt/yt/core/phoenix/unittests/phoenix_ut.cpp b/yt/yt/core/phoenix/unittests/phoenix_ut.cpp index 97792ea4b7..5c86cc98d5 100644 --- a/yt/yt/core/phoenix/unittests/phoenix_ut.cpp +++ b/yt/yt/core/phoenix/unittests/phoenix_ut.cpp @@ -53,7 +53,7 @@ T Deserialize(const TString& buffer, int version = 0) TStringInput input(buffer); TLoadContext context(&input); context.SetVersion(version); - context.Dumper().SetEnabled(true); + context.Dumper().SetMode(ESerializationDumpMode::Content); Load(context, value); return value; } @@ -64,7 +64,7 @@ void InplaceDeserialize(const TIntrusivePtr<T>& value, const TString& buffer, in TStringInput input(buffer); TLoadContext context(&input); context.SetVersion(version); - context.Dumper().SetEnabled(true); + context.Dumper().SetMode(ESerializationDumpMode::Content); NPhoenix::NDetail::TSerializer::InplaceLoad(context, value); } |