aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2025-01-24 14:18:41 +0300
committerbabenko <babenko@yandex-team.com>2025-01-24 14:41:16 +0300
commit44d7e0082415e0cfcc10fc97f63fffc70ff90653 (patch)
tree0fca5d5108508163d9d971b09f549ba5e20a5b2d
parenta9b47db2069a01e1f93339ecc3a6c2eb0ec86630 (diff)
downloadydb-44d7e0082415e0cfcc10fc97f63fffc70ff90653.tar.gz
Introduce checksum snapshot dump mode
* Changelog entry Type: feature Component: master dynamic-tables For master and tablet node `dump-snapshot` CLI command now accepts `snapshot-dump-mode` which could be either `content` (as usual) and `checksum` (a new option). In the latter case the binary dumps checksums for various (hierarchically nested) _scopes_ of snapshot. This mode could be used to aid diagnosing snapshot mismatch issues. commit_hash:6e64e081aa30e6512d55d710a61573f68047c5c6
-rw-r--r--yt/yt/core/misc/public.h9
-rw-r--r--yt/yt/core/misc/serialize-inl.h3
-rw-r--r--yt/yt/core/misc/serialize.cpp157
-rw-r--r--yt/yt/core/misc/serialize.h57
-rw-r--r--yt/yt/core/misc/serialize_dump.h77
-rw-r--r--yt/yt/core/phoenix/unittests/phoenix_ut.cpp4
6 files changed, 249 insertions, 58 deletions
diff --git a/yt/yt/core/misc/public.h b/yt/yt/core/misc/public.h
index 5fb6839f12..e67d095ae5 100644
--- a/yt/yt/core/misc/public.h
+++ b/yt/yt/core/misc/public.h
@@ -81,6 +81,12 @@ struct TValueBoundSerializer;
template <class T, class C, class = void>
struct TSerializerTraits;
+DEFINE_ENUM(ESerializationDumpMode,
+ (None)
+ (Content)
+ (Checksum)
+);
+
template <class TKey, class TComparer>
class TSkipList;
@@ -116,9 +122,6 @@ constexpr TChecksum NullChecksum = 0;
template <class T, size_t N>
class TCompactVector;
-class TRef;
-class TMutableRef;
-
template <class TProto>
class TRefCountedProto;
diff --git a/yt/yt/core/misc/serialize-inl.h b/yt/yt/core/misc/serialize-inl.h
index 1b588bbff3..38fea1742e 100644
--- a/yt/yt/core/misc/serialize-inl.h
+++ b/yt/yt/core/misc/serialize-inl.h
@@ -6,6 +6,7 @@
#include "collection_helpers.h"
#include "maybe_inf.h"
+#include "mpl.h"
#include <yt/yt/core/phoenix/concepts.h>
@@ -17,6 +18,8 @@
#include <library/cpp/yt/containers/enum_indexed_array.h>
+#include <library/cpp/yt/assert/assert.h>
+
#include <optional>
#include <variant>
diff --git a/yt/yt/core/misc/serialize.cpp b/yt/yt/core/misc/serialize.cpp
index b5d8720a35..6bc396ddfc 100644
--- a/yt/yt/core/misc/serialize.cpp
+++ b/yt/yt/core/misc/serialize.cpp
@@ -112,58 +112,169 @@ void TStreamSaveContext::Finish()
////////////////////////////////////////////////////////////////////////////////
-TLoadContextStream::TLoadContextStream(IInputStream* input)
- : Input_(input)
+TLoadContextStream::TLoadContextStream(
+ TStreamLoadContext* context,
+ IInputStream* input)
+ : Context_(context)
+ , Input_(input)
{ }
-TLoadContextStream::TLoadContextStream(IZeroCopyInput* input)
- : ZeroCopyInput_(input)
+TLoadContextStream::TLoadContextStream(
+ TStreamLoadContext* context,
+ IZeroCopyInput* input)
+ : Context_(context)
+ , Input_(input)
{ }
-void TLoadContextStream::ClearBuffer()
+void TLoadContextStream::SkipToCheckpoint()
{
- if (BufferRemaining_ > 0) {
- BufferPtr_ = nullptr;
- BufferRemaining_ = 0;
+ if (BufferRemaining_ == 0) {
+ return;
+ }
+
+ if (!ScopeStack_.empty()) {
+ THROW_ERROR_EXCEPTION("Cannot skip to checkpoint when scope stack is not empty");
+ }
+
+ BufferPtr_ = nullptr;
+ BufferRemaining_ = 0;
+}
+
+void TLoadContextStream::UpdateTopmostScopeChecksum(void* buf, size_t len)
+{
+ auto& scope = ScopeStack_.back();
+ scope.CurrentChecksum = GetChecksum(TRef(buf, len), scope.CurrentChecksum);
+}
+
+void TLoadContextStream::UpdateScopesChecksum()
+{
+ if (Context_->Dumper().IsChecksumDumpActive() && !ScopeStack_.empty()) {
+ auto& topmostScope = ScopeStack_.back();
+ UpdateTopmostScopeChecksum(
+ topmostScope.CurrentChecksumPtr,
+ BufferPtr_ - topmostScope.CurrentChecksumPtr);
+ UpdateScopesCurrentChecksumPtr();
+ }
+}
+
+void TLoadContextStream::UpdateScopesCurrentChecksumPtr()
+{
+ if (Context_->Dumper().IsChecksumDumpActive() && !ScopeStack_.empty()) {
+ for (auto& scope : ScopeStack_) {
+ scope.CurrentChecksumPtr = BufferPtr_;
+ }
}
}
-size_t TLoadContextStream::LoadSlow(void* buf, size_t len)
+size_t TLoadContextStream::LoadSlow(void* buf_, size_t len)
{
+ size_t bytesRead = 0;
if (ZeroCopyInput_) {
- auto bufPtr = static_cast<char*>(buf);
- auto toRead = len;
- while (toRead > 0) {
+ auto buf = static_cast<char*>(buf_);
+ auto bytesToRead = len;
+ while (bytesToRead > 0) {
if (BufferRemaining_ == 0) {
+ UpdateScopesChecksum();
+
BufferRemaining_ = ZeroCopyInput_->Next(&BufferPtr_);
if (BufferRemaining_ == 0) {
break;
}
+
+ UpdateScopesCurrentChecksumPtr();
}
- YT_ASSERT(BufferRemaining_ > 0);
- auto toCopy = std::min(toRead, BufferRemaining_);
- ::memcpy(bufPtr, BufferPtr_, toCopy);
- BufferPtr_ += toCopy;
- BufferRemaining_ -= toCopy;
- bufPtr += toCopy;
- toRead -= toCopy;
+
+ auto bytesToCopy = std::min(bytesToRead, BufferRemaining_);
+ ::memcpy(buf, BufferPtr_, bytesToCopy);
+
+ BufferPtr_ += bytesToCopy;
+ BufferRemaining_ -= bytesToCopy;
+ buf += bytesToCopy;
+ bytesToRead -= bytesToCopy;
+ bytesRead += bytesToCopy;
}
- return len - toRead;
+ return len - bytesToRead;
} else {
- return Input_->Load(buf, len);
+ bytesRead = Input_->Load(buf_, len);
+ if (Context_->Dumper().IsChecksumDumpActive() && !ScopeStack_.empty()) {
+ UpdateTopmostScopeChecksum(buf_, bytesRead);
+ }
+ }
+ return bytesRead;
+}
+
+void TLoadContextStream::BeginScope(TStringBuf name)
+{
+ UpdateScopesChecksum();
+
+ ScopeStack_.push_back({
+ .ScopeNameLength = name.size(),
+ });
+
+ CurrentScopePath_ += "/";
+ CurrentScopePath_ += name;
+
+ if (Context_->Dumper().IsChecksumDumpActive()) {
+ ScopeStack_.back().CurrentChecksumPtr = BufferPtr_;
}
}
+void TLoadContextStream::EndScope()
+{
+ UpdateScopesChecksum();
+
+ YT_VERIFY(!ScopeStack_.empty());
+ auto& topmostScope = ScopeStack_.back();
+
+ if (Context_->Dumper().IsChecksumDumpActive()) {
+ for (int index = 0; index < std::ssize(ScopeStack_) - 1; ++index) {
+ auto& scope = ScopeStack_[index];
+ scope.CurrentChecksum = GetChecksum(TRef::FromPod(topmostScope.CurrentChecksum), scope.CurrentChecksum);
+ }
+
+ Context_->Dumper().WriteChecksum(CurrentScopePath_, topmostScope.CurrentChecksum);
+ }
+
+ CurrentScopePath_.resize(CurrentScopePath_.size() - topmostScope.ScopeNameLength - 1);
+ ScopeStack_.pop_back();
+}
+
////////////////////////////////////////////////////////////////////////////////
TStreamLoadContext::TStreamLoadContext(IInputStream* input)
- : Input_(input)
+ : Input_(this, input)
{ }
TStreamLoadContext::TStreamLoadContext(IZeroCopyInput* input)
- : Input_(input)
+ : Input_(this, input)
{ }
+void TStreamLoadContext::BeginScope(TStringBuf name)
+{
+ Input_.BeginScope(name);
+}
+
+void TStreamLoadContext::EndScope()
+{
+ Input_.EndScope();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TStreamLoadContextScopeGuard::TStreamLoadContextScopeGuard(
+ TStreamLoadContext& context,
+ TStringBuf name)
+ : Context_(context)
+{
+ YT_VERIFY(!name.empty());
+ context.BeginScope(name);
+}
+
+TStreamLoadContextScopeGuard::~TStreamLoadContextScopeGuard()
+{
+ Context_.EndScope();
+}
+
////////////////////////////////////////////////////////////////////////////////
TEntityStreamSaveContext::TEntityStreamSaveContext(
diff --git a/yt/yt/core/misc/serialize.h b/yt/yt/core/misc/serialize.h
index a59522bc75..d51a5012cf 100644
--- a/yt/yt/core/misc/serialize.h
+++ b/yt/yt/core/misc/serialize.h
@@ -1,13 +1,8 @@
#pragma once
#include "public.h"
-#include "error.h"
-#include "mpl.h"
#include "property.h"
#include "serialize_dump.h"
-#include "maybe_inf.h"
-
-#include <library/cpp/yt/assert/assert.h>
#include <library/cpp/yt/memory/ref.h>
@@ -19,6 +14,8 @@
#include <util/system/align.h>
+#include <util/generic/size_literals.h>
+
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -148,20 +145,44 @@ protected:
class TLoadContextStream
{
public:
- explicit TLoadContextStream(IInputStream* input);
- explicit TLoadContextStream(IZeroCopyInput* input);
+ TLoadContextStream(
+ TStreamLoadContext* context,
+ IInputStream* input);
+ TLoadContextStream(
+ TStreamLoadContext* context,
+ IZeroCopyInput* input);
size_t Load(void* buf, size_t len);
- void ClearBuffer();
+ void SkipToCheckpoint();
private:
+ friend class TStreamLoadContext;
+
+ TStreamLoadContext* const Context_;
IInputStream* const Input_ = nullptr;
IZeroCopyInput* const ZeroCopyInput_ = nullptr;
char* BufferPtr_ = nullptr;
size_t BufferRemaining_ = 0;
+ struct TScope
+ {
+ size_t ScopeNameLength;
+ char* CurrentChecksumPtr;
+ TChecksum CurrentChecksum = {};
+ };
+
+ std::vector<TScope> ScopeStack_;
+ std::string CurrentScopePath_;
+
size_t LoadSlow(void* buf, size_t len);
+
+ void UpdateTopmostScopeChecksum(void* buf, size_t len);
+ void UpdateScopesChecksum();
+ void UpdateScopesCurrentChecksumPtr();
+
+ void BeginScope(TStringBuf name);
+ void EndScope();
};
////////////////////////////////////////////////////////////////////////////////
@@ -180,12 +201,32 @@ public:
TLoadContextStream* GetInput();
+ void BeginScope(TStringBuf name);
+ void EndScope();
+
protected:
TLoadContextStream Input_;
};
////////////////////////////////////////////////////////////////////////////////
+class TStreamLoadContextScopeGuard
+{
+public:
+ TStreamLoadContextScopeGuard(
+ TStreamLoadContext& context,
+ TStringBuf name);
+ TStreamLoadContextScopeGuard(const TStreamLoadContextScopeGuard&) = delete;
+ ~TStreamLoadContextScopeGuard();
+
+ TStreamLoadContextScopeGuard& operator=(const TStreamLoadContextScopeGuard&) = delete;
+
+private:
+ TStreamLoadContext& Context_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
template <class TSaveContext, class TLoadContext, class TSnapshotVersion>
class TCustomPersistenceContext
{
diff --git a/yt/yt/core/misc/serialize_dump.h b/yt/yt/core/misc/serialize_dump.h
index 6a01444638..31a53e73e8 100644
--- a/yt/yt/core/misc/serialize_dump.h
+++ b/yt/yt/core/misc/serialize_dump.h
@@ -1,7 +1,11 @@
#pragma once
+#include <yt/yt/core/misc/checksum.h>
+
#include <library/cpp/yt/string/format.h>
+#include <library/cpp/yt/misc/enum.h>
+
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -9,14 +13,14 @@ namespace NYT {
class TSerializationDumper
{
public:
- Y_FORCE_INLINE bool IsEnabled() const
+ Y_FORCE_INLINE ESerializationDumpMode GetMode() const
{
- return Enabled_;
+ return Mode_;
}
- Y_FORCE_INLINE void SetEnabled(bool value)
+ Y_FORCE_INLINE void SetMode(ESerializationDumpMode value)
{
- Enabled_ = value;
+ Mode_ = value;
}
Y_FORCE_INLINE void Indent()
@@ -46,9 +50,14 @@ public:
}
- Y_FORCE_INLINE bool IsActive() const
+ Y_FORCE_INLINE bool IsContentDumpActive() const
{
- return IsEnabled() && !IsSuspended();
+ return GetMode() == ESerializationDumpMode::Content && !IsSuspended();
+ }
+
+ Y_FORCE_INLINE bool IsChecksumDumpActive() const
+ {
+ return GetMode() == ESerializationDumpMode::Checksum;
}
@@ -58,32 +67,48 @@ public:
}
template <class... TArgs>
- void Write(const char* format, const TArgs&... args)
+ void WriteContent(const char* format, const TArgs&... args)
{
- if (!IsActive()) {
- return;
- }
-
- TStringBuilder builder;
- builder.AppendChar(' ', IndentCount_ * 2);
+ BeginWrite();
+ ScratchBuilder_.AppendChar(' ', IndentCount_ * 2);
if (FieldName_) {
- builder.AppendString(FieldName_);
- builder.AppendString(": ");
+ ScratchBuilder_.AppendString(FieldName_);
+ ScratchBuilder_.AppendString(": ");
FieldName_ = {};
}
- builder.AppendFormat(format, args...);
- builder.AppendChar('\n');
- auto buffer = builder.GetBuffer();
- fwrite(buffer.begin(), buffer.length(), 1, stderr);
+ ScratchBuilder_.AppendFormat(format, args...);
+ ScratchBuilder_.AppendChar('\n');
+ EndWrite();
+ }
+
+ void WriteChecksum(TStringBuf path, TChecksum checksum)
+ {
+ BeginWrite();
+ ScratchBuilder_.AppendFormat("%v => %x\n", path, checksum);
+ EndWrite();
}
private:
- bool Enabled_ = false;
+ ESerializationDumpMode Mode_ = ESerializationDumpMode::None;
int IndentCount_ = 0;
int SuspendCount_ = 0;
TStringBuf FieldName_;
+ TStringBuilder ScratchBuilder_;
+
+ void BeginWrite()
+ {
+ ScratchBuilder_.Reset();
+ }
+
+ void EndWrite()
+ {
+ auto buffer = ScratchBuilder_.GetBuffer();
+ fwrite(buffer.begin(), buffer.length(), 1, stderr);
+ }
};
+////////////////////////////////////////////////////////////////////////////////
+
class TSerializeDumpIndentGuard
: private TNonCopyable
{
@@ -117,6 +142,8 @@ private:
TSerializationDumper* Dumper_;
};
+////////////////////////////////////////////////////////////////////////////////
+
class TSerializeDumpSuspendGuard
: private TNonCopyable
{
@@ -150,10 +177,12 @@ private:
TSerializationDumper* Dumper_;
};
+////////////////////////////////////////////////////////////////////////////////
+
#define SERIALIZATION_DUMP_WRITE(context, ...) \
- if (Y_LIKELY(!(context).Dumper().IsActive())) { \
+ if (Y_LIKELY(!(context).Dumper().IsContentDumpActive())) { \
} else \
- (context).Dumper().Write(__VA_ARGS__)
+ (context).Dumper().WriteContent(__VA_ARGS__)
#define SERIALIZATION_DUMP_INDENT(context) \
if (auto SERIALIZATION_DUMP_INDENT__Guard = NYT::TSerializeDumpIndentGuard(&(context).Dumper())) { \
@@ -165,6 +194,8 @@ private:
Y_UNREACHABLE(); \
} else
+////////////////////////////////////////////////////////////////////////////////
+
inline TString DumpRangeToHex(TRef data)
{
TStringBuilder builder;
@@ -178,6 +209,8 @@ inline TString DumpRangeToHex(TRef data)
return builder.Flush();
}
+////////////////////////////////////////////////////////////////////////////////
+
template <class T>
struct TSerializationDumpPodWriter
{
diff --git a/yt/yt/core/phoenix/unittests/phoenix_ut.cpp b/yt/yt/core/phoenix/unittests/phoenix_ut.cpp
index 97792ea4b7..5c86cc98d5 100644
--- a/yt/yt/core/phoenix/unittests/phoenix_ut.cpp
+++ b/yt/yt/core/phoenix/unittests/phoenix_ut.cpp
@@ -53,7 +53,7 @@ T Deserialize(const TString& buffer, int version = 0)
TStringInput input(buffer);
TLoadContext context(&input);
context.SetVersion(version);
- context.Dumper().SetEnabled(true);
+ context.Dumper().SetMode(ESerializationDumpMode::Content);
Load(context, value);
return value;
}
@@ -64,7 +64,7 @@ void InplaceDeserialize(const TIntrusivePtr<T>& value, const TString& buffer, in
TStringInput input(buffer);
TLoadContext context(&input);
context.SetVersion(version);
- context.Dumper().SetEnabled(true);
+ context.Dumper().SetMode(ESerializationDumpMode::Content);
NPhoenix::NDetail::TSerializer::InplaceLoad(context, value);
}