aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-08-25 12:54:55 +0300
committerhor911 <hor911@ydb.tech>2022-08-25 12:54:55 +0300
commitb987f753e1bc78435e0cae37405fdee39ea1a0e0 (patch)
treec488b0b46134940c6454aea41bbfca536ba195a7
parent33493b7a5a2f2292e59064c7e60445719e409a96 (diff)
downloadydb-b987f753e1bc78435e0cae37405fdee39ea1a0e0.tar.gz
Drain data event queue on read error
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp28
1 files changed, 18 insertions, 10 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index a5adc878766..639db27b3a1 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -361,16 +361,24 @@ private:
if (InputFinished)
return;
- const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
- InputFinished = true;
- switch (const auto etype = ev->GetTypeRewrite()) {
- case TEvPrivate::TEvReadFinished::EventType:
- break;
- case TEvPrivate::TEvReadError::EventType:
- Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error);
- break;
- default:
- break;
+ while (true) {
+ const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadError, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadFinished>();
+ const auto etype = ev->GetTypeRewrite();
+ if (etype == TEvPrivate::TEvDataPart::EventType) {
+ // just ignore all data parts event to drain event queue
+ continue;
+ }
+ switch (etype) {
+ case TEvPrivate::TEvReadFinished::EventType:
+ break;
+ case TEvPrivate::TEvReadError::EventType:
+ Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error);
+ break;
+ default:
+ break;
+ }
+ InputFinished = true;
+ return;
}
}