summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <[email protected]>2022-05-18 13:39:08 +0300
committerIlnaz Nizametdinov <[email protected]>2022-05-18 13:39:08 +0300
commit28f554dbe570fd5a28d92ebf2c8a7f7cbc739171 (patch)
tree23ce21b77e88cf2bbe8d347460df0cf5a18d12a1
parent835ba32ee05b1c38ea642a9d86b4bd11cb18d6fa (diff)
Compressed backups KIKIMR-14803
ref:9640af79c24e1eae042d353668d3ad9573c135e5
-rw-r--r--CMakeLists.darwin.txt1
-rw-r--r--CMakeLists.linux.txt1
-rw-r--r--ydb/core/protos/flat_scheme_op.proto7
-rw-r--r--ydb/core/tx/datashard/CMakeLists.txt11
-rw-r--r--ydb/core/tx/datashard/backup_restore_traits.cpp77
-rw-r--r--ydb/core/tx/datashard/backup_restore_traits.h44
-rw-r--r--ydb/core/tx/datashard/backup_unit.cpp13
-rw-r--r--ydb/core/tx/datashard/export_s3.h12
-rw-r--r--ydb/core/tx/datashard/export_s3_base_uploader.h42
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer.cpp192
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer.h6
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer_raw.cpp179
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer_raw.h50
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer_zstd.cpp124
-rw-r--r--ydb/core/tx/datashard/export_scan.cpp34
-rw-r--r--ydb/core/tx/datashard/export_scan.h16
-rw-r--r--ydb/core/tx/datashard/import_s3.cpp308
-rw-r--r--ydb/core/tx/datashard/s3_common.h54
-rw-r--r--ydb/core/tx/schemeshard/ut_backup.cpp120
-rw-r--r--ydb/core/tx/schemeshard/ut_backup/CMakeLists.darwin.txt54
-rw-r--r--ydb/core/tx/schemeshard/ut_backup/CMakeLists.linux.txt57
-rw-r--r--ydb/core/tx/schemeshard/ut_backup/CMakeLists.txt13
-rw-r--r--ydb/core/tx/schemeshard/ut_backup_restore_common.h16
-rw-r--r--ydb/core/tx/schemeshard/ut_restore.cpp304
-rw-r--r--ydb/core/wrappers/ut_helpers/s3_mock.cpp10
25 files changed, 1368 insertions, 377 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 310884c77b9..4ce54157e6b 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -1071,6 +1071,7 @@ add_subdirectory(ydb/core/tx/scheme_board/ut_populator)
add_subdirectory(ydb/core/tx/scheme_board/ut_replica)
add_subdirectory(ydb/core/tx/scheme_board/ut_subscriber)
add_subdirectory(ydb/core/tx/schemeshard/ut_async_index)
+add_subdirectory(ydb/core/tx/schemeshard/ut_backup)
add_subdirectory(ydb/core/tx/schemeshard/ut_base)
add_subdirectory(ydb/core/tx/schemeshard/ut_base_reboots)
add_subdirectory(ydb/core/tx/schemeshard/ut_bsvolume)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 097ff1eb26c..0e8beb569c0 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -1166,6 +1166,7 @@ add_subdirectory(ydb/core/tx/scheme_board/ut_populator)
add_subdirectory(ydb/core/tx/scheme_board/ut_replica)
add_subdirectory(ydb/core/tx/scheme_board/ut_subscriber)
add_subdirectory(ydb/core/tx/schemeshard/ut_async_index)
+add_subdirectory(ydb/core/tx/schemeshard/ut_backup)
add_subdirectory(ydb/core/tx/schemeshard/ut_base)
add_subdirectory(ydb/core/tx/schemeshard/ut_base_reboots)
add_subdirectory(ydb/core/tx/schemeshard/ut_bsvolume)
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 474a0260185..e6b1e0e8a89 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -900,6 +900,13 @@ message TBackupTask {
optional TScanSettings ScanSettings = 11;
optional bool NeedToBill = 12;
+
+ message TCompressionOptions {
+ optional string Codec = 1;
+ optional int32 Level = 2;
+ }
+
+ optional TCompressionOptions Compression = 13; // currently available for s3
}
message TRestoreTask {
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt
index 2880f39156e..cb451d7fedd 100644
--- a/ydb/core/tx/datashard/CMakeLists.txt
+++ b/ydb/core/tx/datashard/CMakeLists.txt
@@ -16,6 +16,7 @@ target_link_libraries(core-tx-datashard PUBLIC
yutil
tools-enum_parser-enum_serialization_runtime
library-cpp-resource
+ contrib-libs-zstd
cpp-actors-core
cpp-containers-flat_hash
cpp-digest-md5
@@ -50,6 +51,7 @@ target_link_libraries(core-tx-datashard PUBLIC
target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_table_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp
@@ -197,11 +199,17 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_uploader.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/import_s3.cpp
)
generate_enum_serilization(core-tx-datashard
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/backup_restore_traits.h
+ INCLUDE_HEADERS
+ ydb/core/tx/datashard/backup_restore_traits.h
+)
+generate_enum_serilization(core-tx-datashard
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange.h
INCLUDE_HEADERS
ydb/core/tx/datashard/change_exchange.h
@@ -251,6 +259,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
yutil
tools-enum_parser-enum_serialization_runtime
library-cpp-resource
+ contrib-libs-zstd
cpp-actors-core
cpp-containers-flat_hash
cpp-digest-md5
diff --git a/ydb/core/tx/datashard/backup_restore_traits.cpp b/ydb/core/tx/datashard/backup_restore_traits.cpp
new file mode 100644
index 00000000000..40e29adfc71
--- /dev/null
+++ b/ydb/core/tx/datashard/backup_restore_traits.cpp
@@ -0,0 +1,77 @@
+#include "backup_restore_traits.h"
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/util/yverify_stream.h>
+
+#include <util/generic/hash.h>
+#include <util/string/cast.h>
+
+namespace NKikimr {
+namespace NDataShard {
+namespace NBackupRestoreTraits {
+
+bool TryCodecFromTask(const NKikimrSchemeOp::TBackupTask& task, ECompressionCodec& codec) {
+ if (!task.HasCompression()) {
+ codec = ECompressionCodec::None;
+ return true;
+ }
+
+ if (!TryFromString<ECompressionCodec>(task.GetCompression().GetCodec(), codec)) {
+ return false;
+ }
+
+ if (codec == ECompressionCodec::Invalid) {
+ return false;
+ }
+
+ return true;
+}
+
+ECompressionCodec CodecFromTask(const NKikimrSchemeOp::TBackupTask& task) {
+ ECompressionCodec codec;
+ Y_VERIFY(TryCodecFromTask(task, codec));
+ return codec;
+}
+
+EDataFormat NextDataFormat(EDataFormat cur) {
+ switch (cur) {
+ case EDataFormat::Csv:
+ return EDataFormat::Invalid;
+ case EDataFormat::Invalid:
+ return EDataFormat::Invalid;
+ }
+}
+
+ECompressionCodec NextCompressionCodec(ECompressionCodec cur) {
+ switch (cur) {
+ case ECompressionCodec::None:
+ return ECompressionCodec::Zstd;
+ case ECompressionCodec::Zstd:
+ return ECompressionCodec::Invalid;
+ case ECompressionCodec::Invalid:
+ return ECompressionCodec::Invalid;
+ }
+}
+
+TString DataFileExtension(EDataFormat format, ECompressionCodec codec) {
+ static THashMap<EDataFormat, TString> formats = {
+ {EDataFormat::Csv, ".csv"},
+ };
+
+ static THashMap<ECompressionCodec, TString> codecs = {
+ {ECompressionCodec::None, ""},
+ {ECompressionCodec::Zstd, ".zst"},
+ };
+
+ auto fit = formats.find(format);
+ Y_VERIFY_S(fit != formats.end(), "Unexpected format: " << format);
+
+ auto cit = codecs.find(codec);
+ Y_VERIFY_S(cit != codecs.end(), "Unexpected codec: " << codec);
+
+ return Sprintf("%s%s", fit->second.c_str(), cit->second.c_str());
+}
+
+} // NBackupRestoreTraits
+} // NDataShard
+} // NKikimr
diff --git a/ydb/core/tx/datashard/backup_restore_traits.h b/ydb/core/tx/datashard/backup_restore_traits.h
new file mode 100644
index 00000000000..78b93232c67
--- /dev/null
+++ b/ydb/core/tx/datashard/backup_restore_traits.h
@@ -0,0 +1,44 @@
+#pragma once
+
+#include <util/generic/string.h>
+#include <util/string/printf.h>
+
+namespace NKikimrSchemeOp {
+ class TBackupTask;
+}
+
+namespace NKikimr {
+namespace NDataShard {
+namespace NBackupRestoreTraits {
+
+enum class EDataFormat: int {
+ Invalid /* "invalid" */,
+ Csv /* "csv" */,
+};
+
+enum class ECompressionCodec: int {
+ Invalid /* "invalid" */,
+ None /* "none" */,
+ Zstd /* "zstd" */,
+};
+
+bool TryCodecFromTask(const NKikimrSchemeOp::TBackupTask& task, ECompressionCodec& codec);
+ECompressionCodec CodecFromTask(const NKikimrSchemeOp::TBackupTask& task);
+
+EDataFormat NextDataFormat(EDataFormat cur);
+ECompressionCodec NextCompressionCodec(ECompressionCodec cur);
+
+TString DataFileExtension(EDataFormat format, ECompressionCodec codec);
+
+inline TString SchemeKey(const TString& objKeyPattern) {
+ return Sprintf("%s/scheme.pb", objKeyPattern.c_str());
+}
+
+inline TString DataKey(const TString& objKeyPattern, ui32 n, EDataFormat format, ECompressionCodec codec) {
+ const auto ext = DataFileExtension(format, codec);
+ return Sprintf("%s/data_%02d%s", objKeyPattern.c_str(), n, ext.c_str());
+}
+
+} // NBackupRestoreTraits
+} // NDataShard
+} // NKikimr
diff --git a/ydb/core/tx/datashard/backup_unit.cpp b/ydb/core/tx/datashard/backup_unit.cpp
index a7a0f8429be..00e718ad660 100644
--- a/ydb/core/tx/datashard/backup_unit.cpp
+++ b/ydb/core/tx/datashard/backup_unit.cpp
@@ -1,5 +1,6 @@
#include "export_iface.h"
#include "backup_restore_common.h"
+#include "backup_restore_traits.h"
#include "execution_unit_ctors.h"
#include "export_scan.h"
#include "export_s3.h"
@@ -45,6 +46,11 @@ protected:
std::shared_ptr<::NKikimr::NDataShard::IExport> exp;
if (backup.HasYTSettings()) {
+ if (backup.HasCompression()) {
+ Abort(op, ctx, "Exports to YT do not support compression");
+ return false;
+ }
+
if (auto* exportFactory = appData->DataShardExportFactory) {
std::shared_ptr<IExport>(exportFactory->CreateExportToYt(backup, columns)).swap(exp);
} else {
@@ -52,6 +58,13 @@ protected:
return false;
}
} else if (backup.HasS3Settings()) {
+ NBackupRestoreTraits::ECompressionCodec codec;
+ if (!TryCodecFromTask(backup, codec)) {
+ Abort(op, ctx, TStringBuilder() << "Unsupported compression codec"
+ << ": " << backup.GetCompression().GetCodec());
+ return false;
+ }
+
if (auto* exportFactory = appData->DataShardExportFactory) {
std::shared_ptr<IExport>(exportFactory->CreateExportToS3(backup, columns)).swap(exp);
} else {
diff --git a/ydb/core/tx/datashard/export_s3.h b/ydb/core/tx/datashard/export_s3.h
index a922288d90f..13dbd8acbf7 100644
--- a/ydb/core/tx/datashard/export_s3.h
+++ b/ydb/core/tx/datashard/export_s3.h
@@ -3,6 +3,7 @@
#ifndef KIKIMR_DISABLE_S3_OPS
#include "defs.h"
+#include "backup_restore_traits.h"
#include "export_iface.h"
#include "export_s3_buffer.h"
@@ -21,7 +22,16 @@ public:
IActor* CreateUploader(const TActorId& dataShard, ui64 txId) const override;
IBuffer* CreateBuffer(ui64 rowsLimit, ui64 bytesLimit) const override {
- return CreateS3ExportBuffer(Columns, rowsLimit, bytesLimit);
+ using namespace NBackupRestoreTraits;
+
+ switch (CodecFromTask(Task)) {
+ case ECompressionCodec::None:
+ return CreateS3ExportBufferRaw(Columns, rowsLimit, bytesLimit);
+ case ECompressionCodec::Zstd:
+ return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, rowsLimit, bytesLimit);
+ case ECompressionCodec::Invalid:
+ Y_FAIL("unreachable");
+ }
}
void Shutdown() const override {}
diff --git a/ydb/core/tx/datashard/export_s3_base_uploader.h b/ydb/core/tx/datashard/export_s3_base_uploader.h
index 8c72a8c3936..3ca6ecf3553 100644
--- a/ydb/core/tx/datashard/export_s3_base_uploader.h
+++ b/ydb/core/tx/datashard/export_s3_base_uploader.h
@@ -13,10 +13,12 @@
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <util/generic/buffer.h>
#include <util/generic/maybe.h>
#include <util/generic/ptr.h>
#include <util/generic/string.h>
#include <util/string/builder.h>
+#include <util/string/cast.h>
namespace NKikimr {
namespace NDataShard {
@@ -38,7 +40,7 @@ class TS3UploaderBase: public TActorBootstrapped<TDerived>
, public IProxyOps
{
using TEvS3Wrapper = NWrappers::TEvS3Wrapper;
- using TEvBuffer = TEvExportScan::TEvBuffer<TString>;
+ using TEvBuffer = TEvExportScan::TEvBuffer<TBuffer>;
protected:
void Restart() {
@@ -52,7 +54,7 @@ protected:
this->Send(std::exchange(Client, TActorId()), new TEvents::TEvPoisonPill());
}
- Client = this->RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(Settings.Credentials, Settings.Config));
+ Client = this->RegisterWithSameMailbox(NWrappers::CreateS3Wrapper(Settings.GetCredentials(), Settings.GetConfig()));
if (!SchemeUploaded) {
this->Become(&TDerived::StateUploadScheme);
@@ -79,9 +81,9 @@ protected:
google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer);
auto request = Model::PutObjectRequest()
- .WithBucket(Settings.Bucket)
- .WithKey(Settings.SchemeKey)
- .WithStorageClass(Settings.StorageClass);
+ .WithBucket(Settings.GetBucket())
+ .WithKey(Settings.GetSchemeKey())
+ .WithStorageClass(Settings.GetStorageClass());
this->Send(Client, new TEvS3Wrapper::TEvPutObjectRequest(request, std::move(Buffer)));
}
@@ -137,7 +139,7 @@ protected:
Last = ev->Get()->Last;
MultiPart = MultiPart || !Last;
- Buffer.swap(ev->Get()->Buffer);
+ ev->Get()->Buffer.AsString(Buffer);
UploadData();
}
@@ -145,9 +147,9 @@ protected:
void UploadData() {
if (!MultiPart) {
auto request = Model::PutObjectRequest()
- .WithBucket(Settings.Bucket)
- .WithKey(Settings.DataKey)
- .WithStorageClass(Settings.StorageClass);
+ .WithBucket(Settings.GetBucket())
+ .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
+ .WithStorageClass(Settings.GetStorageClass());
this->Send(Client, new TEvS3Wrapper::TEvPutObjectRequest(request, std::move(Buffer)));
} else {
if (!UploadId) {
@@ -156,8 +158,8 @@ protected:
}
auto request = Model::UploadPartRequest()
- .WithBucket(Settings.Bucket)
- .WithKey(Settings.DataKey)
+ .WithBucket(Settings.GetBucket())
+ .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
.WithUploadId(*UploadId)
.WithPartNumber(Parts.size() + 1);
this->Send(Client, new TEvS3Wrapper::TEvUploadPartRequest(request, std::move(Buffer)));
@@ -187,9 +189,9 @@ protected:
if (!upload) {
auto request = Model::CreateMultipartUploadRequest()
- .WithBucket(Settings.Bucket)
- .WithKey(Settings.DataKey)
- .WithStorageClass(Settings.StorageClass);
+ .WithBucket(Settings.GetBucket())
+ .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
+ .WithStorageClass(Settings.GetStorageClass());
this->Send(Client, new TEvS3Wrapper::TEvCreateMultipartUploadRequest(request));
} else {
UploadId = upload->Id;
@@ -209,8 +211,8 @@ protected:
}
auto request = Model::CompleteMultipartUploadRequest()
- .WithBucket(Settings.Bucket)
- .WithKey(Settings.DataKey)
+ .WithBucket(Settings.GetBucket())
+ .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
.WithUploadId(*UploadId)
.WithMultipartUpload(Model::CompletedMultipartUpload().WithParts(std::move(parts)));
this->Send(Client, new TEvS3Wrapper::TEvCompleteMultipartUploadRequest(request));
@@ -224,8 +226,8 @@ protected:
}
auto request = Model::AbortMultipartUploadRequest()
- .WithBucket(Settings.Bucket)
- .WithKey(Settings.DataKey)
+ .WithBucket(Settings.GetBucket())
+ .WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
.WithUploadId(*UploadId);
this->Send(Client, new TEvS3Wrapper::TEvAbortMultipartUploadRequest(request));
break;
@@ -379,6 +381,8 @@ public:
const NKikimrSchemeOp::TBackupTask& task,
TMaybe<Ydb::Table::CreateTableRequest>&& scheme)
: Settings(TS3Settings::FromBackupTask(task))
+ , DataFormat(NBackupRestoreTraits::EDataFormat::Csv)
+ , CompressionCodec(NBackupRestoreTraits::CodecFromTask(task))
, DataShard(dataShard)
, TxId(txId)
, Scheme(std::move(scheme))
@@ -436,6 +440,8 @@ public:
protected:
TS3Settings Settings;
+ const NBackupRestoreTraits::EDataFormat DataFormat;
+ const NBackupRestoreTraits::ECompressionCodec CompressionCodec;
bool ProxyResolved;
private:
diff --git a/ydb/core/tx/datashard/export_s3_buffer.cpp b/ydb/core/tx/datashard/export_s3_buffer.cpp
deleted file mode 100644
index 470ed22d6b6..00000000000
--- a/ydb/core/tx/datashard/export_s3_buffer.cpp
+++ /dev/null
@@ -1,192 +0,0 @@
-#ifndef KIKIMR_DISABLE_S3_OPS
-
-#include "export_common.h"
-#include "export_s3.h"
-#include "export_s3_buffer.h"
-
-#include <ydb/core/tablet_flat/flat_row_state.h>
-#include <ydb/library/binary_json/read.h>
-#include <ydb/public/lib/scheme_types/scheme_type_id.h>
-#include <library/cpp/string_utils/quote/quote.h>
-
-#include <util/datetime/base.h>
-
-namespace NKikimr {
-namespace NDataShard {
-
-using namespace NExportScan;
-using TTableColumns = TS3Export::TTableColumns;
-
-class TS3Buffer: public IBuffer {
- using TTagToColumn = TTableColumns;
- using TTagToIndex = THashMap<ui32, ui32>; // index in IScan::TRow
-
-public:
- explicit TS3Buffer(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit)
- : Columns(columns)
- , RowsLimit(rowsLimit)
- , BytesLimit(bytesLimit)
- , Rows(0)
- , BytesRead(0)
- {
- }
-
- void ColumnsOrder(const TVector<ui32>& tags) override {
- Y_VERIFY(tags.size() == Columns.size());
-
- Indices.clear();
- for (ui32 i = 0; i < tags.size(); ++i) {
- const ui32 tag = tags.at(i);
- auto it = Columns.find(tag);
- Y_VERIFY(it != Columns.end());
- Y_VERIFY(Indices.emplace(tag, i).second);
- }
- }
-
- void Collect(const NTable::IScan::TRow& row) override {
- TStringOutput stream(Buffer);
-
- bool needsComma = false;
- for (const auto& [tag, column] : Columns) {
- auto it = Indices.find(tag);
- Y_VERIFY(it != Indices.end());
- Y_VERIFY(it->second < (*row).size());
- const auto& cell = (*row)[it->second];
-
- BytesRead += cell.Size();
-
- if (needsComma) {
- stream << ",";
- } else {
- needsComma = true;
- }
-
- if (cell.IsNull()) {
- stream << "null";
- continue;
- }
-
- switch (column.Type) {
- case NScheme::NTypeIds::Int32:
- stream << cell.AsValue<i32>();
- break;
- case NScheme::NTypeIds::Uint32:
- stream << cell.AsValue<ui32>();
- break;
- case NScheme::NTypeIds::Int64:
- stream << cell.AsValue<i64>();
- break;
- case NScheme::NTypeIds::Uint64:
- stream << cell.AsValue<ui64>();
- break;
- case NScheme::NTypeIds::Uint8:
- //case NScheme::NTypeIds::Byte:
- stream << static_cast<ui32>(cell.AsValue<ui8>());
- break;
- case NScheme::NTypeIds::Int8:
- stream << static_cast<i32>(cell.AsValue<i8>());
- break;
- case NScheme::NTypeIds::Int16:
- stream << cell.AsValue<i16>();
- break;
- case NScheme::NTypeIds::Uint16:
- stream << cell.AsValue<ui16>();
- break;
- case NScheme::NTypeIds::Bool:
- stream << cell.AsValue<bool>();
- break;
- case NScheme::NTypeIds::Double:
- stream << cell.AsValue<double>();
- break;
- case NScheme::NTypeIds::Float:
- stream << cell.AsValue<float>();
- break;
- case NScheme::NTypeIds::Date:
- stream << TInstant::Days(cell.AsValue<ui16>());
- break;
- case NScheme::NTypeIds::Datetime:
- stream << TInstant::Seconds(cell.AsValue<ui32>());
- break;
- case NScheme::NTypeIds::Timestamp:
- stream << TInstant::MicroSeconds(cell.AsValue<ui64>());
- break;
- case NScheme::NTypeIds::Interval:
- stream << cell.AsValue<i64>();
- break;
- case NScheme::NTypeIds::Decimal:
- stream << DecimalToString(cell.AsValue<std::pair<ui64, i64>>());
- break;
- case NScheme::NTypeIds::DyNumber:
- stream << DyNumberToString(cell.AsBuf());
- break;
- case NScheme::NTypeIds::String:
- case NScheme::NTypeIds::String4k:
- case NScheme::NTypeIds::String2m:
- case NScheme::NTypeIds::Utf8:
- case NScheme::NTypeIds::Json:
- case NScheme::NTypeIds::Yson:
- stream << '"' << CGIEscapeRet(cell.AsBuf()) << '"';
- break;
- case NScheme::NTypeIds::JsonDocument:
- stream << '"' << CGIEscapeRet(NBinaryJson::SerializeToJson(cell.AsBuf())) << '"';
- break;
- default:
- Y_FAIL("Unsupported type");
- };
- }
-
- stream << "\n";
- ++Rows;
- }
-
- IEventBase* PrepareEvent(bool last) override {
- return new TEvExportScan::TEvBuffer<TString>(Flush(), last);
- }
-
- void Clear() override {
- Flush();
- }
-
- TString Flush() {
- Rows = 0;
- BytesRead = 0;
- return std::exchange(Buffer, TString());
- }
-
- bool IsFilled() const override {
- return GetRows() >= RowsLimit || GetBytesSent() >= BytesLimit;
- }
-
- ui64 GetRows() const override {
- return Rows;
- }
-
- ui64 GetBytesRead() const override {
- return BytesRead;
- }
-
- ui64 GetBytesSent() const override {
- return Buffer.size();
- }
-
-private:
- const TTagToColumn Columns;
- const ui64 RowsLimit;
- const ui64 BytesLimit;
-
- TTagToIndex Indices;
-
- ui64 Rows;
- ui64 BytesRead;
- TString Buffer;
-
-}; // TS3Buffer
-
-IBuffer* CreateS3ExportBuffer(const TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit) {
- return new TS3Buffer(columns, rowsLimit, bytesLimit);
-}
-
-} // NDataShard
-} // NKikimr
-
-#endif // KIKIMR_DISABLE_S3_OPS
diff --git a/ydb/core/tx/datashard/export_s3_buffer.h b/ydb/core/tx/datashard/export_s3_buffer.h
index f616126dbed..35ef4078ffa 100644
--- a/ydb/core/tx/datashard/export_s3_buffer.h
+++ b/ydb/core/tx/datashard/export_s3_buffer.h
@@ -8,7 +8,11 @@
namespace NKikimr {
namespace NDataShard {
-NExportScan::IBuffer* CreateS3ExportBuffer(const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit);
+NExportScan::IBuffer* CreateS3ExportBufferRaw(
+ const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit);
+
+NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel,
+ const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit);
} // NDataShard
} // NKikimr
diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
new file mode 100644
index 00000000000..a496fd3885e
--- /dev/null
+++ b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
@@ -0,0 +1,179 @@
+#ifndef KIKIMR_DISABLE_S3_OPS
+
+#include "export_common.h"
+#include "export_s3_buffer_raw.h"
+
+#include <ydb/core/tablet_flat/flat_row_state.h>
+#include <ydb/library/binary_json/read.h>
+#include <ydb/public/lib/scheme_types/scheme_type_id.h>
+
+#include <library/cpp/string_utils/quote/quote.h>
+
+#include <util/datetime/base.h>
+#include <util/stream/buffer.h>
+
+namespace NKikimr {
+namespace NDataShard {
+
+TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit)
+ : Columns(columns)
+ , RowsLimit(rowsLimit)
+ , BytesLimit(bytesLimit)
+ , Rows(0)
+ , BytesRead(0)
+{
+}
+
+void TS3BufferRaw::ColumnsOrder(const TVector<ui32>& tags) {
+ Y_VERIFY(tags.size() == Columns.size());
+
+ Indices.clear();
+ for (ui32 i = 0; i < tags.size(); ++i) {
+ const ui32 tag = tags.at(i);
+ auto it = Columns.find(tag);
+ Y_VERIFY(it != Columns.end());
+ Y_VERIFY(Indices.emplace(tag, i).second);
+ }
+}
+
+void TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
+ bool needsComma = false;
+ for (const auto& [tag, column] : Columns) {
+ auto it = Indices.find(tag);
+ Y_VERIFY(it != Indices.end());
+ Y_VERIFY(it->second < (*row).size());
+ const auto& cell = (*row)[it->second];
+
+ BytesRead += cell.Size();
+
+ if (needsComma) {
+ out << ",";
+ } else {
+ needsComma = true;
+ }
+
+ if (cell.IsNull()) {
+ out << "null";
+ continue;
+ }
+
+ switch (column.Type) {
+ case NScheme::NTypeIds::Int32:
+ out << cell.AsValue<i32>();
+ break;
+ case NScheme::NTypeIds::Uint32:
+ out << cell.AsValue<ui32>();
+ break;
+ case NScheme::NTypeIds::Int64:
+ out << cell.AsValue<i64>();
+ break;
+ case NScheme::NTypeIds::Uint64:
+ out << cell.AsValue<ui64>();
+ break;
+ case NScheme::NTypeIds::Uint8:
+ //case NScheme::NTypeIds::Byte:
+ out << static_cast<ui32>(cell.AsValue<ui8>());
+ break;
+ case NScheme::NTypeIds::Int8:
+ out << static_cast<i32>(cell.AsValue<i8>());
+ break;
+ case NScheme::NTypeIds::Int16:
+ out << cell.AsValue<i16>();
+ break;
+ case NScheme::NTypeIds::Uint16:
+ out << cell.AsValue<ui16>();
+ break;
+ case NScheme::NTypeIds::Bool:
+ out << cell.AsValue<bool>();
+ break;
+ case NScheme::NTypeIds::Double:
+ out << cell.AsValue<double>();
+ break;
+ case NScheme::NTypeIds::Float:
+ out << cell.AsValue<float>();
+ break;
+ case NScheme::NTypeIds::Date:
+ out << TInstant::Days(cell.AsValue<ui16>());
+ break;
+ case NScheme::NTypeIds::Datetime:
+ out << TInstant::Seconds(cell.AsValue<ui32>());
+ break;
+ case NScheme::NTypeIds::Timestamp:
+ out << TInstant::MicroSeconds(cell.AsValue<ui64>());
+ break;
+ case NScheme::NTypeIds::Interval:
+ out << cell.AsValue<i64>();
+ break;
+ case NScheme::NTypeIds::Decimal:
+ out << DecimalToString(cell.AsValue<std::pair<ui64, i64>>());
+ break;
+ case NScheme::NTypeIds::DyNumber:
+ out << DyNumberToString(cell.AsBuf());
+ break;
+ case NScheme::NTypeIds::String:
+ case NScheme::NTypeIds::String4k:
+ case NScheme::NTypeIds::String2m:
+ case NScheme::NTypeIds::Utf8:
+ case NScheme::NTypeIds::Json:
+ case NScheme::NTypeIds::Yson:
+ out << '"' << CGIEscapeRet(cell.AsBuf()) << '"';
+ break;
+ case NScheme::NTypeIds::JsonDocument:
+ out << '"' << CGIEscapeRet(NBinaryJson::SerializeToJson(cell.AsBuf())) << '"';
+ break;
+ default:
+ Y_FAIL("Unsupported type");
+ };
+ }
+
+ out << "\n";
+ ++Rows;
+}
+
+bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) {
+ TBufferOutput out(Buffer);
+ Collect(row, out);
+ return true;
+}
+
+IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) {
+ stats.Rows = Rows;
+ stats.BytesRead = BytesRead;
+
+ auto buffer = Flush(true);
+ if (!buffer) {
+ return nullptr;
+ }
+
+ stats.BytesSent = buffer->Size();
+ return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last);
+}
+
+void TS3BufferRaw::Clear() {
+ Y_VERIFY(Flush(false));
+}
+
+bool TS3BufferRaw::IsFilled() const {
+ return Rows >= GetRowsLimit() || Buffer.Size() >= GetBytesLimit();
+}
+
+TString TS3BufferRaw::GetError() const {
+ Y_FAIL("unreachable");
+}
+
+TMaybe<TBuffer> TS3BufferRaw::Flush(bool) {
+ Rows = 0;
+ BytesRead = 0;
+ return std::exchange(Buffer, TBuffer());
+}
+
+NExportScan::IBuffer* CreateS3ExportBufferRaw(
+ const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit)
+{
+ return new TS3BufferRaw(columns, rowsLimit, bytesLimit);
+}
+
+} // NDataShard
+} // NKikimr
+
+#endif // KIKIMR_DISABLE_S3_OPS
diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.h b/ydb/core/tx/datashard/export_s3_buffer_raw.h
new file mode 100644
index 00000000000..25efb7977f5
--- /dev/null
+++ b/ydb/core/tx/datashard/export_s3_buffer_raw.h
@@ -0,0 +1,50 @@
+#pragma once
+
+#ifndef KIKIMR_DISABLE_S3_OPS
+
+#include "export_s3_buffer.h"
+
+#include <util/generic/buffer.h>
+
+namespace NKikimr {
+namespace NDataShard {
+
+class TS3BufferRaw: public NExportScan::IBuffer {
+ using TTagToColumn = IExport::TTableColumns;
+ using TTagToIndex = THashMap<ui32, ui32>; // index in IScan::TRow
+
+public:
+ explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit);
+
+ void ColumnsOrder(const TVector<ui32>& tags) override;
+ bool Collect(const NTable::IScan::TRow& row) override;
+ IEventBase* PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) override;
+ void Clear() override;
+ bool IsFilled() const override;
+ TString GetError() const override;
+
+protected:
+ inline ui64 GetRowsLimit() const { return RowsLimit; }
+ inline ui64 GetBytesLimit() const { return BytesLimit; }
+
+ void Collect(const NTable::IScan::TRow& row, IOutputStream& out);
+ virtual TMaybe<TBuffer> Flush(bool prepare);
+
+private:
+ const TTagToColumn Columns;
+ const ui64 RowsLimit;
+ const ui64 BytesLimit;
+
+ TTagToIndex Indices;
+
+protected:
+ ui64 Rows;
+ ui64 BytesRead;
+ TBuffer Buffer;
+
+}; // TS3BufferRaw
+
+} // NDataShard
+} // NKikimr
+
+#endif // KIKIMR_DISABLE_S3_OPS
diff --git a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
new file mode 100644
index 00000000000..e229916aeef
--- /dev/null
+++ b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
@@ -0,0 +1,124 @@
+#ifndef KIKIMR_DISABLE_S3_OPS
+
+#include "export_s3_buffer_raw.h"
+
+#include <contrib/libs/zstd/include/zstd.h>
+
+namespace {
+
+ struct DestroyZCtx {
+ static void Destroy(::ZSTD_CCtx* p) noexcept {
+ ZSTD_freeCCtx(p);
+ }
+ };
+
+} // anonymous
+
+namespace NKikimr {
+namespace NDataShard {
+
+class TS3BufferZstd: public TS3BufferRaw {
+ enum ECompressionResult {
+ CONTINUE,
+ DONE,
+ ERROR,
+ };
+
+ ECompressionResult Compress(ZSTD_inBuffer* input, ZSTD_EndDirective endOp) {
+ auto output = ZSTD_outBuffer{Buffer.Data(), Buffer.Capacity(), Buffer.Size()};
+ auto res = ZSTD_compressStream2(Context.Get(), &output, input, endOp);
+
+ if (ZSTD_isError(res)) {
+ ErrorCode = res;
+ return ERROR;
+ }
+
+ if (res > 0) {
+ Buffer.Reserve(output.pos + res);
+ }
+
+ Buffer.Proceed(output.pos);
+ return res ? CONTINUE : DONE;
+ }
+
+ void Reset() {
+ ZSTD_CCtx_reset(Context.Get(), ZSTD_reset_session_only);
+ ZSTD_CCtx_refCDict(Context.Get(), NULL);
+ ZSTD_CCtx_setParameter(Context.Get(), ZSTD_c_compressionLevel, CompressionLevel);
+ }
+
+public:
+ explicit TS3BufferZstd(int compressionLevel,
+ const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit)
+ : TS3BufferRaw(columns, rowsLimit, bytesLimit)
+ , CompressionLevel(compressionLevel)
+ , Context(ZSTD_createCCtx())
+ , BytesRaw(0)
+ {
+ Reset();
+ }
+
+ bool Collect(const NTable::IScan::TRow& row) override {
+ BufferRaw.clear();
+ TStringOutput out(BufferRaw);
+ TS3BufferRaw::Collect(row, out);
+
+ BytesRaw += BufferRaw.size();
+
+ auto input = ZSTD_inBuffer{BufferRaw.data(), BufferRaw.size(), 0};
+ while (input.pos < input.size) {
+ if (ERROR == Compress(&input, ZSTD_e_continue)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ bool IsFilled() const override {
+ return Rows >= GetRowsLimit() || BytesRaw >= GetBytesLimit();
+ }
+
+ TString GetError() const override {
+ return ZSTD_getErrorName(ErrorCode);
+ }
+
+protected:
+ TMaybe<TBuffer> Flush(bool prepare) override {
+ if (prepare && Buffer) {
+ ECompressionResult res;
+ auto input = ZSTD_inBuffer{NULL, 0, 0};
+
+ do {
+ if (res = Compress(&input, ZSTD_e_end); res == ERROR) {
+ return Nothing();
+ }
+ } while (res != DONE);
+ }
+
+ Reset();
+
+ BytesRaw = 0;
+ return TS3BufferRaw::Flush(prepare);
+ }
+
+private:
+ const int CompressionLevel;
+
+ THolder<::ZSTD_CCtx, DestroyZCtx> Context;
+ size_t ErrorCode;
+ ui64 BytesRaw;
+ TString BufferRaw;
+
+}; // TS3BufferZstd
+
+NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel,
+ const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit)
+{
+ return new TS3BufferZstd(compressionLevel, columns, rowsLimit, bytesLimit);
+}
+
+} // NDataShard
+} // NKikimr
+
+#endif // KIKIMR_DISABLE_S3_OPS
diff --git a/ydb/core/tx/datashard/export_scan.cpp b/ydb/core/tx/datashard/export_scan.cpp
index b0c40aec224..d1995c2f863 100644
--- a/ydb/core/tx/datashard/export_scan.cpp
+++ b/ydb/core/tx/datashard/export_scan.cpp
@@ -28,15 +28,9 @@ class TExportScan: private NActors::IActor, public NTable::IScan {
ES_COUNT,
};
- struct TStats {
- ui64 Rows;
- ui64 BytesRead;
- ui64 BytesSent;
-
+ struct TStats: public IBuffer::TStats {
TStats()
- : Rows(0)
- , BytesRead(0)
- , BytesSent(0)
+ : IBuffer::TStats()
{
auto counters = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "store_to_yt");
@@ -55,8 +49,8 @@ class TExportScan: private NActors::IActor, public NTable::IScan {
*MonBytesSent += bytesSent;
}
- void Aggr(IBuffer const* buffer) {
- Aggr(buffer->GetRows(), buffer->GetBytesRead(), buffer->GetBytesSent());
+ void Aggr(const IBuffer::TStats& stats) {
+ Aggr(stats.Rows, stats.BytesRead, stats.BytesSent);
}
TString ToString() const {
@@ -96,9 +90,18 @@ class TExportScan: private NActors::IActor, public NTable::IScan {
return EScan::Sleep;
}
+ IBuffer::TStats stats;
+ THolder<IEventBase> ev{Buffer->PrepareEvent(noMoreData, stats)};
+
+ if (!ev) {
+ Success = false;
+ Error = Buffer->GetError();
+ return EScan::Final;
+ }
+
+ Send(Uploader, std::move(ev));
State.Set(ES_BUFFER_SENT);
- Stats->Aggr(Buffer.Get());
- Send(Uploader, Buffer->PrepareEvent(noMoreData));
+ Stats->Aggr(stats);
if (noMoreData) {
Spent->Alter(false);
@@ -199,7 +202,12 @@ public:
}
EScan Feed(TArrayRef<const TCell>, const TRow& row) noexcept override {
- Buffer->Collect(row);
+ if (!Buffer->Collect(row)) {
+ Success = false;
+ Error = Buffer->GetError();
+ return EScan::Final;
+ }
+
return MaybeSendBuffer();
}
diff --git a/ydb/core/tx/datashard/export_scan.h b/ydb/core/tx/datashard/export_scan.h
index d077576a607..9d983be5954 100644
--- a/ydb/core/tx/datashard/export_scan.h
+++ b/ydb/core/tx/datashard/export_scan.h
@@ -35,7 +35,7 @@ struct TEvExportScan {
TEvBuffer() = default;
- explicit TEvBuffer(TBuffer buffer, bool last)
+ explicit TEvBuffer(TBuffer&& buffer, bool last)
: Buffer(std::move(buffer))
, Last(last)
{
@@ -92,17 +92,21 @@ class IBuffer {
public:
using TPtr = THolder<IBuffer>;
+ struct TStats {
+ ui64 Rows = 0;
+ ui64 BytesRead = 0;
+ ui64 BytesSent = 0;
+ };
+
public:
virtual ~IBuffer() = default;
virtual void ColumnsOrder(const TVector<ui32>& tags) = 0;
- virtual void Collect(const NTable::IScan::TRow& row) = 0;
- virtual IEventBase* PrepareEvent(bool last) = 0;
+ virtual bool Collect(const NTable::IScan::TRow& row) = 0;
+ virtual IEventBase* PrepareEvent(bool last, TStats& stats) = 0;
virtual void Clear() = 0;
virtual bool IsFilled() const = 0;
- virtual ui64 GetRows() const = 0;
- virtual ui64 GetBytesRead() const = 0;
- virtual ui64 GetBytesSent() const = 0;
+ virtual TString GetError() const = 0;
};
} // NExportScan
diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp
index 3d4f20c2e22..56630c389d8 100644
--- a/ydb/core/tx/datashard/import_s3.cpp
+++ b/ydb/core/tx/datashard/import_s3.cpp
@@ -13,16 +13,32 @@
#include <ydb/core/io_formats/csv.h>
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
+#include <contrib/libs/zstd/include/zstd.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <util/generic/buffer.h>
#include <util/generic/ptr.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
#include <util/memory/pool.h>
#include <util/string/builder.h>
+namespace {
+
+ struct DestroyZCtx {
+ static void Destroy(::ZSTD_DCtx* p) noexcept {
+ ZSTD_freeDCtx(p);
+ }
+ };
+
+ constexpr ui64 SumWithSaturation(ui64 a, ui64 b) {
+ return Max<ui64>() - a < b ? Max<ui64>() : a + b;
+ }
+
+} // anonymous
+
namespace NKikimr {
namespace NDataShard {
@@ -33,12 +49,28 @@ using namespace Aws::S3;
using namespace Aws;
class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
- class TReadController {
- static constexpr ui64 SumWithSaturation(ui64 a, ui64 b) {
- return Max<ui64>() - a < b ? Max<ui64>() : a + b;
- }
+ class IReadController {
+ public:
+ enum EFeed {
+ READY_DATA = 0,
+ NOT_ENOUGH_DATA = 1,
+ ERROR = 2,
+ };
public:
+ virtual ~IReadController() = default;
+ // Returns status or error
+ virtual EFeed Feed(TString&& portion, TString& error) = 0;
+ // Returns view to internal buffer with ready data after successful Feed()
+ virtual TStringBuf GetReadyData() const = 0;
+ // Clear internal buffer & makes it ready for another Feed()
+ virtual void Confirm() = 0;
+ virtual ui64 PendingBytes() const = 0;
+ virtual ui64 ReadyBytes() const = 0;
+ };
+
+ class TReadController: public IReadController {
+ public:
explicit TReadController(ui32 rangeSize, ui64 bufferSizeLimit)
: RangeSize(rangeSize)
, BufferSizeLimit(bufferSizeLimit)
@@ -49,58 +81,170 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
Y_VERIFY(contentLength > 0);
Y_VERIFY(processedBytes < contentLength);
- const ui64 start = processedBytes + (Buffer.size() - Pos);
+ const ui64 start = processedBytes + PendingBytes();
const ui64 end = Min(SumWithSaturation(start, RangeSize), contentLength) - 1;
return std::make_pair(start, end);
}
- bool Feed(TString&& portion, TStringBuf& buf) {
- if (Buffer && Pos) {
- Buffer.remove(0, Pos);
- Pos = 0;
+ protected:
+ bool CheckBufferSize(size_t size, TString& reason) const {
+ if (size >= BufferSizeLimit) {
+ reason = "reached buffer size limit";
+ return false;
}
- if (!Buffer) {
- Buffer = std::move(portion);
- } else {
- Buffer.append(portion);
- }
+ return true;
+ }
+
+ TStringBuf AsStringBuf(size_t size) const {
+ return TStringBuf(Buffer.Data(), size);
+ }
+
+ private:
+ const ui32 RangeSize;
+ const ui64 BufferSizeLimit;
+
+ protected:
+ TBuffer Buffer;
+
+ }; // TReadController
+
+ class TReadControllerRaw: public TReadController {
+ public:
+ using TReadController::TReadController;
+
+ EFeed Feed(TString&& portion, TString& error) override {
+ Y_VERIFY(Pos == 0);
+
+ Buffer.Append(portion.data(), portion.size());
+ const ui64 pos = AsStringBuf(Buffer.Size()).rfind('\n');
- const ui64 pos = Buffer.rfind('\n');
if (TString::npos == pos) {
- return false;
+ if (!CheckBufferSize(Buffer.Size(), error)) {
+ return ERROR;
+ } else {
+ return NOT_ENOUGH_DATA;
+ }
}
Pos = pos + 1; // for '\n'
- buf = TStringBuf(Buffer.data(), Pos);
+ return READY_DATA;
+ }
- return true;
+ TStringBuf GetReadyData() const override {
+ return AsStringBuf(Pos);
}
- bool CanExpandBuffer(ui64 contentLength, ui64 processedBytes, TString& reason) const {
- const auto size = Buffer.size();
+ void Confirm() override {
+ Buffer.ChopHead(Pos);
+ Pos = 0;
+ }
- if (SumWithSaturation(processedBytes, size) >= contentLength) {
- reason = "reached end of file";
- return false;
+ ui64 PendingBytes() const override {
+ return Buffer.Size();
+ }
+
+ ui64 ReadyBytes() const override {
+ return Pos;
+ }
+
+ private:
+ ui64 Pos = 0;
+ };
+
+ class TReadControllerZstd: public TReadController {
+ void Reset() {
+ ZSTD_DCtx_reset(Context.Get(), ZSTD_reset_session_only);
+ ZSTD_DCtx_refDDict(Context.Get(), NULL);
+ }
+
+ public:
+ explicit TReadControllerZstd(ui32 rangeSize, ui64 bufferSizeLimit)
+ : TReadController(rangeSize, bufferSizeLimit)
+ , Context(ZSTD_createDCtx())
+ {
+ Reset();
+ }
+
+ EFeed Feed(TString&& portion, TString& error) override {
+ Y_VERIFY(ReadyInputBytes == 0 && ReadyOutputPos == 0);
+
+ auto input = ZSTD_inBuffer{portion.data(), portion.size(), 0};
+ while (input.pos < input.size) {
+ PendingInputBytes -= input.pos; // dec before decompress
+
+ auto output = ZSTD_outBuffer{Buffer.Data(), Buffer.Capacity(), PendingOutputPos};
+ auto res = ZSTD_decompressStream(Context.Get(), &output, &input);
+
+ if (ZSTD_isError(res)) {
+ error = ZSTD_getErrorName(res);
+ return ERROR;
+ }
+
+ if (output.pos == output.size) {
+ if (!CheckBufferSize(Buffer.Capacity(), error)) {
+ return ERROR;
+ }
+
+ Buffer.Reserve(Buffer.Capacity() + ZSTD_BLOCKSIZE_MAX);
+ }
+
+ PendingInputBytes += input.pos; // inc after decompress
+ PendingOutputPos = output.pos;
+
+ if (res == 0) {
+ if (AsStringBuf(output.pos).back() != '\n') {
+ error = "cannot find new line symbol";
+ return ERROR;
+ }
+
+ ReadyInputBytes = PendingInputBytes;
+ ReadyOutputPos = PendingOutputPos;
+ Reset();
+ }
}
- if (size >= BufferSizeLimit) {
- reason = "reached buffer size limit";
- return false;
+ if (!ReadyOutputPos) {
+ if (!CheckBufferSize(PendingOutputPos, error)) {
+ return ERROR;
+ } else {
+ return NOT_ENOUGH_DATA;
+ }
}
- return true;
+ Buffer.Proceed(ReadyOutputPos);
+ return READY_DATA;
}
- private:
- const ui32 RangeSize;
- const ui64 BufferSizeLimit;
+ TStringBuf GetReadyData() const override {
+ return AsStringBuf(ReadyOutputPos);
+ }
- TString Buffer;
- ui64 Pos = 0;
+ void Confirm() override {
+ Buffer.ChopHead(ReadyOutputPos);
- }; // TReadController
+ PendingInputBytes -= ReadyInputBytes;
+ ReadyInputBytes = 0;
+
+ PendingOutputPos -= ReadyOutputPos;
+ ReadyOutputPos = 0;
+ }
+
+ ui64 PendingBytes() const override {
+ return PendingInputBytes;
+ }
+
+ ui64 ReadyBytes() const override {
+ return ReadyInputBytes;
+ }
+
+ private:
+ THolder<::ZSTD_DCtx, DestroyZCtx> Context;
+ ui64 PendingInputBytes = 0;
+ ui64 ReadyInputBytes = 0;
+ ui64 PendingOutputPos = 0;
+ ui64 ReadyOutputPos = 0;
+ };
class TUploadRowsRequestBuilder {
public:
@@ -176,9 +320,9 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
Send(client, new TEvents::TEvPoisonPill());
}
- Client = RegisterWithSameMailbox(CreateS3Wrapper(Settings.Credentials, Settings.Config));
+ Client = RegisterWithSameMailbox(CreateS3Wrapper(Settings.GetCredentials(), Settings.GetConfig()));
- HeadObject(Settings.DataKey);
+ HeadObject(Settings.GetDataKey(DataFormat, CompressionCodec));
Become(&TThis::StateWork);
}
@@ -187,7 +331,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
<< ": key# " << key);
auto request = Model::HeadObjectRequest()
- .WithBucket(Settings.Bucket)
+ .WithBucket(Settings.GetBucket())
.WithKey(key);
Send(Client, new TEvS3Wrapper::TEvHeadObjectRequest(request));
@@ -199,7 +343,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
<< ", range# " << range.first << "-" << range.second);
auto request = Model::GetObjectRequest()
- .WithBucket(Settings.Bucket)
+ .WithBucket(Settings.GetBucket())
.WithKey(key)
.WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second);
@@ -210,8 +354,35 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
IMPORT_LOG_D("Handle " << ev->Get()->ToString());
const auto& result = ev->Get()->Result;
- if (!CheckResult(result, TStringBuf("HeadObject"))) {
- return;
+ if (!result.IsSuccess()) {
+ switch (result.GetError().GetErrorType()) {
+ case S3Errors::RESOURCE_NOT_FOUND:
+ case S3Errors::NO_SUCH_KEY:
+ break;
+ default:
+ IMPORT_LOG_E("Error at 'HeadObject'"
+ << ": error# " << result);
+ return RestartOrFinish(result.GetError().GetMessage().c_str());
+ }
+
+ CompressionCodec = NBackupRestoreTraits::NextCompressionCodec(CompressionCodec);
+ if (CompressionCodec == NBackupRestoreTraits::ECompressionCodec::Invalid) {
+ return Finish(false, TStringBuilder() << "Cannot find any supported data file"
+ << ": prefix# " << Settings.GetObjectKeyPattern());
+ }
+
+ return HeadObject(Settings.GetDataKey(DataFormat, CompressionCodec));
+ }
+
+ switch (CompressionCodec) {
+ case NBackupRestoreTraits::ECompressionCodec::None:
+ Reader.Reset(new TReadControllerRaw(ReadBatchSize, ReadBufferSizeLimit));
+ break;
+ case NBackupRestoreTraits::ECompressionCodec::Zstd:
+ Reader.Reset(new TReadControllerZstd(ReadBatchSize, ReadBufferSizeLimit));
+ break;
+ case NBackupRestoreTraits::ECompressionCodec::Invalid:
+ Y_FAIL("unreachable");
}
ETag = result.GetResult().GetETag();
@@ -251,7 +422,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
return Finish();
}
- GetObject(Settings.DataKey, Reader.NextRange(ContentLength, ProcessedBytes));
+ GetObject(Settings.GetDataKey(DataFormat, CompressionCodec),
+ Reader->NextRange(ContentLength, ProcessedBytes));
}
void Handle(TEvS3Wrapper::TEvGetObjectResponse::TPtr& ev) {
@@ -274,34 +446,46 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
<< ", content-length# " << ContentLength
<< ", body-size# " << msg.Body.size());
- if (!Reader.Feed(std::move(msg.Body), Buffer)) {
- TString reason;
- if (!Reader.CanExpandBuffer(ContentLength, ProcessedBytes, reason)) {
- return Finish(false, TStringBuilder() << "Cannot find new line symbol in data"
- << ": " << reason);
+ TString error;
+ switch (Reader->Feed(std::move(msg.Body), error)) {
+ case TReadController::READY_DATA:
+ break;
+
+ case TReadController::NOT_ENOUGH_DATA:
+ if (SumWithSaturation(ProcessedBytes, Reader->PendingBytes()) < ContentLength) {
+ return GetObject(Settings.GetDataKey(DataFormat, CompressionCodec),
+ Reader->NextRange(ContentLength, ProcessedBytes));
+ } else {
+ error = "reached end of file";
}
+ [[fallthrough]];
- IMPORT_LOG_W("Cannot find new line symbol, request additional data"
- << ": processed-bytes# " << ProcessedBytes
- << ", content-length# " << ContentLength);
- return GetObject(Settings.DataKey, Reader.NextRange(ContentLength, ProcessedBytes));
+ default: // ERROR
+ return Finish(false, TStringBuilder() << "Cannot process data"
+ << ": " << error);
}
- ProcessedBytes += Buffer.size();
- RequestBuilder.New(TableInfo, Scheme);
+ if (auto data = Reader->GetReadyData()) {
+ ProcessedBytes += Reader->ReadyBytes();
+ RequestBuilder.New(TableInfo, Scheme);
+
+ TMemoryPool pool(256);
+ while (ProcessData(data, pool));
- TMemoryPool pool(256);
- while (ProcessData(pool));
+ Reader->Confirm();
+ } else {
+ Y_FAIL("unreachable");
+ }
}
- bool ProcessData(TMemoryPool& pool) {
+ bool ProcessData(TStringBuf& data, TMemoryPool& pool) {
pool.Clear();
- TStringBuf line = Buffer.NextTok('\n');
+ TStringBuf line = data.NextTok('\n');
const TStringBuf origLine = line;
if (!line) {
- if (Buffer) {
+ if (data) {
return true; // skip empty line
}
@@ -508,11 +692,14 @@ public:
: DataShard(dataShard)
, TxId(txId)
, Settings(TS3Settings::FromRestoreTask(task))
+ , DataFormat(NBackupRestoreTraits::EDataFormat::Csv)
+ , CompressionCodec(NBackupRestoreTraits::ECompressionCodec::None)
, TableInfo(tableInfo)
, Scheme(task.GetTableDescription())
, LogPrefix_(TStringBuilder() << "s3:" << TxId)
, Retries(task.GetNumberOfRetries())
- , Reader(task.GetS3Settings().GetLimits().GetReadBatchSize(), task.GetS3Settings().GetLimits().GetReadBufferSizeLimit())
+ , ReadBatchSize(task.GetS3Settings().GetLimits().GetReadBatchSize())
+ , ReadBufferSizeLimit(task.GetS3Settings().GetLimits().GetReadBufferSizeLimit())
{
}
@@ -551,6 +738,8 @@ private:
const TActorId DataShard;
const ui64 TxId;
const TS3Settings Settings;
+ const NBackupRestoreTraits::EDataFormat DataFormat;
+ NBackupRestoreTraits::ECompressionCodec CompressionCodec;
const TTableInfo TableInfo;
const NKikimrSchemeOp::TTableDescription Scheme;
const TString LogPrefix_;
@@ -570,8 +759,9 @@ private:
ui64 WrittenBytes = 0;
ui64 WrittenRows = 0;
- TReadController Reader;
- TStringBuf Buffer;
+ const ui32 ReadBatchSize;
+ const ui64 ReadBufferSizeLimit;
+ THolder<TReadController> Reader;
TUploadRowsRequestBuilder RequestBuilder;
}; // TS3Downloader
diff --git a/ydb/core/tx/datashard/s3_common.h b/ydb/core/tx/datashard/s3_common.h
index 8e4b4c14cb9..db81617d01d 100644
--- a/ydb/core/tx/datashard/s3_common.h
+++ b/ydb/core/tx/datashard/s3_common.h
@@ -3,6 +3,7 @@
#ifndef KIKIMR_DISABLE_S3_OPS
#include "defs.h"
+#include "backup_restore_traits.h"
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/auth/AWSCredentials.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/client/ClientConfiguration.h>
@@ -10,7 +11,6 @@
#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <util/string/builder.h>
#include <util/string/printf.h>
namespace NKikimr {
@@ -22,6 +22,7 @@ inline Aws::Client::ClientConfiguration ConfigFromSettings(const NKikimrSchemeOp
config.endpointOverride = settings.GetEndpoint();
config.connectTimeoutMs = 10000;
config.maxConnections = 5;
+ config.caPath = "/etc/ssl/certs";
switch (settings.GetScheme()) {
case NKikimrSchemeOp::TS3Settings::HTTP:
@@ -61,31 +62,44 @@ inline Aws::Auth::AWSCredentials CredentialsFromSettings(const NKikimrSchemeOp::
return Aws::Auth::AWSCredentials(settings.GetAccessKey(), settings.GetSecretKey());
}
-struct TS3Settings {
+class TS3Settings {
Aws::Client::ClientConfiguration Config;
- Aws::Auth::AWSCredentials Credentials;
- TString Bucket;
- TString SchemeKey;
- TString DataKey;
- Aws::S3::Model::StorageClass StorageClass;
+ const Aws::Auth::AWSCredentials Credentials;
+ const TString Bucket;
+ const TString ObjectKeyPattern;
+ const ui32 Shard;
+ const Ydb::Export::ExportToS3Settings::StorageClass StorageClass;
-private:
explicit TS3Settings(const NKikimrSchemeOp::TS3Settings& settings, ui32 shard)
: Config(ConfigFromSettings(settings))
, Credentials(CredentialsFromSettings(settings))
, Bucket(settings.GetBucket())
- , SchemeKey(TStringBuilder() << settings.GetObjectKeyPattern() << "/scheme.pb")
- , DataKey(TStringBuilder() << settings.GetObjectKeyPattern() << Sprintf("/data_%02d.csv", shard))
- , StorageClass(ConvertStorageClass(settings.GetStorageClass()))
+ , ObjectKeyPattern(settings.GetObjectKeyPattern())
+ , Shard(shard)
+ , StorageClass(settings.GetStorageClass())
{
- Config.caPath = "/etc/ssl/certs";
}
- static Aws::S3::Model::StorageClass ConvertStorageClass(Ydb::Export::ExportToS3Settings::StorageClass value) {
+public:
+ static TS3Settings FromBackupTask(const NKikimrSchemeOp::TBackupTask& task) {
+ return TS3Settings(task.GetS3Settings(), task.GetShardNum());
+ }
+
+ static TS3Settings FromRestoreTask(const NKikimrSchemeOp::TRestoreTask& task) {
+ return TS3Settings(task.GetS3Settings(), task.GetShardNum());
+ }
+
+ inline const Aws::Client::ClientConfiguration& GetConfig() const { return Config; }
+ inline Aws::Client::ClientConfiguration& ConfigRef() { return Config; }
+ inline const Aws::Auth::AWSCredentials& GetCredentials() const { return Credentials; }
+ inline const TString& GetBucket() const { return Bucket; }
+ inline const TString& GetObjectKeyPattern() const { return ObjectKeyPattern; }
+
+ inline Aws::S3::Model::StorageClass GetStorageClass() const {
using ExportToS3Settings = Ydb::Export::ExportToS3Settings;
using AwsStorageClass = Aws::S3::Model::StorageClass;
- switch (value) {
+ switch (StorageClass) {
case ExportToS3Settings::STORAGE_CLASS_UNSPECIFIED:
return AwsStorageClass::NOT_SET;
case ExportToS3Settings::STANDARD:
@@ -109,13 +123,15 @@ private:
}
}
-public:
- static TS3Settings FromBackupTask(const NKikimrSchemeOp::TBackupTask& task) {
- return TS3Settings(task.GetS3Settings(), task.GetShardNum());
+ inline TString GetSchemeKey() const {
+ return NBackupRestoreTraits::SchemeKey(ObjectKeyPattern);
}
- static TS3Settings FromRestoreTask(const NKikimrSchemeOp::TRestoreTask& task) {
- return TS3Settings(task.GetS3Settings(), task.GetShardNum());
+ inline TString GetDataKey(
+ NBackupRestoreTraits::EDataFormat format,
+ NBackupRestoreTraits::ECompressionCodec codec) const
+ {
+ return NBackupRestoreTraits::DataKey(ObjectKeyPattern, Shard, format, codec);
}
}; // TS3Settings
diff --git a/ydb/core/tx/schemeshard/ut_backup.cpp b/ydb/core/tx/schemeshard/ut_backup.cpp
new file mode 100644
index 00000000000..6342395177e
--- /dev/null
+++ b/ydb/core/tx/schemeshard/ut_backup.cpp
@@ -0,0 +1,120 @@
+#include "ut_backup_restore_common.h"
+
+#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
+#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
+
+#include <util/string/cast.h>
+#include <util/string/printf.h>
+
+using namespace NSchemeShardUT_Private;
+using namespace NKikimr::NWrappers::NTestHelpers;
+
+Y_UNIT_TEST_SUITE(TBackupTests) {
+ using TFillFn = std::function<void(TTestBasicRuntime&)>;
+
+ void Backup(TTestBasicRuntime& runtime, const TString& compressionCodec,
+ const TString& creationScheme, TFillFn fill, ui32 rowsBatchSize = 128)
+ {
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TS3Mock s3Mock({}, TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", creationScheme);
+ env.TestWaitNotification(runtime, txId);
+
+ fill(runtime);
+
+ const auto tableDesc = DescribePath(runtime, "/MyRoot/Table", true, true);
+ TString tableSchema;
+ UNIT_ASSERT(google::protobuf::TextFormat::PrintToString(tableDesc.GetPathDescription(), &tableSchema));
+
+ TestBackup(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ Table {
+ %s
+ }
+ S3Settings {
+ Endpoint: "localhost:%d"
+ Scheme: HTTP
+ }
+ ScanSettings {
+ RowsBatchSize: %d
+ }
+ Compression {
+ Codec: "%s"
+ }
+ )", tableSchema.c_str(), port, rowsBatchSize, compressionCodec.c_str()));
+ env.TestWaitNotification(runtime, txId);
+ }
+
+ void WriteRow(TTestBasicRuntime& runtime, ui64 tabletId, const TString& key, const TString& value) {
+ NKikimrMiniKQL::TResult result;
+ TString error;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
+ (
+ (let key '( '('key (Utf8 '%s) ) ) )
+ (let row '( '('value (Utf8 '%s) ) ) )
+ (return (AsList (UpdateRow '__user__Table key row) ))
+ )
+ )", key.c_str(), value.c_str()), result, error);
+
+ UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
+ UNIT_ASSERT_VALUES_EQUAL(error, "");
+ }
+
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnSingleShardTable) {
+ TTestBasicRuntime runtime;
+
+ Backup(runtime, ToString(Codec), R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )", [](TTestBasicRuntime& runtime) {
+ WriteRow(runtime, TTestTxConfig::FakeHiveTablets, "a", "valueA");
+ });
+ }
+
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnMultiShardTable) {
+ TTestBasicRuntime runtime;
+
+ Backup(runtime, ToString(Codec), R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ SplitBoundary {
+ KeyPrefix {
+ Tuple { Optional { Text: "b" } }
+ }
+ }
+ )", [](TTestBasicRuntime& runtime) {
+ WriteRow(runtime, TTestTxConfig::FakeHiveTablets + 0, "a", "valueA");
+ WriteRow(runtime, TTestTxConfig::FakeHiveTablets + 1, "b", "valueb");
+ });
+ }
+
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnLargeData) {
+ TTestBasicRuntime runtime;
+ const ui32 batchSize = 10;
+
+ Backup(runtime, ToString(Codec), R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )", [](TTestBasicRuntime& runtime) {
+ for (ui32 i = 0; i < 2 * batchSize; ++i) {
+ WriteRow(runtime, TTestTxConfig::FakeHiveTablets, Sprintf("a%d", i), "valueA");
+ }
+ }, batchSize);
+ }
+
+} // TBackupTests
diff --git a/ydb/core/tx/schemeshard/ut_backup/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..74529d7845c
--- /dev/null
+++ b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.darwin.txt
@@ -0,0 +1,54 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-schemeshard-ut_backup)
+target_compile_options(ydb-core-tx-schemeshard-ut_backup PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-schemeshard-ut_backup PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard
+)
+target_link_libraries(ydb-core-tx-schemeshard-ut_backup PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ core-tx-schemeshard
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ ydb-core-testlib
+ ydb-core-tx
+ tx-schemeshard-ut_helpers
+ core-wrappers-ut_helpers
+ udf-service-exception_policy
+)
+target_link_options(ydb-core-tx-schemeshard-ut_backup PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-core-tx-schemeshard-ut_backup PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_backup.cpp
+)
+add_test(
+ NAME
+ ydb-core-tx-schemeshard-ut_backup
+ COMMAND
+ ydb-core-tx-schemeshard-ut_backup
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-tx-schemeshard-ut_backup)
diff --git a/ydb/core/tx/schemeshard/ut_backup/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.linux.txt
new file mode 100644
index 00000000000..f9277e31c17
--- /dev/null
+++ b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.linux.txt
@@ -0,0 +1,57 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-schemeshard-ut_backup)
+target_compile_options(ydb-core-tx-schemeshard-ut_backup PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-schemeshard-ut_backup PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard
+)
+target_link_libraries(ydb-core-tx-schemeshard-ut_backup PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ core-tx-schemeshard
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ ydb-core-testlib
+ ydb-core-tx
+ tx-schemeshard-ut_helpers
+ core-wrappers-ut_helpers
+ udf-service-exception_policy
+)
+target_link_options(ydb-core-tx-schemeshard-ut_backup PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-tx-schemeshard-ut_backup PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_backup.cpp
+)
+add_test(
+ NAME
+ ydb-core-tx-schemeshard-ut_backup
+ COMMAND
+ ydb-core-tx-schemeshard-ut_backup
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-tx-schemeshard-ut_backup)
diff --git a/ydb/core/tx/schemeshard/ut_backup/CMakeLists.txt b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.txt
new file mode 100644
index 00000000000..a681d385f3e
--- /dev/null
+++ b/ydb/core/tx/schemeshard/ut_backup/CMakeLists.txt
@@ -0,0 +1,13 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (UNIX)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/core/tx/schemeshard/ut_backup_restore_common.h b/ydb/core/tx/schemeshard/ut_backup_restore_common.h
new file mode 100644
index 00000000000..ec3eb3b9383
--- /dev/null
+++ b/ydb/core/tx/schemeshard/ut_backup_restore_common.h
@@ -0,0 +1,16 @@
+#include <ydb/core/tx/datashard/backup_restore_traits.h>
+
+using EDataFormat = NKikimr::NDataShard::NBackupRestoreTraits::EDataFormat;
+using ECompressionCodec = NKikimr::NDataShard::NBackupRestoreTraits::ECompressionCodec;
+
+#define Y_UNIT_TEST_WITH_COMPRESSION(N) \
+ template<ECompressionCodec Codec> void N(NUnitTest::TTestContext&); \
+ struct TTestRegistration##N { \
+ TTestRegistration##N() { \
+ TCurrentTest::AddTest(#N "[Raw]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<ECompressionCodec::None>), false); \
+ TCurrentTest::AddTest(#N "[Zstd]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<ECompressionCodec::Zstd>), false); \
+ } \
+ }; \
+ static TTestRegistration##N testRegistration##N; \
+ template<ECompressionCodec Codec> \
+ void N(NUnitTest::TTestContext&)
diff --git a/ydb/core/tx/schemeshard/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore.cpp
index 9a84fc19caa..42272ebe116 100644
--- a/ydb/core/tx/schemeshard/ut_restore.cpp
+++ b/ydb/core/tx/schemeshard/ut_restore.cpp
@@ -1,3 +1,5 @@
+#include "ut_backup_restore_common.h"
+
#include <contrib/libs/double-conversion/double-conversion/ieee.h>
#include <ydb/core/base/localdb.h>
@@ -10,6 +12,8 @@
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
#include <ydb/core/metering/metering.h>
#include <ydb/core/ydb_convert/table_description.h>
+
+#include <contrib/libs/zstd/include/zstd.h>
#include <library/cpp/string_utils/quote/quote.h>
#include <util/datetime/base.h>
@@ -73,14 +77,43 @@ namespace {
}
struct TTestData {
- TString Csv;
+ TString Data;
TString YsonStr;
+ EDataFormat DataFormat = EDataFormat::Csv;
+ ECompressionCodec CompressionCodec;
- TTestData(TString csv, TString ysonStr)
- : Csv(std::move(csv))
+ TTestData(TString data, TString ysonStr, ECompressionCodec codec = ECompressionCodec::None)
+ : Data(std::move(data))
, YsonStr(std::move(ysonStr))
+ , CompressionCodec(codec)
{
}
+
+ TString Ext() const {
+ TStringBuilder result;
+
+ switch (DataFormat) {
+ case EDataFormat::Csv:
+ result << ".csv";
+ break;
+ case EDataFormat::Invalid:
+ UNIT_ASSERT_C(false, "Invalid data format");
+ break;
+ }
+
+ switch (CompressionCodec) {
+ case ECompressionCodec::None:
+ break;
+ case ECompressionCodec::Zstd:
+ result << ".zst";
+ break;
+ case ECompressionCodec::Invalid:
+ UNIT_ASSERT_C(false, "Invalid compression codec");
+ break;
+ }
+
+ return result;
+ }
};
struct TTestDataWithScheme {
@@ -130,6 +163,63 @@ namespace {
return TTestData(std::move(csv), std::move(yson));
}
+ TString ZstdCompress(const TStringBuf src) {
+ TString compressed;
+ compressed.resize(ZSTD_compressBound(src.size()));
+
+ const auto res = ZSTD_compress(compressed.Detach(), compressed.size(), src.data(), src.size(), ZSTD_CLEVEL_DEFAULT);
+ UNIT_ASSERT_C(!ZSTD_isError(res), "Zstd error: " << ZSTD_getErrorName(res));
+ compressed.resize(res);
+
+ return compressed;
+ }
+
+ TTestData GenerateZstdTestData(const TString& keyPrefix, ui32 count, ui32 rowsPerFrame = 0) {
+ auto data = GenerateTestData(keyPrefix, count);
+ if (!rowsPerFrame) {
+ rowsPerFrame = count;
+ }
+
+ TString compressed;
+ ui32 start = 0;
+ ui32 rowsInFrame = 0;
+
+ for (ui32 i = 0; i < data.Data.size(); ++i) {
+ const auto c = data.Data[i];
+ const bool last = i == data.Data.size() - 1;
+
+ if (last) {
+ UNIT_ASSERT(c == '\n');
+ }
+
+ if (c == '\n') {
+ if (++rowsInFrame == rowsPerFrame || last) {
+ compressed.append(ZstdCompress(TStringBuf(&data.Data[start], i + 1 - start)));
+
+ start = i + 1;
+ rowsInFrame = 0;
+ }
+ }
+ }
+
+ data.Data = std::move(compressed);
+ data.CompressionCodec = ECompressionCodec::Zstd;
+
+ return data;
+ }
+
+ TTestData GenerateTestData(ECompressionCodec codec, const TString& keyPrefix, ui32 count) {
+ switch (codec) {
+ case ECompressionCodec::None:
+ return GenerateTestData(keyPrefix, count);
+ case ECompressionCodec::Zstd:
+ return GenerateZstdTestData(keyPrefix, count);
+ case ECompressionCodec::Invalid:
+ UNIT_ASSERT_C(false, "Invalid compression codec");
+ Y_FAIL("unreachable");
+ }
+ }
+
TTestDataWithScheme GenerateTestData(const TString& scheme, const TVector<std::pair<TString, ui64>>& shardsConfig) {
TTestDataWithScheme result;
result.Scheme = scheme;
@@ -147,7 +237,8 @@ namespace {
for (const auto& [prefix, item] : data) {
result.emplace(prefix + "/scheme.pb", item.Scheme);
for (ui32 i = 0; i < item.Data.size(); ++i) {
- result.emplace(Sprintf("%s/data_%02d.csv", prefix.data(), i), item.Data.at(i).Csv);
+ const auto& data = item.Data.at(i);
+ result.emplace(Sprintf("%s/data_%02d%s", prefix.data(), i, data.Ext().c_str()), data.Data);
}
}
@@ -265,10 +356,10 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
Restore(runtime, env, creationScheme, std::move(data), readBatchSize);
}
- Y_UNIT_TEST(ShouldSucceedOnSingleShardTable) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnSingleShardTable) {
TTestBasicRuntime runtime;
- const auto data = GenerateTestData("a", 1);
+ const auto data = GenerateTestData(Codec, "a", 1);
Restore(runtime, R"(
Name: "Table"
@@ -281,11 +372,11 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
NKqp::CompareYson(data.YsonStr, content);
}
- Y_UNIT_TEST(ShouldSucceedOnMultiShardTable) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnMultiShardTable) {
TTestBasicRuntime runtime;
- const auto a = GenerateTestData("a", 1);
- const auto b = GenerateTestData("b", 1);
+ const auto a = GenerateTestData(Codec, "a", 1);
+ const auto b = GenerateTestData(Codec, "b", 1);
Restore(runtime, R"(
Name: "Table"
@@ -309,11 +400,11 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
}
}
- Y_UNIT_TEST(ShouldSucceedOnLargeData) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnLargeData) {
TTestBasicRuntime runtime;
- const auto data = GenerateTestData("", 100);
- UNIT_ASSERT(data.Csv.size() > 128);
+ const auto data = GenerateTestData(Codec, "", 100);
+ UNIT_ASSERT(data.Data.size() > 128);
Restore(runtime, R"(
Name: "Table"
@@ -326,10 +417,38 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
NKqp::CompareYson(data.YsonStr, content);
}
- Y_UNIT_TEST(ShouldExpandBuffer) {
+ void ShouldSucceedOnMultipleFrames(ui32 batchSize) {
TTestBasicRuntime runtime;
- const auto data = GenerateTestData("a", 2);
+ const auto data = GenerateZstdTestData("a", 3, 2);
+
+ Restore(runtime, R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )", {data}, batchSize);
+
+ auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets);
+ NKqp::CompareYson(data.YsonStr, content);
+ }
+
+ Y_UNIT_TEST(ShouldSucceedOnMultipleFramesStandardBatch) {
+ ShouldSucceedOnMultipleFrames(128);
+ }
+
+ Y_UNIT_TEST(ShouldSucceedOnMultipleFramesSmallBatch) {
+ ShouldSucceedOnMultipleFrames(7);
+ }
+
+ Y_UNIT_TEST(ShouldSucceedOnMultipleFramesTinyBatch) {
+ ShouldSucceedOnMultipleFrames(1);
+ }
+
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldExpandBuffer) {
+ TTestBasicRuntime runtime;
+
+ const auto data = GenerateTestData(Codec, "a", 2);
const ui32 batchSize = 1;
Restore(runtime, R"(
@@ -414,7 +533,7 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
Columns { Name: "json_value" Type: "Json" }
Columns { Name: "jsondoc_value" Type: "JsonDocument" }
KeyColumnNames: ["key"]
- )", {data}, data.Csv.size() + 1);
+ )", {data}, data.Data.size() + 1);
auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", {"key", "Uint64", "0"}, {
"key",
@@ -514,11 +633,11 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
TestGetImport(runtime, txId, "/MyRoot");
}
- Y_UNIT_TEST(ShouldCountWrittenBytesAndRows) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldCountWrittenBytesAndRows) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
- const auto data = GenerateTestData("a", 2);
+ const auto data = GenerateTestData(Codec, "a", 2);
TMaybe<NKikimrTxDataShard::TShardOpResult> result;
runtime.SetObserverFunc([&result](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
@@ -606,17 +725,20 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
env.TestWaitNotification(runtime, txId);
UNIT_ASSERT(overloads > 0);
- const ui32 expected = data.Csv.size() / batchSize + ui32(bool(data.Csv.size() % batchSize));
+ const ui32 expected = data.Data.size() / batchSize + ui32(bool(data.Data.size() % batchSize));
UNIT_ASSERT_VALUES_EQUAL(expected, total - overloads);
auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", {"key", "Uint32", "0"});
NKqp::CompareYson(data.YsonStr, content);
}
+ template <ECompressionCodec Codec>
void ShouldFailOnFileWithoutNewLines(ui32 batchSize) {
TTestBasicRuntime runtime;
- const auto data = TTestData("\"a1\",\"value1\"", EmptyYsonStr);
+ const TString v = "\"a1\",\"value1\"";
+ const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v;
+ const auto data = TTestData(d, EmptyYsonStr, Codec);
Restore(runtime, R"(
Name: "Table"
@@ -629,18 +751,20 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
NKqp::CompareYson(data.YsonStr, content);
}
- Y_UNIT_TEST(ShouldFailOnFileWithoutNewLinesStandardBatch) {
- ShouldFailOnFileWithoutNewLines(128);
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnFileWithoutNewLinesStandardBatch) {
+ ShouldFailOnFileWithoutNewLines<Codec>(128);
}
- Y_UNIT_TEST(ShouldFailOnFileWithoutNewLinesSmallBatch) {
- ShouldFailOnFileWithoutNewLines(1);
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnFileWithoutNewLinesSmallBatch) {
+ ShouldFailOnFileWithoutNewLines<Codec>(1);
}
- Y_UNIT_TEST(ShouldFailOnEmptyToken) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnEmptyToken) {
TTestBasicRuntime runtime;
- const auto data = TTestData("\"a1\",\n", EmptyYsonStr);
+ const TString v = "\"a1\",\n";
+ const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v;
+ const auto data = TTestData(d, EmptyYsonStr, Codec);
Restore(runtime, R"(
Name: "Table"
@@ -653,10 +777,12 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
NKqp::CompareYson(data.YsonStr, content);
}
- Y_UNIT_TEST(ShouldFailOnInvalidValue) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnInvalidValue) {
TTestBasicRuntime runtime;
- const auto data = TTestData("\"a1\",\"value1\"\n", EmptyYsonStr);
+ const TString v = "\"a1\",\"value1\"\n";
+ const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v;
+ const auto data = TTestData(d, EmptyYsonStr, Codec);
Restore(runtime, R"(
Name: "Table"
@@ -669,11 +795,11 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
NKqp::CompareYson(data.YsonStr, content);
}
- Y_UNIT_TEST(ShouldFailOnOutboundKey) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnOutboundKey) {
TTestBasicRuntime runtime;
- const auto a = GenerateTestData("a", 1);
- const auto b = TTestData(a.Csv, EmptyYsonStr);
+ const auto a = GenerateTestData(Codec, "a", 1);
+ const auto b = TTestData(a.Data, EmptyYsonStr);
Restore(runtime, R"(
Name: "Table"
@@ -697,6 +823,23 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
}
}
+ Y_UNIT_TEST(ShouldFailOnInvalidFrame) {
+ TTestBasicRuntime runtime;
+
+ const TString garbage = "\"a1\",\"value1\""; // not valid zstd data
+ const auto data = TTestData(garbage, EmptyYsonStr, ECompressionCodec::Zstd);
+
+ Restore(runtime, R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )", {data});
+
+ auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets);
+ NKqp::CompareYson(data.YsonStr, content);
+ }
+
void TestRestoreNegative(TTestActorRuntime& runtime, ui64 txId, const TString& parentPath, const TString& name,
const TVector<TEvSchemeShard::EStatus>& expectedResults) {
@@ -799,30 +942,30 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
NKqp::CompareYson(data.YsonStr, content);
}
- Y_UNIT_TEST(CancelUponProposeShouldSucceed) {
- auto data = GenerateTestData("a", 1);
+ Y_UNIT_TEST_WITH_COMPRESSION(CancelUponProposeShouldSucceed) {
+ auto data = GenerateTestData(Codec, "a", 1);
data.YsonStr = EmptyYsonStr;
CancelShouldSucceed<TEvDataShard::TEvProposeTransaction>(data);
}
- Y_UNIT_TEST(CancelUponProposeResultShouldSucceed) {
- auto data = GenerateTestData("a", 1);
+ Y_UNIT_TEST_WITH_COMPRESSION(CancelUponProposeResultShouldSucceed) {
+ auto data = GenerateTestData(Codec, "a", 1);
data.YsonStr = EmptyYsonStr;
CancelShouldSucceed<TEvDataShard::TEvProposeTransactionResult>(data);
}
- Y_UNIT_TEST(CancelUponUploadResponseShouldSucceed) {
- const auto data = GenerateTestData("a", 1);
+ Y_UNIT_TEST_WITH_COMPRESSION(CancelUponUploadResponseShouldSucceed) {
+ const auto data = GenerateTestData(Codec, "a", 1);
CancelShouldSucceed<TEvDataShard::TEvUnsafeUploadRowsResponse>(data);
}
- Y_UNIT_TEST(CancelHungOperationShouldSucceed) {
- auto data = GenerateTestData("a", 1);
+ Y_UNIT_TEST_WITH_COMPRESSION(CancelHungOperationShouldSucceed) {
+ auto data = GenerateTestData(Codec, "a", 1);
data.YsonStr = EmptyYsonStr;
CancelShouldSucceed<TEvDataShard::TEvProposeTransactionResult>(data, true);
}
- Y_UNIT_TEST(CancelAlmostCompleteOperationShouldNotHaveEffect) {
+ Y_UNIT_TEST_WITH_COMPRESSION(CancelAlmostCompleteOperationShouldNotHaveEffect) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
ui64 txId = 100;
@@ -842,7 +985,7 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
TPortManager portManager;
THolder<TS3Mock> s3Mock;
- const auto data = GenerateTestData("a", 1);
+ const auto data = GenerateTestData(Codec, "a", 1);
RestoreNoWait(runtime, txId, portManager.GetPort(), s3Mock, {data});
const ui64 restoreTxId = txId;
@@ -868,7 +1011,7 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) {
void Restore(TTestWithReboots& t, TTestActorRuntime& runtime, bool& activeZone,
- ui16 port, const TString& creationScheme, TVector<TTestData>&& data) {
+ ui16 port, const TString& creationScheme, TVector<TTestData>&& data, ui32 readBatchSize = 128) {
THolder<TS3Mock> s3Mock;
TString schemeStr;
@@ -898,20 +1041,20 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) {
Endpoint: "localhost:%d"
Scheme: HTTP
Limits {
- ReadBatchSize: 128
+ ReadBatchSize: %d
}
}
- )", schemeStr.data(), port));
+ )", schemeStr.data(), port, readBatchSize));
t.TestEnv->TestWaitNotification(runtime, t.TxId);
}
- Y_UNIT_TEST(ShouldSucceedOnSingleShardTable) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnSingleShardTable) {
TPortManager portManager;
const ui16 port = portManager.GetPort();
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
- const auto data = GenerateTestData("a", 1);
+ const auto data = GenerateTestData(Codec, "a", 1);
Restore(t, runtime, activeZone, port, R"(
Name: "Table"
@@ -929,14 +1072,14 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) {
});
}
- Y_UNIT_TEST(ShouldSucceedOnMultiShardTable) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnMultiShardTable) {
TPortManager portManager;
const ui16 port = portManager.GetPort();
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
- const auto a = GenerateTestData("a", 1);
- const auto b = GenerateTestData("b", 1);
+ const auto a = GenerateTestData(Codec, "a", 1);
+ const auto b = GenerateTestData(Codec, "b", 1);
Restore(t, runtime, activeZone, port, R"(
Name: "Table"
@@ -964,7 +1107,7 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) {
});
}
- Y_UNIT_TEST(ShouldSucceedOnMultiShardTableAndLimitedResources) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnMultiShardTableAndLimitedResources) {
TPortManager portManager;
const ui16 port = portManager.GetPort();
@@ -987,8 +1130,8 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) {
runtime.Register(CreateResourceBrokerActor(config, runtime.GetDynamicCounters(0))));
}
- const auto a = GenerateTestData("a", 1);
- const auto b = GenerateTestData("b", 1);
+ const auto a = GenerateTestData(Codec, "a", 1);
+ const auto b = GenerateTestData(Codec, "b", 1);
Restore(t, runtime, activeZone, port, R"(
Name: "Table"
@@ -1016,14 +1159,14 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) {
});
}
- Y_UNIT_TEST(ShouldSucceedOnLargeData) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldSucceedOnLargeData) {
TPortManager portManager;
const ui16 port = portManager.GetPort();
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
- const auto data = GenerateTestData("", 100);
- UNIT_ASSERT(data.Csv.size() > 128);
+ const auto data = GenerateTestData(Codec, "", 100);
+ UNIT_ASSERT(data.Data.size() > 128);
Restore(t, runtime, activeZone, port, R"(
Name: "Table"
@@ -1041,13 +1184,40 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) {
});
}
- Y_UNIT_TEST(ShouldFailOnFileWithoutNewLines) {
+ Y_UNIT_TEST(ShouldSucceedOnMultipleFrames) {
TPortManager portManager;
const ui16 port = portManager.GetPort();
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
- const auto data = TTestData("\"a1\",\"value1\"", EmptyYsonStr);
+ const auto data = GenerateZstdTestData("a", 3, 2);
+ const ui32 batchSize = 7; // less than any frame
+
+ Restore(t, runtime, activeZone, port, R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )", {data}, batchSize);
+
+ {
+ TInactiveZone inactive(activeZone);
+
+ auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets);
+ NKqp::CompareYson(data.YsonStr, content);
+ }
+ });
+ }
+
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnFileWithoutNewLines) {
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TTestWithReboots t;
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ const TString v = "\"a1\",\"value1\"";
+ const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v;
+ const auto data = TTestData(d, EmptyYsonStr, Codec);
Restore(t, runtime, activeZone, port, R"(
Name: "Table"
@@ -1065,13 +1235,15 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) {
});
}
- Y_UNIT_TEST(ShouldFailOnEmptyToken) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnEmptyToken) {
TPortManager portManager;
const ui16 port = portManager.GetPort();
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
- const auto data = TTestData("\"a1\",\n", EmptyYsonStr);
+ const TString v = "\"a1\",\n";
+ const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v;
+ const auto data = TTestData(d, EmptyYsonStr, Codec);
Restore(t, runtime, activeZone, port, R"(
Name: "Table"
@@ -1089,13 +1261,15 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) {
});
}
- Y_UNIT_TEST(ShouldFailOnInvalidValue) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnInvalidValue) {
TPortManager portManager;
const ui16 port = portManager.GetPort();
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
- const auto data = TTestData("\"a1\",\"value1\"\n", EmptyYsonStr);
+ const TString v = "\"a1\",\"value1\"\n";
+ const auto d = Codec == ECompressionCodec::Zstd ? ZstdCompress(v) : v;
+ const auto data = TTestData(d, EmptyYsonStr, Codec);
Restore(t, runtime, activeZone, port, R"(
Name: "Table"
@@ -1113,14 +1287,14 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) {
});
}
- Y_UNIT_TEST(ShouldFailOnOutboundKey) {
+ Y_UNIT_TEST_WITH_COMPRESSION(ShouldFailOnOutboundKey) {
TPortManager portManager;
const ui16 port = portManager.GetPort();
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
- const auto a = GenerateTestData("a", 1);
- const auto b = TTestData(a.Csv, EmptyYsonStr);
+ const auto a = GenerateTestData(Codec, "a", 1);
+ const auto b = TTestData(a.Data, EmptyYsonStr);
Restore(t, runtime, activeZone, port, R"(
Name: "Table"
@@ -1148,10 +1322,10 @@ Y_UNIT_TEST_SUITE(TRestoreWithRebootsTests) {
});
}
- Y_UNIT_TEST(CancelShouldSucceed) {
+ Y_UNIT_TEST_WITH_COMPRESSION(CancelShouldSucceed) {
TPortManager portManager;
const ui16 port = portManager.GetPort();
- const auto data = GenerateTestData("a", 1);
+ const auto data = GenerateTestData(Codec, "a", 1);
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
diff --git a/ydb/core/wrappers/ut_helpers/s3_mock.cpp b/ydb/core/wrappers/ut_helpers/s3_mock.cpp
index 8df5acc2001..b0fc2906510 100644
--- a/ydb/core/wrappers/ut_helpers/s3_mock.cpp
+++ b/ydb/core/wrappers/ut_helpers/s3_mock.cpp
@@ -85,6 +85,12 @@ bool TS3Mock::TRequest::HttpBadRequest(const TReplyParams& params, const TString
bool TS3Mock::TRequest::HttpNotFound(const TReplyParams& params) {
params.Output << "HTTP/1.1 404 Not found\r\n\r\n";
+ params.Output << R"(
+ <?xml version="1.0" encoding="UTF-8"?>
+ <Error>
+ <Code>NoSuchKey</Code>
+ </Error>
+ )";
return true;
}
@@ -182,7 +188,7 @@ bool TS3Mock::TRequest::HttpServeAction(const TReplyParams& params, EMethod meth
const int uploadId = Parent->NextUploadId++;
Parent->MultipartUploads[std::make_pair(path, ToString(uploadId))] = {};
- params.Output << "HTTP/1.1 200 Ok\r\n";
+ params.Output << "HTTP/1.1 200 Ok\r\n\r\n";
params.Output << Sprintf(R"(
<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult>
@@ -236,7 +242,7 @@ bool TS3Mock::TRequest::HttpServeAction(const TReplyParams& params, EMethod meth
const TString etag = MD5::Data(Parent->Data[path]);
- params.Output << "HTTP/1.1 200 Ok\r\n";
+ params.Output << "HTTP/1.1 200 Ok\r\n\r\n";
params.Output << Sprintf(R"(
<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadResult>