diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-29 09:07:02 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-29 09:27:45 +0300 |
commit | 0c7c463bde024e085f92b83fb9128526ab3c1acb (patch) | |
tree | 12c39f3321f2db4a550f22aca9303e5750182a53 | |
parent | 1aec7b5b218bdae93aee82b2c28c70955c9ce9a0 (diff) | |
download | ydb-0c7c463bde024e085f92b83fb9128526ab3c1acb.tar.gz |
KIKIMR-19213: provide events for stop reading
-rw-r--r-- | ydb/core/tx/columnshard/blobs_reader/actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blobs_reader/actor.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blobs_reader/task.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blobs_reader/task.h | 28 |
4 files changed, 48 insertions, 21 deletions
diff --git a/ydb/core/tx/columnshard/blobs_reader/actor.cpp b/ydb/core/tx/columnshard/blobs_reader/actor.cpp index ec1f0626e46..2b387baeb9c 100644 --- a/ydb/core/tx/columnshard/blobs_reader/actor.cpp +++ b/ydb/core/tx/columnshard/blobs_reader/actor.cpp @@ -5,6 +5,8 @@ namespace NKikimr::NOlap::NBlobOperations::NRead { TAtomicCounter TActor::WaitingBlobsCount = 0; void TActor::Handle(TEvStartReadTask::TPtr& ev) { + const auto& externalTaskId = ev->Get()->GetTask()->GetExternalTaskId(); + NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("external_task_id", externalTaskId); THashSet<TBlobRange> rangesInProgress; for (auto&& agent : ev->Get()->GetTask()->GetAgents()) { for (auto&& b : agent->GetRangesForRead()) { @@ -14,7 +16,7 @@ void TActor::Handle(TEvStartReadTask::TPtr& ev) { ACFL_DEBUG("event", "TEvReadTask")("enqueued_blob_id", r); rangesInProgress.emplace(r); } else { - ACFL_TRACE("event", "TEvReadTask")("blob_id", r); + ACFL_DEBUG("event", "TEvReadTask")("blob_id", r); it = BlobTasks.emplace(r, std::vector<std::shared_ptr<ITask>>()).first; WaitingBlobsCount.Inc(); } @@ -52,4 +54,12 @@ TActor::TActor(ui64 tabletId, const TActorId& parent) } +TActor::~TActor() { + for (auto&& i : BlobTasks) { + for (auto&& t : i.second) { + t->Abort(); + } + } +} + } diff --git a/ydb/core/tx/columnshard/blobs_reader/actor.h b/ydb/core/tx/columnshard/blobs_reader/actor.h index c03cc1e49b0..aad5d74b185 100644 --- a/ydb/core/tx/columnshard/blobs_reader/actor.h +++ b/ydb/core/tx/columnshard/blobs_reader/actor.h @@ -30,6 +30,7 @@ public: STFUNC(StateWait) { TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent", Parent)); switch (ev->GetTypeRewrite()) { + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway); hFunc(TEvStartReadTask, Handle); hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle); default: @@ -37,6 +38,7 @@ public: } } + ~TActor(); }; } diff --git a/ydb/core/tx/columnshard/blobs_reader/task.cpp b/ydb/core/tx/columnshard/blobs_reader/task.cpp index 38c7ebf627e..4b79bac1dd8 100644 --- a/ydb/core/tx/columnshard/blobs_reader/task.cpp +++ b/ydb/core/tx/columnshard/blobs_reader/task.cpp @@ -9,11 +9,12 @@ const std::vector<std::shared_ptr<IBlobsReadingAction>>& ITask::GetAgents() cons } bool ITask::AddError(const TBlobRange& range, const TErrorStatus& status) { - if (TaskFinishedWithError) { - ACFL_WARN("event", "SkipError")("message", status.GetErrorMessage())("status", status.GetStatus()); + if (TaskFinishedWithError || AbortFlag) { + ACFL_WARN("event", "SkipError")("blob_range", range)("message", status.GetErrorMessage())("status", status.GetStatus())("external_task_id", ExternalTaskId) + ("abort", AbortFlag)("finished_with_error", TaskFinishedWithError); return false; } else { - ACFL_ERROR("event", "NewError")("message", status.GetErrorMessage())("status", status.GetStatus()); + ACFL_ERROR("event", "NewError")("blob_range", range)("message", status.GetErrorMessage())("status", status.GetStatus())("external_task_id", ExternalTaskId); } { auto it = BlobsWaiting.find(range); @@ -34,11 +35,11 @@ bool ITask::AddError(const TBlobRange& range, const TErrorStatus& status) { } void ITask::AddData(const TBlobRange& range, const TString& data) { - if (TaskFinishedWithError) { - ACFL_WARN("event", "SkipDataAfterError"); + if (TaskFinishedWithError || AbortFlag) { + ACFL_WARN("event", "SkipDataAfterError")("external_task_id", ExternalTaskId)("abort", AbortFlag)("finished_with_error", TaskFinishedWithError); return; } else { - ACFL_TRACE("event", "NewData")("range", range.ToString()); + ACFL_TRACE("event", "NewData")("range", range.ToString())("external_task_id", ExternalTaskId); } Y_VERIFY(BlobsFetchingStarted); { @@ -54,16 +55,23 @@ void ITask::AddData(const TBlobRange& range, const TString& data) { } void ITask::StartBlobsFetching(const THashSet<TBlobRange>& rangesInProgress) { + ACFL_TRACE("task_id", ExternalTaskId)("event", "start"); Y_VERIFY(!BlobsFetchingStarted); BlobsFetchingStarted = true; + ui64 size = 0; + ui64 count = 0; for (auto&& agent : Agents) { for (auto&& b : agent->GetRangesForRead()) { for (auto&& r : b.second) { BlobsWaiting.emplace(r, agent); + size += r.Size; + ++count; } } agent->Start(rangesInProgress); } + WaitBlobsCount = count; + WaitBlobsSize = size; if (BlobsWaiting.empty()) { OnDataReady(); } @@ -73,9 +81,10 @@ namespace { TAtomicCounter TaskIdentifierBuilder = 0; } -ITask::ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions) +ITask::ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& externalTaskId) : Agents(actions) , TaskIdentifier(TaskIdentifierBuilder.Inc()) + , ExternalTaskId(externalTaskId) { AFL_VERIFY(Agents.size()); for (auto&& i : Agents) { @@ -95,7 +104,7 @@ TString ITask::DebugString() const { } void ITask::OnDataReady() { - ACFL_DEBUG("event", "OnDataReady")("task", DebugString()); + ACFL_DEBUG("event", "OnDataReady")("task", DebugString())("external_task_id", ExternalTaskId); Y_VERIFY(!DataIsReadyFlag); DataIsReadyFlag = true; DoOnDataReady(); @@ -107,7 +116,7 @@ bool ITask::OnError(const TBlobRange& range) { } ITask::~ITask() { - Y_VERIFY(!NActors::TlsActivationContext || DataIsReadyFlag || TaskFinishedWithError); + AFL_VERIFY(!NActors::TlsActivationContext || DataIsReadyFlag || TaskFinishedWithError || AbortFlag); } } diff --git a/ydb/core/tx/columnshard/blobs_reader/task.h b/ydb/core/tx/columnshard/blobs_reader/task.h index fb2a1a5bfcc..213058d8d23 100644 --- a/ydb/core/tx/columnshard/blobs_reader/task.h +++ b/ydb/core/tx/columnshard/blobs_reader/task.h @@ -19,6 +19,10 @@ private: bool TaskFinishedWithError = false; bool DataIsReadyFlag = false; const ui64 TaskIdentifier = 0; + const TString ExternalTaskId; + bool AbortFlag = false; + std::optional<ui64> WaitBlobsSize; + std::optional<ui64> WaitBlobsCount; protected: bool IsFetchingStarted() const { return BlobsFetchingStarted; @@ -42,26 +46,28 @@ protected: return ""; } public: + void Abort() { + AbortFlag = true; + } + ui64 GetTaskIdentifier() const { return TaskIdentifier; } + const TString& GetExternalTaskId() const { + return ExternalTaskId; + } + TString DebugString() const; ui64 GetExpectedBlobsSize() const { - ui64 result = 0; - for (auto&& i : BlobsWaiting) { - result += i.second->GetExpectedBlobsSize(); - } - return result; + Y_VERIFY(WaitBlobsSize); + return *WaitBlobsSize; } ui64 GetExpectedBlobsCount() const { - ui64 result = 0; - for (auto&& i : BlobsWaiting) { - result += i.second->GetExpectedBlobsCount(); - } - return result; + Y_VERIFY(WaitBlobsCount); + return *WaitBlobsCount; } THashSet<TBlobRange> GetExpectedRanges() const { @@ -76,7 +82,7 @@ public: virtual ~ITask(); - ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions); + ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& externalTaskId = ""); void StartBlobsFetching(const THashSet<TBlobRange>& rangesInProgress); |