diff options
author | dakovalkov <dakovalkov@yandex-team.com> | 2023-11-28 11:21:04 +0300 |
---|---|---|
committer | dakovalkov <dakovalkov@yandex-team.com> | 2023-11-28 12:17:28 +0300 |
commit | 38b378a33c9814cc1cf654beceb92bf6fb0c7c2c (patch) | |
tree | 1297be68dd39310e07d0b32ce99e6b4894778937 | |
parent | e47e2d7f764c4433fc3ccb00c5875361175988f7 (diff) | |
download | ydb-38b378a33c9814cc1cf654beceb92bf6fb0c7c2c.tar.gz |
Fix aws-cpp-sdk-s3 usage in ydb
This PR fixes incorrect aws-cpp-sdk usages in ydb founded during the aws-cpp-sdk library update in arcadia.
Explanation:
- Aws::ShutdownAPI should be called before exiting the program to avoid memory leaks (detected by leak sanitizer).
- All asynchronous operations should be awaited before destroying S3Client. DisableRequestProcessing does not wait for completion of such operations. Heap use after free detected by address sanitizer.
-rw-r--r-- | ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp | 21 | ||||
-rw-r--r-- | ydb/core/wrappers/s3_storage.cpp | 2 | ||||
-rw-r--r-- | ydb/core/wrappers/s3_storage.h | 26 |
3 files changed, 34 insertions, 15 deletions
diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp index 3263c1353a..1e9bf14f04 100644 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp @@ -19,6 +19,8 @@ #include <fmt/format.h> +#include <library/cpp/testing/hook/hook.h> + namespace NKikimr { namespace NKqp { @@ -41,19 +43,7 @@ const TString TEST_SCHEMA = R"(["StructType";[["key";["DataType";"Utf8";];];["va const TString TEST_SCHEMA_IDS = R"(["StructType";[["key";["DataType";"Utf8";];];];])"; -bool InitAwsApi() { - Aws::InitAPI(Aws::SDKOptions()); - return true; -} - -bool EnsureAwsApiInited() { - static const bool inited = InitAwsApi(); - return inited; -} - Aws::S3::S3Client MakeS3Client() { - EnsureAwsApiInited(); - Aws::Client::ClientConfiguration s3ClientConfig; s3ClientConfig.endpointOverride = GetEnv("S3_ENDPOINT"); s3ClientConfig.scheme = Aws::Http::Scheme::HTTP; @@ -170,6 +160,13 @@ TString GetBucketLocation(const TStringBuf bucket) { return TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket << '/'; } +Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) { + Aws::InitAPI(Aws::SDKOptions()); +} + +Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) { + Aws::ShutdownAPI(Aws::SDKOptions()); +} Y_UNIT_TEST_SUITE(KqpFederatedQuery) { Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) { diff --git a/ydb/core/wrappers/s3_storage.cpp b/ydb/core/wrappers/s3_storage.cpp index ca7dbc7c4d..9f892da3b6 100644 --- a/ydb/core/wrappers/s3_storage.cpp +++ b/ydb/core/wrappers/s3_storage.cpp @@ -270,6 +270,8 @@ public: TS3ExternalStorage::~TS3ExternalStorage() { if (Client) { Client->DisableRequestProcessing(); + std::unique_lock guard(RunningQueriesMutex); + RunningQueriesNotifier.wait(guard, [&] { return RunningQueriesCount == 0; }); } } diff --git a/ydb/core/wrappers/s3_storage.h b/ydb/core/wrappers/s3_storage.h index a1728a6384..b3e4e9c9b4 100644 --- a/ydb/core/wrappers/s3_storage.h +++ b/ydb/core/wrappers/s3_storage.h @@ -23,9 +23,13 @@ #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/UploadPartCopyRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/S3Client.h> #include <library/cpp/actors/core/log.h> +#include <util/generic/scope.h> #include <util/string/builder.h> #include <util/string/printf.h> +#include <condition_variable> +#include <mutex> + namespace NKikimr::NWrappers::NExternalStorage { class TS3ExternalStorage: public IExternalStorageOperator, TS3User { @@ -37,6 +41,10 @@ private: const Aws::S3::Model::StorageClass StorageClass = Aws::S3::Model::StorageClass::STANDARD; bool Verbose = true; + mutable std::mutex RunningQueriesMutex; + mutable std::condition_variable RunningQueriesNotifier; + mutable int RunningQueriesCount = 0; + template <typename TRequest, typename TOutcome> using THandler = std::function<void(const Aws::S3::S3Client*, const TRequest&, const TOutcome&, const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)>; @@ -49,15 +57,24 @@ private: ev->Get()->MutableRequest().WithBucket(Bucket); auto ctx = std::make_shared<TCtx>(TlsActivationContext->ActorSystem(), ev->Sender, ev->Get()->GetRequestContext(), StorageClass, ReplyAdapter); - bool verbose = Verbose; - auto callback = [verbose]( + auto callback = [this]( const Aws::S3::S3Client*, const typename TEvRequest::TRequest& request, const typename TEvResponse::TOutcome& outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) { const auto* ctx = static_cast<const TCtx*>(context.get()); - if (verbose) { + Y_DEFER { + std::unique_lock guard(RunningQueriesMutex); + --RunningQueriesCount; + bool needNotify = (RunningQueriesCount == 0); + guard.unlock(); + if (needNotify) { + RunningQueriesNotifier.notify_all(); + } + }; + + if (Verbose) { LOG_NOTICE_S(*ctx->GetActorSystem(), NKikimrServices::S3_WRAPPER, "Response" << ": uuid# " << ctx->GetUUID() << ", response# " << outcome); @@ -79,6 +96,9 @@ private: << ", request# " << ev->Get()->GetRequest()); } func(Client.Get(), ctx->PrepareRequest(ev), callback, ctx); + + std::unique_lock guard(RunningQueriesMutex); + ++RunningQueriesCount; } public: |