aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Demin <deminds@ydb.tech>2024-08-07 23:30:09 +0300
committerGitHub <noreply@github.com>2024-08-07 23:30:09 +0300
commitff0fd33c853e53ba7667cef1cc33f71a13917529 (patch)
tree13918de6d2e425c8b39a422e80b8f03f4ae3809b
parentcb6f3c813958906ffb40abade0e6f64dfa490ca0 (diff)
downloadydb-ff0fd33c853e53ba7667cef1cc33f71a13917529.tar.gz
Fix memory leak issue with AWS API usage (#6160)
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp30
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h9
-rw-r--r--ydb/core/driver_lib/run/run.cpp2
-rw-r--r--ydb/core/driver_lib/run/ya.make1
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp13
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ya.make1
-rw-r--r--ydb/core/tx/datashard/import_s3.cpp1
-rw-r--r--ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp18
-rw-r--r--ydb/core/tx/schemeshard/ut_backup/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/ut_export/ut_export.cpp18
-rw-r--r--ydb/core/tx/schemeshard/ut_export/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp18
-rw-r--r--ydb/core/tx/schemeshard/ut_export_reboots_s3/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp13
-rw-r--r--ydb/core/tx/schemeshard/ut_restore/ya.make1
-rw-r--r--ydb/core/wrappers/s3_storage.h3
-rw-r--r--ydb/core/wrappers/s3_storage_config.cpp86
-rw-r--r--ydb/core/wrappers/s3_storage_config.h9
-rw-r--r--ydb/core/wrappers/s3_wrapper_ut.cpp21
-rw-r--r--ydb/core/wrappers/ut/ya.make1
-rw-r--r--ydb/services/ydb/backup_ut/ya.make32
-rw-r--r--ydb/services/ydb/backup_ut/ydb_backup_ut.cpp482
-rw-r--r--ydb/services/ydb/ut/ya.make3
-rw-r--r--ydb/services/ydb/ya.make1
-rw-r--r--ydb/services/ydb/ydb_import_ut.cpp214
25 files changed, 664 insertions, 316 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 969f9048bf..00f5cef197 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -241,6 +241,25 @@
#include <util/system/hostname.h>
+#include <aws/core/Aws.h>
+
+namespace {
+
+struct TAwsApiGuard {
+ TAwsApiGuard() {
+ Aws::InitAPI(Options);
+ }
+
+ ~TAwsApiGuard() {
+ Aws::ShutdownAPI(Options);
+ }
+
+private:
+ Aws::SDKOptions Options;
+};
+
+}
+
namespace NKikimr {
namespace NKikimrServicesInitializers {
@@ -2810,5 +2829,16 @@ void TGraphServiceInitializer::InitializeServices(NActors::TActorSystemSetup* se
TActorSetupCmd(NGraph::CreateGraphService(appData->TenantName), TMailboxType::HTSwap, appData->UserPoolId));
}
+TAwsApiInitializer::TAwsApiInitializer(IGlobalObjectStorage& globalObjects)
+ : GlobalObjects(globalObjects)
+{
+}
+
+void TAwsApiInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+ Y_UNUSED(setup);
+ Y_UNUSED(appData);
+ GlobalObjects.AddGlobalObject(std::make_shared<TAwsApiGuard>());
+}
+
} // namespace NKikimrServicesInitializers
} // namespace NKikimr
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index c8aceabde9..b5d5fc783b 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -619,5 +619,14 @@ public:
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};
+class TAwsApiInitializer : public IServiceInitializer {
+ IGlobalObjectStorage& GlobalObjects;
+
+public:
+ TAwsApiInitializer(IGlobalObjectStorage& globalObjects);
+
+ void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
+};
+
} // namespace NKikimrServicesInitializers
} // namespace NKikimr
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 2aa1e70fcf..6f4f670e08 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1653,6 +1653,8 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TGraphServiceInitializer(runConfig));
}
+ sil->AddServiceInitializer(new TAwsApiInitializer(*this));
+
return sil;
}
diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make
index 6f1f6c0da7..1db5a12181 100644
--- a/ydb/core/driver_lib/run/ya.make
+++ b/ydb/core/driver_lib/run/ya.make
@@ -20,6 +20,7 @@ SRCS(
)
PEERDIR(
+ contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core
contrib/libs/protobuf
ydb/library/actors/core
ydb/library/actors/dnsresolver
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index 68540e95e3..29da71e9a8 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -17,6 +17,9 @@
#include <util/system/hostname.h>
#include <library/cpp/deprecated/atomic/atomic.h>
+#include <library/cpp/testing/hook/hook.h>
+
+#include <aws/core/Aws.h>
namespace NKikimr {
@@ -32,6 +35,16 @@ enum class EInitialEviction {
namespace {
+Aws::SDKOptions Options;
+
+Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
+ Aws::InitAPI(Options);
+}
+
+Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
+ Aws::ShutdownAPI(Options);
+}
+
static const std::vector<NArrow::NTest::TTestColumn> testYdbSchema = TTestSchema::YdbSchema();
static const std::vector<NArrow::NTest::TTestColumn> testYdbPk = TTestSchema::YdbPkSchema();
diff --git a/ydb/core/tx/columnshard/ut_schema/ya.make b/ydb/core/tx/columnshard/ut_schema/ya.make
index 35d906ee20..d67c0d2ad5 100644
--- a/ydb/core/tx/columnshard/ut_schema/ya.make
+++ b/ydb/core/tx/columnshard/ut_schema/ya.make
@@ -18,6 +18,7 @@ PEERDIR(
library/cpp/getopt
library/cpp/regex/pcre
library/cpp/svnversion
+ contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core
ydb/core/testlib/default
ydb/core/tx/columnshard/hooks/abstract
ydb/core/tx/columnshard/hooks/testing
diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp
index ba7227a7a7..655ee80172 100644
--- a/ydb/core/tx/datashard/import_s3.cpp
+++ b/ydb/core/tx/datashard/import_s3.cpp
@@ -14,6 +14,7 @@
#include <ydb/core/tablet/resource_broker.h>
#include <ydb/core/wrappers/s3_wrapper.h>
#include <ydb/core/wrappers/s3_storage.h>
+#include <ydb/core/wrappers/s3_storage_config.h>
#include <ydb/core/io_formats/ydb_dump/csv_ydb_dump.h>
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
diff --git a/ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp b/ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp
index fd73669032..b23d0dfffd 100644
--- a/ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp
+++ b/ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp
@@ -7,9 +7,27 @@
#include <util/string/cast.h>
#include <util/string/printf.h>
+#include <library/cpp/testing/hook/hook.h>
+
+#include <aws/core/Aws.h>
+
using namespace NSchemeShardUT_Private;
using namespace NKikimr::NWrappers::NTestHelpers;
+namespace {
+
+Aws::SDKOptions Options;
+
+Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
+ Aws::InitAPI(Options);
+}
+
+Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
+ Aws::ShutdownAPI(Options);
+}
+
+}
+
Y_UNIT_TEST_SUITE(TBackupTests) {
using TFillFn = std::function<void(TTestBasicRuntime&)>;
diff --git a/ydb/core/tx/schemeshard/ut_backup/ya.make b/ydb/core/tx/schemeshard/ut_backup/ya.make
index d9ee6dd814..aac9bc5f93 100644
--- a/ydb/core/tx/schemeshard/ut_backup/ya.make
+++ b/ydb/core/tx/schemeshard/ut_backup/ya.make
@@ -20,6 +20,7 @@ IF (NOT OS_WINDOWS)
library/cpp/getopt
library/cpp/regex/pcre
library/cpp/svnversion
+ contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core
ydb/core/testlib/default
ydb/core/tx
ydb/core/tx/schemeshard/ut_helpers
diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
index 365dbf5d72..c50d931159 100644
--- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
+++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
@@ -12,6 +12,10 @@
#include <util/string/printf.h>
#include <util/system/env.h>
+#include <library/cpp/testing/hook/hook.h>
+
+#include <aws/core/Aws.h>
+
using namespace NSchemeShardUT_Private;
using namespace NKikimr::NWrappers::NTestHelpers;
@@ -19,6 +23,16 @@ using TTablesWithAttrs = TVector<std::pair<TString, TMap<TString, TString>>>;
namespace {
+ Aws::SDKOptions Options;
+
+ Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
+ Aws::InitAPI(Options);
+ }
+
+ Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
+ Aws::ShutdownAPI(Options);
+ }
+
void Run(TTestBasicRuntime& runtime, TTestEnv& env, const std::variant<TVector<TString>, TTablesWithAttrs>& tablesVar, const TString& request,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS,
const TString& dbName = "/MyRoot", bool serverless = false, const TString& userSID = "") {
@@ -1809,7 +1823,7 @@ partitioning_settings {
return ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record
.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpBackup;
};
-
+
THolder<IEventHandle> delayed;
auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (delayFunc(ev)) {
@@ -1976,7 +1990,7 @@ partitioning_settings {
min_partitions_count: 10
)"));
}
-
+
Y_UNIT_TEST(UserSID) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
diff --git a/ydb/core/tx/schemeshard/ut_export/ya.make b/ydb/core/tx/schemeshard/ut_export/ya.make
index 4d5bf91e26..c62dc9ea8e 100644
--- a/ydb/core/tx/schemeshard/ut_export/ya.make
+++ b/ydb/core/tx/schemeshard/ut_export/ya.make
@@ -20,6 +20,7 @@ IF (NOT OS_WINDOWS)
library/cpp/getopt
library/cpp/regex/pcre
library/cpp/svnversion
+ contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core
ydb/core/testlib/default
ydb/core/tx
ydb/core/tx/schemeshard/ut_helpers
diff --git a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp
index 5e1e42d17c..97e34a6d3f 100644
--- a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp
+++ b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp
@@ -4,10 +4,28 @@
#include <util/string/printf.h>
+#include <library/cpp/testing/hook/hook.h>
+
+#include <aws/core/Aws.h>
+
using namespace NSchemeShardUT_Private;
using namespace NSchemeShardUT_Private::NExportReboots;
using namespace NKikimr::NWrappers::NTestHelpers;
+namespace {
+
+Aws::SDKOptions Options;
+
+Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
+ Aws::InitAPI(Options);
+}
+
+Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
+ Aws::ShutdownAPI(Options);
+}
+
+}
+
Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) {
using TUnderlying = std::function<void(const TVector<TString>&, const TString&, TTestWithReboots&)>;
diff --git a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ya.make b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ya.make
index bc7ca966e0..caf4fb7de3 100644
--- a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ya.make
+++ b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ya.make
@@ -19,6 +19,7 @@ PEERDIR(
library/cpp/getopt
library/cpp/regex/pcre
library/cpp/svnversion
+ contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core
ydb/core/testlib/default
ydb/core/tx
ydb/core/tx/schemeshard/ut_helpers
diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
index 1d4992404e..c3f3e4dc05 100644
--- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
+++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
@@ -20,8 +20,10 @@
#include <ydb/public/api/protos/ydb_import.pb.h>
+#include <aws/core/Aws.h>
#include <contrib/libs/zstd/include/zstd.h>
#include <library/cpp/string_utils/quote/quote.h>
+#include <library/cpp/testing/hook/hook.h>
#include <util/datetime/base.h>
#include <util/generic/size_literals.h>
@@ -37,6 +39,16 @@ using namespace NKikimr::NWrappers::NTestHelpers;
namespace {
+ Aws::SDKOptions Options;
+
+ Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
+ Aws::InitAPI(Options);
+ }
+
+ Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
+ Aws::ShutdownAPI(Options);
+ }
+
const TString EmptyYsonStr = R"([[[[];%false]]])";
TString GenerateScheme(const NKikimrSchemeOp::TPathDescription& pathDesc) {
@@ -316,7 +328,6 @@ namespace {
runtime.SetObserverFunc(prevObserver);
}
-
} // anonymous
Y_UNIT_TEST_SUITE(TRestoreTests) {
diff --git a/ydb/core/tx/schemeshard/ut_restore/ya.make b/ydb/core/tx/schemeshard/ut_restore/ya.make
index 7044d4283b..d514b36b49 100644
--- a/ydb/core/tx/schemeshard/ut_restore/ya.make
+++ b/ydb/core/tx/schemeshard/ut_restore/ya.make
@@ -14,6 +14,7 @@ ELSE()
ENDIF()
PEERDIR(
+ contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core
contrib/libs/double-conversion
library/cpp/string_utils/quote
ydb/core/kqp/ut/common
diff --git a/ydb/core/wrappers/s3_storage.h b/ydb/core/wrappers/s3_storage.h
index d678f23761..5af392c4f6 100644
--- a/ydb/core/wrappers/s3_storage.h
+++ b/ydb/core/wrappers/s3_storage.h
@@ -3,7 +3,6 @@
#ifndef KIKIMR_DISABLE_S3_OPS
#include "abstract.h"
-#include "s3_storage_config.h"
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/wrappers/events/common.h>
@@ -32,7 +31,7 @@
namespace NKikimr::NWrappers::NExternalStorage {
-class TS3ExternalStorage: public IExternalStorageOperator, TS3User {
+class TS3ExternalStorage: public IExternalStorageOperator {
private:
THolder<Aws::S3::S3Client> Client;
const Aws::Client::ClientConfiguration Config;
diff --git a/ydb/core/wrappers/s3_storage_config.cpp b/ydb/core/wrappers/s3_storage_config.cpp
index f0bc70e15f..9785eb7ed5 100644
--- a/ydb/core/wrappers/s3_storage_config.cpp
+++ b/ydb/core/wrappers/s3_storage_config.cpp
@@ -1,81 +1,15 @@
#include "s3_storage.h"
#include "s3_storage_config.h"
-#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/internal/AWSHttpResourceClient.h>
-#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/utils/stream/PreallocatedStreamBuf.h>
-#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/utils/stream/ResponseStream.h>
-#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/Aws.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/include/aws/core/utils/threading/Executor.h>
-#include <contrib/libs/curl/include/curl/curl.h>
-#include <ydb/library/actors/core/actorsystem.h>
-#include <ydb/library/actors/core/log.h>
#include <util/string/cast.h>
#ifndef KIKIMR_DISABLE_S3_OPS
namespace NKikimr::NWrappers::NExternalStorage {
-using namespace Aws;
-using namespace Aws::Auth;
-using namespace Aws::Client;
-using namespace Aws::S3;
-using namespace Aws::S3::Model;
-using namespace Aws::Utils::Stream;
-
namespace {
-struct TCurlInitializer {
- TCurlInitializer() {
- curl_global_init(CURL_GLOBAL_ALL);
- }
-
- ~TCurlInitializer() {
- curl_global_cleanup();
- }
-};
-
-struct TApiInitializer {
- TApiInitializer() {
- Options.httpOptions.initAndCleanupCurl = false;
- InitAPI(Options);
-
- Internal::CleanupEC2MetadataClient(); // speeds up config construction
- }
-
- ~TApiInitializer() {
- ShutdownAPI(Options);
- }
-
-private:
- SDKOptions Options;
-};
-
-class TApiOwner {
-public:
- void Ref() {
- auto guard = Guard(Mutex);
- if (!RefCount++) {
- if (!CurlInitializer) {
- CurlInitializer.Reset(new TCurlInitializer);
- }
- ApiInitializer.Reset(new TApiInitializer);
- }
- }
-
- void UnRef() {
- auto guard = Guard(Mutex);
- if (!--RefCount) {
- ApiInitializer.Destroy();
- }
- }
-
-private:
- ui64 RefCount = 0;
- TMutex Mutex;
- THolder<TCurlInitializer> CurlInitializer;
- THolder<TApiInitializer> ApiInitializer;
-};
-
namespace NPrivate {
template <class TSettings>
@@ -93,10 +27,10 @@ Aws::Client::ClientConfiguration ConfigFromSettings(const TSettings& settings) {
switch (settings.scheme()) {
case TSettings::HTTP:
- config.scheme = Http::Scheme::HTTP;
+ config.scheme = Aws::Http::Scheme::HTTP;
break;
case TSettings::HTTPS:
- config.scheme = Http::Scheme::HTTPS;
+ config.scheme = Aws::Http::Scheme::HTTPS;
break;
default:
Y_ABORT("Unknown scheme");
@@ -114,22 +48,6 @@ Aws::Auth::AWSCredentials CredentialsFromSettings(const TSettings& settings) {
} // anonymous
-TS3User::TS3User(const TS3User& /*baseObject*/) {
- Singleton<TApiOwner>()->Ref();
-}
-
-TS3User::TS3User(TS3User& /*baseObject*/) {
- Singleton<TApiOwner>()->Ref();
-}
-
-TS3User::TS3User() {
- Singleton<TApiOwner>()->Ref();
-}
-
-TS3User::~TS3User() {
- Singleton<TApiOwner>()->UnRef();
-}
-
class TS3ThreadsPoolByEndpoint {
private:
diff --git a/ydb/core/wrappers/s3_storage_config.h b/ydb/core/wrappers/s3_storage_config.h
index 544ccc74af..b276bd79b3 100644
--- a/ydb/core/wrappers/s3_storage_config.h
+++ b/ydb/core/wrappers/s3_storage_config.h
@@ -17,14 +17,7 @@
namespace NKikimr::NWrappers::NExternalStorage {
-struct TS3User {
- TS3User();
- TS3User(const TS3User& baseObject);
- TS3User(TS3User& baseObject);
- ~TS3User();
-};
-
-class TS3ExternalStorageConfig: public IExternalStorageConfig, TS3User {
+class TS3ExternalStorageConfig: public IExternalStorageConfig {
private:
YDB_READONLY_DEF(TString, Bucket);
Aws::Client::ClientConfiguration Config;
diff --git a/ydb/core/wrappers/s3_wrapper_ut.cpp b/ydb/core/wrappers/s3_wrapper_ut.cpp
index 4e310add87..b8b0e185cd 100644
--- a/ydb/core/wrappers/s3_wrapper_ut.cpp
+++ b/ydb/core/wrappers/s3_wrapper_ut.cpp
@@ -8,16 +8,33 @@
#include <ydb/library/actors/core/log.h>
#include <library/cpp/digest/md5/md5.h>
+#include <library/cpp/testing/hook/hook.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/string/printf.h>
+#include <aws/core/Aws.h>
+
using namespace NActors;
using namespace NKikimr;
using namespace NKikimr::NWrappers;
using namespace Aws::S3::Model;
-class TS3MockTest: public NUnitTest::TTestBase, private NExternalStorage::TS3User {
+namespace {
+
+Aws::SDKOptions Options;
+
+Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
+ Aws::InitAPI(Options);
+}
+
+Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
+ Aws::ShutdownAPI(Options);
+}
+
+}
+
+class TS3MockTest: public NUnitTest::TTestBase {
using TS3Mock = NWrappers::NTestHelpers::TS3Mock;
static auto MakeClientConfig(ui16 port) {
@@ -48,8 +65,8 @@ public:
}
void TearDown() override {
- S3Mock.Reset();
Runtime.Reset();
+ S3Mock.Reset();
}
ui16 GetPort() const {
diff --git a/ydb/core/wrappers/ut/ya.make b/ydb/core/wrappers/ut/ya.make
index 61e02602c8..212e65cfc5 100644
--- a/ydb/core/wrappers/ut/ya.make
+++ b/ydb/core/wrappers/ut/ya.make
@@ -9,6 +9,7 @@ IF (NOT OS_WINDOWS)
ydb/library/actors/core
library/cpp/digest/md5
library/cpp/testing/unittest
+ contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core
ydb/core/protos
ydb/core/testlib/basics/default
ydb/library/yql/minikql/comp_nodes/llvm14
diff --git a/ydb/services/ydb/backup_ut/ya.make b/ydb/services/ydb/backup_ut/ya.make
new file mode 100644
index 0000000000..39732a69bd
--- /dev/null
+++ b/ydb/services/ydb/backup_ut/ya.make
@@ -0,0 +1,32 @@
+UNITTEST_FOR(ydb/services/ydb)
+
+FORK_SUBTESTS()
+
+IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ SIZE(MEDIUM)
+ENDIF()
+
+SRCS(
+ ydb_backup_ut.cpp
+)
+
+PEERDIR(
+ ydb/core/testlib/default
+ ydb/core/wrappers/ut_helpers
+ ydb/public/lib/ydb_cli/dump
+ ydb/public/sdk/cpp/client/ydb_export
+ ydb/public/sdk/cpp/client/ydb_import
+ ydb/public/sdk/cpp/client/ydb_operation
+ ydb/public/sdk/cpp/client/ydb_result
+ ydb/public/sdk/cpp/client/ydb_table
+ ydb/public/sdk/cpp/client/ydb_value
+ ydb/library/backup
+ contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/services/ydb/backup_ut/ydb_backup_ut.cpp b/ydb/services/ydb/backup_ut/ydb_backup_ut.cpp
new file mode 100644
index 0000000000..90e0f55a8e
--- /dev/null
+++ b/ydb/services/ydb/backup_ut/ydb_backup_ut.cpp
@@ -0,0 +1,482 @@
+#include "ydb_common_ut.h"
+
+#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
+
+#include <ydb/public/lib/ydb_cli/dump/dump.h>
+#include <ydb/public/sdk/cpp/client/ydb_export/export.h>
+#include <ydb/public/sdk/cpp/client/ydb_import/import.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
+
+#include <ydb/library/backup/backup.h>
+
+#include <library/cpp/testing/hook/hook.h>
+
+#include <aws/core/Aws.h>
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+namespace {
+
+void ExecuteDataDefinitionQuery(TSession& session, const TString& script) {
+ const auto result = session.ExecuteSchemeQuery(script).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), "script:\n" << script << "\nissues:\n" << result.GetIssues().ToString());
+}
+
+TDataQueryResult ExecuteDataModificationQuery(TSession& session,
+ const TString& script,
+ const TExecDataQuerySettings& settings = {}
+) {
+ const auto result = session.ExecuteDataQuery(
+ script,
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ settings
+ ).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), "script:\n" << script << "\nissues:\n" << result.GetIssues().ToString());
+
+ return result;
+}
+
+TValue GetSingleResult(const TDataQueryResult& rawResults) {
+ auto resultSetParser = rawResults.GetResultSetParser(0);
+ UNIT_ASSERT(resultSetParser.TryNextRow());
+ return resultSetParser.GetValue(0);
+}
+
+ui64 GetUint64(const TValue& value) {
+ return TValueParser(value).GetUint64();
+}
+
+auto CreateMinPartitionsChecker(ui64 expectedMinPartitions) {
+ return [=](const TTableDescription& tableDescription) {
+ return tableDescription.GetPartitioningSettings().GetMinPartitionsCount() == expectedMinPartitions;
+ };
+}
+
+void CheckTableDescription(TSession& session, const TString& path, auto&& checker) {
+ auto describeResult = session.DescribeTable(path).ExtractValueSync();
+ UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
+ auto tableDescription = describeResult.GetTableDescription();
+ Ydb::Table::CreateTableRequest descriptionProto;
+ // The purpose of translating to CreateTableRequest is solely to produce a clearer error message.
+ tableDescription.SerializeTo(descriptionProto);
+ UNIT_ASSERT_C(
+ checker(tableDescription),
+ descriptionProto.DebugString()
+ );
+}
+
+}
+
+Y_UNIT_TEST_SUITE(BackupRestore) {
+
+ void Restore(NDump::TClient& client, const TFsPath& sourceFile, const TString& dbPath) {
+ auto result = client.Restore(sourceFile, dbPath);
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ Y_UNIT_TEST(Basic) {
+ TKikimrWithGrpcAndRootSchema server;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%d", server.GetPort())));
+ TTableClient tableClient(driver);
+ auto session = tableClient.GetSession().ExtractValueSync().GetSession();
+
+ constexpr const char* table = "/Root/table";
+ ExecuteDataDefinitionQuery(session, Sprintf(R"(
+ CREATE TABLE `%s` (
+ Key Uint32,
+ Value Utf8,
+ PRIMARY KEY (Key)
+ );
+ )",
+ table
+ ));
+ ExecuteDataModificationQuery(session, Sprintf(R"(
+ UPSERT INTO `%s` (
+ Key,
+ Value
+ )
+ VALUES
+ (1, "one"),
+ (2, "two"),
+ (3, "three"),
+ (4, "four"),
+ (5, "five");
+ )",
+ table
+ ));
+
+ TTempDir tempDir;
+ const auto& pathToBackup = tempDir.Path();
+ // TO DO: implement NDump::TClient::Dump and call it instead of BackupFolder
+ NYdb::NBackup::BackupFolder(driver, "/Root", ".", pathToBackup, {}, false, false);
+
+ NDump::TClient backupClient(driver);
+
+ // restore deleted rows in an existing table
+ ExecuteDataModificationQuery(session, Sprintf(R"(
+ DELETE FROM `%s` WHERE Key > 3;
+ )", table
+ ));
+ Restore(backupClient, pathToBackup, "/Root");
+ {
+ auto result = ExecuteDataModificationQuery(session, Sprintf(R"(
+ SELECT COUNT(*) FROM `%s`;
+ )", table
+ ));
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(GetSingleResult(result)), 5ull);
+ }
+
+ // restore deleted table
+ ExecuteDataDefinitionQuery(session, Sprintf(R"(
+ DROP TABLE `%s`;
+ )", table
+ ));
+ Restore(backupClient, pathToBackup, "/Root");
+ {
+ auto result = ExecuteDataModificationQuery(session, Sprintf(R"(
+ SELECT COUNT(*) FROM `%s`;
+ )", table
+ ));
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(GetSingleResult(result)), 5ull);
+ }
+ }
+
+ Y_UNIT_TEST(RestoreTablePartitioningSettings) {
+ TKikimrWithGrpcAndRootSchema server;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%d", server.GetPort())));
+ TTableClient tableClient(driver);
+ auto session = tableClient.GetSession().ExtractValueSync().GetSession();
+
+ constexpr const char* table = "/Root/table";
+ constexpr int minPartitions = 10;
+ ExecuteDataDefinitionQuery(session, Sprintf(R"(
+ CREATE TABLE `%s` (
+ Key Uint32,
+ Value Utf8,
+ PRIMARY KEY (Key)
+ )
+ WITH (
+ AUTO_PARTITIONING_BY_LOAD = ENABLED,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d
+ );
+ )",
+ table, minPartitions
+ ));
+
+ CheckTableDescription(session, table, CreateMinPartitionsChecker(minPartitions));
+
+ TTempDir tempDir;
+ const auto& pathToBackup = tempDir.Path();
+ // TO DO: implement NDump::TClient::Dump and call it instead of BackupFolder
+ NYdb::NBackup::BackupFolder(driver, "/Root", ".", pathToBackup, {}, false, false);
+
+ NDump::TClient backupClient(driver);
+
+ // restore deleted table
+ ExecuteDataDefinitionQuery(session, Sprintf(R"(
+ DROP TABLE `%s`;
+ )", table
+ ));
+ Restore(backupClient, pathToBackup, "/Root");
+ CheckTableDescription(session, table, CreateMinPartitionsChecker(minPartitions));
+ }
+
+ Y_UNIT_TEST(RestoreIndexTablePartitioningSettings) {
+ TKikimrWithGrpcAndRootSchema server;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%d", server.GetPort())));
+ TTableClient tableClient(driver);
+ auto session = tableClient.GetSession().ExtractValueSync().GetSession();
+
+ constexpr const char* table = "/Root/table";
+ constexpr const char* index = "byValue";
+ const TString indexTablePath = JoinFsPaths(table, index, "indexImplTable");
+ constexpr int minPartitions = 10;
+ ExecuteDataDefinitionQuery(session, Sprintf(R"(
+ CREATE TABLE `%s` (
+ Key Uint32,
+ Value Uint32,
+ PRIMARY KEY (Key),
+ INDEX %s GLOBAL ON (Value)
+ );
+ )",
+ table, index
+ ));
+ ExecuteDataDefinitionQuery(session, Sprintf(R"(
+ ALTER TABLE `%s` ALTER INDEX %s SET (
+ AUTO_PARTITIONING_BY_LOAD = ENABLED,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d
+ );
+ )", table, index, minPartitions
+ ));
+
+ CheckTableDescription(session, indexTablePath, CreateMinPartitionsChecker(minPartitions));
+
+ TTempDir tempDir;
+ const auto& pathToBackup = tempDir.Path();
+ // TO DO: implement NDump::TClient::Dump and call it instead of BackupFolder
+ NYdb::NBackup::BackupFolder(driver, "/Root", ".", pathToBackup, {}, false, false);
+
+ NDump::TClient backupClient(driver);
+
+ // restore deleted table
+ ExecuteDataDefinitionQuery(session, Sprintf(R"(
+ DROP TABLE `%s`;
+ )", table
+ ));
+ Restore(backupClient, pathToBackup, "/Root");
+ CheckTableDescription(session, indexTablePath, CreateMinPartitionsChecker(minPartitions));
+ }
+
+}
+
+Y_UNIT_TEST_SUITE(BackupRestoreS3) {
+
+ Aws::SDKOptions Options;
+
+ Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
+ Aws::InitAPI(Options);
+ }
+
+ Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
+ Aws::ShutdownAPI(Options);
+ }
+
+ using NKikimr::NWrappers::NTestHelpers::TS3Mock;
+
+ class TS3TestEnv {
+ TKikimrWithGrpcAndRootSchema server;
+ TDriver driver;
+ TTableClient tableClient;
+ TSession session;
+ ui16 s3Port;
+ TS3Mock s3Mock;
+ // required for exports to function
+ TDataShardExportFactory dataShardExportFactory;
+
+ public:
+ TS3TestEnv()
+ : driver(TDriverConfig().SetEndpoint(Sprintf("localhost:%d", server.GetPort())))
+ , tableClient(driver)
+ , session(tableClient.CreateSession().ExtractValueSync().GetSession())
+ , s3Port(server.GetPortManager().GetPort())
+ , s3Mock({}, TS3Mock::TSettings(s3Port))
+ {
+ UNIT_ASSERT_C(s3Mock.Start(), s3Mock.GetError());
+
+ auto& runtime = *server.GetRuntime();
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::EPriority::PRI_DEBUG);
+ runtime.GetAppData().DataShardExportFactory = &dataShardExportFactory;
+ }
+
+ TKikimrWithGrpcAndRootSchema& GetServer() {
+ return server;
+ }
+
+ const TDriver& GetDriver() const {
+ return driver;
+ }
+
+ TSession& GetSession() {
+ return session;
+ }
+
+ ui16 GetS3Port() const {
+ return s3Port;
+ }
+ };
+
+ template <typename TOperation>
+ bool WaitForOperation(NOperation::TOperationClient& client, NOperationId::TOperationId id,
+ int retries = 10, TDuration sleepDuration = TDuration::MilliSeconds(100)
+ ) {
+ for (int retry = 0; retry <= retries; ++retry) {
+ auto result = client.Get<TOperation>(id).ExtractValueSync();
+ if (result.Ready()) {
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ result.Status().GetStatus(), EStatus::SUCCESS,
+ result.Status().GetIssues().ToString()
+ );
+ return true;
+ }
+ Sleep(sleepDuration *= 2);
+ }
+ return false;
+ }
+
+ void ExportToS3(NExport::TExportClient& exportClient, ui16 s3Port, NOperation::TOperationClient& operationClient,
+ const TString& source, const TString& destination
+ ) {
+ // The exact values for Bucket, AccessKey and SecretKey do not matter if the S3 backend is TS3Mock.
+ // Any non-empty strings should do.
+ const auto exportSettings = NExport::TExportToS3Settings()
+ .Endpoint(Sprintf("localhost:%d", s3Port))
+ .Scheme(ES3Scheme::HTTP)
+ .Bucket("test_bucket")
+ .AccessKey("test_key")
+ .SecretKey("test_secret")
+ .AppendItem(NExport::TExportToS3Settings::TItem{.Src = source, .Dst = destination});
+
+ auto response = exportClient.ExportToS3(exportSettings).ExtractValueSync();
+ UNIT_ASSERT_C(WaitForOperation<NExport::TExportToS3Response>(operationClient, response.Id()),
+ Sprintf("The export from %s to %s did not complete within the allocated time.",
+ source.c_str(), destination.c_str()
+ )
+ );
+ }
+
+ void ImportFromS3(NImport::TImportClient& importClient, ui16 s3Port, NOperation::TOperationClient& operationClient,
+ const TString& source, const TString& destination
+ ) {
+ // The exact values for Bucket, AccessKey and SecretKey do not matter if the S3 backend is TS3Mock.
+ // Any non-empty strings should do.
+ const auto importSettings = NImport::TImportFromS3Settings()
+ .Endpoint(Sprintf("localhost:%d", s3Port))
+ .Scheme(ES3Scheme::HTTP)
+ .Bucket("test_bucket")
+ .AccessKey("test_key")
+ .SecretKey("test_secret")
+ .AppendItem(NImport::TImportFromS3Settings::TItem{.Src = source, .Dst = destination});
+
+ auto response = importClient.ImportFromS3(importSettings).ExtractValueSync();
+ UNIT_ASSERT_C(WaitForOperation<NImport::TImportFromS3Response>(operationClient, response.Id()),
+ Sprintf("The import from %s to %s did not complete within the allocated time.",
+ source.c_str(), destination.c_str()
+ )
+ );
+ }
+
+ Y_UNIT_TEST(Basic) {
+ TS3TestEnv testEnv;
+
+ constexpr const char* table = "/Root/table";
+ ExecuteDataDefinitionQuery(testEnv.GetSession(), Sprintf(R"(
+ CREATE TABLE `%s` (
+ Key Uint32,
+ Value Utf8,
+ PRIMARY KEY (Key)
+ );
+ )",
+ table
+ ));
+ ExecuteDataModificationQuery(testEnv.GetSession(), Sprintf(R"(
+ UPSERT INTO `%s` (
+ Key,
+ Value
+ )
+ VALUES
+ (1, "one"),
+ (2, "two"),
+ (3, "three"),
+ (4, "four"),
+ (5, "five");
+ )",
+ table
+ ));
+
+ NExport::TExportClient exportClient(testEnv.GetDriver());
+ NImport::TImportClient importClient(testEnv.GetDriver());
+ NOperation::TOperationClient operationClient(testEnv.GetDriver());
+
+ ExportToS3(exportClient, testEnv.GetS3Port(), operationClient, table, "table");
+
+ // The table needs to be dropped before importing from S3 can proceed successfully.
+ ExecuteDataDefinitionQuery(testEnv.GetSession(), Sprintf(R"(
+ DROP TABLE `%s`;
+ )", table
+ ));
+
+ ImportFromS3(importClient, testEnv.GetS3Port(), operationClient, "table", table);
+ {
+ auto result = ExecuteDataModificationQuery(testEnv.GetSession(), Sprintf(R"(
+ SELECT COUNT(*) FROM `%s`;
+ )", table
+ ));
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(GetSingleResult(result)), 5ull);
+ }
+ }
+
+ Y_UNIT_TEST(RestoreTablePartitioningSettings) {
+ TS3TestEnv testEnv;
+
+ constexpr const char* table = "/Root/table";
+ constexpr int minPartitions = 10;
+ ExecuteDataDefinitionQuery(testEnv.GetSession(), Sprintf(R"(
+ CREATE TABLE `%s` (
+ Key Uint32,
+ Value Utf8,
+ PRIMARY KEY (Key)
+ )
+ WITH (
+ AUTO_PARTITIONING_BY_LOAD = ENABLED,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d
+ );
+ )",
+ table, minPartitions
+ ));
+
+ CheckTableDescription(testEnv.GetSession(), table, CreateMinPartitionsChecker(minPartitions));
+
+ NExport::TExportClient exportClient(testEnv.GetDriver());
+ NImport::TImportClient importClient(testEnv.GetDriver());
+ NOperation::TOperationClient operationClient(testEnv.GetDriver());
+
+ ExportToS3(exportClient, testEnv.GetS3Port(), operationClient, table, "table");
+
+ // The table needs to be dropped before importing from S3 can proceed successfully.
+ ExecuteDataDefinitionQuery(testEnv.GetSession(), Sprintf(R"(
+ DROP TABLE `%s`;
+ )", table
+ ));
+
+ ImportFromS3(importClient, testEnv.GetS3Port(), operationClient, "table", table);
+ CheckTableDescription(testEnv.GetSession(), table, CreateMinPartitionsChecker(minPartitions));
+ }
+
+ Y_UNIT_TEST(RestoreIndexTablePartitioningSettings) {
+ TS3TestEnv testEnv;
+
+ constexpr const char* table = "/Root/table";
+ constexpr const char* index = "byValue";
+ const TString indexTablePath = JoinFsPaths(table, index, "indexImplTable");
+ constexpr int minPartitions = 10;
+ ExecuteDataDefinitionQuery(testEnv.GetSession(), Sprintf(R"(
+ CREATE TABLE `%s` (
+ Key Uint32,
+ Value Uint32,
+ PRIMARY KEY (Key),
+ INDEX %s GLOBAL ON (Value)
+ );
+ )",
+ table, index
+ ));
+ ExecuteDataDefinitionQuery(testEnv.GetSession(), Sprintf(R"(
+ ALTER TABLE `%s` ALTER INDEX %s SET (
+ AUTO_PARTITIONING_BY_LOAD = ENABLED,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d
+ );
+ )", table, index, minPartitions
+ ));
+
+ CheckTableDescription(testEnv.GetSession(), indexTablePath, CreateMinPartitionsChecker(minPartitions));
+
+ NExport::TExportClient exportClient(testEnv.GetDriver());
+ NImport::TImportClient importClient(testEnv.GetDriver());
+ NOperation::TOperationClient operationClient(testEnv.GetDriver());
+
+ ExportToS3(exportClient, testEnv.GetS3Port(), operationClient, table, "table");
+
+ // The table needs to be dropped before importing from S3 can proceed successfully.
+ ExecuteDataDefinitionQuery(testEnv.GetSession(), Sprintf(R"(
+ DROP TABLE `%s`;
+ )", table
+ ));
+
+ ImportFromS3(importClient, testEnv.GetS3Port(), operationClient, "table", table);
+ CheckTableDescription(testEnv.GetSession(), indexTablePath, CreateMinPartitionsChecker(minPartitions));
+ }
+
+}
diff --git a/ydb/services/ydb/ut/ya.make b/ydb/services/ydb/ut/ya.make
index 2d60535a32..09601dd06b 100644
--- a/ydb/services/ydb/ut/ya.make
+++ b/ydb/services/ydb/ut/ya.make
@@ -42,8 +42,6 @@ PEERDIR(
ydb/core/grpc_services/base
ydb/core/testlib
ydb/core/security
- ydb/core/wrappers/ut_helpers
- ydb/library/backup
ydb/library/yql/minikql/dom
ydb/library/yql/minikql/jsonpath
ydb/library/testlib/service_mocks/ldap_mock
@@ -52,7 +50,6 @@ PEERDIR(
ydb/public/lib/yson_value
ydb/public/lib/ut_helpers
ydb/public/lib/ydb_cli/commands
- ydb/public/lib/ydb_cli/dump
ydb/public/sdk/cpp/client/draft
ydb/public/sdk/cpp/client/ydb_coordination
ydb/public/sdk/cpp/client/ydb_export
diff --git a/ydb/services/ydb/ya.make b/ydb/services/ydb/ya.make
index d42bbf1928..3467679449 100644
--- a/ydb/services/ydb/ya.make
+++ b/ydb/services/ydb/ya.make
@@ -37,6 +37,7 @@ PEERDIR(
END()
RECURSE_FOR_TESTS(
+ backup_ut
sdk_sessions_ut
sdk_sessions_pool_ut
table_split_ut
diff --git a/ydb/services/ydb/ydb_import_ut.cpp b/ydb/services/ydb/ydb_import_ut.cpp
index a423246c30..961cd9679c 100644
--- a/ydb/services/ydb/ydb_import_ut.cpp
+++ b/ydb/services/ydb/ydb_import_ut.cpp
@@ -3,10 +3,8 @@
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_import/import.h>
-#include <ydb/public/lib/ydb_cli/dump/dump.h>
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
-#include <ydb/library/backup/backup.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
@@ -130,215 +128,3 @@ Y_UNIT_TEST_SUITE(YdbImport) {
}
}
-
-Y_UNIT_TEST_SUITE(BackupRestore) {
-
- using namespace NYdb::NTable;
-
- void ExecuteDataDefinitionQuery(TSession& session, const TString& script) {
- const auto result = session.ExecuteSchemeQuery(script).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), "script:\n" << script << "\nissues:\n" << result.GetIssues().ToString());
- }
-
- TDataQueryResult ExecuteDataModificationQuery(TSession& session,
- const TString& script,
- const TExecDataQuerySettings& settings = {}
- ) {
- const auto result = session.ExecuteDataQuery(
- script,
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
- settings
- ).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), "script:\n" << script << "\nissues:\n" << result.GetIssues().ToString());
-
- return result;
- }
-
- TValue GetSingleResult(const TDataQueryResult& rawResults) {
- auto resultSetParser = rawResults.GetResultSetParser(0);
- UNIT_ASSERT(resultSetParser.TryNextRow());
- return resultSetParser.GetValue(0);
- }
-
- ui64 GetUint64(const TValue& value) {
- return TValueParser(value).GetUint64();
- }
-
- void Restore(NDump::TClient& client, const TFsPath& sourceFile, const TString& dbPath) {
- auto result = client.Restore(sourceFile, dbPath);
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- }
-
- auto CreateMinPartitionsChecker(ui64 expectedMinPartitions) {
- return [=](const TTableDescription& tableDescription) {
- return tableDescription.GetPartitioningSettings().GetMinPartitionsCount() == expectedMinPartitions;
- };
- }
-
- void CheckTableDescription(TSession& session, const TString& path, auto&& checker) {
- auto describeResult = session.DescribeTable(path).ExtractValueSync();
- UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
- auto tableDescription = describeResult.GetTableDescription();
- Ydb::Table::CreateTableRequest descriptionProto;
- // The purpose of translating to CreateTableRequest is solely to produce a clearer error message.
- tableDescription.SerializeTo(descriptionProto);
- UNIT_ASSERT_C(
- checker(tableDescription),
- descriptionProto.DebugString()
- );
- }
-
- Y_UNIT_TEST(Basic) {
- TKikimrWithGrpcAndRootSchema server;
- auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%d", server.GetPort())));
- TTableClient tableClient(driver);
- auto session = tableClient.GetSession().ExtractValueSync().GetSession();
-
- constexpr const char* table = "/Root/table";
- ExecuteDataDefinitionQuery(session, Sprintf(R"(
- CREATE TABLE `%s` (
- Key Uint32,
- Value Utf8,
- PRIMARY KEY (Key)
- );
- )",
- table
- ));
- ExecuteDataModificationQuery(session, Sprintf(R"(
- UPSERT INTO `%s` (
- Key,
- Value
- )
- VALUES
- (1, "one"),
- (2, "two"),
- (3, "three"),
- (4, "four"),
- (5, "five");
- )",
- table
- ));
-
- TTempDir tempDir;
- const auto& pathToBackup = tempDir.Path();
- // TO DO: implement NDump::TClient::Dump and call it instead of BackupFolder
- NYdb::NBackup::BackupFolder(driver, "/Root", ".", pathToBackup, {}, false, false);
-
- NDump::TClient backupClient(driver);
-
- // restore deleted rows in an existing table
- ExecuteDataModificationQuery(session, Sprintf(R"(
- DELETE FROM `%s` WHERE Key > 3;
- )", table
- ));
- Restore(backupClient, pathToBackup, "/Root");
- {
- auto result = ExecuteDataModificationQuery(session, Sprintf(R"(
- SELECT COUNT(*) FROM `%s`;
- )", table
- ));
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(GetSingleResult(result)), 5ull);
- }
-
- // restore deleted table
- ExecuteDataDefinitionQuery(session, Sprintf(R"(
- DROP TABLE `%s`;
- )", table
- ));
- Restore(backupClient, pathToBackup, "/Root");
- {
- auto result = ExecuteDataModificationQuery(session, Sprintf(R"(
- SELECT COUNT(*) FROM `%s`;
- )", table
- ));
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(GetSingleResult(result)), 5ull);
- }
- }
-
- Y_UNIT_TEST(RestoreTablePartitioningSettings) {
- TKikimrWithGrpcAndRootSchema server;
- auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%d", server.GetPort())));
- TTableClient tableClient(driver);
- auto session = tableClient.GetSession().ExtractValueSync().GetSession();
-
- constexpr const char* table = "/Root/table";
- constexpr int minPartitions = 10;
- ExecuteDataDefinitionQuery(session, Sprintf(R"(
- CREATE TABLE `%s` (
- Key Uint32,
- Value Utf8,
- PRIMARY KEY (Key)
- )
- WITH (
- AUTO_PARTITIONING_BY_LOAD = ENABLED,
- AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d
- );
- )",
- table, minPartitions
- ));
-
- CheckTableDescription(session, table, CreateMinPartitionsChecker(minPartitions));
-
- TTempDir tempDir;
- const auto& pathToBackup = tempDir.Path();
- // TO DO: implement NDump::TClient::Dump and call it instead of BackupFolder
- NYdb::NBackup::BackupFolder(driver, "/Root", ".", pathToBackup, {}, false, false);
-
- NDump::TClient backupClient(driver);
-
- // restore deleted table
- ExecuteDataDefinitionQuery(session, Sprintf(R"(
- DROP TABLE `%s`;
- )", table
- ));
- Restore(backupClient, pathToBackup, "/Root");
- CheckTableDescription(session, table, CreateMinPartitionsChecker(minPartitions));
- }
-
- Y_UNIT_TEST(RestoreIndexTablePartitioningSettings) {
- TKikimrWithGrpcAndRootSchema server;
- auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%d", server.GetPort())));
- TTableClient tableClient(driver);
- auto session = tableClient.GetSession().ExtractValueSync().GetSession();
-
- constexpr const char* table = "/Root/table";
- constexpr const char* index = "byValue";
- const TString indexTablePath = JoinFsPaths(table, index, "indexImplTable");
- constexpr int minPartitions = 10;
- ExecuteDataDefinitionQuery(session, Sprintf(R"(
- CREATE TABLE `%s` (
- Key Uint32,
- Value Uint32,
- PRIMARY KEY (Key),
- INDEX %s GLOBAL ON (Value)
- );
- )",
- table, index
- ));
- ExecuteDataDefinitionQuery(session, Sprintf(R"(
- ALTER TABLE `%s` ALTER INDEX %s SET (
- AUTO_PARTITIONING_BY_LOAD = ENABLED,
- AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d
- );
- )", table, index, minPartitions
- ));
-
- CheckTableDescription(session, indexTablePath, CreateMinPartitionsChecker(minPartitions));
-
- TTempDir tempDir;
- const auto& pathToBackup = tempDir.Path();
- // TO DO: implement NDump::TClient::Dump and call it instead of BackupFolder
- NYdb::NBackup::BackupFolder(driver, "/Root", ".", pathToBackup, {}, false, false);
-
- NDump::TClient backupClient(driver);
-
- // restore deleted table
- ExecuteDataDefinitionQuery(session, Sprintf(R"(
- DROP TABLE `%s`;
- )", table
- ));
- Restore(backupClient, pathToBackup, "/Root");
- CheckTableDescription(session, indexTablePath, CreateMinPartitionsChecker(minPartitions));
- }
-
-}