aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-24 14:02:43 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-24 14:02:43 +0300
commitf31fd59b254383df90ae3e828884f2dd64b6440f (patch)
treede1ce442f01d7df26776c2122c75cbb86feb2ac5
parentb4eaeb41928a66a2d10eed0c0c3da8afb59acddd (diff)
downloadydb-f31fd59b254383df90ae3e828884f2dd64b6440f.tar.gz
YQ-701 YQ-577 Add callback under mutex
Apply code from review https://a.yandex-team.ru/review/2344254 and fix copying issues in `Fail()` ref:4a93e54489c9c40c9726a28592b707a9b0bb9b7f
-rw-r--r--build/mapping.conf.json6
-rwxr-xr-xya4
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp22
3 files changed, 25 insertions, 7 deletions
diff --git a/build/mapping.conf.json b/build/mapping.conf.json
index 8ddaa5d9a1..2bd8467202 100644
--- a/build/mapping.conf.json
+++ b/build/mapping.conf.json
@@ -2946,6 +2946,12 @@
"2812465732": "https://storage.mds.yandex.net/get-devtools-opensource/471749/2812465732",
"2812467322": "https://storage.mds.yandex.net/get-devtools-opensource/233854/2812467322",
"2812467790": "https://storage.mds.yandex.net/get-devtools-opensource/471749/2812467790",
+ "2817782042": "https://storage.mds.yandex.net/get-devtools-opensource/471749/2817782042",
+ "2817782630": "https://storage.mds.yandex.net/get-devtools-opensource/250854/2817782630",
+ "2817783133": "https://storage.mds.yandex.net/get-devtools-opensource/471749/2817783133",
+ "2817784371": "https://storage.mds.yandex.net/get-devtools-opensource/250854/2817784371",
+ "2817784890": "https://storage.mds.yandex.net/get-devtools-opensource/479623/2817784890",
+ "2817785117": "https://storage.mds.yandex.net/get-devtools-opensource/479623/2817785117",
"309054781": "https://storage.mds.yandex.net/get-devtools-opensource/250854/309054781",
"360916612": "https://storage.mds.yandex.net/get-devtools-opensource/233854/360916612",
"412716868": "https://storage.mds.yandex.net/get-devtools-opensource/233854/412716868",
diff --git a/ya b/ya
index 402c4455f0..fdd928be05 100755
--- a/ya
+++ b/ya
@@ -4,8 +4,8 @@ import sys
import platform
import json
-URLS = ["https://storage.mds.yandex.net/get-devtools-opensource/233854/7389b72dfa2a5e402ccfcc2b6dcdbd5b"]
-MD5 = "7389b72dfa2a5e402ccfcc2b6dcdbd5b"
+URLS = ["https://storage.mds.yandex.net/get-devtools-opensource/471749/11f899077dc929aab633241c83ff8616"]
+MD5 = "11f899077dc929aab633241c83ff8616"
RETRIES = 5
HASH_PREFIX = 10
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 7c3cf7c0b9..a89dd1b38f 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -62,13 +62,20 @@ public:
return Handle;
}
- void AddCallback(IHTTPGateway::TOnResult callback) {
+ // return true if callback successfully added to this work
+ bool AddCallback(IHTTPGateway::TOnResult callback) {
+ const std::unique_lock lock(SyncCallbacks);
+ if (Callbacks.empty())
+ return false;
Callbacks.emplace(std::move(callback));
+ return true;
}
- void Fail(TIssue error) {
+ void Fail(const TIssue& error) {
+ TIssues issues{error};
+ const std::unique_lock lock(SyncCallbacks);
while (!Callbacks.empty()) {
- Callbacks.top()(TIssues{error});
+ Callbacks.top()(issues);
Callbacks.pop();
}
}
@@ -77,6 +84,7 @@ public:
if (CURLE_OK != result)
return Fail(TIssue(curl_easy_strerror(result)));
+ const std::unique_lock lock(SyncCallbacks);
while (!Callbacks.empty()) {
if (1U == Callbacks.size())
Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer)));
@@ -107,6 +115,8 @@ private:
TString Buffer;
TStringInput Input;
TStringOutput Output;
+
+ std::mutex SyncCallbacks;
std::stack<IHTTPGateway::TOnResult> Callbacks;
};
@@ -294,7 +304,8 @@ private:
const std::unique_lock lock(Sync);
auto& entry = Requests[TKeyType(url, headers, data, retryPolicy)];
if (const auto& easy = entry.lock())
- return easy->AddCallback(std::move(callback));
+ if (easy->AddCallback(std::move(callback)))
+ return;
auto easy = TEasyCurl::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
entry = easy;
@@ -404,7 +415,8 @@ private:
const std::unique_lock lock(Sync);
auto& entry = Requests[TKeyType(url, headers, data, std::move(retryPolicy))];
if (const auto& easy = entry.first.lock())
- return easy->AddCallback(std::move(callback));
+ if (easy->AddCallback(std::move(callback)))
+ return;
auto easy = TEasyCurl::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
entry = std::make_pair(TEasyCurl::TWeakPtr(easy), std::thread(&THTTPEasyGateway::Perform, weak_from_this(), easy));
}