aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordakovalkov <dakovalkov@yandex-team.com>2023-11-28 11:21:04 +0300
committerdakovalkov <dakovalkov@yandex-team.com>2023-11-28 12:17:28 +0300
commit38b378a33c9814cc1cf654beceb92bf6fb0c7c2c (patch)
tree1297be68dd39310e07d0b32ce99e6b4894778937
parente47e2d7f764c4433fc3ccb00c5875361175988f7 (diff)
downloadydb-38b378a33c9814cc1cf654beceb92bf6fb0c7c2c.tar.gz
Fix aws-cpp-sdk-s3 usage in ydb
This PR fixes incorrect aws-cpp-sdk usages in ydb founded during the aws-cpp-sdk library update in arcadia. Explanation: - Aws::ShutdownAPI should be called before exiting the program to avoid memory leaks (detected by leak sanitizer). - All asynchronous operations should be awaited before destroying S3Client. DisableRequestProcessing does not wait for completion of such operations. Heap use after free detected by address sanitizer.
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp21
-rw-r--r--ydb/core/wrappers/s3_storage.cpp2
-rw-r--r--ydb/core/wrappers/s3_storage.h26
3 files changed, 34 insertions, 15 deletions
diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
index 3263c1353a..1e9bf14f04 100644
--- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
@@ -19,6 +19,8 @@
#include <fmt/format.h>
+#include <library/cpp/testing/hook/hook.h>
+
namespace NKikimr {
namespace NKqp {
@@ -41,19 +43,7 @@ const TString TEST_SCHEMA = R"(["StructType";[["key";["DataType";"Utf8";];];["va
const TString TEST_SCHEMA_IDS = R"(["StructType";[["key";["DataType";"Utf8";];];];])";
-bool InitAwsApi() {
- Aws::InitAPI(Aws::SDKOptions());
- return true;
-}
-
-bool EnsureAwsApiInited() {
- static const bool inited = InitAwsApi();
- return inited;
-}
-
Aws::S3::S3Client MakeS3Client() {
- EnsureAwsApiInited();
-
Aws::Client::ClientConfiguration s3ClientConfig;
s3ClientConfig.endpointOverride = GetEnv("S3_ENDPOINT");
s3ClientConfig.scheme = Aws::Http::Scheme::HTTP;
@@ -170,6 +160,13 @@ TString GetBucketLocation(const TStringBuf bucket) {
return TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket << '/';
}
+Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
+ Aws::InitAPI(Aws::SDKOptions());
+}
+
+Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
+ Aws::ShutdownAPI(Aws::SDKOptions());
+}
Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) {
diff --git a/ydb/core/wrappers/s3_storage.cpp b/ydb/core/wrappers/s3_storage.cpp
index ca7dbc7c4d..9f892da3b6 100644
--- a/ydb/core/wrappers/s3_storage.cpp
+++ b/ydb/core/wrappers/s3_storage.cpp
@@ -270,6 +270,8 @@ public:
TS3ExternalStorage::~TS3ExternalStorage() {
if (Client) {
Client->DisableRequestProcessing();
+ std::unique_lock guard(RunningQueriesMutex);
+ RunningQueriesNotifier.wait(guard, [&] { return RunningQueriesCount == 0; });
}
}
diff --git a/ydb/core/wrappers/s3_storage.h b/ydb/core/wrappers/s3_storage.h
index a1728a6384..b3e4e9c9b4 100644
--- a/ydb/core/wrappers/s3_storage.h
+++ b/ydb/core/wrappers/s3_storage.h
@@ -23,9 +23,13 @@
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/UploadPartCopyRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/S3Client.h>
#include <library/cpp/actors/core/log.h>
+#include <util/generic/scope.h>
#include <util/string/builder.h>
#include <util/string/printf.h>
+#include <condition_variable>
+#include <mutex>
+
namespace NKikimr::NWrappers::NExternalStorage {
class TS3ExternalStorage: public IExternalStorageOperator, TS3User {
@@ -37,6 +41,10 @@ private:
const Aws::S3::Model::StorageClass StorageClass = Aws::S3::Model::StorageClass::STANDARD;
bool Verbose = true;
+ mutable std::mutex RunningQueriesMutex;
+ mutable std::condition_variable RunningQueriesNotifier;
+ mutable int RunningQueriesCount = 0;
+
template <typename TRequest, typename TOutcome>
using THandler = std::function<void(const Aws::S3::S3Client*, const TRequest&, const TOutcome&, const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)>;
@@ -49,15 +57,24 @@ private:
ev->Get()->MutableRequest().WithBucket(Bucket);
auto ctx = std::make_shared<TCtx>(TlsActivationContext->ActorSystem(), ev->Sender, ev->Get()->GetRequestContext(), StorageClass, ReplyAdapter);
- bool verbose = Verbose;
- auto callback = [verbose](
+ auto callback = [this](
const Aws::S3::S3Client*,
const typename TEvRequest::TRequest& request,
const typename TEvResponse::TOutcome& outcome,
const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) {
const auto* ctx = static_cast<const TCtx*>(context.get());
- if (verbose) {
+ Y_DEFER {
+ std::unique_lock guard(RunningQueriesMutex);
+ --RunningQueriesCount;
+ bool needNotify = (RunningQueriesCount == 0);
+ guard.unlock();
+ if (needNotify) {
+ RunningQueriesNotifier.notify_all();
+ }
+ };
+
+ if (Verbose) {
LOG_NOTICE_S(*ctx->GetActorSystem(), NKikimrServices::S3_WRAPPER, "Response"
<< ": uuid# " << ctx->GetUUID()
<< ", response# " << outcome);
@@ -79,6 +96,9 @@ private:
<< ", request# " << ev->Get()->GetRequest());
}
func(Client.Get(), ctx->PrepareRequest(ev), callback, ctx);
+
+ std::unique_lock guard(RunningQueriesMutex);
+ ++RunningQueriesCount;
}
public: