aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-09-29 09:07:02 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-09-29 09:27:45 +0300
commit0c7c463bde024e085f92b83fb9128526ab3c1acb (patch)
tree12c39f3321f2db4a550f22aca9303e5750182a53
parent1aec7b5b218bdae93aee82b2c28c70955c9ce9a0 (diff)
downloadydb-0c7c463bde024e085f92b83fb9128526ab3c1acb.tar.gz
KIKIMR-19213: provide events for stop reading
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/actor.cpp12
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/actor.h2
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/task.cpp27
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/task.h28
4 files changed, 48 insertions, 21 deletions
diff --git a/ydb/core/tx/columnshard/blobs_reader/actor.cpp b/ydb/core/tx/columnshard/blobs_reader/actor.cpp
index ec1f0626e46..2b387baeb9c 100644
--- a/ydb/core/tx/columnshard/blobs_reader/actor.cpp
+++ b/ydb/core/tx/columnshard/blobs_reader/actor.cpp
@@ -5,6 +5,8 @@ namespace NKikimr::NOlap::NBlobOperations::NRead {
TAtomicCounter TActor::WaitingBlobsCount = 0;
void TActor::Handle(TEvStartReadTask::TPtr& ev) {
+ const auto& externalTaskId = ev->Get()->GetTask()->GetExternalTaskId();
+ NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("external_task_id", externalTaskId);
THashSet<TBlobRange> rangesInProgress;
for (auto&& agent : ev->Get()->GetTask()->GetAgents()) {
for (auto&& b : agent->GetRangesForRead()) {
@@ -14,7 +16,7 @@ void TActor::Handle(TEvStartReadTask::TPtr& ev) {
ACFL_DEBUG("event", "TEvReadTask")("enqueued_blob_id", r);
rangesInProgress.emplace(r);
} else {
- ACFL_TRACE("event", "TEvReadTask")("blob_id", r);
+ ACFL_DEBUG("event", "TEvReadTask")("blob_id", r);
it = BlobTasks.emplace(r, std::vector<std::shared_ptr<ITask>>()).first;
WaitingBlobsCount.Inc();
}
@@ -52,4 +54,12 @@ TActor::TActor(ui64 tabletId, const TActorId& parent)
}
+TActor::~TActor() {
+ for (auto&& i : BlobTasks) {
+ for (auto&& t : i.second) {
+ t->Abort();
+ }
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/blobs_reader/actor.h b/ydb/core/tx/columnshard/blobs_reader/actor.h
index c03cc1e49b0..aad5d74b185 100644
--- a/ydb/core/tx/columnshard/blobs_reader/actor.h
+++ b/ydb/core/tx/columnshard/blobs_reader/actor.h
@@ -30,6 +30,7 @@ public:
STFUNC(StateWait) {
TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent", Parent));
switch (ev->GetTypeRewrite()) {
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway);
hFunc(TEvStartReadTask, Handle);
hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle);
default:
@@ -37,6 +38,7 @@ public:
}
}
+ ~TActor();
};
}
diff --git a/ydb/core/tx/columnshard/blobs_reader/task.cpp b/ydb/core/tx/columnshard/blobs_reader/task.cpp
index 38c7ebf627e..4b79bac1dd8 100644
--- a/ydb/core/tx/columnshard/blobs_reader/task.cpp
+++ b/ydb/core/tx/columnshard/blobs_reader/task.cpp
@@ -9,11 +9,12 @@ const std::vector<std::shared_ptr<IBlobsReadingAction>>& ITask::GetAgents() cons
}
bool ITask::AddError(const TBlobRange& range, const TErrorStatus& status) {
- if (TaskFinishedWithError) {
- ACFL_WARN("event", "SkipError")("message", status.GetErrorMessage())("status", status.GetStatus());
+ if (TaskFinishedWithError || AbortFlag) {
+ ACFL_WARN("event", "SkipError")("blob_range", range)("message", status.GetErrorMessage())("status", status.GetStatus())("external_task_id", ExternalTaskId)
+ ("abort", AbortFlag)("finished_with_error", TaskFinishedWithError);
return false;
} else {
- ACFL_ERROR("event", "NewError")("message", status.GetErrorMessage())("status", status.GetStatus());
+ ACFL_ERROR("event", "NewError")("blob_range", range)("message", status.GetErrorMessage())("status", status.GetStatus())("external_task_id", ExternalTaskId);
}
{
auto it = BlobsWaiting.find(range);
@@ -34,11 +35,11 @@ bool ITask::AddError(const TBlobRange& range, const TErrorStatus& status) {
}
void ITask::AddData(const TBlobRange& range, const TString& data) {
- if (TaskFinishedWithError) {
- ACFL_WARN("event", "SkipDataAfterError");
+ if (TaskFinishedWithError || AbortFlag) {
+ ACFL_WARN("event", "SkipDataAfterError")("external_task_id", ExternalTaskId)("abort", AbortFlag)("finished_with_error", TaskFinishedWithError);
return;
} else {
- ACFL_TRACE("event", "NewData")("range", range.ToString());
+ ACFL_TRACE("event", "NewData")("range", range.ToString())("external_task_id", ExternalTaskId);
}
Y_VERIFY(BlobsFetchingStarted);
{
@@ -54,16 +55,23 @@ void ITask::AddData(const TBlobRange& range, const TString& data) {
}
void ITask::StartBlobsFetching(const THashSet<TBlobRange>& rangesInProgress) {
+ ACFL_TRACE("task_id", ExternalTaskId)("event", "start");
Y_VERIFY(!BlobsFetchingStarted);
BlobsFetchingStarted = true;
+ ui64 size = 0;
+ ui64 count = 0;
for (auto&& agent : Agents) {
for (auto&& b : agent->GetRangesForRead()) {
for (auto&& r : b.second) {
BlobsWaiting.emplace(r, agent);
+ size += r.Size;
+ ++count;
}
}
agent->Start(rangesInProgress);
}
+ WaitBlobsCount = count;
+ WaitBlobsSize = size;
if (BlobsWaiting.empty()) {
OnDataReady();
}
@@ -73,9 +81,10 @@ namespace {
TAtomicCounter TaskIdentifierBuilder = 0;
}
-ITask::ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions)
+ITask::ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& externalTaskId)
: Agents(actions)
, TaskIdentifier(TaskIdentifierBuilder.Inc())
+ , ExternalTaskId(externalTaskId)
{
AFL_VERIFY(Agents.size());
for (auto&& i : Agents) {
@@ -95,7 +104,7 @@ TString ITask::DebugString() const {
}
void ITask::OnDataReady() {
- ACFL_DEBUG("event", "OnDataReady")("task", DebugString());
+ ACFL_DEBUG("event", "OnDataReady")("task", DebugString())("external_task_id", ExternalTaskId);
Y_VERIFY(!DataIsReadyFlag);
DataIsReadyFlag = true;
DoOnDataReady();
@@ -107,7 +116,7 @@ bool ITask::OnError(const TBlobRange& range) {
}
ITask::~ITask() {
- Y_VERIFY(!NActors::TlsActivationContext || DataIsReadyFlag || TaskFinishedWithError);
+ AFL_VERIFY(!NActors::TlsActivationContext || DataIsReadyFlag || TaskFinishedWithError || AbortFlag);
}
}
diff --git a/ydb/core/tx/columnshard/blobs_reader/task.h b/ydb/core/tx/columnshard/blobs_reader/task.h
index fb2a1a5bfcc..213058d8d23 100644
--- a/ydb/core/tx/columnshard/blobs_reader/task.h
+++ b/ydb/core/tx/columnshard/blobs_reader/task.h
@@ -19,6 +19,10 @@ private:
bool TaskFinishedWithError = false;
bool DataIsReadyFlag = false;
const ui64 TaskIdentifier = 0;
+ const TString ExternalTaskId;
+ bool AbortFlag = false;
+ std::optional<ui64> WaitBlobsSize;
+ std::optional<ui64> WaitBlobsCount;
protected:
bool IsFetchingStarted() const {
return BlobsFetchingStarted;
@@ -42,26 +46,28 @@ protected:
return "";
}
public:
+ void Abort() {
+ AbortFlag = true;
+ }
+
ui64 GetTaskIdentifier() const {
return TaskIdentifier;
}
+ const TString& GetExternalTaskId() const {
+ return ExternalTaskId;
+ }
+
TString DebugString() const;
ui64 GetExpectedBlobsSize() const {
- ui64 result = 0;
- for (auto&& i : BlobsWaiting) {
- result += i.second->GetExpectedBlobsSize();
- }
- return result;
+ Y_VERIFY(WaitBlobsSize);
+ return *WaitBlobsSize;
}
ui64 GetExpectedBlobsCount() const {
- ui64 result = 0;
- for (auto&& i : BlobsWaiting) {
- result += i.second->GetExpectedBlobsCount();
- }
- return result;
+ Y_VERIFY(WaitBlobsCount);
+ return *WaitBlobsCount;
}
THashSet<TBlobRange> GetExpectedRanges() const {
@@ -76,7 +82,7 @@ public:
virtual ~ITask();
- ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions);
+ ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& externalTaskId = "");
void StartBlobsFetching(const THashSet<TBlobRange>& rangesInProgress);