aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoruzhas <uzhas@ydb.tech>2022-08-26 20:54:05 +0300
committeruzhas <uzhas@ydb.tech>2022-08-26 20:54:05 +0300
commitc815af1186223677be77bf709c12468b01b29a3a (patch)
tree00a41bd7bcbbfb763c53c3ef02020803d8dc02f0
parentb0c9cc1d3129b79b0c09132622a795ab6a9319f3 (diff)
downloadydb-c815af1186223677be77bf709c12468b01b29a3a.tar.gz
fix hanging POST, support queue drain via 0 min size to unlock sink finishing, add tracing capability to gateway
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp135
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp6
2 files changed, 95 insertions, 46 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index ae9ccc3e1ac..7644754cfc6 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -11,6 +11,49 @@
#include <stack>
#include <queue>
+namespace {
+int curlTrace(CURL *handle, curl_infotype type,
+ char *data, size_t size,
+ void *userp) {
+ Y_UNUSED(handle);
+ Y_UNUSED(userp);
+ Y_UNUSED(size);
+ Y_UNUSED(data);
+
+ TStringBuf buf(data, size);
+ switch (type) {
+ case CURLINFO_TEXT:
+ Cerr << "== Info: " << buf;
+ break;
+ case CURLINFO_HEADER_OUT:
+ Cerr << "=> Send header (" << size << " bytes):" << Endl << buf;
+ break;
+ case CURLINFO_HEADER_IN:
+ Cerr << "<= Recv header (" << size << " bytes):" << buf;
+ break;
+ default:
+ return 0;
+
+/*
+ case CURLINFO_DATA_OUT:
+ Cerr << "=> Send data (" << size << " bytes)" << Endl;
+ break;
+ case CURLINFO_SSL_DATA_OUT:
+ Cerr << "=> Send SSL data (" << size << " bytes)" << Endl;
+ break;
+ case CURLINFO_DATA_IN:
+ Cerr << "<= Recv data (" << size << " bytes)" << Endl;
+ break;
+ case CURLINFO_SSL_DATA_IN:
+ Cerr << "<= Recv SSL data (" << size << " bytes)" << Endl;
+ break;
+*/
+ }
+
+ return 0;
+}
+}
+
namespace NYql {
namespace {
@@ -31,7 +74,7 @@ public:
PUT
};
- TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false, ui64 expectedSize = 0, const TCurlInitConfig& config = TCurlInitConfig())
+ TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t bodySize = 0, ui64 expectedSize = 0, const TCurlInitConfig& config = TCurlInitConfig())
: Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes)
{
switch (method) {
@@ -44,6 +87,14 @@ public:
curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
break;
}
+
+ // does nothing if CURLOPT_VERBOSE is not set to 1
+ curl_easy_setopt(Handle, CURLOPT_DEBUGFUNCTION, curlTrace);
+
+ // for local debug only
+ // will print tokens in HTTP headers
+ // curl_easy_setopt(Handle, CURLOPT_VERBOSE, 1L);
+
curl_easy_setopt(Handle, CURLOPT_URL, url.c_str());
curl_easy_setopt(Handle, CURLOPT_USERAGENT, "YQ HTTP gateway");
curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L);
@@ -64,7 +115,11 @@ public:
curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str());
curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERDATA :CURLOPT_WRITEDATA, static_cast<void*>(this));
- if (withBody) {
+ if (method == EMethod::POST) {
+ curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, bodySize);
+ }
+
+ if (bodySize) {
curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback);
curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this));
}
@@ -123,7 +178,7 @@ public:
using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>;
TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig())
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty(), expectedSize, std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState))
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), expectedSize, std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState))
{
Output.Reserve(ExpectedSize);
Callbacks.emplace(std::move(callback));
@@ -219,7 +274,7 @@ public:
IHTTPGateway::TOnNewDataPart onNewData,
IHTTPGateway::TOnDownloadFinish onFinish,
const TCurlInitConfig& config = TCurlInitConfig())
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, false, 0, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, 0, 0, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
{}
static TPtr Make(
@@ -380,6 +435,7 @@ public:
~THTTPMultiGateway() {
curl_multi_wakeup(Handle);
+ IsStopped = true;
if (Thread.joinable()) {
Thread.join();
}
@@ -394,61 +450,50 @@ private:
void InitCurl() {
const CURLcode globalInitResult = curl_global_init(CURL_GLOBAL_ALL);
- if (globalInitResult == CURLE_OK) {
- Handle = curl_multi_init();
- if (!Handle) {
- Cerr << "curl_multi_init error" << Endl;
- }
- } else {
- Cerr << "curl_global_init error " << int(globalInitResult) << ": " << curl_easy_strerror(globalInitResult) << Endl;
+ if (globalInitResult != CURLE_OK) {
+ throw yexception() << "curl_global_init error " << int(globalInitResult) << ": " << curl_easy_strerror(globalInitResult) << Endl;
+ }
+ Handle = curl_multi_init();
+ if (!Handle) {
+ throw yexception() << "curl_multi_init error";
}
}
void UninitCurl() {
- if (Handle) {
- const CURLMcode multiCleanupResult = curl_multi_cleanup(Handle);
- if (multiCleanupResult != CURLM_OK) {
- Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl;
- }
+ Y_VERIFY(Handle);
+ const CURLMcode multiCleanupResult = curl_multi_cleanup(Handle);
+ if (multiCleanupResult != CURLM_OK) {
+ Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl;
}
curl_global_cleanup();
}
- static void Perform(const TWeakPtr& weak, CURLM* handle) {
+ void Perform() {
OutputSize.store(0ULL);
- for (size_t handlers = 0U;;) {
- if (const auto& self = weak.lock()) {
- handlers = self->FillHandlers();
- self->PerformCycles->Inc();
- self->OutputMemory->Set(OutputSize);
- } else {
- break;
- }
+ for (size_t handlers = 0U; !IsStopped;) {
+ handlers = FillHandlers();
+ PerformCycles->Inc();
+ OutputMemory->Set(OutputSize);
int running = 0;
- if (const auto c = curl_multi_perform(handle, &running); CURLM_OK != c) {
- if (const auto& self = weak.lock()) {
- self->Fail(c);
- }
+ if (const auto c = curl_multi_perform(Handle, &running); CURLM_OK != c) {
+ Fail(c);
break;
}
if (running < int(handlers)) {
- if (const auto& self = weak.lock()) {
- for (int messages = int(handlers) - running; messages;) {
- if (const auto msg = curl_multi_info_read(handle, &messages)) {
- if(msg->msg == CURLMSG_DONE) {
- self->Done(msg->easy_handle, msg->data.result);
- }
+ for (int messages = int(handlers) - running; messages;) {
+ if (const auto msg = curl_multi_info_read(Handle, &messages)) {
+ if(msg->msg == CURLMSG_DONE) {
+ Done(msg->easy_handle, msg->data.result);
}
}
}
} else {
- if (const auto c = curl_multi_poll(handle, nullptr, 0, 256, nullptr); CURLM_OK != c) {
- if (const auto& self = weak.lock()) {
- self->Fail(c);
- }
+ const int timeoutMs = 300;
+ if (const auto c = curl_multi_poll(Handle, nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) {
+ Fail(c);
break;
}
}
@@ -547,6 +592,7 @@ private:
const TIssue error(curl_multi_strerror(result));
while (!works.empty()) {
+ curl_multi_remove_handle(Handle, works.top()->GetHandle());
works.top()->Fail(error);
works.pop();
}
@@ -639,6 +685,7 @@ private:
std::mutex Sync;
std::thread Thread;
+ std::atomic<bool> IsStopped = false;
size_t AllocatedSize = 0ULL;
static std::atomic_size_t OutputSize;
@@ -732,11 +779,11 @@ IHTTPGateway::Make(const THttpGatewayConfig* httpGatewaysCfg, ::NMonitoring::TDy
const auto gateway = std::make_shared<THTTPMultiGateway>(httpGatewaysCfg, std::move(counters));
THTTPMultiGateway::Singleton = gateway;
- if (gateway->GetHandle()) {
- gateway->Thread = std::thread(std::bind(&THTTPMultiGateway::Perform, THTTPMultiGateway::Singleton, gateway->GetHandle()));
- } else {
- ythrow yexception() << "Failed to initialize http gateway";
- }
+
+ gateway->Thread = std::thread([self = gateway.get()] () {
+ return self->Perform();
+ });
+
return gateway;
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 65bfd5c53b5..641a7c4f6b5 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -96,7 +96,7 @@ public:
Gateway->Upload(Url, MakeHeader(), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), true, GetS3RetryPolicy());
} else {
Become(&TS3FileWriteActor::InitialStateFunc);
- Gateway->Upload(Url + "?uploads", MakeHeader(), "", std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, GetS3RetryPolicy());
+ Gateway->Upload(Url + "?uploads", MakeHeader(), 0, std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, GetS3RetryPolicy());
}
}
@@ -114,6 +114,9 @@ public:
void Finish() {
Parts->Seal();
+ if (!UploadId.empty())
+ StartUploadParts();
+
if (!InFlight && Parts->Empty())
CommitUploadedParts();
}
@@ -401,7 +404,6 @@ private:
// IActor & IDqComputeActorAsyncOutput
void PassAway() override { // Is called from Compute Actor
-
for (const auto& p : FileWriteActors) {
for (const auto& fileWriter : p.second) {
fileWriter->PassAway();