diff options
| author | Ilnaz Nizametdinov <[email protected]> | 2022-05-18 13:39:08 +0300 |
|---|---|---|
| committer | Ilnaz Nizametdinov <[email protected]> | 2022-05-18 13:39:08 +0300 |
| commit | 28f554dbe570fd5a28d92ebf2c8a7f7cbc739171 (patch) | |
| tree | 23ce21b77e88cf2bbe8d347460df0cf5a18d12a1 | |
| parent | 835ba32ee05b1c38ea642a9d86b4bd11cb18d6fa (diff) | |
Compressed backups KIKIMR-14803
ref:9640af79c24e1eae042d353668d3ad9573c135e5
25 files changed, 1368 insertions, 377 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 310884c77b9..4ce54157e6b 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -1071,6 +1071,7 @@ add_subdirectory(ydb/core/tx/scheme_board/ut_populator) add_subdirectory(ydb/core/tx/scheme_board/ut_replica) add_subdirectory(ydb/core/tx/scheme_board/ut_subscriber) add_subdirectory(ydb/core/tx/schemeshard/ut_async_index) +add_subdirectory(ydb/core/tx/schemeshard/ut_backup) add_subdirectory(ydb/core/tx/schemeshard/ut_base) add_subdirectory(ydb/core/tx/schemeshard/ut_base_reboots) add_subdirectory(ydb/core/tx/schemeshard/ut_bsvolume) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 097ff1eb26c..0e8beb569c0 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -1166,6 +1166,7 @@ add_subdirectory(ydb/core/tx/scheme_board/ut_populator) add_subdirectory(ydb/core/tx/scheme_board/ut_replica) add_subdirectory(ydb/core/tx/scheme_board/ut_subscriber) add_subdirectory(ydb/core/tx/schemeshard/ut_async_index) +add_subdirectory(ydb/core/tx/schemeshard/ut_backup) add_subdirectory(ydb/core/tx/schemeshard/ut_base) add_subdirectory(ydb/core/tx/schemeshard/ut_base_reboots) add_subdirectory(ydb/core/tx/schemeshard/ut_bsvolume) diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 474a0260185..e6b1e0e8a89 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -900,6 +900,13 @@ message TBackupTask { optional TScanSettings ScanSettings = 11; optional bool NeedToBill = 12; + + message TCompressionOptions { + optional string Codec = 1; + optional int32 Level = 2; + } + + optional TCompressionOptions Compression = 13; // currently available for s3 } message TRestoreTask { diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index 2880f39156e..cb451d7fedd 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -16,6 +16,7 @@ target_link_libraries(core-tx-datashard PUBLIC yutil tools-enum_parser-enum_serialization_runtime library-cpp-resource + contrib-libs-zstd cpp-actors-core cpp-containers-flat_hash cpp-digest-md5 @@ -50,6 +51,7 @@ target_link_libraries(core-tx-datashard PUBLIC target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_table_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp @@ -197,11 +199,17 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_uploader.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/import_s3.cpp ) generate_enum_serilization(core-tx-datashard + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.h + INCLUDE_HEADERS + ydb/core/tx/datashard/backup_restore_traits.h +) +generate_enum_serilization(core-tx-datashard ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange.h INCLUDE_HEADERS ydb/core/tx/datashard/change_exchange.h @@ -251,6 +259,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC yutil tools-enum_parser-enum_serialization_runtime library-cpp-resource + contrib-libs-zstd cpp-actors-core cpp-containers-flat_hash cpp-digest-md5 diff --git a/ydb/core/tx/datashard/backup_restore_traits.cpp b/ydb/core/tx/datashard/backup_restore_traits.cpp new file mode 100644 index 00000000000..40e29adfc71 --- /dev/null +++ b/ydb/core/tx/datashard/backup_restore_traits.cpp @@ -0,0 +1,77 @@ +#include "backup_restore_traits.h" + +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/util/yverify_stream.h> + +#include <util/generic/hash.h> +#include <util/string/cast.h> + +namespace NKikimr { +namespace NDataShard { +namespace NBackupRestoreTraits { + +bool TryCodecFromTask(const NKikimrSchemeOp::TBackupTask& task, ECompressionCodec& codec) { + if (!task.HasCompression()) { + codec = ECompressionCodec::None; + return true; + } + + if (!TryFromString<ECompressionCodec>(task.GetCompression().GetCodec(), codec)) { + return false; + } + + if (codec == ECompressionCodec::Invalid) { + return false; + } + + return true; +} + +ECompressionCodec CodecFromTask(const NKikimrSchemeOp::TBackupTask& task) { + ECompressionCodec codec; + Y_VERIFY(TryCodecFromTask(task, codec)); + return codec; +} + +EDataFormat NextDataFormat(EDataFormat cur) { + switch (cur) { + case EDataFormat::Csv: + return EDataFormat::Invalid; + case EDataFormat::Invalid: + return EDataFormat::Invalid; + } +} + +ECompressionCodec NextCompressionCodec(ECompressionCodec cur) { + switch (cur) { + case ECompressionCodec::None: + return ECompressionCodec::Zstd; + case ECompressionCodec::Zstd: + return ECompressionCodec::Invalid; + case ECompressionCodec::Invalid: + return ECompressionCodec::Invalid; + } +} + +TString DataFileExtension(EDataFormat format, ECompressionCodec codec) { + static THashMap<EDataFormat, TString> formats = { + {EDataFormat::Csv, ".csv"}, + }; + + static THashMap<ECompressionCodec, TString> codecs = { + {ECompressionCodec::None, ""}, + {ECompressionCodec::Zstd, ".zst"}, + }; + + auto fit = formats.find(format); + Y_VERIFY_S(fit != formats.end(), "Unexpected format: " << format); + + auto cit = codecs.find(codec); + Y_VERIFY_S(cit != codecs.end(), "Unexpected codec: " << codec); + + return Sprintf("%s%s", fit->second.c_str(), cit->second.c_str()); +} + +} // NBackupRestoreTraits +} // NDataShard +} // NKikimr diff --git a/ydb/core/tx/datashard/backup_restore_traits.h b/ydb/core/tx/datashard/backup_restore_traits.h new file mode 100644 index 00000000000..78b93232c67 --- /dev/null +++ b/ydb/core/tx/datashard/backup_restore_traits.h @@ -0,0 +1,44 @@ +#pragma once + +#include <util/generic/string.h> +#include <util/string/printf.h> + +namespace NKikimrSchemeOp { + class TBackupTask; +} + +namespace NKikimr { +namespace NDataShard { +namespace NBackupRestoreTraits { + +enum class EDataFormat: int { + Invalid /* "invalid" */, + Csv /* "csv" */, +}; + +enum class ECompressionCodec: int { + Invalid /* "invalid" */, + None /* "none" */, + Zstd /* "zstd" */, +}; + +bool TryCodecFromTask(const NKikimrSchemeOp::TBackupTask& task, ECompressionCodec& codec); +ECompressionCodec CodecFromTask(const NKikimrSchemeOp::TBackupTask& task); + +EDataFormat NextDataFormat(EDataFormat cur); +ECompressionCodec NextCompressionCodec(ECompressionCodec cur); + +TString DataFileExtension(EDataFormat format, ECompressionCodec codec); + +inline TString SchemeKey(const TString& objKeyPattern) { + return Sprintf("%s/scheme.pb", objKeyPattern.c_str()); +} + +inline TString DataKey(const TString& objKeyPattern, ui32 n, EDataFormat format, ECompressionCodec codec) { + const auto ext = DataFileExtension(format, codec); + return Sprintf("%s/data_%02d%s", objKeyPattern.c_str(), n, ext.c_str()); +} + +} // NBackupRestoreTraits +} // NDataShard +} // NKikimr diff --git a/ydb/core/tx/datashard/backup_unit.cpp b/ydb/core/tx/datashard/backup_unit.cpp index a7a0f8429be..00e718ad660 100644 --- a/ydb/core/tx/datashard/backup_unit.cpp +++ b/ydb/core/tx/datashard/backup_unit.cpp @@ -1,5 +1,6 @@ #include "export_iface.h" #include "backup_restore_common.h" +#include "backup_restore_traits.h" #include "execution_unit_ctors.h" #include "export_scan.h" #include "export_s3.h" @@ -45,6 +46,11 @@ protected: std::shared_ptr<::NKikimr::NDataShard::IExport> exp; if (backup.HasYTSettings()) { + if (backup.HasCompression()) { + Abort(op, ctx, "Exports to YT do not support compression"); + return false; + } + if (auto* exportFactory = appData->DataShardExportFactory) { std::shared_ptr<IExport>(exportFactory->CreateExportToYt(backup, columns)).swap(exp); } else { @@ -52,6 +58,13 @@ protected: return false; } } else if (backup.HasS3Settings()) { + NBackupRestoreTraits::ECompressionCodec codec; + if (!TryCodecFromTask(backup, codec)) { + Abort(op, ctx, TStringBuilder() << "Unsupported compression codec" + << ": " << backup.GetCompression().GetCodec()); + return false; + } + if (auto* exportFactory = appData->DataShardExportFactory) { std::shared_ptr<IExport>(exportFactory->CreateExportToS3(backup, columns)).swap(exp); } else { diff --git a/ydb/core/tx/datashard/export_s3.h b/ydb/core/tx/datashard/export_s3.h index a922288d90f..13dbd8acbf7 100644 --- a/ydb/core/tx/datashard/export_s3.h +++ b/ydb/core/tx/datashard/export_s3.h @@ -3,6 +3,7 @@ #ifndef KIKIMR_DISABLE_S3_OPS #include "defs.h" +#include "backup_restore_traits.h" #include "export_iface.h" #include "export_s3_buffer.h" @@ -21,7 +22,16 @@ public: IActor* CreateUploader(const TActorId& dataShard, ui64 txId) const override; IBuffer* CreateBuffer(ui64 rowsLimit, ui64 bytesLimit) const override { - return CreateS3ExportBuffer(Columns, rowsLimit, bytesLimit); + using namespace NBackupRestoreTraits; + + switch (CodecFromTask(Task)) { + case ECompressionCodec::None: + return CreateS3ExportBufferRaw(Columns, rowsLimit, bytesLimit); + case ECompressionCodec::Zstd: + return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, rowsLimit, bytesLimit); + case ECompressionCodec::Invalid: + Y_FAIL("unreachable"); + } } void Shutdown() const override {} diff --git a/ydb/core/tx/datashard/export_s3_base_uploader.h b/ydb/core/tx/datashard/export_s3_base_uploader.h index 8c72a8c3936..3ca6ecf3553 100644 --- a/ydb/core/tx/datashard/export_s3_base_uploader.h +++ b/ydb/core/tx/datashard/export_s3_base_uploader.h @@ -13,10 +13,12 @@ #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> +#include <util/generic/buffer.h> #include <util/generic/maybe.h> #include <util/generic/ptr.h> #include <util/generic/string.h> #include <util/string/builder.h> +#include <util/string/cast.h> namespace NKikimr { namespace NDataShard { @@ -38,7 +40,7 @@ class TS3UploaderBase: public TActorBootstrapped<TDerived> , public IProxyOps { using TEvS3Wrapper = NWrappers::TEvS3Wrapper; - using TEvBuffer = TEvExportScan::TEvBuffer<TString>; + using TEvBuffer = TEvExportScan::TEvBuffer<TBuffer>; protected: void Restart() { @@ -52,7 +54,7 @@ protected: this->Send(std::exchange(Client, TActorId()), new TEvents::TEvPoisonPill()); } - Client = this->RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(Settings.Credentials, Settings.Config)); + Client = this->RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(Settings.GetCredentials(), Settings.GetConfig())); if (!SchemeUploaded) { this->Become(&TDerived::StateUploadScheme); @@ -79,9 +81,9 @@ protected: google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer); auto request = Model::PutObjectRequest() - .WithBucket(Settings.Bucket) - .WithKey(Settings.SchemeKey) - .WithStorageClass(Settings.StorageClass); + .WithBucket(Settings.GetBucket()) + .WithKey(Settings.GetSchemeKey()) + .WithStorageClass(Settings.GetStorageClass()); this->Send(Client, new TEvS3Wrapper::TEvPutObjectRequest(request, std::move(Buffer))); } @@ -137,7 +139,7 @@ protected: Last = ev->Get()->Last; MultiPart = MultiPart || !Last; - Buffer.swap(ev->Get()->Buffer); + ev->Get()->Buffer.AsString(Buffer); UploadData(); } @@ -145,9 +147,9 @@ protected: void UploadData() { if (!MultiPart) { auto request = Model::PutObjectRequest() - .WithBucket(Settings.Bucket) - .WithKey(Settings.DataKey) - .WithStorageClass(Settings.StorageClass); + .WithBucket(Settings.GetBucket()) + .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) + .WithStorageClass(Settings.GetStorageClass()); this->Send(Client, new TEvS3Wrapper::TEvPutObjectRequest(request, std::move(Buffer))); } else { if (!UploadId) { @@ -156,8 +158,8 @@ protected: } auto request = Model::UploadPartRequest() - .WithBucket(Settings.Bucket) - .WithKey(Settings.DataKey) + .WithBucket(Settings.GetBucket()) + .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) .WithUploadId(*UploadId) .WithPartNumber(Parts.size() + 1); this->Send(Client, new TEvS3Wrapper::TEvUploadPartRequest(request, std::move(Buffer))); @@ -187,9 +189,9 @@ protected: if (!upload) { auto request = Model::CreateMultipartUploadRequest() - .WithBucket(Settings.Bucket) - .WithKey(Settings.DataKey) - .WithStorageClass(Settings.StorageClass); + .WithBucket(Settings.GetBucket()) + .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) + .WithStorageClass(Settings.GetStorageClass()); this->Send(Client, new TEvS3Wrapper::TEvCreateMultipartUploadRequest(request)); } else { UploadId = upload->Id; @@ -209,8 +211,8 @@ protected: } auto request = Model::CompleteMultipartUploadRequest() - .WithBucket(Settings.Bucket) - .WithKey(Settings.DataKey) + .WithBucket(Settings.GetBucket()) + .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) .WithUploadId(*UploadId) .WithMultipartUpload(Model::CompletedMultipartUpload().WithParts(std::move(parts))); this->Send(Client, new TEvS3Wrapper::TEvCompleteMultipartUploadRequest(request)); @@ -224,8 +226,8 @@ protected: } auto request = Model::AbortMultipartUploadRequest() - .WithBucket(Settings.Bucket) - .WithKey(Settings.DataKey) + .WithBucket(Settings.GetBucket()) + .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec)) .WithUploadId(*UploadId); this->Send(Client, new TEvS3Wrapper::TEvAbortMultipartUploadRequest(request)); break; @@ -379,6 +381,8 @@ public: const NKikimrSchemeOp::TBackupTask& task, TMaybe<Ydb::Table::CreateTableRequest>&& scheme) : Settings(TS3Settings::FromBackupTask(task)) + , DataFormat(NBackupRestoreTraits::EDataFormat::Csv) + , CompressionCodec(NBackupRestoreTraits::CodecFromTask(task)) , DataShard(dataShard) , TxId(txId) , Scheme(std::move(scheme)) @@ -436,6 +440,8 @@ public: protected: TS3Settings Settings; + const NBackupRestoreTraits::EDataFormat DataFormat; + const NBackupRestoreTraits::ECompressionCodec CompressionCodec; bool ProxyResolved; private: diff --git a/ydb/core/tx/datashard/export_s3_buffer.cpp b/ydb/core/tx/datashard/export_s3_buffer.cpp deleted file mode 100644 index 470ed22d6b6..00000000000 --- a/ydb/core/tx/datashard/export_s3_buffer.cpp +++ /dev/null @@ -1,192 +0,0 @@ -#ifndef KIKIMR_DISABLE_S3_OPS - -#include "export_common.h" -#include "export_s3.h" -#include "export_s3_buffer.h" - -#include <ydb/core/tablet_flat/flat_row_state.h> -#include <ydb/library/binary_json/read.h> -#include <ydb/public/lib/scheme_types/scheme_type_id.h> -#include <library/cpp/string_utils/quote/quote.h> - -#include <util/datetime/base.h> - -namespace NKikimr { -namespace NDataShard { - -using namespace NExportScan; -using TTableColumns = TS3Export::TTableColumns; - -class TS3Buffer: public IBuffer { - using TTagToColumn = TTableColumns; - using TTagToIndex = THashMap<ui32, ui32>; // index in IScan::TRow - -public: - explicit TS3Buffer(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit) - : Columns(columns) - , RowsLimit(rowsLimit) - , BytesLimit(bytesLimit) - , Rows(0) - , BytesRead(0) - { - } - - void ColumnsOrder(const TVector<ui32>& tags) override { - Y_VERIFY(tags.size() == Columns.size()); - - Indices.clear(); - for (ui32 i = 0; i < tags.size(); ++i) { - const ui32 tag = tags.at(i); - auto it = Columns.find(tag); - Y_VERIFY(it != Columns.end()); - Y_VERIFY(Indices.emplace(tag, i).second); - } - } - - void Collect(const NTable::IScan::TRow& row) override { - TStringOutput stream(Buffer); - - bool needsComma = false; - for (const auto& [tag, column] : Columns) { - auto it = Indices.find(tag); - Y_VERIFY(it != Indices.end()); - Y_VERIFY(it->second < (*row).size()); - const auto& cell = (*row)[it->second]; - - BytesRead += cell.Size(); - - if (needsComma) { - stream << ","; - } else { - needsComma = true; - } - - if (cell.IsNull()) { - stream << "null"; - continue; - } - - switch (column.Type) { - case NScheme::NTypeIds::Int32: - stream << cell.AsValue<i32>(); - break; - case NScheme::NTypeIds::Uint32: - stream << cell.AsValue<ui32>(); - break; - case NScheme::NTypeIds::Int64: - stream << cell.AsValue<i64>(); - break; - case NScheme::NTypeIds::Uint64: - stream << cell.AsValue<ui64>(); - break; - case NScheme::NTypeIds::Uint8: - //case NScheme::NTypeIds::Byte: - stream << static_cast<ui32>(cell.AsValue<ui8>()); - break; - case NScheme::NTypeIds::Int8: - stream << static_cast<i32>(cell.AsValue<i8>()); - break; - case NScheme::NTypeIds::Int16: - stream << cell.AsValue<i16>(); - break; - case NScheme::NTypeIds::Uint16: - stream << cell.AsValue<ui16>(); - break; - case NScheme::NTypeIds::Bool: - stream << cell.AsValue<bool>(); - break; - case NScheme::NTypeIds::Double: - stream << cell.AsValue<double>(); - break; - case NScheme::NTypeIds::Float: - stream << cell.AsValue<float>(); - break; - case NScheme::NTypeIds::Date: - stream << TInstant::Days(cell.AsValue<ui16>()); - break; - case NScheme::NTypeIds::Datetime: - stream << TInstant::Seconds(cell.AsValue<ui32>()); - break; - case NScheme::NTypeIds::Timestamp: - stream << TInstant::MicroSeconds(cell.AsValue<ui64>()); - break; - case NScheme::NTypeIds::Interval: - stream << cell.AsValue<i64>(); - break; - case NScheme::NTypeIds::Decimal: - stream << DecimalToString(cell.AsValue<std::pair<ui64, i64>>()); - break; - case NScheme::NTypeIds::DyNumber: - stream << DyNumberToString(cell.AsBuf()); - break; - case NScheme::NTypeIds::String: - case NScheme::NTypeIds::String4k: - case NScheme::NTypeIds::String2m: - case NScheme::NTypeIds::Utf8: - case NScheme::NTypeIds::Json: - case NScheme::NTypeIds::Yson: - stream << '"' << CGIEscapeRet(cell.AsBuf()) << '"'; - break; - case NScheme::NTypeIds::JsonDocument: - stream << '"' << CGIEscapeRet(NBinaryJson::SerializeToJson(cell.AsBuf())) << '"'; - break; - default: - Y_FAIL("Unsupported type"); - }; - } - - stream << "\n"; - ++Rows; - } - - IEventBase* PrepareEvent(bool last) override { - return new TEvExportScan::TEvBuffer<TString>(Flush(), last); - } - - void Clear() override { - Flush(); - } - - TString Flush() { - Rows = 0; - BytesRead = 0; - return std::exchange(Buffer, TString()); - } - - bool IsFilled() const override { - return GetRows() >= RowsLimit || GetBytesSent() >= BytesLimit; - } - - ui64 GetRows() const override { - return Rows; - } - - ui64 GetBytesRead() const override { - return BytesRead; - } - - ui64 GetBytesSent() const override { - return Buffer.size(); - } - -private: - const TTagToColumn Columns; - const ui64 RowsLimit; - const ui64 BytesLimit; - - TTagToIndex Indices; - - ui64 Rows; - ui64 BytesRead; - TString Buffer; - -}; // TS3Buffer - -IBuffer* CreateS3ExportBuffer(const TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit) { - return new TS3Buffer(columns, rowsLimit, bytesLimit); -} - -} // NDataShard -} // NKikimr - -#endif // KIKIMR_DISABLE_S3_OPS diff --git a/ydb/core/tx/datashard/export_s3_buffer.h b/ydb/core/tx/datashard/export_s3_buffer.h index f616126dbed..35ef4078ffa 100644 --- a/ydb/core/tx/datashard/export_s3_buffer.h +++ b/ydb/core/tx/datashard/export_s3_buffer.h @@ -8,7 +8,11 @@ namespace NKikimr { namespace NDataShard { -NExportScan::IBuffer* CreateS3ExportBuffer(const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit); +NExportScan::IBuffer* CreateS3ExportBufferRaw( + const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit); + +NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel, + const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit); } // NDataShard } // NKikimr diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp new file mode 100644 index 00000000000..a496fd3885e --- /dev/null +++ b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp @@ -0,0 +1,179 @@ +#ifndef KIKIMR_DISABLE_S3_OPS + +#include "export_common.h" +#include "export_s3_buffer_raw.h" + +#include <ydb/core/tablet_flat/flat_row_state.h> +#include <ydb/library/binary_json/read.h> +#include <ydb/public/lib/scheme_types/scheme_type_id.h> + +#include <library/cpp/string_utils/quote/quote.h> + +#include <util/datetime/base.h> +#include <util/stream/buffer.h> + +namespace NKikimr { +namespace NDataShard { + +TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit) + : Columns(columns) + , RowsLimit(rowsLimit) + , BytesLimit(bytesLimit) + , Rows(0) + , BytesRead(0) +{ +} + +void TS3BufferRaw::ColumnsOrder(const TVector<ui32>& tags) { + Y_VERIFY(tags.size() == Columns.size()); + + Indices.clear(); + for (ui32 i = 0; i < tags.size(); ++i) { + const ui32 tag = tags.at(i); + auto it = Columns.find(tag); + Y_VERIFY(it != Columns.end()); + Y_VERIFY(Indices.emplace(tag, i).second); + } +} + +void TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) { + bool needsComma = false; + for (const auto& [tag, column] : Columns) { + auto it = Indices.find(tag); + Y_VERIFY(it != Indices.end()); + Y_VERIFY(it->second < (*row).size()); + const auto& cell = (*row)[it->second]; + + BytesRead += cell.Size(); + + if (needsComma) { + out << ","; + } else { + needsComma = true; + } + + if (cell.IsNull()) { + out << "null"; + continue; + } + + switch (column.Type) { + case NScheme::NTypeIds::Int32: + out << cell.AsValue<i32>(); + break; + case NScheme::NTypeIds::Uint32: + out << cell.AsValue<ui32>(); + break; + case NScheme::NTypeIds::Int64: + out << cell.AsValue<i64>(); + break; + case NScheme::NTypeIds::Uint64: + out << cell.AsValue<ui64>(); + break; + case NScheme::NTypeIds::Uint8: + //case NScheme::NTypeIds::Byte: + out << static_cast<ui32>(cell.AsValue<ui8>()); + break; + case NScheme::NTypeIds::Int8: + out << static_cast<i32>(cell.AsValue<i8>()); + break; + case NScheme::NTypeIds::Int16: + out << cell.AsValue<i16>(); + break; + case NScheme::NTypeIds::Uint16: + out << cell.AsValue<ui16>(); + break; + case NScheme::NTypeIds::Bool: + out << cell.AsValue<bool>(); + break; + case NScheme::NTypeIds::Double: + out << cell.AsValue<double>(); + break; + case NScheme::NTypeIds::Float: + out << cell.AsValue<float>(); + break; + case NScheme::NTypeIds::Date: + out << TInstant::Days(cell.AsValue<ui16>()); + break; + case NScheme::NTypeIds::Datetime: + out << TInstant::Seconds(cell.AsValue<ui32>()); + break; + case NScheme::NTypeIds::Timestamp: + out << TInstant::MicroSeconds(cell.AsValue<ui64>()); + break; + case NScheme::NTypeIds::Interval: + out << cell.AsValue<i64>(); + break; + case NScheme::NTypeIds::Decimal: + out << DecimalToString(cell.AsValue<std::pair<ui64, i64>>()); + break; + case NScheme::NTypeIds::DyNumber: + out << DyNumberToString(cell.AsBuf()); + break; + case NScheme::NTypeIds::String: + case NScheme::NTypeIds::String4k: + case NScheme::NTypeIds::String2m: + case NScheme::NTypeIds::Utf8: + case NScheme::NTypeIds::Json: + case NScheme::NTypeIds::Yson: + out << '"' << CGIEscapeRet(cell.AsBuf()) << '"'; + break; + case NScheme::NTypeIds::JsonDocument: + out << '"' << CGIEscapeRet(NBinaryJson::SerializeToJson(cell.AsBuf())) << '"'; + break; + default: + Y_FAIL("Unsupported type"); + }; + } + + out << "\n"; + ++Rows; +} + +bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) { + TBufferOutput out(Buffer); + Collect(row, out); + return true; +} + +IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) { + stats.Rows = Rows; + stats.BytesRead = BytesRead; + + auto buffer = Flush(true); + if (!buffer) { + return nullptr; + } + + stats.BytesSent = buffer->Size(); + return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last); +} + +void TS3BufferRaw::Clear() { + Y_VERIFY(Flush(false)); +} + +bool TS3BufferRaw::IsFilled() const { + return Rows >= GetRowsLimit() || Buffer.Size() >= GetBytesLimit(); +} + +TString TS3BufferRaw::GetError() const { + Y_FAIL("unreachable"); +} + +TMaybe<TBuffer> TS3BufferRaw::Flush(bool) { + Rows = 0; + BytesRead = 0; + return std::exchange(Buffer, TBuffer()); +} + +NExportScan::IBuffer* CreateS3ExportBufferRaw( + const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit) +{ + return new TS3BufferRaw(columns, rowsLimit, bytesLimit); +} + +} // NDataShard +} // NKikimr + +#endif // KIKIMR_DISABLE_S3_OPS diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.h b/ydb/core/tx/datashard/export_s3_buffer_raw.h new file mode 100644 index 00000000000..25efb7977f5 --- /dev/null +++ b/ydb/core/tx/datashard/export_s3_buffer_raw.h @@ -0,0 +1,50 @@ +#pragma once + +#ifndef KIKIMR_DISABLE_S3_OPS + +#include "export_s3_buffer.h" + +#include <util/generic/buffer.h> + +namespace NKikimr { +namespace NDataShard { + +class TS3BufferRaw: public NExportScan::IBuffer { + using TTagToColumn = IExport::TTableColumns; + using TTagToIndex = THashMap<ui32, ui32>; // index in IScan::TRow + +public: + explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit); + + void ColumnsOrder(const TVector<ui32>& tags) override; + bool Collect(const NTable::IScan::TRow& row) override; + IEventBase* PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) override; + void Clear() override; + bool IsFilled() const override; + TString GetError() const override; + +protected: + inline ui64 GetRowsLimit() const { return RowsLimit; } + inline ui64 GetBytesLimit() const { return BytesLimit; } + + void Collect(const NTable::IScan::TRow& row, IOutputStream& out); + virtual TMaybe<TBuffer> Flush(bool prepare); + +private: + const TTagToColumn Columns; + const ui64 RowsLimit; + const ui64 BytesLimit; + + TTagToIndex Indices; + +protected: + ui64 Rows; + ui64 BytesRead; + TBuffer Buffer; + +}; // TS3BufferRaw + +} // NDataShard +} // NKikimr + +#endif // KIKIMR_DISABLE_S3_OPS diff --git a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp new file mode 100644 index 00000000000..e229916aeef --- /dev/null +++ b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp @@ -0,0 +1,124 @@ +#ifndef KIKIMR_DISABLE_S3_OPS + +#include "export_s3_buffer_raw.h" + +#include <contrib/libs/zstd/include/zstd.h> + +namespace { + + struct DestroyZCtx { + static void Destroy(::ZSTD_CCtx* p) noexcept { + ZSTD_freeCCtx(p); + } + }; + +} // anonymous + +namespace NKikimr { +namespace NDataShard { + +class TS3BufferZstd: public TS3BufferRaw { + enum ECompressionResult { + CONTINUE, + DONE, + ERROR, + }; + + ECompressionResult Compress(ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { + auto output = ZSTD_outBuffer{Buffer.Data(), Buffer.Capacity(), Buffer.Size()}; + auto res = ZSTD_compressStream2(Context.Get(), &output, input, endOp); + + if (ZSTD_isError(res)) { + ErrorCode = res; + return ERROR; + } + + if (res > 0) { + Buffer.Reserve(output.pos + res); + } + + Buffer.Proceed(output.pos); + return res ? CONTINUE : DONE; + } + + void Reset() { + ZSTD_CCtx_reset(Context.Get(), ZSTD_reset_session_only); + ZSTD_CCtx_refCDict(Context.Get(), NULL); + ZSTD_CCtx_setParameter(Context.Get(), ZSTD_c_compressionLevel, CompressionLevel); + } + +public: + explicit TS3BufferZstd(int compressionLevel, + const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit) + : TS3BufferRaw(columns, rowsLimit, bytesLimit) + , CompressionLevel(compressionLevel) + , Context(ZSTD_createCCtx()) + , BytesRaw(0) + { + Reset(); + } + + bool Collect(const NTable::IScan::TRow& row) override { + BufferRaw.clear(); + TStringOutput out(BufferRaw); + TS3BufferRaw::Collect(row, out); + + BytesRaw += BufferRaw.size(); + + auto input = ZSTD_inBuffer{BufferRaw.data(), BufferRaw.size(), 0}; + while (input.pos < input.size) { + if (ERROR == Compress(&input, ZSTD_e_continue)) { + return false; + } + } + + return true; + } + + bool IsFilled() const override { + return Rows >= GetRowsLimit() || BytesRaw >= GetBytesLimit(); + } + + TString GetError() const override { + return ZSTD_getErrorName(ErrorCode); + } + +protected: + TMaybe<TBuffer> Flush(bool prepare) override { + if (prepare && Buffer) { + ECompressionResult res; + auto input = ZSTD_inBuffer{NULL, 0, 0}; + + do { + if (res = Compress(&input, ZSTD_e_end); res == ERROR) { + return Nothing(); + } + } while (res != DONE); + } + + Reset(); + + BytesRaw = 0; + return TS3BufferRaw::Flush(prepare); + } + +private: + const int CompressionLevel; + + THolder<::ZSTD_CCtx, DestroyZCtx> Context; + size_t ErrorCode; + ui64 BytesRaw; + TString BufferRaw; + +}; // TS3BufferZstd + +NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel, + const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit) +{ + return new TS3BufferZstd(compressionLevel, columns, rowsLimit, bytesLimit); +} + +} // NDataShard +} // NKikimr + +#endif // KIKIMR_DISABLE_S3_OPS diff --git a/ydb/core/tx/datashard/export_scan.cpp b/ydb/core/tx/datashard/export_scan.cpp index b0c40aec224..d1995c2f863 100644 --- a/ydb/core/tx/datashard/export_scan.cpp +++ b/ydb/core/tx/datashard/export_scan.cpp @@ -28,15 +28,9 @@ class TExportScan: private NActors::IActor, public NTable::IScan { ES_COUNT, }; - struct TStats { - ui64 Rows; - ui64 BytesRead; - ui64 BytesSent; - + struct TStats: public IBuffer::TStats { TStats() - : Rows(0) - , BytesRead(0) - , BytesSent(0) + : IBuffer::TStats() { auto counters = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "store_to_yt"); @@ -55,8 +49,8 @@ class TExportScan: private NActors::IActor, public NTable::IScan { *MonBytesSent += bytesSent; } - void Aggr(IBuffer const* buffer) { - Aggr(buffer->GetRows(), buffer->GetBytesRead(), buffer->GetBytesSent()); + void Aggr(const IBuffer::TStats& stats) { + Aggr(stats.Rows, stats.BytesRead, stats.BytesSent); } TString ToString() const { @@ -96,9 +90,18 @@ class TExportScan: private NActors::IActor, public NTable::IScan { return EScan::Sleep; } + IBuffer::TStats stats; + THolder<IEventBase> ev{Buffer->PrepareEvent(noMoreData, stats)}; + + if (!ev) { + Success = false; + Error = Buffer->GetError(); + return EScan::Final; + } + + Send(Uploader, std::move(ev)); State.Set(ES_BUFFER_SENT); - Stats->Aggr(Buffer.Get()); - Send(Uploader, Buffer->PrepareEvent(noMoreData)); + Stats->Aggr(stats); if (noMoreData) { Spent->Alter(false); @@ -199,7 +202,12 @@ public: } EScan Feed(TArrayRef<const TCell>, const TRow& row) noexcept override { - Buffer->Collect(row); + if (!Buffer->Collect(row)) { + Success = false; + Error = Buffer->GetError(); + return EScan::Final; + } + return MaybeSendBuffer(); } diff --git a/ydb/core/tx/datashard/export_scan.h b/ydb/core/tx/datashard/export_scan.h index d077576a607..9d983be5954 100644 --- a/ydb/core/tx/datashard/export_scan.h +++ b/ydb/core/tx/datashard/export_scan.h @@ -35,7 +35,7 @@ struct TEvExportScan { TEvBuffer() = default; - explicit TEvBuffer(TBuffer buffer, bool last) + explicit TEvBuffer(TBuffer&& buffer, bool last) : Buffer(std::move(buffer)) , Last(last) { @@ -92,17 +92,21 @@ class IBuffer { public: using TPtr = THolder<IBuffer>; + struct TStats { + ui64 Rows = 0; + ui64 BytesRead = 0; + ui64 BytesSent = 0; + }; + public: virtual ~IBuffer() = default; virtual void ColumnsOrder(const TVector<ui32>& tags) = 0; - virtual void Collect(const NTable::IScan::TRow& row) = 0; - virtual IEventBase* PrepareEvent(bool last) = 0; + virtual bool Collect(const NTable::IScan::TRow& row) = 0; + virtual IEventBase* PrepareEvent(bool last, TStats& stats) = 0; virtual void Clear() = 0; virtual bool IsFilled() const = 0; - virtual ui64 GetRows() const = 0; - virtual ui64 GetBytesRead() const = 0; - virtual ui64 GetBytesSent() const = 0; + virtual TString GetError() const = 0; }; } // NExportScan diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp index 3d4f20c2e22..56630c389d8 100644 --- a/ydb/core/tx/datashard/import_s3.cpp +++ b/ydb/core/tx/datashard/import_s3.cpp @@ -13,16 +13,32 @@ #include <ydb/core/io_formats/csv.h> #include <ydb/public/lib/scheme_types/scheme_type_id.h> +#include <contrib/libs/zstd/include/zstd.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +#include <util/generic/buffer.h> #include <util/generic/ptr.h> #include <util/generic/string.h> #include <util/generic/vector.h> #include <util/memory/pool.h> #include <util/string/builder.h> +namespace { + + struct DestroyZCtx { + static void Destroy(::ZSTD_DCtx* p) noexcept { + ZSTD_freeDCtx(p); + } + }; + + constexpr ui64 SumWithSaturation(ui64 a, ui64 b) { + return Max<ui64>() - a < b ? Max<ui64>() : a + b; + } + +} // anonymous + namespace NKikimr { namespace NDataShard { @@ -33,12 +49,28 @@ using namespace Aws::S3; using namespace Aws; class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { - class TReadController { - static constexpr ui64 SumWithSaturation(ui64 a, ui64 b) { - return Max<ui64>() - a < b ? Max<ui64>() : a + b; - } + class IReadController { + public: + enum EFeed { + READY_DATA = 0, + NOT_ENOUGH_DATA = 1, + ERROR = 2, + }; public: + virtual ~IReadController() = default; + // Returns status or error + virtual EFeed Feed(TString&& portion, TString& error) = 0; + // Returns view to internal buffer with ready data after successful Feed() + virtual TStringBuf GetReadyData() const = 0; + // Clear internal buffer & makes it ready for another Feed() + virtual void Confirm() = 0; + virtual ui64 PendingBytes() const = 0; + virtual ui64 ReadyBytes() const = 0; + }; + + class TReadController: public IReadController { + public: explicit TReadController(ui32 rangeSize, ui64 bufferSizeLimit) : RangeSize(rangeSize) , BufferSizeLimit(bufferSizeLimit) @@ -49,58 +81,170 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { Y_VERIFY(contentLength > 0); Y_VERIFY(processedBytes < contentLength); - const ui64 start = processedBytes + (Buffer.size() - Pos); + const ui64 start = processedBytes + PendingBytes(); const ui64 end = Min(SumWithSaturation(start, RangeSize), contentLength) - 1; return std::make_pair(start, end); } - bool Feed(TString&& portion, TStringBuf& buf) { - if (Buffer && Pos) { - Buffer.remove(0, Pos); - Pos = 0; + protected: + bool CheckBufferSize(size_t size, TString& reason) const { + if (size >= BufferSizeLimit) { + reason = "reached buffer size limit"; + return false; } - if (!Buffer) { - Buffer = std::move(portion); - } else { - Buffer.append(portion); - } + return true; + } + + TStringBuf AsStringBuf(size_t size) const { + return TStringBuf(Buffer.Data(), size); + } + + private: + const ui32 RangeSize; + const ui64 BufferSizeLimit; + + protected: + TBuffer Buffer; + + }; // TReadController + + class TReadControllerRaw: public TReadController { + public: + using TReadController::TReadController; + + EFeed Feed(TString&& portion, TString& error) override { + Y_VERIFY(Pos == 0); + + Buffer.Append(portion.data(), portion.size()); + const ui64 pos = AsStringBuf(Buffer.Size()).rfind('\n'); - const ui64 pos = Buffer.rfind('\n'); if (TString::npos == pos) { - return false; + if (!CheckBufferSize(Buffer.Size(), error)) { + return ERROR; + } else { + return NOT_ENOUGH_DATA; + } } Pos = pos + 1; // for '\n' - buf = TStringBuf(Buffer.data(), Pos); + return READY_DATA; + } - return true; + TStringBuf GetReadyData() const override { + return AsStringBuf(Pos); } - bool CanExpandBuffer(ui64 contentLength, ui64 processedBytes, TString& reason) const { - const auto size = Buffer.size(); + void Confirm() override { + Buffer.ChopHead(Pos); + Pos = 0; + } - if (SumWithSaturation(processedBytes, size) >= contentLength) { - reason = "reached end of file"; - return false; + ui64 PendingBytes() const override { + return Buffer.Size(); + } + + ui64 ReadyBytes() const override { + return Pos; + } + + private: + ui64 Pos = 0; + }; + + class TReadControllerZstd: public TReadController { + void Reset() { + ZSTD_DCtx_reset(Context.Get(), ZSTD_reset_session_only); + ZSTD_DCtx_refDDict(Context.Get(), NULL); + } + + public: + explicit TReadControllerZstd(ui32 rangeSize, ui64 bufferSizeLimit) + : TReadController(rangeSize, bufferSizeLimit) + , Context(ZSTD_createDCtx()) + { + Reset(); + } + + EFeed Feed(TString&& portion, TString& error) override { + Y_VERIFY(ReadyInputBytes == 0 && ReadyOutputPos == 0); + + auto input = ZSTD_inBuffer{portion.data(), portion.size(), 0}; + while (input.pos < input.size) { + PendingInputBytes -= input.pos; // dec before decompress + + auto output = ZSTD_outBuffer{Buffer.Data(), Buffer.Capacity(), PendingOutputPos}; + auto res = ZSTD_decompressStream(Context.Get(), &output, &input); + + if (ZSTD_isError(res)) { + error = ZSTD_getErrorName(res); + return ERROR; + } + + if (output.pos == output.size) { + if (!CheckBufferSize(Buffer.Capacity(), error)) { + return ERROR; + } + + Buffer.Reserve(Buffer.Capacity() + ZSTD_BLOCKSIZE_MAX); + } + + PendingInputBytes += input.pos; // inc after decompress + PendingOutputPos = output.pos; + + if (res == 0) { + if (AsStringBuf(output.pos).back() != '\n') { + error = "cannot find new line symbol"; + return ERROR; + } + + ReadyInputBytes = PendingInputBytes; + ReadyOutputPos = PendingOutputPos; + Reset(); + } } - if (size >= BufferSizeLimit) { - reason = "reached buffer size limit"; - return false; + if (!ReadyOutputPos) { + if (!CheckBufferSize(PendingOutputPos, error)) { + return ERROR; + } else { + return NOT_ENOUGH_DATA; + } } - return true; + Buffer.Proceed(ReadyOutputPos); + return READY_DATA; } - private: - const ui32 RangeSize; - const ui64 BufferSizeLimit; + TStringBuf GetReadyData() const override { + return AsStringBuf(ReadyOutputPos); + } - TString Buffer; - ui64 Pos = 0; + void Confirm() override { + Buffer.ChopHead(ReadyOutputPos); - }; // TReadController + PendingInputBytes -= ReadyInputBytes; + ReadyInputBytes = 0; + + PendingOutputPos -= ReadyOutputPos; + ReadyOutputPos = 0; + } + + ui64 PendingBytes() const override { + return PendingInputBytes; + } + + ui64 ReadyBytes() const override { + return ReadyInputBytes; + } + + private: + THolder<::ZSTD_DCtx, DestroyZCtx> Context; + ui64 PendingInputBytes = 0; + ui64 ReadyInputBytes = 0; + ui64 PendingOutputPos = 0; + ui64 ReadyOutputPos = 0; + }; class TUploadRowsRequestBuilder { public: @@ -176,9 +320,9 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { Send(client, new TEvents::TEvPoisonPill()); } - Client = RegisterWithSameMailbox(CreateS3Wrapper(Settings.Credentials, Settings.Config)); + Client = RegisterWithSameMailbox(CreateS3Wrapper(Settings.GetCredentials(), Settings.GetConfig())); - HeadObject(Settings.DataKey); + HeadObject(Settings.GetDataKey(DataFormat, CompressionCodec)); Become(&TThis::StateWork); } @@ -187,7 +331,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { << ": key# " << key); auto request = Model::HeadObjectRequest() - .WithBucket(Settings.Bucket) + .WithBucket(Settings.GetBucket()) .WithKey(key); Send(Client, new TEvS3Wrapper::TEvHeadObjectRequest(request)); @@ -199,7 +343,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { << ", range# " << range.first << "-" << range.second); auto request = Model::GetObjectRequest() - .WithBucket(Settings.Bucket) + .WithBucket(Settings.GetBucket()) .WithKey(key) .WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second); @@ -210,8 +354,35 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { IMPORT_LOG_D("Handle " << ev->Get()->ToString()); const auto& result = ev->Get()->Result; - if (!CheckResult(result, TStringBuf("HeadObject"))) { - return; + if (!result.IsSuccess()) { + switch (result.GetError().GetErrorType()) { + case S3Errors::RESOURCE_NOT_FOUND: + case S3Errors::NO_SUCH_KEY: + break; + default: + IMPORT_LOG_E("Error at 'HeadObject'" + << ": error# " << result); + return RestartOrFinish(result.GetError().GetMessage().c_str()); + } + + CompressionCodec = NBackupRestoreTraits::NextCompressionCodec(CompressionCodec); + if (CompressionCodec == NBackupRestoreTraits::ECompressionCodec::Invalid) { + return Finish(false, TStringBuilder() << "Cannot find any supported data file" + << ": prefix# " << Settings.GetObjectKeyPattern()); + } + + return HeadObject(Settings.GetDataKey(DataFormat, CompressionCodec)); + } + + switch (CompressionCodec) { + case NBackupRestoreTraits::ECompressionCodec::None: + Reader.Reset(new TReadControllerRaw(ReadBatchSize, ReadBufferSizeLimit)); + break; + case NBackupRestoreTraits::ECompressionCodec::Zstd: + Reader.Reset(new TReadControllerZstd(ReadBatchSize, ReadBufferSizeLimit)); + break; + case NBackupRestoreTraits::ECompressionCodec::Invalid: + Y_FAIL("unreachable"); } ETag = result.GetResult().GetETag(); @@ -251,7 +422,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { return Finish(); } - GetObject(Settings.DataKey, Reader.NextRange(ContentLength, ProcessedBytes)); + GetObject(Settings.GetDataKey(DataFormat, CompressionCodec), + Reader->NextRange(ContentLength, ProcessedBytes)); } void Handle(TEvS3Wrapper::TEvGetObjectResponse::TPtr& ev) { @@ -274,34 +446,46 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { << ", content-length# " << ContentLength << ", body-size# " << msg.Body.size()); - if (!Reader.Feed(std::move(msg.Body), Buffer)) { - TString reason; - if (!Reader.CanExpandBuffer(ContentLength, ProcessedBytes, reason)) { - return Finish(false, TStringBuilder() << "Cannot find new line symbol in data" - << ": " << reason); + TString error; + switch (Reader->Feed(std::move(msg.Body), error)) { + case TReadController::READY_DATA: + break; + + case TReadController::NOT_ENOUGH_DATA: + if (SumWithSaturation(ProcessedBytes, Reader->PendingBytes()) < ContentLength) { + return GetObject(Settings.GetDataKey(DataFormat, CompressionCodec), + Reader->NextRange(ContentLength, ProcessedBytes)); + } else { + error = "reached end of file"; } + [[fallthrough]]; - IMPORT_LOG_W("Cannot find new line symbol, request additional data" - << ": processed-bytes# " << ProcessedBytes - << ", content-length# " << ContentLength); - return GetObject(Settings.DataKey, Reader.NextRange(ContentLength, ProcessedBytes)); + default: // ERROR + return Finish(false, TStringBuilder() << "Cannot process data" + << ": " << error); } - ProcessedBytes += Buffer.size(); - RequestBuilder.New(TableInfo, Scheme); + if (auto data = Reader->GetReadyData()) { + ProcessedBytes += Reader->ReadyBytes(); + RequestBuilder.New(TableInfo, Scheme); + + TMemoryPool pool(256); + while (ProcessData(data, pool)); - TMemoryPool pool(256); - while (ProcessData(pool)); + Reader->Confirm(); + } else { + Y_FAIL("unreachable"); + } } - bool ProcessData(TMemoryPool& pool) { + bool ProcessData(TStringBuf& data, TMemoryPool& pool) { pool.Clear(); - TStringBuf line = Buffer.NextTok('\n'); + TStringBuf line = data.NextTok('\n'); const TStringBuf origLine = line; if (!line) { - if (Buffer) { + if (data) { return true; // skip empty line } @@ -508,11 +692,14 @@ public: : DataShard(dataShard) , TxId(txId) , Settings(TS3Settings::FromRestoreTask(task)) + , DataFormat(NBackupRestoreTraits::EDataFormat::Csv) + , CompressionCodec(NBackupRestoreTraits::ECompressionCodec::None) , TableInfo(tableInfo) , Scheme(task.GetTableDescription()) , LogPrefix_(TStringBuilder() << "s3:" << TxId) , Retries(task.GetNumberOfRetries()) - , Reader(task.GetS3Settings().GetLimits().GetReadBatchSize(), task.GetS3Settings().GetLimits().GetReadBufferSizeLimit()) + , ReadBatchSize(task.GetS3Settings().GetLimits().GetReadBatchSize()) + , ReadBufferSizeLimit(task.GetS3Settings().GetLimits().GetReadBufferSizeLimit()) { } @@ -551,6 +738,8 @@ private: const TActorId DataShard; const ui64 TxId; const TS3Settings Settings; + const NBackupRestoreTraits::EDataFormat DataFormat; + NBackupRestoreTraits::ECompressionCodec CompressionCodec; const TTableInfo TableInfo; const NKikimrSchemeOp::TTableDescription Scheme; const TString LogPrefix_; @@ -570,8 +759,9 @@ private: ui64 WrittenBytes = 0; ui64 WrittenRows = 0; - TReadController Reader; - TStringBuf Buffer; + const ui32 ReadBatchSize; + const ui64 ReadBufferSizeLimit; + THolder<TReadController> Reader; TUploadRowsRequestBuilder RequestBuilder; }; // TS3Downloader diff --git a/ydb/core/tx/datashard/s3_common.h b/ydb/core/tx/datashard/s3_common.h index 8e4b4c14cb9..db81617d01d 100644 --- a/ydb/core/tx/datashard/s3_common.h +++ b/ydb/core/tx/datashard/s3_common.h @@ -3,6 +3,7 @@ #ifndef KIKIMR_DISABLE_S3_OPS #include "defs.h" +#include "backup_restore_traits.h" #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/auth/AWSCredentials.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/client/ClientConfiguration.h> @@ -10,7 +11,6 @@ #include <ydb/core/protos/flat_scheme_op.pb.h> -#include <util/string/builder.h> #include <util/string/printf.h> namespace NKikimr { @@ -22,6 +22,7 @@ inline Aws::Client::ClientConfiguration ConfigFromSettings(const NKikimrSchemeOp config.endpointOverride = settings.GetEndpoint(); config.connectTimeoutMs = 10000; config.maxConnections = 5; + config.caPath = "/etc/ssl/certs"; switch (settings.GetScheme()) { case NKikimrSchemeOp::TS3Settings::HTTP: @@ -61,31 +62,44 @@ inline Aws::Auth::AWSCredentials CredentialsFromSettings(const NKikimrSchemeOp:: return Aws::Auth::AWSCredentials(settings.GetAccessKey(), settings.GetSecretKey()); } -struct TS3Settings { +class TS3Settings { Aws::Client::ClientConfiguration Config; - Aws::Auth::AWSCredentials Credentials; - TString Bucket; - TString SchemeKey; - TString DataKey; - Aws::S3::Model::StorageClass StorageClass; + const Aws::Auth::AWSCredentials Credentials; + const TString Bucket; + const TString ObjectKeyPattern; + const ui32 Shard; + const Ydb::Export::ExportToS3Settings::StorageClass StorageClass; -private: explicit TS3Settings(const NKikimrSchemeOp::TS3Settings& settings, ui32 shard) : Config(ConfigFromSettings(settings)) , Credentials(CredentialsFromSettings(settings)) , Bucket(settings.GetBucket()) - , SchemeKey(TStringBuilder() << settings.GetObjectKeyPattern() << "/scheme.pb") - , DataKey(TStringBuilder() << settings.GetObjectKeyPattern() << Sprintf("/data_%02d.csv", shard)) - , StorageClass(ConvertStorageClass(settings.GetStorageClass())) + , ObjectKeyPattern(settings.GetObjectKeyPattern()) + , Shard(shard) + , StorageClass(settings.GetStorageClass()) { - Config.caPath = "/etc/ssl/certs"; } - static Aws::S3::Model::StorageClass ConvertStorageClass(Ydb::Export::ExportToS3Settings::StorageClass value) { +public: + static TS3Settings FromBackupTask(const NKikimrSchemeOp::TBackupTask& task) { + return TS3Settings(task.GetS3Settings(), task.GetShardNum()); + } + + static TS3Settings FromRestoreTask(const NKikimrSchemeOp::TRestoreTask& task) { + return TS3Settings(task.GetS3Settings(), task.GetShardNum()); + } + + inline const Aws::Client::ClientConfiguration& GetConfig() const { return Config; } + inline Aws::Client::ClientConfiguration& ConfigRef() { return Config; } + inline const Aws::Auth::AWSCredentials& GetCredentials() const { return Credentials; } + inline const TString& GetBucket() const { return Bucket; } + inline const TString& GetObjectKeyPattern() const { return ObjectKeyPattern; } + + inline Aws::S3::Model::StorageClass GetStorageClass() const { using ExportToS3Settings = Ydb::Export::ExportToS3Settings; using AwsStorageClass = Aws::S3::Model::StorageClass; - switch (value) { + switch (StorageClass) { case ExportToS3Settings::STORAGE_CLASS_UNSPECIFIED: return AwsStorageClass::NOT_SET; case ExportToS3Settings::STANDARD: @@ -109,13 +123,15 @@ private: } } -public: - static TS3Settings FromBackupTask(const NKikimrSchemeOp::TBackupTask& task) { - return TS3Settings(task.GetS3Settings(), task.GetShardNum()); + inline TString GetSchemeKey() const { + return NBackupRestoreTraits::SchemeKey(ObjectKeyPattern); } - static TS3Settings FromRestoreTask(const NKikimrSchemeOp::TRestoreTask& task) { - return TS3Settings(task.GetS3Settings(), task.GetShardNum()); + inline TString GetDataKey( + NBackupRestoreTraits::EDataFormat format, + NBackupRestoreTraits::ECompressionCodec codec) const + { + return NBackupRestoreTraits::DataKey(ObjectKeyPattern, Shard, format, codec); } }; // TS3Settings diff --git a/ydb/core/tx/schemeshard/ut_backup.cpp b/ydb/core/tx/schemeshard/ut_backup.cpp new file mode 100644 index 00000000000..6342395177e --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_backup.cpp @@ -0,0 +1,120 @@ +#include "ut_backup_restore_common.h" + +#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> +#include <ydb/core/wrappers/ut_helpers/s3_mock.h> + +#include <util/string/cast.h> +#include <util/string/printf.h> + +using namespace NSchemeShardUT_Private; +using namespace NKikimr::NWrappers::NTestHelpers; + +Y_UNIT_TEST_SUITE(TBackupTests) { + using TFillFn = std::function<void(TTestBasicRuntime&)>; + + void Backup(TTestBasicRuntime& runtime, const TString& compressionCodec, + const TString& creationScheme, TFillFn fill, ui32 rowsBatchSize = 128) + { + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TTestEnv env(runtime); + ui64 txId = 100; + + runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); + + TestCreateTable(runtime, ++txId, "/MyRoot", creationScheme); + env.TestWaitNotification(runtime, txId); + + fill(runtime); + + const auto tableDesc = DescribePath(runtime, "/MyRoot/Table", true, true); + TString tableSchema; + UNIT_ASSERT(google::protobuf::TextFormat::PrintToString(tableDesc.GetPathDescription(), &tableSchema)); + + TestBackup(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + Table { + %s + } + S3Settings { + Endpoint: "localhost:%d" + Scheme: HTTP + } + ScanSettings { + RowsBatchSize: %d + } + Compression { + Codec: "%s" + } + )", tableSchema.c_str(), port, rowsBatchSize, compressionCodec.c_str())); + env.TestWaitNotification(runtime, txId); + } + + void WriteRow(TTestBasicRuntime& runtime, ui64 tabletId, const TString& key, const TString& value) { + NKikimrMiniKQL::TResult result; + TString error; + NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"( + ( + (let key '( '('key (Utf8 '%s) ) ) ) + (let row '( '('value (Utf8 '%s) ) ) ) + (return (AsList (UpdateRow '__user__Table key row) )) + ) + )", key.c_str(), value.c_str()), result, error); + + UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error); + UNIT_ASSERT_VALUES_EQUAL(error, ""); + } + + Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnSingleShardTable) { + TTestBasicRuntime runtime; + + Backup(runtime, ToString(Codec), R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", [](TTestBasicRuntime& runtime) { + WriteRow(runtime, TTestTxConfig::FakeHiveTablets, "a", "valueA"); + }); + } + + Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnMultiShardTable) { + TTestBasicRuntime runtime; + + Backup(runtime, ToString(Codec), R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + SplitBoundary { + KeyPrefix { + Tuple { Optional { Text: "b" } } + } + } + )", [](TTestBasicRuntime& runtime) { + WriteRow(runtime, TTestTxConfig::FakeHiveTablets + 0, "a", "valueA"); + WriteRow(runtime, TTestTxConfig::FakeHiveTablets + 1, "b", "valueb"); + }); + } + + Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnLargeData) { + TTestBasicRuntime runtime; + const ui32 batchSize = 10; + + Backup(runtime, ToString(Codec), R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", [](TTestBasicRuntime& runtime) { + for (ui32 i = 0; i < 2 * batchSize; ++i) { + WriteRow(runtime, TTestTxConfig::FakeHiveTablets, Sprintf("a%d", i), "valueA"); + } + }, batchSize); + } + +} // TBackupTests diff --git a/ydb/core/tx/schemeshard/ut_backup/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.darwin.txt new file mode 100644 index 00000000000..74529d7845c --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.darwin.txt @@ -0,0 +1,54 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-schemeshard-ut_backup) +target_compile_options(ydb-core-tx-schemeshard-ut_backup PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-schemeshard-ut_backup PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(ydb-core-tx-schemeshard-ut_backup PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + ydb-core-testlib + ydb-core-tx + tx-schemeshard-ut_helpers + core-wrappers-ut_helpers + udf-service-exception_policy +) +target_link_options(ydb-core-tx-schemeshard-ut_backup PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-tx-schemeshard-ut_backup PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_backup.cpp +) +add_test( + NAME + ydb-core-tx-schemeshard-ut_backup + COMMAND + ydb-core-tx-schemeshard-ut_backup + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-tx-schemeshard-ut_backup) diff --git a/ydb/core/tx/schemeshard/ut_backup/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.linux.txt new file mode 100644 index 00000000000..f9277e31c17 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.linux.txt @@ -0,0 +1,57 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-schemeshard-ut_backup) +target_compile_options(ydb-core-tx-schemeshard-ut_backup PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-schemeshard-ut_backup PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(ydb-core-tx-schemeshard-ut_backup PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + ydb-core-testlib + ydb-core-tx + tx-schemeshard-ut_helpers + core-wrappers-ut_helpers + udf-service-exception_policy +) +target_link_options(ydb-core-tx-schemeshard-ut_backup PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-schemeshard-ut_backup PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_backup.cpp +) +add_test( + NAME + ydb-core-tx-schemeshard-ut_backup + COMMAND + ydb-core-tx-schemeshard-ut_backup + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-tx-schemeshard-ut_backup) diff --git a/ydb/core/tx/schemeshard/ut_backup/CMakeLists.txt b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.txt new file mode 100644 index 00000000000..a681d385f3e --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/tx/schemeshard/ut_backup_restore_common.h b/ydb/core/tx/schemeshard/ut_backup_restore_common.h new file mode 100644 index 00000000000..ec3eb3b9383 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_backup_restore_common.h @@ -0,0 +1,16 @@ +#include <ydb/core/tx/datashard/backup_restore_traits.h> + +using EDataFormat = NKikimr::NDataShard::NBackupRestoreTraits::EDataFormat; +using ECompressionCodec = NKikimr::NDataShard::NBackupRestoreTraits::ECompressionCodec; + +#define Y_UNIT_TEST_WITH_COMPRESSION(N) \ + template<ECompressionCodec Codec> void N(NUnitTest::TTestContext&); \ + struct TTestRegistration##N { \ + TTestRegistration##N() { \ + TCurrentTest::AddTest(#N "[Raw]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<ECompressionCodec::None>), false); \ + TCurrentTest::AddTest(#N "[Zstd]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<ECompressionCodec::Zstd>), false); \ + } \ + }; \ + static TTestRegistration##N testRegistration##N; \ + template<ECompressionCodec Codec> \ + void N(NUnitTest::TTestContext&) diff --git a/ydb/core/tx/schemeshard/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore.cpp index 9a84fc19caa..42272ebe116 100644 --- a/ydb/core/tx/schemeshard/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore.cpp @@ -1,3 +1,5 @@ +#include "ut_backup_restore_common.h" + #include <contrib/libs/double-conversion/double-conversion/ieee.h> #include <ydb/core/base/localdb.h> @@ -10,6 +12,8 @@ #include <ydb/core/wrappers/ut_helpers/s3_mock.h> #include <ydb/core/metering/metering.h> #include <ydb/core/ydb_convert/table_description.h> + +#include <contrib/libs/zstd/include/zstd.h> #include <library/cpp/string_utils/quote/quote.h> #include <util/datetime/base.h> @@ -73,14 +77,43 @@ namespace { } struct TTestData { - TString Csv; + TString Data; TString YsonStr; + EDataFormat DataFormat = EDataFormat::Csv; + ECompressionCodec CompressionCodec; - TTestData(TString csv, TString ysonStr) - : Csv(std::move(csv)) + TTestData(TString data, TString ysonStr, ECompressionCodec codec = ECompressionCodec::None) + : Data(std::move(data)) , YsonStr(std::move(ysonStr)) + , CompressionCodec(codec) { } + + TString Ext() const { + TStringBuilder result; + + switch (DataFormat) { + case EDataFormat::Csv: + result << ".csv"; + break; + case EDataFormat::Invalid: + UNIT_ASSERT_C(false, "Invalid data format"); + break; + } + + switch (CompressionCodec) { + case ECompressionCodec::None: + break; + case ECompressionCodec::Zstd: + result << ".zst"; + break; + case ECompressionCodec::Invalid: + UNIT_ASSERT_C(false, "Invalid compression codec"); + break; + } + + return result; + } }; struct TTestDataWithScheme { @@ -130,6 +163,63 @@ namespace { return TTestData(std::move(csv), std::move(yson)); } + TString ZstdCompress(const TStringBuf src) { + TString compressed; + compressed.resize(ZSTD_compressBound(src.size())); + + const auto res = ZSTD_compress(compressed.Detach(), compressed.size(), src.data(), src.size(), ZSTD_CLEVEL_DEFAULT); + UNIT_ASSERT_C(!ZSTD_isError(res), "Zstd error: " << ZSTD_getErrorName(res)); + compressed.resize(res); + + return compressed; + } + + TTestData GenerateZstdTestData(const TString& keyPrefix, ui32 count, ui32 rowsPerFrame = 0) { + auto data = GenerateTestData(keyPrefix, count); + if (!rowsPerFrame) { + rowsPerFrame = count; + } + + TString compressed; + ui32 start = 0; + ui32 rowsInFrame = 0; + + for (ui32 i = 0; i < data.Data.size(); ++i) { + const auto c = data.Data[i]; + const bool last = i == data.Data.size() - 1; + + if (last) { + UNIT_ASSERT(c == '\n'); + } + + if (c == '\n') { + if (++rowsInFrame == rowsPerFrame || last) { + compressed.append(ZstdCompress(TStringBuf(&data.Data[start], i + 1 - start))); + + start = i + 1; + rowsInFrame = 0; + } + } + } + + data.Data = std::move(compressed); + data.CompressionCodec = ECompressionCodec::Zstd; + + return data; + } + + TTestData GenerateTestData(ECompressionCodec codec, const TString& keyPrefix, ui32 count) { + switch (codec) { + case ECompressionCodec::None: + return GenerateTestData(keyPrefix, count); + case ECompressionCodec::Zstd: + return GenerateZstdTestData(keyPrefix, count); + case ECompressionCodec::Invalid: + UNIT_ASSERT_C(false, "Invalid compression codec"); + Y_FAIL("unreachable"); + } + } + TTestDataWithScheme GenerateTestData(const TString& scheme, const TVector<std::pair<TString, ui64>>& shardsConfig) { TTestDataWithScheme result; result.Scheme = scheme; @@ -147,7 +237,8 @@ namespace { for (const auto& [prefix, item] : data) { result.emplace(prefix + "/scheme.pb", item.Scheme); for (ui32 i = 0; i < item.Data.size(); ++i) { - result.emplace(Sprintf("%s/data_%02d.csv", prefix.data(), i), item.Data.at(i).Csv); + const auto& data = item.Data.at(i); + result.emplace(Sprintf("%s/data_%02d%s", prefix.data(), i, data.Ext().c_str()), data.Data); } } @@ -265,10 +356,10 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { Restore(runtime, env, creationScheme, std::move(data), readBatchSize); } - Y_UNIT_TEST(ShouldSucceedOnSingleShardTable) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnSingleShardTable) { TTestBasicRuntime runtime; - const auto data = GenerateTestData("a", 1); + const auto data = GenerateTestData(Codec, "a", 1); Restore(runtime, R"( Name: "Table" @@ -281,11 +372,11 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { NKqp::CompareYson(data.YsonStr, content); } - Y_UNIT_TEST(ShouldSucceedOnMultiShardTable) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnMultiShardTable) { TTestBasicRuntime runtime; - const auto a = GenerateTestData("a", 1); - const auto b = GenerateTestData("b", 1); + const auto a = GenerateTestData(Codec, "a", 1); + const auto b = GenerateTestData(Codec, "b", 1); Restore(runtime, R"( Name: "Table" @@ -309,11 +400,11 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { } } - Y_UNIT_TEST(ShouldSucceedOnLargeData) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnLargeData) { TTestBasicRuntime runtime; - const auto data = GenerateTestData("", 100); - UNIT_ASSERT(data.Csv.size() > 128); + const auto data = GenerateTestData(Codec, "", 100); + UNIT_ASSERT(data.Data.size() > 128); Restore(runtime, R"( Name: "Table" @@ -326,10 +417,38 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { NKqp::CompareYson(data.YsonStr, content); } - Y_UNIT_TEST(ShouldExpandBuffer) { + void ShouldSucceedOnMultipleFrames(ui32 batchSize) { TTestBasicRuntime runtime; - const auto data = GenerateTestData("a", 2); + const auto data = GenerateZstdTestData("a", 3, 2); + + Restore(runtime, R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", {data}, batchSize); + + auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets); + NKqp::CompareYson(data.YsonStr, content); + } + + Y_UNIT_TEST(ShouldSucceedOnMultipleFramesStandardBatch) { + ShouldSucceedOnMultipleFrames(128); + } + + Y_UNIT_TEST(ShouldSucceedOnMultipleFramesSmallBatch) { + ShouldSucceedOnMultipleFrames(7); + } + + Y_UNIT_TEST(ShouldSucceedOnMultipleFramesTinyBatch) { + ShouldSucceedOnMultipleFrames(1); + } + + Y_UNIT_TEST_WITH_COMPRESSION(ShouldExpandBuffer) { + TTestBasicRuntime runtime; + + const auto data = GenerateTestData(Codec, "a", 2); const ui32 batchSize = 1; Restore(runtime, R"( @@ -414,7 +533,7 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { Columns { Name: "json_value" Type: "Json" } Columns { Name: "jsondoc_value" Type: "JsonDocument" } KeyColumnNames: ["key"] - )", {data}, data.Csv.size() + 1); + )", {data}, data.Data.size() + 1); auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", {"key", "Uint64", "0"}, { "key", @@ -514,11 +633,11 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { TestGetImport(runtime, txId, "/MyRoot"); } - Y_UNIT_TEST(ShouldCountWrittenBytesAndRows) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldCountWrittenBytesAndRows) { TTestBasicRuntime runtime; TTestEnv env(runtime); - const auto data = GenerateTestData("a", 2); + const auto data = GenerateTestData(Codec, "a", 2); TMaybe<NKikimrTxDataShard::TShardOpResult> result; runtime.SetObserverFunc([&result](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { @@ -606,17 +725,20 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { env.TestWaitNotification(runtime, txId); UNIT_ASSERT(overloads > 0); - const ui32 expected = data.Csv.size() / batchSize + ui32(bool(data.Csv.size() % batchSize)); + const ui32 expected = data.Data.size() / batchSize + ui32(bool(data.Data.size() % batchSize)); UNIT_ASSERT_VALUES_EQUAL(expected, total - overloads); auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", {"key", "Uint32", "0"}); NKqp::CompareYson(data.YsonStr, content); } + template <ECompressionCodec Codec> void ShouldFailOnFileWithoutNewLines(ui32 batchSize) { TTestBasicRuntime runtime; - const auto data = TTestData("\"a1\",\"value1\"", EmptyYsonStr); + const TString v = "\"a1\",\"value1\""; + const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v; + const auto data = TTestData(d, EmptyYsonStr, Codec); Restore(runtime, R"( Name: "Table" @@ -629,18 +751,20 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { NKqp::CompareYson(data.YsonStr, content); } - Y_UNIT_TEST(ShouldFailOnFileWithoutNewLinesStandardBatch) { - ShouldFailOnFileWithoutNewLines(128); + Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnFileWithoutNewLinesStandardBatch) { + ShouldFailOnFileWithoutNewLines<Codec>(128); } - Y_UNIT_TEST(ShouldFailOnFileWithoutNewLinesSmallBatch) { - ShouldFailOnFileWithoutNewLines(1); + Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnFileWithoutNewLinesSmallBatch) { + ShouldFailOnFileWithoutNewLines<Codec>(1); } - Y_UNIT_TEST(ShouldFailOnEmptyToken) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnEmptyToken) { TTestBasicRuntime runtime; - const auto data = TTestData("\"a1\",\n", EmptyYsonStr); + const TString v = "\"a1\",\n"; + const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v; + const auto data = TTestData(d, EmptyYsonStr, Codec); Restore(runtime, R"( Name: "Table" @@ -653,10 +777,12 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { NKqp::CompareYson(data.YsonStr, content); } - Y_UNIT_TEST(ShouldFailOnInvalidValue) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnInvalidValue) { TTestBasicRuntime runtime; - const auto data = TTestData("\"a1\",\"value1\"\n", EmptyYsonStr); + const TString v = "\"a1\",\"value1\"\n"; + const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v; + const auto data = TTestData(d, EmptyYsonStr, Codec); Restore(runtime, R"( Name: "Table" @@ -669,11 +795,11 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { NKqp::CompareYson(data.YsonStr, content); } - Y_UNIT_TEST(ShouldFailOnOutboundKey) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnOutboundKey) { TTestBasicRuntime runtime; - const auto a = GenerateTestData("a", 1); - const auto b = TTestData(a.Csv, EmptyYsonStr); + const auto a = GenerateTestData(Codec, "a", 1); + const auto b = TTestData(a.Data, EmptyYsonStr); Restore(runtime, R"( Name: "Table" @@ -697,6 +823,23 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { } } + Y_UNIT_TEST(ShouldFailOnInvalidFrame) { + TTestBasicRuntime runtime; + + const TString garbage = "\"a1\",\"value1\""; // not valid zstd data + const auto data = TTestData(garbage, EmptyYsonStr, ECompressionCodec::Zstd); + + Restore(runtime, R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", {data}); + + auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets); + NKqp::CompareYson(data.YsonStr, content); + } + void TestRestoreNegative(TTestActorRuntime& runtime, ui64 txId, const TString& parentPath, const TString& name, const TVector<TEvSchemeShard::EStatus>& expectedResults) { @@ -799,30 +942,30 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { NKqp::CompareYson(data.YsonStr, content); } - Y_UNIT_TEST(CancelUponProposeShouldSucceed) { - auto data = GenerateTestData("a", 1); + Y_UNIT_TEST_WITH_COMPRESSION(CancelUponProposeShouldSucceed) { + auto data = GenerateTestData(Codec, "a", 1); data.YsonStr = EmptyYsonStr; CancelShouldSucceed<TEvDataShard::TEvProposeTransaction>(data); } - Y_UNIT_TEST(CancelUponProposeResultShouldSucceed) { - auto data = GenerateTestData("a", 1); + Y_UNIT_TEST_WITH_COMPRESSION(CancelUponProposeResultShouldSucceed) { + auto data = GenerateTestData(Codec, "a", 1); data.YsonStr = EmptyYsonStr; CancelShouldSucceed<TEvDataShard::TEvProposeTransactionResult>(data); } - Y_UNIT_TEST(CancelUponUploadResponseShouldSucceed) { - const auto data = GenerateTestData("a", 1); + Y_UNIT_TEST_WITH_COMPRESSION(CancelUponUploadResponseShouldSucceed) { + const auto data = GenerateTestData(Codec, "a", 1); CancelShouldSucceed<TEvDataShard::TEvUnsafeUploadRowsResponse>(data); } - Y_UNIT_TEST(CancelHungOperationShouldSucceed) { - auto data = GenerateTestData("a", 1); + Y_UNIT_TEST_WITH_COMPRESSION(CancelHungOperationShouldSucceed) { + auto data = GenerateTestData(Codec, "a", 1); data.YsonStr = EmptyYsonStr; CancelShouldSucceed<TEvDataShard::TEvProposeTransactionResult>(data, true); } - Y_UNIT_TEST(CancelAlmostCompleteOperationShouldNotHaveEffect) { + Y_UNIT_TEST_WITH_COMPRESSION(CancelAlmostCompleteOperationShouldNotHaveEffect) { TTestBasicRuntime runtime; TTestEnv env(runtime); ui64 txId = 100; @@ -842,7 +985,7 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { TPortManager portManager; THolder<TS3Mock> s3Mock; - const auto data = GenerateTestData("a", 1); + const auto data = GenerateTestData(Codec, "a", 1); RestoreNoWait(runtime, txId, portManager.GetPort(), s3Mock, {data}); const ui64 restoreTxId = txId; @@ -868,7 +1011,7 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) { void Restore(TTestWithReboots& t, TTestActorRuntime& runtime, bool& activeZone, - ui16 port, const TString& creationScheme, TVector<TTestData>&& data) { + ui16 port, const TString& creationScheme, TVector<TTestData>&& data, ui32 readBatchSize = 128) { THolder<TS3Mock> s3Mock; TString schemeStr; @@ -898,20 +1041,20 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) { Endpoint: "localhost:%d" Scheme: HTTP Limits { - ReadBatchSize: 128 + ReadBatchSize: %d } } - )", schemeStr.data(), port)); + )", schemeStr.data(), port, readBatchSize)); t.TestEnv->TestWaitNotification(runtime, t.TxId); } - Y_UNIT_TEST(ShouldSucceedOnSingleShardTable) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnSingleShardTable) { TPortManager portManager; const ui16 port = portManager.GetPort(); TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { - const auto data = GenerateTestData("a", 1); + const auto data = GenerateTestData(Codec, "a", 1); Restore(t, runtime, activeZone, port, R"( Name: "Table" @@ -929,14 +1072,14 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) { }); } - Y_UNIT_TEST(ShouldSucceedOnMultiShardTable) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnMultiShardTable) { TPortManager portManager; const ui16 port = portManager.GetPort(); TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { - const auto a = GenerateTestData("a", 1); - const auto b = GenerateTestData("b", 1); + const auto a = GenerateTestData(Codec, "a", 1); + const auto b = GenerateTestData(Codec, "b", 1); Restore(t, runtime, activeZone, port, R"( Name: "Table" @@ -964,7 +1107,7 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) { }); } - Y_UNIT_TEST(ShouldSucceedOnMultiShardTableAndLimitedResources) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnMultiShardTableAndLimitedResources) { TPortManager portManager; const ui16 port = portManager.GetPort(); @@ -987,8 +1130,8 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) { runtime.Register(CreateResourceBrokerActor(config, runtime.GetDynamicCounters(0)))); } - const auto a = GenerateTestData("a", 1); - const auto b = GenerateTestData("b", 1); + const auto a = GenerateTestData(Codec, "a", 1); + const auto b = GenerateTestData(Codec, "b", 1); Restore(t, runtime, activeZone, port, R"( Name: "Table" @@ -1016,14 +1159,14 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) { }); } - Y_UNIT_TEST(ShouldSucceedOnLargeData) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnLargeData) { TPortManager portManager; const ui16 port = portManager.GetPort(); TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { - const auto data = GenerateTestData("", 100); - UNIT_ASSERT(data.Csv.size() > 128); + const auto data = GenerateTestData(Codec, "", 100); + UNIT_ASSERT(data.Data.size() > 128); Restore(t, runtime, activeZone, port, R"( Name: "Table" @@ -1041,13 +1184,40 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) { }); } - Y_UNIT_TEST(ShouldFailOnFileWithoutNewLines) { + Y_UNIT_TEST(ShouldSucceedOnMultipleFrames) { TPortManager portManager; const ui16 port = portManager.GetPort(); TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { - const auto data = TTestData("\"a1\",\"value1\"", EmptyYsonStr); + const auto data = GenerateZstdTestData("a", 3, 2); + const ui32 batchSize = 7; // less than any frame + + Restore(t, runtime, activeZone, port, R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", {data}, batchSize); + + { + TInactiveZone inactive(activeZone); + + auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets); + NKqp::CompareYson(data.YsonStr, content); + } + }); + } + + Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnFileWithoutNewLines) { + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + const TString v = "\"a1\",\"value1\""; + const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v; + const auto data = TTestData(d, EmptyYsonStr, Codec); Restore(t, runtime, activeZone, port, R"( Name: "Table" @@ -1065,13 +1235,15 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) { }); } - Y_UNIT_TEST(ShouldFailOnEmptyToken) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnEmptyToken) { TPortManager portManager; const ui16 port = portManager.GetPort(); TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { - const auto data = TTestData("\"a1\",\n", EmptyYsonStr); + const TString v = "\"a1\",\n"; + const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v; + const auto data = TTestData(d, EmptyYsonStr, Codec); Restore(t, runtime, activeZone, port, R"( Name: "Table" @@ -1089,13 +1261,15 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) { }); } - Y_UNIT_TEST(ShouldFailOnInvalidValue) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnInvalidValue) { TPortManager portManager; const ui16 port = portManager.GetPort(); TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { - const auto data = TTestData("\"a1\",\"value1\"\n", EmptyYsonStr); + const TString v = "\"a1\",\"value1\"\n"; + const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v; + const auto data = TTestData(d, EmptyYsonStr, Codec); Restore(t, runtime, activeZone, port, R"( Name: "Table" @@ -1113,14 +1287,14 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) { }); } - Y_UNIT_TEST(ShouldFailOnOutboundKey) { + Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnOutboundKey) { TPortManager portManager; const ui16 port = portManager.GetPort(); TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { - const auto a = GenerateTestData("a", 1); - const auto b = TTestData(a.Csv, EmptyYsonStr); + const auto a = GenerateTestData(Codec, "a", 1); + const auto b = TTestData(a.Data, EmptyYsonStr); Restore(t, runtime, activeZone, port, R"( Name: "Table" @@ -1148,10 +1322,10 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) { }); } - Y_UNIT_TEST(CancelShouldSucceed) { + Y_UNIT_TEST_WITH_COMPRESSION(CancelShouldSucceed) { TPortManager portManager; const ui16 port = portManager.GetPort(); - const auto data = GenerateTestData("a", 1); + const auto data = GenerateTestData(Codec, "a", 1); TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { diff --git a/ydb/core/wrappers/ut_helpers/s3_mock.cpp b/ydb/core/wrappers/ut_helpers/s3_mock.cpp index 8df5acc2001..b0fc2906510 100644 --- a/ydb/core/wrappers/ut_helpers/s3_mock.cpp +++ b/ydb/core/wrappers/ut_helpers/s3_mock.cpp @@ -85,6 +85,12 @@ bool TS3Mock::TRequest::HttpBadRequest(const TReplyParams& params, const TString bool TS3Mock::TRequest::HttpNotFound(const TReplyParams& params) { params.Output << "HTTP/1.1 404 Not found\r\n\r\n"; + params.Output << R"( + <?xml version="1.0" encoding="UTF-8"?> + <Error> + <Code>NoSuchKey</Code> + </Error> + )"; return true; } @@ -182,7 +188,7 @@ bool TS3Mock::TRequest::HttpServeAction(const TReplyParams& params, EMethod meth const int uploadId = Parent->NextUploadId++; Parent->MultipartUploads[std::make_pair(path, ToString(uploadId))] = {}; - params.Output << "HTTP/1.1 200 Ok\r\n"; + params.Output << "HTTP/1.1 200 Ok\r\n\r\n"; params.Output << Sprintf(R"( <?xml version="1.0" encoding="UTF-8"?> <InitiateMultipartUploadResult> @@ -236,7 +242,7 @@ bool TS3Mock::TRequest::HttpServeAction(const TReplyParams& params, EMethod meth const TString etag = MD5::Data(Parent->Data[path]); - params.Output << "HTTP/1.1 200 Ok\r\n"; + params.Output << "HTTP/1.1 200 Ok\r\n\r\n"; params.Output << Sprintf(R"( <?xml version="1.0" encoding="UTF-8"?> <CompleteMultipartUploadResult> |
