aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/xdelta3/state
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
committerqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
commit22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch)
treebffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/xdelta3/state
parent332b99e2173f0425444abb759eebcb2fafaa9209 (diff)
downloadydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz
validate canons without yatest_common
Diffstat (limited to 'library/cpp/xdelta3/state')
-rw-r--r--library/cpp/xdelta3/state/create_proto.cpp134
-rw-r--r--library/cpp/xdelta3/state/create_proto.h26
-rw-r--r--library/cpp/xdelta3/state/data_ptr.cpp18
-rw-r--r--library/cpp/xdelta3/state/data_ptr.h17
-rw-r--r--library/cpp/xdelta3/state/hash.cpp10
-rw-r--r--library/cpp/xdelta3/state/hash.h5
-rw-r--r--library/cpp/xdelta3/state/merge.cpp226
-rw-r--r--library/cpp/xdelta3/state/merge.h32
-rw-r--r--library/cpp/xdelta3/state/state.cpp147
-rw-r--r--library/cpp/xdelta3/state/state.h37
10 files changed, 652 insertions, 0 deletions
diff --git a/library/cpp/xdelta3/state/create_proto.cpp b/library/cpp/xdelta3/state/create_proto.cpp
new file mode 100644
index 0000000000..ad97201e55
--- /dev/null
+++ b/library/cpp/xdelta3/state/create_proto.cpp
@@ -0,0 +1,134 @@
+#include "create_proto.h"
+
+#include <library/cpp/xdelta3/state/data_ptr.h>
+#include <library/cpp/xdelta3/state/hash.h>
+#include <library/cpp/xdelta3/xdelta_codec/codec.h>
+
+namespace NXdeltaAggregateColumn {
+ bool EncodeHeaderTo(const TStateHeader& header, ui8* data, size_t size, size_t& resultSize);
+}
+
+namespace {
+ using namespace NXdeltaAggregateColumn;
+
+ template<typename TResult>
+ TResult EncodeProto(const TStateHeader& header, const ui8* data, size_t size)
+ {
+ using namespace NProtoBuf::io;
+
+ TResult result;
+ ui8 totalHeaderSize = SizeOfHeader(header);
+ ui8* ptr = nullptr;
+ if constexpr (std::is_same_v<TResult, TString>) {
+ result.ReserveAndResize(totalHeaderSize + size);
+ ptr = reinterpret_cast<ui8*>(&result[0]);
+ } else {
+ result.Resize(totalHeaderSize + size);
+ ptr = reinterpret_cast<ui8*>(result.Data());
+ }
+ size_t resultSize = 0;
+
+ if (EncodeHeaderTo(header, ptr, result.Size(), resultSize)) {
+ if (data && size) {
+ memcpy(ptr + totalHeaderSize, data, size);
+ }
+ return result;
+ }
+ return {};
+ }
+
+ template<typename TResult>
+ TResult EncodeErrorProto(TStateHeader::EErrorCode error, const TChangeHeader& changeHeader = {})
+ {
+ TStateHeader header;
+ header.set_error_code(error);
+
+ if (changeHeader) {
+ changeHeader(header);
+ }
+
+ return EncodeProto<TResult>(header, nullptr, 0);
+ }
+
+ template<typename TResult>
+ TResult EncodeBaseProto(const ui8* base, size_t baseSize, const TChangeHeader& changeHeader = {})
+ {
+ TStateHeader header;
+ header.set_type(TStateHeader::BASE);
+ header.set_data_size(baseSize);
+
+ if (changeHeader) {
+ changeHeader(header);
+ }
+
+ return EncodeProto<TResult>(header, base, baseSize);
+ }
+
+ template<typename TResult>
+ TResult EncodePatchProto(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const ui8* patch, size_t patchSize, const TChangeHeader& changeHeader = {})
+ {
+ TStateHeader header;
+ header.set_type(TStateHeader::PATCH);
+ header.set_data_size(patchSize);
+ header.set_state_hash(CalcHash(state, stateSize));
+ header.set_state_size(stateSize);
+ header.set_base_hash(CalcHash(base, baseSize));
+
+ if (changeHeader) {
+ changeHeader(header);
+ }
+
+ return EncodeProto<TResult>(header, patch, patchSize);
+ }
+
+ template<typename TResult>
+ TResult EncodePatchProto(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const TChangeHeader& changeHeader = {})
+ {
+ size_t patchSize = 0;
+ auto patch = TDataPtr(ComputePatch(nullptr, base, baseSize, state, stateSize, &patchSize));
+
+ return EncodePatchProto<TResult>(base, baseSize, state, stateSize, patch.get(), patchSize, changeHeader);
+ }
+}
+
+namespace NXdeltaAggregateColumn {
+ TBuffer EncodeErrorProtoAsBuffer(TStateHeader::EErrorCode error, const TChangeHeader& changeHeader) {
+ return EncodeErrorProto<TBuffer>(error, changeHeader);
+ }
+
+ TBuffer EncodeBaseProtoAsBuffer(const ui8* base, size_t baseSize, const TChangeHeader& changeHeader) {
+ return EncodeBaseProto<TBuffer>(base, baseSize, changeHeader);
+ }
+
+ TBuffer EncodePatchProtoAsBuffer(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const TChangeHeader& changeHeader) {
+ return EncodePatchProto<TBuffer>(base, baseSize, state, stateSize, changeHeader);
+ }
+
+ TBuffer EncodePatchProtoAsBuffer(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const ui8* patch, size_t patchSize, const TChangeHeader& changeHeader) {
+ return EncodePatchProto<TBuffer>(base, baseSize, state, stateSize, patch, patchSize, changeHeader);
+ }
+
+ TBuffer EncodeProtoAsBuffer(const TStateHeader& header, const ui8* data, size_t size) {
+ return EncodeProto<TBuffer>(header, data, size);
+ }
+
+ TString EncodeErrorProtoAsString(TStateHeader::EErrorCode error, const TChangeHeader& changeHeader) {
+ return EncodeErrorProto<TString>(error, changeHeader);
+ }
+
+ TString EncodeBaseProtoAsString(const ui8* base, size_t baseSize, const TChangeHeader& changeHeader) {
+ return EncodeBaseProto<TString>(base, baseSize, changeHeader);
+ }
+
+ TString EncodePatchProtoAsString(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const TChangeHeader& changeHeader) {
+ return EncodePatchProto<TString>(base, baseSize, state, stateSize, changeHeader);
+ }
+
+ TString EncodePatchProtoAsString(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const ui8* patch, size_t patchSize, const TChangeHeader& changeHeader) {
+ return EncodePatchProto<TString>(base, baseSize, state, stateSize, patch, patchSize, changeHeader);
+ }
+
+ TString EncodeProtoAsString(const TStateHeader& header, const ui8* data, size_t size) {
+ return EncodeProto<TString>(header, data, size);
+ }
+}
diff --git a/library/cpp/xdelta3/state/create_proto.h b/library/cpp/xdelta3/state/create_proto.h
new file mode 100644
index 0000000000..7ca9763d84
--- /dev/null
+++ b/library/cpp/xdelta3/state/create_proto.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include "state.h"
+
+#include <library/cpp/xdelta3/proto/state_header.pb.h>
+
+#include <functional>
+
+namespace NXdeltaAggregateColumn {
+
+ // functor used for testing - change header fields to inject errors
+ using TChangeHeader = std::function<void(TStateHeader&)>;
+
+ TBuffer EncodeErrorProtoAsBuffer(TStateHeader::EErrorCode error, const TChangeHeader& changeHeader = {});
+ TBuffer EncodeBaseProtoAsBuffer(const ui8* base, size_t baseSize, const TChangeHeader& changeHeader = {});
+ TBuffer EncodePatchProtoAsBuffer(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const TChangeHeader& changeHeader = {});
+ TBuffer EncodePatchProtoAsBuffer(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const ui8* patch, size_t patchSize, const TChangeHeader& changeHeader = {});
+ TBuffer EncodeProtoAsBuffer(const TStateHeader& header, const ui8* data, size_t size);
+
+ TString EncodeErrorProtoAsString(TStateHeader::EErrorCode error, const TChangeHeader& changeHeader = {});
+ TString EncodeBaseProtoAsString(const ui8* base, size_t baseSize, const TChangeHeader& changeHeader = {});
+ TString EncodePatchProtoAsString(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const TChangeHeader& changeHeader = {});
+ TString EncodePatchProtoAsString(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const ui8* patch, size_t patchSize, const TChangeHeader& changeHeader = {});
+ TString EncodeProtoAsString(const TStateHeader& header, const ui8* data, size_t size);
+
+}
diff --git a/library/cpp/xdelta3/state/data_ptr.cpp b/library/cpp/xdelta3/state/data_ptr.cpp
new file mode 100644
index 0000000000..a64af77124
--- /dev/null
+++ b/library/cpp/xdelta3/state/data_ptr.cpp
@@ -0,0 +1,18 @@
+#include "data_ptr.h"
+
+namespace NXdeltaAggregateColumn {
+
+ TDeleter::TDeleter(XDeltaContext* context)
+ : Context(context)
+ {
+ }
+
+ void TDeleter::operator()(ui8* ptr) const
+ {
+ if (!Context) {
+ free(ptr);
+ return;
+ }
+ Context->free(Context->opaque, ptr);
+ }
+}
diff --git a/library/cpp/xdelta3/state/data_ptr.h b/library/cpp/xdelta3/state/data_ptr.h
new file mode 100644
index 0000000000..7ba12eb4ba
--- /dev/null
+++ b/library/cpp/xdelta3/state/data_ptr.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <library/cpp/xdelta3/xdelta_codec/codec.h>
+
+#include <memory>
+
+namespace NXdeltaAggregateColumn {
+ struct TDeleter {
+ TDeleter() = default;
+ explicit TDeleter(XDeltaContext* context);
+
+ void operator()(ui8* p) const;
+
+ XDeltaContext* Context = nullptr;
+ };
+ using TDataPtr = std::unique_ptr<ui8, TDeleter>;
+}
diff --git a/library/cpp/xdelta3/state/hash.cpp b/library/cpp/xdelta3/state/hash.cpp
new file mode 100644
index 0000000000..741e60c013
--- /dev/null
+++ b/library/cpp/xdelta3/state/hash.cpp
@@ -0,0 +1,10 @@
+#include <util/digest/murmur.h>
+
+namespace NXdeltaAggregateColumn {
+
+ ui32 CalcHash(const ui8* data, size_t size)
+ {
+ return MurmurHash<ui32>(data, size);
+ }
+
+}
diff --git a/library/cpp/xdelta3/state/hash.h b/library/cpp/xdelta3/state/hash.h
new file mode 100644
index 0000000000..437e3be123
--- /dev/null
+++ b/library/cpp/xdelta3/state/hash.h
@@ -0,0 +1,5 @@
+#pragma once
+
+namespace NXdeltaAggregateColumn {
+ ui32 CalcHash(const ui8* data, size_t size);
+}
diff --git a/library/cpp/xdelta3/state/merge.cpp b/library/cpp/xdelta3/state/merge.cpp
new file mode 100644
index 0000000000..de8b12226d
--- /dev/null
+++ b/library/cpp/xdelta3/state/merge.cpp
@@ -0,0 +1,226 @@
+#include "merge.h"
+
+#include "state.h"
+
+#include "data_ptr.h"
+
+#include <library/cpp/xdelta3/xdelta_codec/codec.h>
+#include <library/cpp/xdelta3/state/hash.h>
+
+#include <util/digest/murmur.h>
+
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+namespace NXdeltaAggregateColumn {
+
+ static ui8* AllocateFromContext(XDeltaContext* context, size_t size)
+ {
+ if (!context) {
+ return reinterpret_cast<ui8*>(malloc(size));
+ }
+ return reinterpret_cast<ui8*>(context->allocate(context->opaque, size));
+ }
+
+ bool EncodeHeaderTo(const TStateHeader& header, ui8* data, size_t size, size_t& resultSize);
+
+ bool EncodeErrorHeader(XDeltaContext* context, NProtoBuf::Arena& arena, TStateHeader::EErrorCode error, TSpan* result)
+ {
+ result->Offset = result->Size = 0;
+
+ auto header = NProtoBuf::Arena::CreateMessage<NXdeltaAggregateColumn::TStateHeader>(&arena);
+ header->set_error_code(error);
+ auto headerSize = SizeOfHeader(*header);
+ auto data = TDataPtr(AllocateFromContext(context, headerSize), TDeleter(context));
+ if (EncodeHeaderTo(*header, data.get(), headerSize, result->Size)) {
+ result->Offset = 0;
+ result->Data = data.release();
+ return true;
+ }
+ return false;
+ }
+
+ bool EncodeState(XDeltaContext* context, NProtoBuf::Arena& arena, const TState& state, TSpan* result)
+ {
+ auto headerSize = SizeOfHeader(state.Header());
+ result->Size = headerSize + state.PayloadSize();
+ auto data = TDataPtr(AllocateFromContext(context, result->Size), TDeleter(context));
+ size_t written = 0;
+ if (EncodeHeaderTo(state.Header(), data.get(), result->Size, written)) {
+ if (state.PayloadSize() && state.PayloadData()) {
+ memcpy(data.get() + headerSize, state.PayloadData(), state.PayloadSize());
+ }
+ result->Data = data.release();
+ return true;
+ }
+ return EncodeErrorHeader(context, arena, TStateHeader::PROTOBUF_ERROR, result);
+ }
+
+ // NOTE: empty (data_size == 0) patch means nothing changed.
+ // empty valid patch and will be ignored unless will raise error
+
+ bool IsBadEmptyPatch(const TState& empty)
+ {
+ return 0u == empty.PayloadSize() && empty.Header().base_hash() != empty.Header().state_hash();
+ }
+
+ bool MergePatches(XDeltaContext* context, NProtoBuf::Arena& arena, const TState& lhs, const TState& rhs, TSpan* result)
+ {
+ if (lhs.Header().state_hash() != rhs.Header().base_hash()) {
+ return EncodeErrorHeader(context, arena, TStateHeader::MERGE_PATCHES_ERROR, result);
+ }
+
+ if (IsBadEmptyPatch(lhs) || IsBadEmptyPatch(rhs)) {
+ return EncodeErrorHeader(context, arena, TStateHeader::MERGE_PATCHES_ERROR, result);
+ }
+
+ if (0u == lhs.PayloadSize()) {
+ return EncodeState(context, arena, rhs, result);
+ }
+
+ if (0u == rhs.PayloadSize()) {
+ return EncodeState(context, arena, lhs, result);
+ }
+
+ auto merged = NProtoBuf::Arena::CreateMessage<TStateHeader>(&arena);
+ merged->set_type(TStateHeader::PATCH);
+ merged->set_base_hash(lhs.Header().base_hash());
+ merged->set_state_hash(rhs.Header().state_hash());
+ merged->set_state_size(rhs.Header().state_size());
+
+ size_t patchSize = 0;
+ auto maxMergedHeaderSize = SizeOfHeader(*merged) + sizeof(patchSize); // estimation should be valid: sizeof(ui64 patchSize) covers possible sizeOfHeaderSize growth (+1)
+ // as well as ui32 data_size which is unset at this point
+ auto patch = TDataPtr(MergePatches(
+ context,
+ maxMergedHeaderSize,
+ lhs.PayloadData(),
+ lhs.PayloadSize(),
+ rhs.PayloadData(),
+ rhs.PayloadSize(),
+ &patchSize),
+ TDeleter(context));
+ if (!patch) {
+ return EncodeErrorHeader(context, arena, TStateHeader::MERGE_PATCHES_ERROR, result);
+ }
+
+ merged->set_data_size(patchSize);
+
+ auto mergedHeaderSize = SizeOfHeader(*merged);
+ Y_ENSURE(maxMergedHeaderSize >= mergedHeaderSize);
+ auto offset = maxMergedHeaderSize - mergedHeaderSize;
+
+ size_t headerSize = 0;
+ if (!EncodeHeaderTo(*merged, patch.get() + offset, mergedHeaderSize, headerSize)) {
+ return EncodeErrorHeader(context, arena, TStateHeader::PROTOBUF_ERROR, result);
+ }
+
+ result->Size = mergedHeaderSize + patchSize;
+ result->Offset = offset;
+ result->Data = patch.release();
+ return true;
+ }
+
+ bool ApplyPatch(XDeltaContext* context, NProtoBuf::Arena& arena, const TState& base, const TState& patch, TSpan* result)
+ {
+ auto baseHash = base.CalcHash();
+ if (baseHash != patch.Header().base_hash()) {
+ return EncodeErrorHeader(context, arena, TStateHeader::BASE_HASH_ERROR, result);
+ }
+
+ if (patch.Header().data_size() == 0) {
+ if (patch.Header().state_size() == base.Header().data_size()) {
+ if (patch.Header().state_hash() == baseHash) {
+ return EncodeState(context, arena, base, result);
+ }
+ return EncodeErrorHeader(context, arena, TStateHeader::STATE_HASH_ERROR, result);
+ }
+ return EncodeErrorHeader(context, arena, TStateHeader::STATE_SIZE_ERROR, result);
+ }
+
+ size_t stateSize = 0;
+
+ auto merged = NProtoBuf::Arena::CreateMessage<TStateHeader>(&arena);
+ merged->set_type(TStateHeader::BASE);
+
+ auto maxHeaderSize = SizeOfHeader(*merged) + sizeof(stateSize);
+
+ auto state = TDataPtr(ApplyPatch(
+ context,
+ maxHeaderSize,
+ base.PayloadData(),
+ base.PayloadSize(),
+ patch.PayloadData(),
+ patch.PayloadSize(),
+ patch.Header().state_size(),
+ &stateSize),
+ TDeleter(context));
+ if (!state) {
+ return EncodeErrorHeader(context, arena, TStateHeader::APPLY_PATCH_ERROR, result);
+ }
+
+ if (stateSize != patch.Header().state_size()) {
+ return EncodeErrorHeader(context, arena, TStateHeader::STATE_SIZE_ERROR, result);
+ }
+
+ auto stateHash = CalcHash(state.get() + maxHeaderSize, stateSize);
+ if (stateHash != patch.Header().state_hash()) {
+ return EncodeErrorHeader(context, arena, TStateHeader::STATE_HASH_ERROR, result);
+ }
+
+ merged->set_data_size(stateSize);
+
+ auto mergedHeaderSize = SizeOfHeader(*merged);
+ auto offset = maxHeaderSize - mergedHeaderSize;
+ size_t headerSize = 0;
+ if (!EncodeHeaderTo(*merged, state.get() + offset, mergedHeaderSize, headerSize)) {
+ return EncodeErrorHeader(context, arena, TStateHeader::PROTOBUF_ERROR, result);
+ }
+
+ result->Size = mergedHeaderSize + stateSize;
+ result->Offset = offset;
+ result->Data = state.release();
+ return true;
+ }
+
+ int MergeStates(XDeltaContext* context, const ui8* lhsData, size_t lhsSize, const ui8* rhsData, size_t rhsSize, TSpan* result)
+ {
+ using namespace NXdeltaAggregateColumn;
+ using namespace NProtoBuf::io;
+
+ result->Data = nullptr;
+ result->Size = 0;
+ result->Offset = 0;
+
+ NProtoBuf::ArenaOptions options;
+ options.initial_block_size = ArenaMaxSize;
+ auto buffer = TDataPtr(AllocateFromContext(context, options.initial_block_size), TDeleter(context));
+ options.initial_block = reinterpret_cast<char*>(buffer.get());
+ NProtoBuf::Arena arena(options);
+
+ TState rhs(arena, rhsData, rhsSize);
+
+ if (rhs.Header().has_error_code()) {
+ return EncodeErrorHeader(context, arena, rhs.Error(), result);
+ }
+
+ if (rhs.Type() == TStateHeader::BASE) {
+ result->Data = rhsData;
+ result->Size = rhsSize;
+ return true;
+ }
+
+ TState lhs(arena, lhsData, lhsSize);
+ if (lhs.Header().has_error_code()) {
+ return EncodeErrorHeader(context, arena, lhs.Error(), result);
+ }
+
+ if (lhs.Type() == TStateHeader::PATCH && rhs.Type() == TStateHeader::PATCH) {
+ return MergePatches(context, arena, lhs, rhs, result);
+ } else if (lhs.Type() == TStateHeader::BASE && rhs.Type() == TStateHeader::PATCH) {
+ return ApplyPatch(context, arena, lhs, rhs, result);
+ }
+ return EncodeErrorHeader(context, arena, TStateHeader::YT_MERGE_ERROR, result);
+ }
+}
diff --git a/library/cpp/xdelta3/state/merge.h b/library/cpp/xdelta3/state/merge.h
new file mode 100644
index 0000000000..a93106c7d9
--- /dev/null
+++ b/library/cpp/xdelta3/state/merge.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include <library/cpp/xdelta3/xdelta_codec/codec.h>
+
+#include <util/system/types.h>
+
+#include <string.h>
+
+#ifdef __cplusplus
+namespace NXdeltaAggregateColumn {
+extern "C" {
+#endif
+
+// total Data size = Offset + Size
+struct TSpan {
+ const ui8* Data;
+ size_t Offset;
+ size_t Size;
+};
+
+int MergeStates(
+ XDeltaContext* context,
+ const ui8* lhsSata,
+ size_t lhsSize,
+ const ui8* rhsData,
+ size_t rhsSize,
+ struct TSpan* result);
+
+#ifdef __cplusplus
+}
+}
+#endif
diff --git a/library/cpp/xdelta3/state/state.cpp b/library/cpp/xdelta3/state/state.cpp
new file mode 100644
index 0000000000..f09dde0425
--- /dev/null
+++ b/library/cpp/xdelta3/state/state.cpp
@@ -0,0 +1,147 @@
+#include "state.h"
+
+#include <library/cpp/xdelta3/state/hash.h>
+#include <library/cpp/xdelta3/xdelta_codec/codec.h>
+
+#include <util/stream/null.h>
+
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+namespace NXdeltaAggregateColumn {
+ size_t SizeOfHeaderSize(size_t headerSize)
+ {
+ using namespace NProtoBuf::io;
+
+ ui32 dummy = 0;
+ auto data = reinterpret_cast<ui8*>(&dummy);
+ return CodedOutputStream::WriteVarint32ToArray(headerSize, data) - data;
+ }
+
+ size_t SizeOfHeader(const TStateHeader& header)
+ {
+ // length of header + calculated length
+ auto headerSize = header.ByteSize();
+ return SizeOfHeaderSize(headerSize) + headerSize;
+ }
+
+ TStateHeader* ParseHeader(NProtoBuf::Arena& arena, const ui8* data, size_t size)
+ {
+ using namespace NProtoBuf::io;
+
+ auto header = NProtoBuf::Arena::CreateMessage<TStateHeader>(&arena);
+ if (nullptr == data || 0 == size) {
+ header->set_error_code(TStateHeader::HEADER_PARSE_ERROR);
+ return header;
+ }
+
+ ui32 headerSize = 0;
+ CodedInputStream in(data, size);
+ if (in.ReadVarint32(&headerSize)) {
+ auto sizeofHeaderSize = in.CurrentPosition();
+ if (size - sizeofHeaderSize < headerSize) {
+ header->set_error_code(TStateHeader::HEADER_PARSE_ERROR);
+ return header;
+ }
+
+ if (!header->ParseFromArray(data + sizeofHeaderSize, headerSize)) {
+ header->Clear();
+ header->set_error_code(TStateHeader::HEADER_PARSE_ERROR);
+ }
+ } else {
+ header->set_error_code(TStateHeader::HEADER_PARSE_ERROR);
+ }
+
+ return header;
+ }
+
+ bool EncodeHeaderTo(const TStateHeader& header, ui8* data, size_t size, size_t& resultSize)
+ {
+ using namespace NProtoBuf::io;
+
+ resultSize = 0;
+ auto headerSize = header.ByteSize();
+ auto sizeOfHeaderSize = SizeOfHeaderSize(headerSize);
+ if (header.SerializeToArray(data + sizeOfHeaderSize, size - sizeOfHeaderSize)) {
+ sizeOfHeaderSize = CodedOutputStream::WriteVarint32ToArray(headerSize, data) - data;
+ resultSize = sizeOfHeaderSize + headerSize;
+ return true;
+ }
+ return false;
+ }
+
+ TStateHeader::EErrorCode CheckProto(const TStateHeader* header, size_t dataSize)
+ {
+ auto hasRequiredFields = false;
+ if (header->type() == TStateHeader::BASE) {
+ hasRequiredFields = header->has_data_size();
+ } else if (header->type() == TStateHeader::PATCH) {
+ hasRequiredFields = header->has_base_hash() &&
+ header->has_state_hash() &&
+ header->has_state_size() &&
+ header->has_data_size();
+ } else {
+ hasRequiredFields = header->has_error_code();
+ }
+ if (!hasRequiredFields) {
+ return TStateHeader::MISSING_REQUIRED_FIELD_ERROR;
+ }
+
+ auto payloadSizeOk = header->data_size() <= dataSize - SizeOfHeader(*header);
+ if (!payloadSizeOk) {
+ return TStateHeader::WRONG_DATA_SIZE;
+ }
+ return TStateHeader::NO_ERROR;
+ }
+
+ TState::TState(NProtoBuf::Arena& arena, const ui8* data, size_t size)
+ {
+ if (nullptr == data || 0 == size) {
+ return;
+ }
+
+ HeaderPtr = ParseHeader(arena, data, size);
+ if (HeaderPtr->has_error_code() && HeaderPtr->error_code() == TStateHeader::HEADER_PARSE_ERROR) {
+ return;
+ }
+
+ auto errorCode = CheckProto(HeaderPtr, size);
+ if (errorCode != TStateHeader::NO_ERROR) {
+ HeaderPtr->Clear();
+ HeaderPtr->set_error_code(errorCode);
+ return;
+ }
+
+ if (HeaderPtr->type() != TStateHeader::NONE_TYPE) {
+ if (HeaderPtr->data_size()) {
+ Data = data + SizeOfHeader(*HeaderPtr);
+ }
+ }
+ }
+
+ ui32 TState::CalcHash() const
+ {
+ if (nullptr != PayloadData() && 0 != PayloadSize()) {
+ return NXdeltaAggregateColumn::CalcHash(PayloadData(), PayloadSize());
+ }
+ return 0;
+ }
+
+ size_t TState::PayloadSize() const
+ {
+ return HeaderPtr->data_size();
+ }
+
+ TStateHeader::EErrorCode TState::Error() const
+ {
+ return HeaderPtr->has_error_code()
+ ? HeaderPtr->error_code()
+ : TStateHeader::NO_ERROR;
+ }
+
+ TStateHeader::EType TState::Type() const
+ {
+ return HeaderPtr->type();
+ }
+}
diff --git a/library/cpp/xdelta3/state/state.h b/library/cpp/xdelta3/state/state.h
new file mode 100644
index 0000000000..297824952a
--- /dev/null
+++ b/library/cpp/xdelta3/state/state.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <library/cpp/xdelta3/proto/state_header.pb.h>
+
+#include <google/protobuf/arena.h>
+
+namespace NXdeltaAggregateColumn {
+
+ constexpr auto ArenaMaxSize = 65525;
+
+ class TState {
+ public:
+ TState(NProtoBuf::Arena& arena, const ui8* data, size_t size);
+
+ const NXdeltaAggregateColumn::TStateHeader& Header() const
+ {
+ return *HeaderPtr;
+ }
+
+ const ui8* PayloadData() const
+ {
+ return Data;
+ }
+
+ size_t PayloadSize() const;
+ TStateHeader::EErrorCode Error() const;
+ TStateHeader::EType Type() const;
+
+ ui32 CalcHash() const;
+
+ private:
+ const ui8* Data = nullptr;
+ TStateHeader* HeaderPtr = nullptr;
+ };
+
+ size_t SizeOfHeader(const TStateHeader& header);
+}