diff options
author | mrlolthe1st <mrlolthe1st@yandex-team.com> | 2023-02-17 14:22:43 +0300 |
---|---|---|
committer | mrlolthe1st <mrlolthe1st@yandex-team.com> | 2023-02-17 14:22:43 +0300 |
commit | ed241323ffc1d09cf1999a40708af44b1f48326b (patch) | |
tree | 4258da9280500e27c6b22207c7c98b6f352501cc | |
parent | 221c521309e51dc220108e6682fe5e9744094783 (diff) | |
download | ydb-ed241323ffc1d09cf1999a40708af44b1f48326b.tar.gz |
fix unexcepted events
initial
-rw-r--r-- | ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp | 5 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 60 |
2 files changed, 27 insertions, 38 deletions
diff --git a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp index 5ba0b2960c..081a26be28 100644 --- a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp +++ b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp @@ -87,7 +87,7 @@ public: std::bind(&OnResult, promise, std::placeholders::_1), {}, RetryPolicy); - return arrow::Buffer::FromString(promise.GetFuture().GetValueSync()); + return arrow::Buffer::FromString(std::move(promise.GetFuture().GetValueSync())); } catch (const std::exception& e) { return arrow::Status::UnknownError(e.what()); } @@ -104,8 +104,7 @@ public: private: static void OnResult(NThreading::TPromise<TString> promise, IHTTPGateway::TResult&& result) { try { - auto res = std::get<IHTTPGateway::TContent>(result).Extract(); - promise.SetValue(res); + promise.SetValue(std::move(std::get<IHTTPGateway::TContent>(result).Extract())); } catch (const std::exception& e) { promise.SetException(std::current_exception()); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index d29c26cd8d..4d2704c8d5 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -865,6 +865,27 @@ private: } } + template<typename EvType> + void WaitEvent() { + auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvPause, TEvPrivate::TEvContinue, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>(); + TVector<THolder<IEventBase>> otherEvents; + while (!event->CastAsLocal<EvType>()) { + if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) { + throw TS3ReadAbort(); + } + + if (!event->CastAsLocal<TEvPrivate::TEvPause>() && !event->CastAsLocal<TEvPrivate::TEvContinue>()) { + otherEvents.push_back(event->ReleaseBase()); + } + + event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvPause, TEvPrivate::TEvContinue, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>(); + } + + for (auto& e: otherEvents) { + Send(SelfActorId, e.Release()); + } + } + void Run() final try { LOG_CORO_D("TS3ReadCoroImpl", "Run" << ", Path: " << Path); @@ -911,26 +932,11 @@ private: auto onResolve = [actorSystem, actorId = this->SelfActorId] { actorSystem->Send(new IEventHandle(actorId, actorId, new TEvPrivate::TEvFutureResolved())); }; - auto waitForResolve = [&] { - auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>(); - TVector<THolder<IEventBase>> otherEvents; - while (!event->CastAsLocal<TEvPrivate::TEvFutureResolved>()) { - if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) { - throw TS3ReadAbort(); - } - otherEvents.push_back(event->ReleaseBase()); - event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>(); - } - - for (auto &e: otherEvents) { - Send(SelfActorId, e.Release()); - } - }; auto future = ArrowReader->GetSchema(fileDesc); future.Subscribe([onResolve](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) { onResolve(); }); - waitForResolve(); + WaitEvent<TEvPrivate::TEvFutureResolved>(); auto result = future.GetValue(); std::shared_ptr<arrow::Schema> schema = result.Schema; std::vector<int> columnIndices; @@ -965,7 +971,7 @@ private: std::move(columnIndices), std::move(columnConverters), onResolve, - waitForResolve); + [&] { WaitEvent<TEvPrivate::TEvFutureResolved>(); }); ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal); } else { std::unique_ptr<NDB::ReadBuffer> buffer; @@ -1039,22 +1045,6 @@ private: auto selfActorId = SelfActorId; size_t cntBlocksInFly = 0; if (isLocal) { - auto waitProcessed = [&] { - auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>(); - TVector<THolder<IEventBase>> otherEvents; - while (!event->CastAsLocal<TEvPrivate::TEvBlockProcessed>()) { - if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) { - throw TS3ReadAbort(); - } - otherEvents.push_back(event->ReleaseBase()); - event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>(); - } - - for (auto& e: otherEvents) { - Send(SelfActorId, e.Release()); - } - }; - for (;;) { T batch; @@ -1062,7 +1052,7 @@ private: break; } if (++cntBlocksInFly > MaxBlocksInFly) { - waitProcessed(); + WaitEvent<TEvPrivate::TEvBlockProcessed>(); --cntBlocksInFly; } Send(ParentActorId, new TEv(batch, PathIndex, [actorSystem, selfActorId]() { @@ -1070,7 +1060,7 @@ private: }, GetIngressDelta())); } while (cntBlocksInFly--) { - waitProcessed(); + WaitEvent<TEvPrivate::TEvBlockProcessed>(); } } else { for (;;) { |