diff options
author | hor911 <hor911@ydb.tech> | 2022-12-29 21:45:01 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-12-29 21:45:01 +0300 |
commit | f1382397c90c19e0615ada6d291febc0fc6dcf86 (patch) | |
tree | 474d00376069a40d6be4c1058f816fc03a9e919c | |
parent | d4a93e59b962be866262372a9f7a6f78fc11efe1 (diff) | |
download | ydb-f1382397c90c19e0615ada6d291febc0fc6dcf86.tar.gz |
Back pressure to HTTP gateway
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 119 |
1 files changed, 108 insertions, 11 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 532667ab41..a133b9f338 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -111,6 +111,10 @@ namespace { constexpr TDuration MEMORY_USAGE_REPORT_PERIOD = TDuration::Seconds(10); +struct TS3ReadAbort : public yexception { + using yexception::yexception; +}; + struct TS3ReadError : public yexception { using yexception::yexception; }; @@ -130,6 +134,8 @@ struct TEvPrivate { EvNextRecordBatch, EvBlockProcessed, EvFileFinished, + EvPause, + EvContinue, EvEnd }; @@ -215,6 +221,12 @@ struct TEvPrivate { TEvBlockProcessed() {} }; + struct TEvPause : public NActors::TEventLocal<TEvPause, EvPause> { + }; + + struct TEvContinue : public NActors::TEventLocal<TEvContinue, EvContinue> { + }; + }; using namespace NKikimr::NMiniKQL; @@ -616,7 +628,39 @@ public: if (InputFinished) return false; - const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadFinished>(); + if (Paused || DeferredEvents.empty()) { + auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted + , TEvPrivate::TEvDataPart + , TEvPrivate::TEvReadFinished + , TEvPrivate::TEvPause + , TEvPrivate::TEvContinue + , NActors::TEvents::TEvPoison>(); + + switch (const auto etype = ev->GetTypeRewrite()) { + case TEvPrivate::TEvPause::EventType: + Paused = true; + break; + case TEvPrivate::TEvContinue::EventType: + Paused = false; + break; + case NActors::TEvents::TEvPoison::EventType: + RetryStuff->Cancel(); + throw TS3ReadAbort(); + default: + DeferredEvents.push(std::move(ev)); + break; + } + } + + if (Paused || DeferredEvents.empty()) { + value.clear(); + return true; + } + + THolder<IEventHandle> ev; + ev.Swap(DeferredEvents.front()); + DeferredEvents.pop(); + switch (const auto etype = ev->GetTypeRewrite()) { case TEvPrivate::TEvReadStarted::EventType: ErrorText.clear(); @@ -679,13 +723,22 @@ private: return; while (true) { - const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadFinished>(); + const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted + , TEvPrivate::TEvDataPart + , TEvPrivate::TEvReadFinished + , TEvPrivate::TEvPause + , TEvPrivate::TEvContinue + , NActors::TEvents::TEvPoison>(); + const auto etype = ev->GetTypeRewrite(); switch (etype) { case TEvPrivate::TEvReadFinished::EventType: Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues); LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished: " << Path << " " << Issues.ToOneLineString()); break; + case NActors::TEvents::TEvPoison::EventType: + RetryStuff->Cancel(); + throw TS3ReadAbort(); default: continue; } @@ -786,6 +839,8 @@ private: Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode)); else Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, IngressBytes, ExpectedDataSize, ActualDataSize)); + } catch (const TS3ReadAbort&) { + LOG_CORO_D("TS3ReadCoroImpl", "S3 read abort. Path: " << Path); } catch (const TDtorException&) { return RetryStuff->Cancel(); } catch (const std::exception& err) { @@ -913,6 +968,8 @@ private: ui64 IngressBytes = 0; ui64 ExpectedDataSize; ui64 ActualDataSize = 0; + bool Paused = false; + std::queue<THolder<IEventHandle>> DeferredEvents; }; class TS3ReadCoroActor : public TActorCoro { @@ -999,6 +1056,7 @@ public: QueueDataSize = Counters->GetCounter("QueueDataSize"); QueueBlockCount = Counters->GetCounter("QueueBlockCount"); BufferDataSize = Counters->GetCounter("BufferDataSize"); + DownloadPaused = Counters->GetCounter("DownloadPaused"); } if (TaskCounters) { ExpectedDataSize = TaskCounters->GetCounter("ExpectedDataSize"); @@ -1020,10 +1078,12 @@ public: // no path is pending return false; } + /* if (BufferTotalDataSize > static_cast<i64>(ReadActorFactoryCfg.DataInflight)) { // too large data inflight return false; } + */ if (DownloadInflight >= ReadActorFactoryCfg.MaxInflight) { // too large download inflight return false; @@ -1046,7 +1106,7 @@ public: BufferDataSize->Add(expectedDataSize); } auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg, expectedDataSize); - RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex).release()); + CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex).release())); } static constexpr char ActorName[] = "S3_STREAM_READ_ACTOR"; @@ -1144,16 +1204,18 @@ private: total += s; output.emplace_back(std::move(value)); Blocks.pop_front(); + BufferTotalDataSize -= s; + QueueTotalDataSize -= s; if (Counters) { - BufferTotalDataSize -= s; BufferDataSize->Sub(s); - QueueTotalDataSize -= s; QueueDataSize->Sub(s); QueueBlockCount->Dec(); } TryRegisterCoro(); } while (!Blocks.empty() && free > 0LL && GetBlockSize(Blocks.front()) <= size_t(free)); + MaybeContinue(); + finished = Blocks.empty() && !Count; if (finished) { ContainerCache.Clear(); @@ -1170,12 +1232,17 @@ private: BufferDataSize->Sub(BufferTotalDataSize); QueueDataSize->Sub(QueueTotalDataSize); QueueBlockCount->Sub(Blocks.size()); + if (Paused) { + DownloadPaused->Dec(); + } } BufferTotalDataSize = 0; QueueTotalDataSize = 0; - for (auto pair: RetryStuffForFile) { - pair.second->Cancel(); + + for (const auto actorId : CoroActors) { + Send(actorId, new NActors::TEvents::TEvPoison()); } + ContainerCache.Clear(); TActorBootstrapped<TS3StreamReadActor>::PassAway(); } @@ -1188,6 +1255,30 @@ private: return headers; } + void MaybePause() { + if (!Paused && QueueTotalDataSize >= ReadActorFactoryCfg.DataInflight) { + for (const auto actorId : CoroActors) { + Send(actorId, new TEvPrivate::TEvPause()); + } + Paused = true; + if (Counters) { + DownloadPaused->Inc(); + } + } + } + + void MaybeContinue() { + if (Paused && QueueTotalDataSize < ReadActorFactoryCfg.DataInflight) { + for (const auto actorId : CoroActors) { + Send(actorId, new TEvPrivate::TEvContinue()); + } + Paused = false; + if (Counters) { + DownloadPaused->Dec(); + } + } + } + STRICT_STFUNC(StateFunc, hFunc(TEvPrivate::TEvRetryEventFunc, HandleRetry); hFunc(TEvPrivate::TEvNextBlock, HandleNextBlock); @@ -1202,12 +1293,13 @@ private: void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) { YQL_ENSURE(!ReadSpec->Arrow); IngressBytes = next->Get()->IngressBytes; + auto size = next->Get()->Block.bytes(); + QueueTotalDataSize += size; if (Counters) { QueueBlockCount->Inc(); - auto size = next->Get()->Block.bytes(); QueueDataSize->Add(size); - QueueTotalDataSize += size; } + MaybePause(); Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); ReportMemoryUsage(); @@ -1216,18 +1308,20 @@ private: void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) { YQL_ENSURE(ReadSpec->Arrow); IngressBytes = next->Get()->IngressBytes; + auto size = GetSizeOfBatch(*next->Get()->Batch); + QueueTotalDataSize += size; if (Counters) { QueueBlockCount->Inc(); - auto size = GetSizeOfBatch(*next->Get()->Batch); QueueDataSize->Add(size); - QueueTotalDataSize += size; } + MaybePause(); Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); ReportMemoryUsage(); } void HandleFileFinished(TEvPrivate::TEvFileFinished::TPtr& ev) { + CoroActors.erase(ev->Sender); IngressBytes = ev->Get()->IngressBytes; RetryStuffForFile.erase(ev->Get()->PathIndex); Y_VERIFY(Count); @@ -1291,10 +1385,13 @@ private: ::NMonitoring::TDynamicCounters::TCounterPtr ExpectedDataSize; ::NMonitoring::TDynamicCounters::TCounterPtr ActualDataSize; ::NMonitoring::TDynamicCounters::TCounterPtr BufferDataSize; + ::NMonitoring::TDynamicCounters::TCounterPtr DownloadPaused; ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounterPtr TaskCounters; double Ratio = 0.0; ui64 DownloadInflight = 0; + std::set<NActors::TActorId> CoroActors; + bool Paused = false; }; using namespace NKikimr::NMiniKQL; |