aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-12-29 21:45:01 +0300
committerhor911 <hor911@ydb.tech>2022-12-29 21:45:01 +0300
commitf1382397c90c19e0615ada6d291febc0fc6dcf86 (patch)
tree474d00376069a40d6be4c1058f816fc03a9e919c
parentd4a93e59b962be866262372a9f7a6f78fc11efe1 (diff)
downloadydb-f1382397c90c19e0615ada6d291febc0fc6dcf86.tar.gz
Back pressure to HTTP gateway
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp119
1 files changed, 108 insertions, 11 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 532667ab41..a133b9f338 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -111,6 +111,10 @@ namespace {
constexpr TDuration MEMORY_USAGE_REPORT_PERIOD = TDuration::Seconds(10);
+struct TS3ReadAbort : public yexception {
+ using yexception::yexception;
+};
+
struct TS3ReadError : public yexception {
using yexception::yexception;
};
@@ -130,6 +134,8 @@ struct TEvPrivate {
EvNextRecordBatch,
EvBlockProcessed,
EvFileFinished,
+ EvPause,
+ EvContinue,
EvEnd
};
@@ -215,6 +221,12 @@ struct TEvPrivate {
TEvBlockProcessed() {}
};
+ struct TEvPause : public NActors::TEventLocal<TEvPause, EvPause> {
+ };
+
+ struct TEvContinue : public NActors::TEventLocal<TEvContinue, EvContinue> {
+ };
+
};
using namespace NKikimr::NMiniKQL;
@@ -616,7 +628,39 @@ public:
if (InputFinished)
return false;
- const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadFinished>();
+ if (Paused || DeferredEvents.empty()) {
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted
+ , TEvPrivate::TEvDataPart
+ , TEvPrivate::TEvReadFinished
+ , TEvPrivate::TEvPause
+ , TEvPrivate::TEvContinue
+ , NActors::TEvents::TEvPoison>();
+
+ switch (const auto etype = ev->GetTypeRewrite()) {
+ case TEvPrivate::TEvPause::EventType:
+ Paused = true;
+ break;
+ case TEvPrivate::TEvContinue::EventType:
+ Paused = false;
+ break;
+ case NActors::TEvents::TEvPoison::EventType:
+ RetryStuff->Cancel();
+ throw TS3ReadAbort();
+ default:
+ DeferredEvents.push(std::move(ev));
+ break;
+ }
+ }
+
+ if (Paused || DeferredEvents.empty()) {
+ value.clear();
+ return true;
+ }
+
+ THolder<IEventHandle> ev;
+ ev.Swap(DeferredEvents.front());
+ DeferredEvents.pop();
+
switch (const auto etype = ev->GetTypeRewrite()) {
case TEvPrivate::TEvReadStarted::EventType:
ErrorText.clear();
@@ -679,13 +723,22 @@ private:
return;
while (true) {
- const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadFinished>();
+ const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted
+ , TEvPrivate::TEvDataPart
+ , TEvPrivate::TEvReadFinished
+ , TEvPrivate::TEvPause
+ , TEvPrivate::TEvContinue
+ , NActors::TEvents::TEvPoison>();
+
const auto etype = ev->GetTypeRewrite();
switch (etype) {
case TEvPrivate::TEvReadFinished::EventType:
Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues);
LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished: " << Path << " " << Issues.ToOneLineString());
break;
+ case NActors::TEvents::TEvPoison::EventType:
+ RetryStuff->Cancel();
+ throw TS3ReadAbort();
default:
continue;
}
@@ -786,6 +839,8 @@ private:
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));
else
Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, IngressBytes, ExpectedDataSize, ActualDataSize));
+ } catch (const TS3ReadAbort&) {
+ LOG_CORO_D("TS3ReadCoroImpl", "S3 read abort. Path: " << Path);
} catch (const TDtorException&) {
return RetryStuff->Cancel();
} catch (const std::exception& err) {
@@ -913,6 +968,8 @@ private:
ui64 IngressBytes = 0;
ui64 ExpectedDataSize;
ui64 ActualDataSize = 0;
+ bool Paused = false;
+ std::queue<THolder<IEventHandle>> DeferredEvents;
};
class TS3ReadCoroActor : public TActorCoro {
@@ -999,6 +1056,7 @@ public:
QueueDataSize = Counters->GetCounter("QueueDataSize");
QueueBlockCount = Counters->GetCounter("QueueBlockCount");
BufferDataSize = Counters->GetCounter("BufferDataSize");
+ DownloadPaused = Counters->GetCounter("DownloadPaused");
}
if (TaskCounters) {
ExpectedDataSize = TaskCounters->GetCounter("ExpectedDataSize");
@@ -1020,10 +1078,12 @@ public:
// no path is pending
return false;
}
+ /*
if (BufferTotalDataSize > static_cast<i64>(ReadActorFactoryCfg.DataInflight)) {
// too large data inflight
return false;
}
+ */
if (DownloadInflight >= ReadActorFactoryCfg.MaxInflight) {
// too large download inflight
return false;
@@ -1046,7 +1106,7 @@ public:
BufferDataSize->Add(expectedDataSize);
}
auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg, expectedDataSize);
- RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex).release());
+ CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex).release()));
}
static constexpr char ActorName[] = "S3_STREAM_READ_ACTOR";
@@ -1144,16 +1204,18 @@ private:
total += s;
output.emplace_back(std::move(value));
Blocks.pop_front();
+ BufferTotalDataSize -= s;
+ QueueTotalDataSize -= s;
if (Counters) {
- BufferTotalDataSize -= s;
BufferDataSize->Sub(s);
- QueueTotalDataSize -= s;
QueueDataSize->Sub(s);
QueueBlockCount->Dec();
}
TryRegisterCoro();
} while (!Blocks.empty() && free > 0LL && GetBlockSize(Blocks.front()) <= size_t(free));
+ MaybeContinue();
+
finished = Blocks.empty() && !Count;
if (finished) {
ContainerCache.Clear();
@@ -1170,12 +1232,17 @@ private:
BufferDataSize->Sub(BufferTotalDataSize);
QueueDataSize->Sub(QueueTotalDataSize);
QueueBlockCount->Sub(Blocks.size());
+ if (Paused) {
+ DownloadPaused->Dec();
+ }
}
BufferTotalDataSize = 0;
QueueTotalDataSize = 0;
- for (auto pair: RetryStuffForFile) {
- pair.second->Cancel();
+
+ for (const auto actorId : CoroActors) {
+ Send(actorId, new NActors::TEvents::TEvPoison());
}
+
ContainerCache.Clear();
TActorBootstrapped<TS3StreamReadActor>::PassAway();
}
@@ -1188,6 +1255,30 @@ private:
return headers;
}
+ void MaybePause() {
+ if (!Paused && QueueTotalDataSize >= ReadActorFactoryCfg.DataInflight) {
+ for (const auto actorId : CoroActors) {
+ Send(actorId, new TEvPrivate::TEvPause());
+ }
+ Paused = true;
+ if (Counters) {
+ DownloadPaused->Inc();
+ }
+ }
+ }
+
+ void MaybeContinue() {
+ if (Paused && QueueTotalDataSize < ReadActorFactoryCfg.DataInflight) {
+ for (const auto actorId : CoroActors) {
+ Send(actorId, new TEvPrivate::TEvContinue());
+ }
+ Paused = false;
+ if (Counters) {
+ DownloadPaused->Dec();
+ }
+ }
+ }
+
STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvRetryEventFunc, HandleRetry);
hFunc(TEvPrivate::TEvNextBlock, HandleNextBlock);
@@ -1202,12 +1293,13 @@ private:
void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) {
YQL_ENSURE(!ReadSpec->Arrow);
IngressBytes = next->Get()->IngressBytes;
+ auto size = next->Get()->Block.bytes();
+ QueueTotalDataSize += size;
if (Counters) {
QueueBlockCount->Inc();
- auto size = next->Get()->Block.bytes();
QueueDataSize->Add(size);
- QueueTotalDataSize += size;
}
+ MaybePause();
Blocks.emplace_back(next);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
ReportMemoryUsage();
@@ -1216,18 +1308,20 @@ private:
void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) {
YQL_ENSURE(ReadSpec->Arrow);
IngressBytes = next->Get()->IngressBytes;
+ auto size = GetSizeOfBatch(*next->Get()->Batch);
+ QueueTotalDataSize += size;
if (Counters) {
QueueBlockCount->Inc();
- auto size = GetSizeOfBatch(*next->Get()->Batch);
QueueDataSize->Add(size);
- QueueTotalDataSize += size;
}
+ MaybePause();
Blocks.emplace_back(next);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
ReportMemoryUsage();
}
void HandleFileFinished(TEvPrivate::TEvFileFinished::TPtr& ev) {
+ CoroActors.erase(ev->Sender);
IngressBytes = ev->Get()->IngressBytes;
RetryStuffForFile.erase(ev->Get()->PathIndex);
Y_VERIFY(Count);
@@ -1291,10 +1385,13 @@ private:
::NMonitoring::TDynamicCounters::TCounterPtr ExpectedDataSize;
::NMonitoring::TDynamicCounters::TCounterPtr ActualDataSize;
::NMonitoring::TDynamicCounters::TCounterPtr BufferDataSize;
+ ::NMonitoring::TDynamicCounters::TCounterPtr DownloadPaused;
::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounterPtr TaskCounters;
double Ratio = 0.0;
ui64 DownloadInflight = 0;
+ std::set<NActors::TActorId> CoroActors;
+ bool Paused = false;
};
using namespace NKikimr::NMiniKQL;