diff options
author | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
commit | 22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch) | |
tree | bffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/xdelta3 | |
parent | 332b99e2173f0425444abb759eebcb2fafaa9209 (diff) | |
download | ydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz |
validate canons without yatest_common
Diffstat (limited to 'library/cpp/xdelta3')
-rw-r--r-- | library/cpp/xdelta3/proto/state_header.proto | 37 | ||||
-rw-r--r-- | library/cpp/xdelta3/state/create_proto.cpp | 134 | ||||
-rw-r--r-- | library/cpp/xdelta3/state/create_proto.h | 26 | ||||
-rw-r--r-- | library/cpp/xdelta3/state/data_ptr.cpp | 18 | ||||
-rw-r--r-- | library/cpp/xdelta3/state/data_ptr.h | 17 | ||||
-rw-r--r-- | library/cpp/xdelta3/state/hash.cpp | 10 | ||||
-rw-r--r-- | library/cpp/xdelta3/state/hash.h | 5 | ||||
-rw-r--r-- | library/cpp/xdelta3/state/merge.cpp | 226 | ||||
-rw-r--r-- | library/cpp/xdelta3/state/merge.h | 32 | ||||
-rw-r--r-- | library/cpp/xdelta3/state/state.cpp | 147 | ||||
-rw-r--r-- | library/cpp/xdelta3/state/state.h | 37 | ||||
-rw-r--r-- | library/cpp/xdelta3/xdelta_codec/codec.c | 199 | ||||
-rw-r--r-- | library/cpp/xdelta3/xdelta_codec/codec.h | 49 | ||||
-rw-r--r-- | library/cpp/xdelta3/xdelta_codec/merge_patches.c | 494 |
14 files changed, 1431 insertions, 0 deletions
diff --git a/library/cpp/xdelta3/proto/state_header.proto b/library/cpp/xdelta3/proto/state_header.proto new file mode 100644 index 00000000000..21454b8263b --- /dev/null +++ b/library/cpp/xdelta3/proto/state_header.proto @@ -0,0 +1,37 @@ +syntax = "proto2"; + +package NXdeltaAggregateColumn; + +option cc_enable_arenas = true; + +// update ArenaMaxSize with reasonable constant after Header modification +// note! not using strings here to avoid heap allocations + +message TStateHeader { + enum EType { + NONE_TYPE = 0; + BASE = 1; + PATCH = 2; + }; + + enum EErrorCode { + NO_ERROR = 0; + HEADER_PARSE_ERROR = 1; + BASE_HASH_ERROR = 2; + STATE_HASH_ERROR = 3; + MERGE_PATCHES_ERROR = 4; + APPLY_PATCH_ERROR = 5; + YT_MERGE_ERROR = 6; + MISSING_REQUIRED_FIELD_ERROR = 7; + WRONG_DATA_SIZE = 8; + STATE_SIZE_ERROR = 9; + PROTOBUF_ERROR = 10; + }; + + optional EType type = 1; // base or patch + optional uint32 base_hash = 2; // applicable for patch - hash of base to apply on + optional uint32 state_hash = 3; // applicable for patch - hash of target state + optional uint32 state_size = 4; // applicable for patch - target state size - remove it? + optional uint32 data_size = 5; // base or patch payload size + optional EErrorCode error_code = 6; +}; diff --git a/library/cpp/xdelta3/state/create_proto.cpp b/library/cpp/xdelta3/state/create_proto.cpp new file mode 100644 index 00000000000..ad97201e559 --- /dev/null +++ b/library/cpp/xdelta3/state/create_proto.cpp @@ -0,0 +1,134 @@ +#include "create_proto.h" + +#include <library/cpp/xdelta3/state/data_ptr.h> +#include <library/cpp/xdelta3/state/hash.h> +#include <library/cpp/xdelta3/xdelta_codec/codec.h> + +namespace NXdeltaAggregateColumn { + bool EncodeHeaderTo(const TStateHeader& header, ui8* data, size_t size, size_t& resultSize); +} + +namespace { + using namespace NXdeltaAggregateColumn; + + template<typename TResult> + TResult EncodeProto(const TStateHeader& header, const ui8* data, size_t size) + { + using namespace NProtoBuf::io; + + TResult result; + ui8 totalHeaderSize = SizeOfHeader(header); + ui8* ptr = nullptr; + if constexpr (std::is_same_v<TResult, TString>) { + result.ReserveAndResize(totalHeaderSize + size); + ptr = reinterpret_cast<ui8*>(&result[0]); + } else { + result.Resize(totalHeaderSize + size); + ptr = reinterpret_cast<ui8*>(result.Data()); + } + size_t resultSize = 0; + + if (EncodeHeaderTo(header, ptr, result.Size(), resultSize)) { + if (data && size) { + memcpy(ptr + totalHeaderSize, data, size); + } + return result; + } + return {}; + } + + template<typename TResult> + TResult EncodeErrorProto(TStateHeader::EErrorCode error, const TChangeHeader& changeHeader = {}) + { + TStateHeader header; + header.set_error_code(error); + + if (changeHeader) { + changeHeader(header); + } + + return EncodeProto<TResult>(header, nullptr, 0); + } + + template<typename TResult> + TResult EncodeBaseProto(const ui8* base, size_t baseSize, const TChangeHeader& changeHeader = {}) + { + TStateHeader header; + header.set_type(TStateHeader::BASE); + header.set_data_size(baseSize); + + if (changeHeader) { + changeHeader(header); + } + + return EncodeProto<TResult>(header, base, baseSize); + } + + template<typename TResult> + TResult EncodePatchProto(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const ui8* patch, size_t patchSize, const TChangeHeader& changeHeader = {}) + { + TStateHeader header; + header.set_type(TStateHeader::PATCH); + header.set_data_size(patchSize); + header.set_state_hash(CalcHash(state, stateSize)); + header.set_state_size(stateSize); + header.set_base_hash(CalcHash(base, baseSize)); + + if (changeHeader) { + changeHeader(header); + } + + return EncodeProto<TResult>(header, patch, patchSize); + } + + template<typename TResult> + TResult EncodePatchProto(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const TChangeHeader& changeHeader = {}) + { + size_t patchSize = 0; + auto patch = TDataPtr(ComputePatch(nullptr, base, baseSize, state, stateSize, &patchSize)); + + return EncodePatchProto<TResult>(base, baseSize, state, stateSize, patch.get(), patchSize, changeHeader); + } +} + +namespace NXdeltaAggregateColumn { + TBuffer EncodeErrorProtoAsBuffer(TStateHeader::EErrorCode error, const TChangeHeader& changeHeader) { + return EncodeErrorProto<TBuffer>(error, changeHeader); + } + + TBuffer EncodeBaseProtoAsBuffer(const ui8* base, size_t baseSize, const TChangeHeader& changeHeader) { + return EncodeBaseProto<TBuffer>(base, baseSize, changeHeader); + } + + TBuffer EncodePatchProtoAsBuffer(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const TChangeHeader& changeHeader) { + return EncodePatchProto<TBuffer>(base, baseSize, state, stateSize, changeHeader); + } + + TBuffer EncodePatchProtoAsBuffer(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const ui8* patch, size_t patchSize, const TChangeHeader& changeHeader) { + return EncodePatchProto<TBuffer>(base, baseSize, state, stateSize, patch, patchSize, changeHeader); + } + + TBuffer EncodeProtoAsBuffer(const TStateHeader& header, const ui8* data, size_t size) { + return EncodeProto<TBuffer>(header, data, size); + } + + TString EncodeErrorProtoAsString(TStateHeader::EErrorCode error, const TChangeHeader& changeHeader) { + return EncodeErrorProto<TString>(error, changeHeader); + } + + TString EncodeBaseProtoAsString(const ui8* base, size_t baseSize, const TChangeHeader& changeHeader) { + return EncodeBaseProto<TString>(base, baseSize, changeHeader); + } + + TString EncodePatchProtoAsString(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const TChangeHeader& changeHeader) { + return EncodePatchProto<TString>(base, baseSize, state, stateSize, changeHeader); + } + + TString EncodePatchProtoAsString(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const ui8* patch, size_t patchSize, const TChangeHeader& changeHeader) { + return EncodePatchProto<TString>(base, baseSize, state, stateSize, patch, patchSize, changeHeader); + } + + TString EncodeProtoAsString(const TStateHeader& header, const ui8* data, size_t size) { + return EncodeProto<TString>(header, data, size); + } +} diff --git a/library/cpp/xdelta3/state/create_proto.h b/library/cpp/xdelta3/state/create_proto.h new file mode 100644 index 00000000000..7ca9763d848 --- /dev/null +++ b/library/cpp/xdelta3/state/create_proto.h @@ -0,0 +1,26 @@ +#pragma once + +#include "state.h" + +#include <library/cpp/xdelta3/proto/state_header.pb.h> + +#include <functional> + +namespace NXdeltaAggregateColumn { + + // functor used for testing - change header fields to inject errors + using TChangeHeader = std::function<void(TStateHeader&)>; + + TBuffer EncodeErrorProtoAsBuffer(TStateHeader::EErrorCode error, const TChangeHeader& changeHeader = {}); + TBuffer EncodeBaseProtoAsBuffer(const ui8* base, size_t baseSize, const TChangeHeader& changeHeader = {}); + TBuffer EncodePatchProtoAsBuffer(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const TChangeHeader& changeHeader = {}); + TBuffer EncodePatchProtoAsBuffer(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const ui8* patch, size_t patchSize, const TChangeHeader& changeHeader = {}); + TBuffer EncodeProtoAsBuffer(const TStateHeader& header, const ui8* data, size_t size); + + TString EncodeErrorProtoAsString(TStateHeader::EErrorCode error, const TChangeHeader& changeHeader = {}); + TString EncodeBaseProtoAsString(const ui8* base, size_t baseSize, const TChangeHeader& changeHeader = {}); + TString EncodePatchProtoAsString(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const TChangeHeader& changeHeader = {}); + TString EncodePatchProtoAsString(const ui8* base, size_t baseSize, const ui8* state, size_t stateSize, const ui8* patch, size_t patchSize, const TChangeHeader& changeHeader = {}); + TString EncodeProtoAsString(const TStateHeader& header, const ui8* data, size_t size); + +} diff --git a/library/cpp/xdelta3/state/data_ptr.cpp b/library/cpp/xdelta3/state/data_ptr.cpp new file mode 100644 index 00000000000..a64af771244 --- /dev/null +++ b/library/cpp/xdelta3/state/data_ptr.cpp @@ -0,0 +1,18 @@ +#include "data_ptr.h" + +namespace NXdeltaAggregateColumn { + + TDeleter::TDeleter(XDeltaContext* context) + : Context(context) + { + } + + void TDeleter::operator()(ui8* ptr) const + { + if (!Context) { + free(ptr); + return; + } + Context->free(Context->opaque, ptr); + } +} diff --git a/library/cpp/xdelta3/state/data_ptr.h b/library/cpp/xdelta3/state/data_ptr.h new file mode 100644 index 00000000000..7ba12eb4ba9 --- /dev/null +++ b/library/cpp/xdelta3/state/data_ptr.h @@ -0,0 +1,17 @@ +#pragma once + +#include <library/cpp/xdelta3/xdelta_codec/codec.h> + +#include <memory> + +namespace NXdeltaAggregateColumn { + struct TDeleter { + TDeleter() = default; + explicit TDeleter(XDeltaContext* context); + + void operator()(ui8* p) const; + + XDeltaContext* Context = nullptr; + }; + using TDataPtr = std::unique_ptr<ui8, TDeleter>; +} diff --git a/library/cpp/xdelta3/state/hash.cpp b/library/cpp/xdelta3/state/hash.cpp new file mode 100644 index 00000000000..741e60c013c --- /dev/null +++ b/library/cpp/xdelta3/state/hash.cpp @@ -0,0 +1,10 @@ +#include <util/digest/murmur.h> + +namespace NXdeltaAggregateColumn { + + ui32 CalcHash(const ui8* data, size_t size) + { + return MurmurHash<ui32>(data, size); + } + +} diff --git a/library/cpp/xdelta3/state/hash.h b/library/cpp/xdelta3/state/hash.h new file mode 100644 index 00000000000..437e3be1236 --- /dev/null +++ b/library/cpp/xdelta3/state/hash.h @@ -0,0 +1,5 @@ +#pragma once + +namespace NXdeltaAggregateColumn { + ui32 CalcHash(const ui8* data, size_t size); +} diff --git a/library/cpp/xdelta3/state/merge.cpp b/library/cpp/xdelta3/state/merge.cpp new file mode 100644 index 00000000000..de8b12226d5 --- /dev/null +++ b/library/cpp/xdelta3/state/merge.cpp @@ -0,0 +1,226 @@ +#include "merge.h" + +#include "state.h" + +#include "data_ptr.h" + +#include <library/cpp/xdelta3/xdelta_codec/codec.h> +#include <library/cpp/xdelta3/state/hash.h> + +#include <util/digest/murmur.h> + +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +namespace NXdeltaAggregateColumn { + + static ui8* AllocateFromContext(XDeltaContext* context, size_t size) + { + if (!context) { + return reinterpret_cast<ui8*>(malloc(size)); + } + return reinterpret_cast<ui8*>(context->allocate(context->opaque, size)); + } + + bool EncodeHeaderTo(const TStateHeader& header, ui8* data, size_t size, size_t& resultSize); + + bool EncodeErrorHeader(XDeltaContext* context, NProtoBuf::Arena& arena, TStateHeader::EErrorCode error, TSpan* result) + { + result->Offset = result->Size = 0; + + auto header = NProtoBuf::Arena::CreateMessage<NXdeltaAggregateColumn::TStateHeader>(&arena); + header->set_error_code(error); + auto headerSize = SizeOfHeader(*header); + auto data = TDataPtr(AllocateFromContext(context, headerSize), TDeleter(context)); + if (EncodeHeaderTo(*header, data.get(), headerSize, result->Size)) { + result->Offset = 0; + result->Data = data.release(); + return true; + } + return false; + } + + bool EncodeState(XDeltaContext* context, NProtoBuf::Arena& arena, const TState& state, TSpan* result) + { + auto headerSize = SizeOfHeader(state.Header()); + result->Size = headerSize + state.PayloadSize(); + auto data = TDataPtr(AllocateFromContext(context, result->Size), TDeleter(context)); + size_t written = 0; + if (EncodeHeaderTo(state.Header(), data.get(), result->Size, written)) { + if (state.PayloadSize() && state.PayloadData()) { + memcpy(data.get() + headerSize, state.PayloadData(), state.PayloadSize()); + } + result->Data = data.release(); + return true; + } + return EncodeErrorHeader(context, arena, TStateHeader::PROTOBUF_ERROR, result); + } + + // NOTE: empty (data_size == 0) patch means nothing changed. + // empty valid patch and will be ignored unless will raise error + + bool IsBadEmptyPatch(const TState& empty) + { + return 0u == empty.PayloadSize() && empty.Header().base_hash() != empty.Header().state_hash(); + } + + bool MergePatches(XDeltaContext* context, NProtoBuf::Arena& arena, const TState& lhs, const TState& rhs, TSpan* result) + { + if (lhs.Header().state_hash() != rhs.Header().base_hash()) { + return EncodeErrorHeader(context, arena, TStateHeader::MERGE_PATCHES_ERROR, result); + } + + if (IsBadEmptyPatch(lhs) || IsBadEmptyPatch(rhs)) { + return EncodeErrorHeader(context, arena, TStateHeader::MERGE_PATCHES_ERROR, result); + } + + if (0u == lhs.PayloadSize()) { + return EncodeState(context, arena, rhs, result); + } + + if (0u == rhs.PayloadSize()) { + return EncodeState(context, arena, lhs, result); + } + + auto merged = NProtoBuf::Arena::CreateMessage<TStateHeader>(&arena); + merged->set_type(TStateHeader::PATCH); + merged->set_base_hash(lhs.Header().base_hash()); + merged->set_state_hash(rhs.Header().state_hash()); + merged->set_state_size(rhs.Header().state_size()); + + size_t patchSize = 0; + auto maxMergedHeaderSize = SizeOfHeader(*merged) + sizeof(patchSize); // estimation should be valid: sizeof(ui64 patchSize) covers possible sizeOfHeaderSize growth (+1) + // as well as ui32 data_size which is unset at this point + auto patch = TDataPtr(MergePatches( + context, + maxMergedHeaderSize, + lhs.PayloadData(), + lhs.PayloadSize(), + rhs.PayloadData(), + rhs.PayloadSize(), + &patchSize), + TDeleter(context)); + if (!patch) { + return EncodeErrorHeader(context, arena, TStateHeader::MERGE_PATCHES_ERROR, result); + } + + merged->set_data_size(patchSize); + + auto mergedHeaderSize = SizeOfHeader(*merged); + Y_ENSURE(maxMergedHeaderSize >= mergedHeaderSize); + auto offset = maxMergedHeaderSize - mergedHeaderSize; + + size_t headerSize = 0; + if (!EncodeHeaderTo(*merged, patch.get() + offset, mergedHeaderSize, headerSize)) { + return EncodeErrorHeader(context, arena, TStateHeader::PROTOBUF_ERROR, result); + } + + result->Size = mergedHeaderSize + patchSize; + result->Offset = offset; + result->Data = patch.release(); + return true; + } + + bool ApplyPatch(XDeltaContext* context, NProtoBuf::Arena& arena, const TState& base, const TState& patch, TSpan* result) + { + auto baseHash = base.CalcHash(); + if (baseHash != patch.Header().base_hash()) { + return EncodeErrorHeader(context, arena, TStateHeader::BASE_HASH_ERROR, result); + } + + if (patch.Header().data_size() == 0) { + if (patch.Header().state_size() == base.Header().data_size()) { + if (patch.Header().state_hash() == baseHash) { + return EncodeState(context, arena, base, result); + } + return EncodeErrorHeader(context, arena, TStateHeader::STATE_HASH_ERROR, result); + } + return EncodeErrorHeader(context, arena, TStateHeader::STATE_SIZE_ERROR, result); + } + + size_t stateSize = 0; + + auto merged = NProtoBuf::Arena::CreateMessage<TStateHeader>(&arena); + merged->set_type(TStateHeader::BASE); + + auto maxHeaderSize = SizeOfHeader(*merged) + sizeof(stateSize); + + auto state = TDataPtr(ApplyPatch( + context, + maxHeaderSize, + base.PayloadData(), + base.PayloadSize(), + patch.PayloadData(), + patch.PayloadSize(), + patch.Header().state_size(), + &stateSize), + TDeleter(context)); + if (!state) { + return EncodeErrorHeader(context, arena, TStateHeader::APPLY_PATCH_ERROR, result); + } + + if (stateSize != patch.Header().state_size()) { + return EncodeErrorHeader(context, arena, TStateHeader::STATE_SIZE_ERROR, result); + } + + auto stateHash = CalcHash(state.get() + maxHeaderSize, stateSize); + if (stateHash != patch.Header().state_hash()) { + return EncodeErrorHeader(context, arena, TStateHeader::STATE_HASH_ERROR, result); + } + + merged->set_data_size(stateSize); + + auto mergedHeaderSize = SizeOfHeader(*merged); + auto offset = maxHeaderSize - mergedHeaderSize; + size_t headerSize = 0; + if (!EncodeHeaderTo(*merged, state.get() + offset, mergedHeaderSize, headerSize)) { + return EncodeErrorHeader(context, arena, TStateHeader::PROTOBUF_ERROR, result); + } + + result->Size = mergedHeaderSize + stateSize; + result->Offset = offset; + result->Data = state.release(); + return true; + } + + int MergeStates(XDeltaContext* context, const ui8* lhsData, size_t lhsSize, const ui8* rhsData, size_t rhsSize, TSpan* result) + { + using namespace NXdeltaAggregateColumn; + using namespace NProtoBuf::io; + + result->Data = nullptr; + result->Size = 0; + result->Offset = 0; + + NProtoBuf::ArenaOptions options; + options.initial_block_size = ArenaMaxSize; + auto buffer = TDataPtr(AllocateFromContext(context, options.initial_block_size), TDeleter(context)); + options.initial_block = reinterpret_cast<char*>(buffer.get()); + NProtoBuf::Arena arena(options); + + TState rhs(arena, rhsData, rhsSize); + + if (rhs.Header().has_error_code()) { + return EncodeErrorHeader(context, arena, rhs.Error(), result); + } + + if (rhs.Type() == TStateHeader::BASE) { + result->Data = rhsData; + result->Size = rhsSize; + return true; + } + + TState lhs(arena, lhsData, lhsSize); + if (lhs.Header().has_error_code()) { + return EncodeErrorHeader(context, arena, lhs.Error(), result); + } + + if (lhs.Type() == TStateHeader::PATCH && rhs.Type() == TStateHeader::PATCH) { + return MergePatches(context, arena, lhs, rhs, result); + } else if (lhs.Type() == TStateHeader::BASE && rhs.Type() == TStateHeader::PATCH) { + return ApplyPatch(context, arena, lhs, rhs, result); + } + return EncodeErrorHeader(context, arena, TStateHeader::YT_MERGE_ERROR, result); + } +} diff --git a/library/cpp/xdelta3/state/merge.h b/library/cpp/xdelta3/state/merge.h new file mode 100644 index 00000000000..a93106c7d9a --- /dev/null +++ b/library/cpp/xdelta3/state/merge.h @@ -0,0 +1,32 @@ +#pragma once + +#include <library/cpp/xdelta3/xdelta_codec/codec.h> + +#include <util/system/types.h> + +#include <string.h> + +#ifdef __cplusplus +namespace NXdeltaAggregateColumn { +extern "C" { +#endif + +// total Data size = Offset + Size +struct TSpan { + const ui8* Data; + size_t Offset; + size_t Size; +}; + +int MergeStates( + XDeltaContext* context, + const ui8* lhsSata, + size_t lhsSize, + const ui8* rhsData, + size_t rhsSize, + struct TSpan* result); + +#ifdef __cplusplus +} +} +#endif diff --git a/library/cpp/xdelta3/state/state.cpp b/library/cpp/xdelta3/state/state.cpp new file mode 100644 index 00000000000..f09dde04259 --- /dev/null +++ b/library/cpp/xdelta3/state/state.cpp @@ -0,0 +1,147 @@ +#include "state.h" + +#include <library/cpp/xdelta3/state/hash.h> +#include <library/cpp/xdelta3/xdelta_codec/codec.h> + +#include <util/stream/null.h> + +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +namespace NXdeltaAggregateColumn { + size_t SizeOfHeaderSize(size_t headerSize) + { + using namespace NProtoBuf::io; + + ui32 dummy = 0; + auto data = reinterpret_cast<ui8*>(&dummy); + return CodedOutputStream::WriteVarint32ToArray(headerSize, data) - data; + } + + size_t SizeOfHeader(const TStateHeader& header) + { + // length of header + calculated length + auto headerSize = header.ByteSize(); + return SizeOfHeaderSize(headerSize) + headerSize; + } + + TStateHeader* ParseHeader(NProtoBuf::Arena& arena, const ui8* data, size_t size) + { + using namespace NProtoBuf::io; + + auto header = NProtoBuf::Arena::CreateMessage<TStateHeader>(&arena); + if (nullptr == data || 0 == size) { + header->set_error_code(TStateHeader::HEADER_PARSE_ERROR); + return header; + } + + ui32 headerSize = 0; + CodedInputStream in(data, size); + if (in.ReadVarint32(&headerSize)) { + auto sizeofHeaderSize = in.CurrentPosition(); + if (size - sizeofHeaderSize < headerSize) { + header->set_error_code(TStateHeader::HEADER_PARSE_ERROR); + return header; + } + + if (!header->ParseFromArray(data + sizeofHeaderSize, headerSize)) { + header->Clear(); + header->set_error_code(TStateHeader::HEADER_PARSE_ERROR); + } + } else { + header->set_error_code(TStateHeader::HEADER_PARSE_ERROR); + } + + return header; + } + + bool EncodeHeaderTo(const TStateHeader& header, ui8* data, size_t size, size_t& resultSize) + { + using namespace NProtoBuf::io; + + resultSize = 0; + auto headerSize = header.ByteSize(); + auto sizeOfHeaderSize = SizeOfHeaderSize(headerSize); + if (header.SerializeToArray(data + sizeOfHeaderSize, size - sizeOfHeaderSize)) { + sizeOfHeaderSize = CodedOutputStream::WriteVarint32ToArray(headerSize, data) - data; + resultSize = sizeOfHeaderSize + headerSize; + return true; + } + return false; + } + + TStateHeader::EErrorCode CheckProto(const TStateHeader* header, size_t dataSize) + { + auto hasRequiredFields = false; + if (header->type() == TStateHeader::BASE) { + hasRequiredFields = header->has_data_size(); + } else if (header->type() == TStateHeader::PATCH) { + hasRequiredFields = header->has_base_hash() && + header->has_state_hash() && + header->has_state_size() && + header->has_data_size(); + } else { + hasRequiredFields = header->has_error_code(); + } + if (!hasRequiredFields) { + return TStateHeader::MISSING_REQUIRED_FIELD_ERROR; + } + + auto payloadSizeOk = header->data_size() <= dataSize - SizeOfHeader(*header); + if (!payloadSizeOk) { + return TStateHeader::WRONG_DATA_SIZE; + } + return TStateHeader::NO_ERROR; + } + + TState::TState(NProtoBuf::Arena& arena, const ui8* data, size_t size) + { + if (nullptr == data || 0 == size) { + return; + } + + HeaderPtr = ParseHeader(arena, data, size); + if (HeaderPtr->has_error_code() && HeaderPtr->error_code() == TStateHeader::HEADER_PARSE_ERROR) { + return; + } + + auto errorCode = CheckProto(HeaderPtr, size); + if (errorCode != TStateHeader::NO_ERROR) { + HeaderPtr->Clear(); + HeaderPtr->set_error_code(errorCode); + return; + } + + if (HeaderPtr->type() != TStateHeader::NONE_TYPE) { + if (HeaderPtr->data_size()) { + Data = data + SizeOfHeader(*HeaderPtr); + } + } + } + + ui32 TState::CalcHash() const + { + if (nullptr != PayloadData() && 0 != PayloadSize()) { + return NXdeltaAggregateColumn::CalcHash(PayloadData(), PayloadSize()); + } + return 0; + } + + size_t TState::PayloadSize() const + { + return HeaderPtr->data_size(); + } + + TStateHeader::EErrorCode TState::Error() const + { + return HeaderPtr->has_error_code() + ? HeaderPtr->error_code() + : TStateHeader::NO_ERROR; + } + + TStateHeader::EType TState::Type() const + { + return HeaderPtr->type(); + } +} diff --git a/library/cpp/xdelta3/state/state.h b/library/cpp/xdelta3/state/state.h new file mode 100644 index 00000000000..297824952a2 --- /dev/null +++ b/library/cpp/xdelta3/state/state.h @@ -0,0 +1,37 @@ +#pragma once + +#include <library/cpp/xdelta3/proto/state_header.pb.h> + +#include <google/protobuf/arena.h> + +namespace NXdeltaAggregateColumn { + + constexpr auto ArenaMaxSize = 65525; + + class TState { + public: + TState(NProtoBuf::Arena& arena, const ui8* data, size_t size); + + const NXdeltaAggregateColumn::TStateHeader& Header() const + { + return *HeaderPtr; + } + + const ui8* PayloadData() const + { + return Data; + } + + size_t PayloadSize() const; + TStateHeader::EErrorCode Error() const; + TStateHeader::EType Type() const; + + ui32 CalcHash() const; + + private: + const ui8* Data = nullptr; + TStateHeader* HeaderPtr = nullptr; + }; + + size_t SizeOfHeader(const TStateHeader& header); +} diff --git a/library/cpp/xdelta3/xdelta_codec/codec.c b/library/cpp/xdelta3/xdelta_codec/codec.c new file mode 100644 index 00000000000..96c04affaf8 --- /dev/null +++ b/library/cpp/xdelta3/xdelta_codec/codec.c @@ -0,0 +1,199 @@ +#include "codec.h" + +#include <contrib/libs/xdelta3/xdelta3.h> +#include <contrib/libs/xdelta3/xdelta3-internal.h> + +#include <util/system/types.h> + +#include <arpa/inet.h> + +#include <stdlib.h> +#include <string.h> + +#define IOPT_SIZE 100 + +#ifndef MAX + #define MAX(a,b) ((a) > (b) ? (a) : (b)) +#endif + + +void* xdelta3_buffer_alloc(XDeltaContext* context, size_t items, usize_t size); + +void xdelta3_buffer_free(XDeltaContext* context, void* ptr); + +void init_common_config(XDeltaContext* context, xd3_config* config, usize_t iopt_size); + +int merge_vcdiff_patches( + XDeltaContext* context, + const ui8* patch1, + size_t patch_size1, + const ui8* patch2, + size_t patch_size2, + ui8* result, + size_t* result_size, + size_t max_result_size); + +int ProcessMemory( + int isEncode, + int (*func)(xd3_stream*), + const ui8* input, + usize_t inputSize, + const ui8* source, + usize_t sourceSize, + ui8* output, + usize_t* outputSize, + usize_t outputSizeMax) +{ + xd3_stream stream; + xd3_config config; + xd3_source src; + int ret; + + memset(&stream, 0, sizeof(stream)); + xd3_init_config(&config, XD3_NOCOMPRESS); + + if (isEncode) { + config.winsize = inputSize; + config.sprevsz = xd3_pow2_roundup(config.winsize); + } + + init_common_config(NULL, &config, IOPT_SIZE); // IOPT_SIZE - key option drastically increased performance + + if ((ret = xd3_config_stream(&stream, &config)) == 0) { + if (source != NULL || sourceSize == 0) { + memset(&src, 0, sizeof(src)); + + src.blksize = sourceSize; + src.onblk = sourceSize; + src.curblk = source; + src.curblkno = 0; + src.max_winsize = sourceSize; + + if ((ret = xd3_set_source_and_size(&stream, &src, sourceSize)) == 0) { + ret = xd3_process_stream( + isEncode, + &stream, + func, + 1, + input, + inputSize, + output, + outputSize, + outputSizeMax); + } + } + } + xd3_free_stream(&stream); + return ret; +} + +ui8* ComputePatch( + XDeltaContext* context, + const ui8* from, + size_t fromSize, + const ui8* to, + size_t toSize, + size_t* patchSize) +{ + *patchSize = 0; + + size_t maxInputSize = MAX(toSize, fromSize); + size_t deltaSize = MAX(maxInputSize, 200u) * 1.5; // NOTE: for small data N * 1.5 does not work e.g. data 10 & 10 -> patch 31 + + ui8* delta = (ui8*)xdelta3_buffer_alloc(context, deltaSize, 1); + if (delta == NULL) { + return NULL; + } + + usize_t outSize = 0; + int ret = ProcessMemory( + 1, + &xd3_encode_input, + to, + toSize, + from, + fromSize, + delta, + &outSize, + deltaSize); + + if (ret != 0) { + xdelta3_buffer_free(context, delta); + return NULL; + } + + *patchSize = outSize; + return delta; +} + +ui8* ApplyPatch( + XDeltaContext* context, + size_t headroomSize, + const ui8* base, + size_t baseSize, + const ui8* patch, + size_t patchSize, + size_t stateSize, + size_t* resultSize) +{ + *resultSize = 0; + + ui8* buffer = (ui8*)xdelta3_buffer_alloc(context, headroomSize + stateSize, 1); + if (buffer == NULL) { + return NULL; + } + + size_t decodedSize = 0; + int ret = ProcessMemory( + 0, + &xd3_decode_input, + patch, + patchSize, + base, + baseSize, + buffer + headroomSize, + &decodedSize, + stateSize); + + if (ret != 0) { + xdelta3_buffer_free(context, buffer); + return NULL; + } + + *resultSize = decodedSize; + return buffer; +} + +ui8* MergePatches( + XDeltaContext* context, + size_t headroomSize, + const ui8* patch1, + size_t patch1Size, + const ui8* patch2, + size_t patch2Size, + size_t* patch3Size) +{ + *patch3Size = 0; + + size_t maxResultSize = headroomSize + 3 * (patch1Size + patch2Size); // estmation could be more accurate + + ui8* result = (ui8*)xdelta3_buffer_alloc(context, maxResultSize, 1); + size_t resultSize = 0; + int ret = merge_vcdiff_patches( + NULL, + patch1, + patch1Size, + patch2, + patch2Size, + result + headroomSize, + &resultSize, + maxResultSize - headroomSize); + + if (ret != 0) { + xdelta3_buffer_free(context, result); + return NULL; + } + + *patch3Size = resultSize; + return result; +} diff --git a/library/cpp/xdelta3/xdelta_codec/codec.h b/library/cpp/xdelta3/xdelta_codec/codec.h new file mode 100644 index 00000000000..51b935c9162 --- /dev/null +++ b/library/cpp/xdelta3/xdelta_codec/codec.h @@ -0,0 +1,49 @@ +#pragma once + +#include <util/system/types.h> + +#include <string.h> + +struct _xdelta_context { + void* opaque; + void* (*allocate)(void* opaque, size_t size); + void (*free)(void* opaque, void* ptr); +}; +typedef struct _xdelta_context XDeltaContext; + +#ifdef __cplusplus +namespace NXdeltaAggregateColumn { +extern "C" { +#endif + +ui8* ApplyPatch( + XDeltaContext* context, + size_t headroomSize, + const ui8* base, + size_t baseSize, + const ui8* patch, + size_t patchSize, + size_t stateSize, + size_t* resultSize); + +ui8* ComputePatch( + XDeltaContext* context, + const ui8* from, + size_t fromSize, + const ui8* to, + size_t toSize, + size_t* patchSize); + +ui8* MergePatches( + XDeltaContext* context, + size_t headroomSize, + const ui8* patch1, + size_t patch1_size, + const ui8* patch2, + size_t patch2_size, + size_t* patch3_size); + +#ifdef __cplusplus +} +} +#endif diff --git a/library/cpp/xdelta3/xdelta_codec/merge_patches.c b/library/cpp/xdelta3/xdelta_codec/merge_patches.c new file mode 100644 index 00000000000..b425873dde1 --- /dev/null +++ b/library/cpp/xdelta3/xdelta_codec/merge_patches.c @@ -0,0 +1,494 @@ +#include "codec.h" + +#include <contrib/libs/xdelta3/xdelta3.h> +#include <contrib/libs/xdelta3/xdelta3-internal.h> + +#include <util/system/types.h> + +// routines extracted from xdelta3-main.h + +// from xdelta3.c +#define VCD_ADLER32 (1U << 2) /* has adler32 checksum */ + +#define UNUSED(x) (void)(x) + +typedef enum { + CMD_MERGE_ARG, + CMD_MERGE, +} xd3_cmd; + +void* xdelta3_buffer_alloc(void* context, size_t items, usize_t size) +{ + if (!context) { + return malloc(items * size); + } + XDeltaContext* xd3_context = (XDeltaContext*) context; + return xd3_context->allocate(xd3_context->opaque, items * size); +} + +void xdelta3_buffer_free(void* context, void* ptr) +{ + if (!context) { + free(ptr); + return; + } + XDeltaContext* xd3_context = (XDeltaContext*) context; + xd3_context->free(xd3_context->opaque, ptr); +} + +void init_common_config(XDeltaContext* context, xd3_config* config, usize_t iopt_size) +{ + config->alloc = xdelta3_buffer_alloc; + config->freef = xdelta3_buffer_free; + config->opaque = context; + if (iopt_size) { // option should be 0 in case of patches merge, critical for patch calculation performance + config->iopt_size = iopt_size; + } +} + +int init_recode_stream(XDeltaContext* context, xd3_stream* recode_stream) +{ + int ret; + int stream_flags = XD3_ADLER32_NOVER | XD3_SKIP_EMIT; + int recode_flags; + xd3_config recode_config; + + recode_flags = (stream_flags & XD3_SEC_TYPE); + + xd3_init_config(&recode_config, recode_flags); + + init_common_config(context, &recode_config, 0); + + if ((ret = xd3_config_stream(recode_stream, &recode_config)) || + (ret = xd3_encode_init_partial(recode_stream)) || + (ret = xd3_whole_state_init(recode_stream))) + { + xd3_free_stream(recode_stream); + return ret; + } + + return 0; +} + +int merge_func(xd3_stream* stream, ui8* out_data, size_t* out_size) +{ + UNUSED(out_data); + UNUSED(out_size); + + return xd3_whole_append_window(stream); +} + +int write_output(xd3_stream* stream, ui8* out_data, size_t* out_size, size_t max_osize) +{ + if (stream->avail_out > 0) { + if (*out_size + stream->avail_out > max_osize) { + return ENOMEM; + } + + memcpy(out_data + *out_size, stream->next_out, stream->avail_out); + *out_size += stream->avail_out; + } + + return 0; +} + +int merge_output( + XDeltaContext* context, + xd3_stream* stream, + xd3_stream* recode_stream, + xd3_stream* merge_stream, + ui8** buffer, + size_t buffer_size, + ui8* out_data, + size_t* out_size, + size_t max_osize) +{ + int ret; + usize_t inst_pos = 0; + xoff_t output_pos = 0; + xd3_source recode_source; + usize_t window_num = 0; + int at_least_once = 0; + + /* merge_stream is set if there were arguments. this stream's input + * needs to be applied to the merge_stream source. */ + if ((merge_stream != NULL) && + (ret = xd3_merge_input_output(stream, &merge_stream->whole_target))) + { + return ret; + } + + /* Enter the ENC_INPUT state and bypass the next_in == NULL test + * and (leftover) input buffering logic. */ + XD3_ASSERT(recode_stream->enc_state == ENC_INIT); + recode_stream->enc_state = ENC_INPUT; + recode_stream->next_in = *buffer; + recode_stream->flags |= XD3_FLUSH; + + /* This encodes the entire target. */ + while (inst_pos < stream->whole_target.instlen || !at_least_once) { + xoff_t window_start = output_pos; + int window_srcset = 0; + xoff_t window_srcmin = 0; + xoff_t window_srcmax = 0; + usize_t window_pos = 0; + usize_t window_size; + + /* at_least_once ensures that we encode at least one window, + * which handles the 0-byte case. */ + at_least_once = 1; + + XD3_ASSERT(recode_stream->enc_state == ENC_INPUT); + + if ((ret = xd3_encode_input(recode_stream)) != XD3_WINSTART) { + return XD3_INVALID; + } + + /* Window sizes must match from the input to the output, so that + * target copies are in-range (and so that checksums carry + * over). */ + XD3_ASSERT(window_num < stream->whole_target.wininfolen); + window_size = stream->whole_target.wininfo[window_num].length; + + /* Output position should also match. */ + if (output_pos != stream->whole_target.wininfo[window_num].offset) { + // internal merge error: offset mismatch + return XD3_INVALID; + } + + // NOTE: check if delta_codecs can decode this. option_use_checksum = 1 + if ((stream->dec_win_ind & VCD_ADLER32) != 0) { + recode_stream->flags |= XD3_ADLER32_RECODE; + recode_stream->recode_adler32 = stream->whole_target.wininfo[window_num].adler32; + } + + window_num++; + + if (buffer_size < window_size) { + xdelta3_buffer_free(context, *buffer); + *buffer = NULL; + buffer_size = 0; + if ((*buffer = (ui8*)xdelta3_buffer_alloc(context, window_size, 1)) == NULL) { + return ENOMEM; + } + recode_stream->next_in = *buffer; // re-setting stream buffer + buffer_size = window_size; + } + + /* This encodes a single target window. */ + while (window_pos < window_size && inst_pos < stream->whole_target.instlen) { + xd3_winst* inst = &stream->whole_target.inst[inst_pos]; + usize_t take = xd3_min(inst->size, window_size - window_pos); + xoff_t addr; + + switch (inst->type) { + case XD3_RUN: + if ((ret = xd3_emit_run(recode_stream, window_pos, take, &stream->whole_target.adds[inst->addr]))) { + return ret; + } + break; + + case XD3_ADD: + /* Adds are implicit, put them into the input buffer. */ + memcpy(*buffer + window_pos, + stream->whole_target.adds + inst->addr, take); + break; + + default: /* XD3_COPY + copy mode */ + if (inst->mode != 0) { + if (window_srcset) { + window_srcmin = xd3_min(window_srcmin, inst->addr); + window_srcmax = xd3_max(window_srcmax, + inst->addr + take); + } else { + window_srcset = 1; + window_srcmin = inst->addr; + window_srcmax = inst->addr + take; + } + addr = inst->addr; + } else { + XD3_ASSERT(inst->addr >= window_start); + addr = inst->addr - window_start; + } + + if ((ret = xd3_found_match(recode_stream, window_pos, take, addr, inst->mode != 0))) { + return ret; + } + break; + } + + window_pos += take; + output_pos += take; + + if (take == inst->size) { + inst_pos += 1; + } else { + /* Modify the instruction for the next pass. */ + if (inst->type != XD3_RUN) { + inst->addr += take; + } + inst->size -= take; + } + } + + xd3_avail_input(recode_stream, *buffer, window_pos); + + recode_stream->enc_state = ENC_INSTR; + + if (window_srcset) { + recode_stream->srcwin_decided = 1; + recode_stream->src = &recode_source; + recode_source.srclen = (usize_t)(window_srcmax - window_srcmin); + recode_source.srcbase = window_srcmin; + recode_stream->taroff = recode_source.srclen; + + XD3_ASSERT(recode_source.srclen != 0); + } else { + recode_stream->srcwin_decided = 0; + recode_stream->src = NULL; + recode_stream->taroff = 0; + } + + for (;;) { + switch ((ret = xd3_encode_input(recode_stream))) { + case XD3_INPUT: { + goto done_window; + } + case XD3_OUTPUT: { + /* main_file_write below */ + break; + } + case XD3_GOTHEADER: + case XD3_WINSTART: + case XD3_WINFINISH: { + /* ignore */ + continue; + } + case XD3_GETSRCBLK: + case 0: { + return XD3_INTERNAL; + } + default: + return ret; + } + + if ((ret = write_output(recode_stream, out_data, out_size, max_osize))) { + return ret; + } + + xd3_consume_output(recode_stream); + } + done_window: + (void)0; + } + + return 0; +} + +int process_input( + XDeltaContext* context, + xd3_cmd cmd, + xd3_stream* recode_stream, + xd3_stream* merge_stream, + const ui8* patch, + size_t patch_size, + ui8* out_data, + size_t* out_size, + size_t max_out_size) +{ + int ret; + xd3_stream stream; + int stream_flags = 0; + xd3_config config; + xd3_source source; + + int (*input_func)(xd3_stream*); + int (*output_func)(xd3_stream*, ui8*, size_t*); + + memset(&stream, 0, sizeof(stream)); + memset(&source, 0, sizeof(source)); + memset(&config, 0, sizeof(config)); + + stream_flags |= XD3_ADLER32; + /* main_input setup. */ + stream_flags |= XD3_ADLER32_NOVER | XD3_SKIP_EMIT; // TODO: add nocompress + input_func = xd3_decode_input; + + if ((ret = init_recode_stream(context, recode_stream))) { + return EXIT_FAILURE; + } + + config.winsize = patch_size; + config.sprevsz = xd3_pow2_roundup(config.winsize); + config.flags = stream_flags; + init_common_config(context, &config, 0); + + output_func = merge_func; + + if ((ret = xd3_config_stream(&stream, &config)) || (ret = xd3_whole_state_init(&stream))) { + return EXIT_FAILURE; + } + + /* If we've reached EOF tell the stream to flush. */ + stream.flags |= XD3_FLUSH; + + xd3_avail_input(&stream, patch, patch_size); + + /* Main input loop. */ + + int again = 0; + do { + ret = input_func(&stream); + + switch (ret) { + case XD3_INPUT: + again = 0; + break; + + case XD3_GOTHEADER: { + XD3_ASSERT(stream.current_window == 0); + } + /* FALLTHROUGH */ + case XD3_WINSTART: { + /* e.g., set or unset XD3_SKIP_WINDOW. */ + again = 1; + break; + } + + case XD3_OUTPUT: { + if (ret = output_func(&stream, out_data, out_size)) { + xd3_free_stream(&stream); + return EXIT_FAILURE; + } + + xd3_consume_output(&stream); + again = 1; + break; + } + + case XD3_WINFINISH: { + again = 1; + break; + } + + default: + /* input_func() error */ + xd3_free_stream(&stream); + return EXIT_FAILURE; + } + } while (again); + + if (cmd == CMD_MERGE) { + ui8* buffer = NULL; + size_t buffer_size = patch_size; + if ((buffer = (ui8*)xdelta3_buffer_alloc(context, buffer_size, 1)) == NULL) { + xd3_free_stream(&stream); + return EXIT_FAILURE; + } + + ret = merge_output( + context, + &stream, + recode_stream, + merge_stream, + &buffer, + buffer_size, + out_data, + out_size, + max_out_size); + + xdelta3_buffer_free(context, buffer); + + if (ret) { + xd3_free_stream(&stream); + return EXIT_FAILURE; + } + } else if (cmd == CMD_MERGE_ARG) { + xd3_swap_whole_state(&stream.whole_target, &recode_stream->whole_target); + } + + if ((ret = xd3_close_stream(&stream))) { + return EXIT_FAILURE; + } + + xd3_free_stream(&stream); + return EXIT_SUCCESS; +} + +int patch_to_stream( + XDeltaContext* context, + const ui8* patch, + size_t patch_size, + xd3_stream* recode_stream, + xd3_stream* merge_stream) +{ + xd3_stream merge_input; + int ret; + + xd3_config config; + memset(&config, 0, sizeof(config)); + init_common_config(context, &config, 0); + + if ((ret = xd3_config_stream(&merge_input, &config)) || (ret = xd3_whole_state_init(&merge_input))) { + return ret; + } + + ret = process_input( + context, + CMD_MERGE_ARG, + recode_stream, + merge_stream, + patch, + patch_size, + NULL, + NULL, + 0); + + if (ret == 0) { + xd3_swap_whole_state(&recode_stream->whole_target, &merge_input.whole_target); + } + + xd3_free_stream(recode_stream); + + if (ret != 0) { + xd3_free_stream(&merge_input); + return ret; + } + + if ((ret = xd3_config_stream(merge_stream, &config)) || (ret = xd3_whole_state_init(merge_stream))) { + xd3_free_stream(&merge_input); + return ret; + } + + xd3_swap_whole_state(&merge_stream->whole_target, &merge_input.whole_target); + ret = 0; + xd3_free_stream(&merge_input); + return ret; +} + +int merge_vcdiff_patches( + XDeltaContext* context, + const ui8* patch1, + size_t patch_size1, + const ui8* patch2, + size_t patch_size2, + ui8* result, + size_t* result_size, + size_t max_result_size) +{ + xd3_stream recode_stream; + memset(&recode_stream, 0, sizeof(xd3_stream)); + xd3_stream merge_stream; + memset(&merge_stream, 0, sizeof(xd3_stream)); + + int ret; + ret = patch_to_stream(context, patch1, patch_size1, &recode_stream, &merge_stream); + if (!ret) { + ret = process_input(context, CMD_MERGE, &recode_stream, &merge_stream, patch2, patch_size2, result, result_size, max_result_size); + } + + xd3_free_stream(&recode_stream); + xd3_free_stream(&merge_stream); + + return ret; +} |