aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormrlolthe1st <mrlolthe1st@yandex-team.com>2023-02-17 14:22:43 +0300
committermrlolthe1st <mrlolthe1st@yandex-team.com>2023-02-17 14:22:43 +0300
commited241323ffc1d09cf1999a40708af44b1f48326b (patch)
tree4258da9280500e27c6b22207c7c98b6f352501cc
parent221c521309e51dc220108e6682fe5e9744094783 (diff)
downloadydb-ed241323ffc1d09cf1999a40708af44b1f48326b.tar.gz
fix unexcepted events
initial
-rw-r--r--ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp5
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp60
2 files changed, 27 insertions, 38 deletions
diff --git a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
index 5ba0b2960c..081a26be28 100644
--- a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
+++ b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
@@ -87,7 +87,7 @@ public:
std::bind(&OnResult, promise, std::placeholders::_1),
{},
RetryPolicy);
- return arrow::Buffer::FromString(promise.GetFuture().GetValueSync());
+ return arrow::Buffer::FromString(std::move(promise.GetFuture().GetValueSync()));
} catch (const std::exception& e) {
return arrow::Status::UnknownError(e.what());
}
@@ -104,8 +104,7 @@ public:
private:
static void OnResult(NThreading::TPromise<TString> promise, IHTTPGateway::TResult&& result) {
try {
- auto res = std::get<IHTTPGateway::TContent>(result).Extract();
- promise.SetValue(res);
+ promise.SetValue(std::move(std::get<IHTTPGateway::TContent>(result).Extract()));
} catch (const std::exception& e) {
promise.SetException(std::current_exception());
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index d29c26cd8d..4d2704c8d5 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -865,6 +865,27 @@ private:
}
}
+ template<typename EvType>
+ void WaitEvent() {
+ auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvPause, TEvPrivate::TEvContinue, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
+ TVector<THolder<IEventBase>> otherEvents;
+ while (!event->CastAsLocal<EvType>()) {
+ if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) {
+ throw TS3ReadAbort();
+ }
+
+ if (!event->CastAsLocal<TEvPrivate::TEvPause>() && !event->CastAsLocal<TEvPrivate::TEvContinue>()) {
+ otherEvents.push_back(event->ReleaseBase());
+ }
+
+ event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvPause, TEvPrivate::TEvContinue, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
+ }
+
+ for (auto& e: otherEvents) {
+ Send(SelfActorId, e.Release());
+ }
+ }
+
void Run() final try {
LOG_CORO_D("TS3ReadCoroImpl", "Run" << ", Path: " << Path);
@@ -911,26 +932,11 @@ private:
auto onResolve = [actorSystem, actorId = this->SelfActorId] {
actorSystem->Send(new IEventHandle(actorId, actorId, new TEvPrivate::TEvFutureResolved()));
};
- auto waitForResolve = [&] {
- auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
- TVector<THolder<IEventBase>> otherEvents;
- while (!event->CastAsLocal<TEvPrivate::TEvFutureResolved>()) {
- if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) {
- throw TS3ReadAbort();
- }
- otherEvents.push_back(event->ReleaseBase());
- event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
- }
-
- for (auto &e: otherEvents) {
- Send(SelfActorId, e.Release());
- }
- };
auto future = ArrowReader->GetSchema(fileDesc);
future.Subscribe([onResolve](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) {
onResolve();
});
- waitForResolve();
+ WaitEvent<TEvPrivate::TEvFutureResolved>();
auto result = future.GetValue();
std::shared_ptr<arrow::Schema> schema = result.Schema;
std::vector<int> columnIndices;
@@ -965,7 +971,7 @@ private:
std::move(columnIndices),
std::move(columnConverters),
onResolve,
- waitForResolve);
+ [&] { WaitEvent<TEvPrivate::TEvFutureResolved>(); });
ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal);
} else {
std::unique_ptr<NDB::ReadBuffer> buffer;
@@ -1039,22 +1045,6 @@ private:
auto selfActorId = SelfActorId;
size_t cntBlocksInFly = 0;
if (isLocal) {
- auto waitProcessed = [&] {
- auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
- TVector<THolder<IEventBase>> otherEvents;
- while (!event->CastAsLocal<TEvPrivate::TEvBlockProcessed>()) {
- if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) {
- throw TS3ReadAbort();
- }
- otherEvents.push_back(event->ReleaseBase());
- event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
- }
-
- for (auto& e: otherEvents) {
- Send(SelfActorId, e.Release());
- }
- };
-
for (;;) {
T batch;
@@ -1062,7 +1052,7 @@ private:
break;
}
if (++cntBlocksInFly > MaxBlocksInFly) {
- waitProcessed();
+ WaitEvent<TEvPrivate::TEvBlockProcessed>();
--cntBlocksInFly;
}
Send(ParentActorId, new TEv(batch, PathIndex, [actorSystem, selfActorId]() {
@@ -1070,7 +1060,7 @@ private:
}, GetIngressDelta()));
}
while (cntBlocksInFly--) {
- waitProcessed();
+ WaitEvent<TEvPrivate::TEvBlockProcessed>();
}
} else {
for (;;) {