diff options
author | uzhas <uzhas@ydb.tech> | 2022-08-26 20:54:05 +0300 |
---|---|---|
committer | uzhas <uzhas@ydb.tech> | 2022-08-26 20:54:05 +0300 |
commit | c815af1186223677be77bf709c12468b01b29a3a (patch) | |
tree | 00a41bd7bcbbfb763c53c3ef02020803d8dc02f0 | |
parent | b0c9cc1d3129b79b0c09132622a795ab6a9319f3 (diff) | |
download | ydb-c815af1186223677be77bf709c12468b01b29a3a.tar.gz |
fix hanging POST, support queue drain via 0 min size to unlock sink finishing, add tracing capability to gateway
-rw-r--r-- | ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp | 135 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp | 6 |
2 files changed, 95 insertions, 46 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index ae9ccc3e1ac..7644754cfc6 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -11,6 +11,49 @@ #include <stack> #include <queue> +namespace { +int curlTrace(CURL *handle, curl_infotype type, + char *data, size_t size, + void *userp) { + Y_UNUSED(handle); + Y_UNUSED(userp); + Y_UNUSED(size); + Y_UNUSED(data); + + TStringBuf buf(data, size); + switch (type) { + case CURLINFO_TEXT: + Cerr << "== Info: " << buf; + break; + case CURLINFO_HEADER_OUT: + Cerr << "=> Send header (" << size << " bytes):" << Endl << buf; + break; + case CURLINFO_HEADER_IN: + Cerr << "<= Recv header (" << size << " bytes):" << buf; + break; + default: + return 0; + +/* + case CURLINFO_DATA_OUT: + Cerr << "=> Send data (" << size << " bytes)" << Endl; + break; + case CURLINFO_SSL_DATA_OUT: + Cerr << "=> Send SSL data (" << size << " bytes)" << Endl; + break; + case CURLINFO_DATA_IN: + Cerr << "<= Recv data (" << size << " bytes)" << Endl; + break; + case CURLINFO_SSL_DATA_IN: + Cerr << "<= Recv SSL data (" << size << " bytes)" << Endl; + break; +*/ + } + + return 0; +} +} + namespace NYql { namespace { @@ -31,7 +74,7 @@ public: PUT }; - TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false, ui64 expectedSize = 0, const TCurlInitConfig& config = TCurlInitConfig()) + TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t bodySize = 0, ui64 expectedSize = 0, const TCurlInitConfig& config = TCurlInitConfig()) : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes) { switch (method) { @@ -44,6 +87,14 @@ public: curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); break; } + + // does nothing if CURLOPT_VERBOSE is not set to 1 + curl_easy_setopt(Handle, CURLOPT_DEBUGFUNCTION, curlTrace); + + // for local debug only + // will print tokens in HTTP headers + // curl_easy_setopt(Handle, CURLOPT_VERBOSE, 1L); + curl_easy_setopt(Handle, CURLOPT_URL, url.c_str()); curl_easy_setopt(Handle, CURLOPT_USERAGENT, "YQ HTTP gateway"); curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L); @@ -64,7 +115,11 @@ public: curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str()); curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback); curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERDATA :CURLOPT_WRITEDATA, static_cast<void*>(this)); - if (withBody) { + if (method == EMethod::POST) { + curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, bodySize); + } + + if (bodySize) { curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback); curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this)); } @@ -123,7 +178,7 @@ public: using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty(), expectedSize, std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState)) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), expectedSize, std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState)) { Output.Reserve(ExpectedSize); Callbacks.emplace(std::move(callback)); @@ -219,7 +274,7 @@ public: IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, const TCurlInitConfig& config = TCurlInitConfig()) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, false, 0, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, 0, 0, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) {} static TPtr Make( @@ -380,6 +435,7 @@ public: ~THTTPMultiGateway() { curl_multi_wakeup(Handle); + IsStopped = true; if (Thread.joinable()) { Thread.join(); } @@ -394,61 +450,50 @@ private: void InitCurl() { const CURLcode globalInitResult = curl_global_init(CURL_GLOBAL_ALL); - if (globalInitResult == CURLE_OK) { - Handle = curl_multi_init(); - if (!Handle) { - Cerr << "curl_multi_init error" << Endl; - } - } else { - Cerr << "curl_global_init error " << int(globalInitResult) << ": " << curl_easy_strerror(globalInitResult) << Endl; + if (globalInitResult != CURLE_OK) { + throw yexception() << "curl_global_init error " << int(globalInitResult) << ": " << curl_easy_strerror(globalInitResult) << Endl; + } + Handle = curl_multi_init(); + if (!Handle) { + throw yexception() << "curl_multi_init error"; } } void UninitCurl() { - if (Handle) { - const CURLMcode multiCleanupResult = curl_multi_cleanup(Handle); - if (multiCleanupResult != CURLM_OK) { - Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl; - } + Y_VERIFY(Handle); + const CURLMcode multiCleanupResult = curl_multi_cleanup(Handle); + if (multiCleanupResult != CURLM_OK) { + Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl; } curl_global_cleanup(); } - static void Perform(const TWeakPtr& weak, CURLM* handle) { + void Perform() { OutputSize.store(0ULL); - for (size_t handlers = 0U;;) { - if (const auto& self = weak.lock()) { - handlers = self->FillHandlers(); - self->PerformCycles->Inc(); - self->OutputMemory->Set(OutputSize); - } else { - break; - } + for (size_t handlers = 0U; !IsStopped;) { + handlers = FillHandlers(); + PerformCycles->Inc(); + OutputMemory->Set(OutputSize); int running = 0; - if (const auto c = curl_multi_perform(handle, &running); CURLM_OK != c) { - if (const auto& self = weak.lock()) { - self->Fail(c); - } + if (const auto c = curl_multi_perform(Handle, &running); CURLM_OK != c) { + Fail(c); break; } if (running < int(handlers)) { - if (const auto& self = weak.lock()) { - for (int messages = int(handlers) - running; messages;) { - if (const auto msg = curl_multi_info_read(handle, &messages)) { - if(msg->msg == CURLMSG_DONE) { - self->Done(msg->easy_handle, msg->data.result); - } + for (int messages = int(handlers) - running; messages;) { + if (const auto msg = curl_multi_info_read(Handle, &messages)) { + if(msg->msg == CURLMSG_DONE) { + Done(msg->easy_handle, msg->data.result); } } } } else { - if (const auto c = curl_multi_poll(handle, nullptr, 0, 256, nullptr); CURLM_OK != c) { - if (const auto& self = weak.lock()) { - self->Fail(c); - } + const int timeoutMs = 300; + if (const auto c = curl_multi_poll(Handle, nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) { + Fail(c); break; } } @@ -547,6 +592,7 @@ private: const TIssue error(curl_multi_strerror(result)); while (!works.empty()) { + curl_multi_remove_handle(Handle, works.top()->GetHandle()); works.top()->Fail(error); works.pop(); } @@ -639,6 +685,7 @@ private: std::mutex Sync; std::thread Thread; + std::atomic<bool> IsStopped = false; size_t AllocatedSize = 0ULL; static std::atomic_size_t OutputSize; @@ -732,11 +779,11 @@ IHTTPGateway::Make(const THttpGatewayConfig* httpGatewaysCfg, ::NMonitoring::TDy const auto gateway = std::make_shared<THTTPMultiGateway>(httpGatewaysCfg, std::move(counters)); THTTPMultiGateway::Singleton = gateway; - if (gateway->GetHandle()) { - gateway->Thread = std::thread(std::bind(&THTTPMultiGateway::Perform, THTTPMultiGateway::Singleton, gateway->GetHandle())); - } else { - ythrow yexception() << "Failed to initialize http gateway"; - } + + gateway->Thread = std::thread([self = gateway.get()] () { + return self->Perform(); + }); + return gateway; } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 65bfd5c53b5..641a7c4f6b5 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -96,7 +96,7 @@ public: Gateway->Upload(Url, MakeHeader(), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, std::placeholders::_1), true, GetS3RetryPolicy()); } else { Become(&TS3FileWriteActor::InitialStateFunc); - Gateway->Upload(Url + "?uploads", MakeHeader(), "", std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, GetS3RetryPolicy()); + Gateway->Upload(Url + "?uploads", MakeHeader(), 0, std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false, GetS3RetryPolicy()); } } @@ -114,6 +114,9 @@ public: void Finish() { Parts->Seal(); + if (!UploadId.empty()) + StartUploadParts(); + if (!InFlight && Parts->Empty()) CommitUploadedParts(); } @@ -401,7 +404,6 @@ private: // IActor & IDqComputeActorAsyncOutput void PassAway() override { // Is called from Compute Actor - for (const auto& p : FileWriteActors) { for (const auto& fileWriter : p.second) { fileWriter->PassAway(); |