aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-03-29 13:45:12 +0300
committerGitHub <noreply@github.com>2024-03-29 13:45:12 +0300
commit3966efa7aa75db3716207221ede6f628e8ad5d8a (patch)
tree7260aaf17c966c641c5a285361f26a6fb908d59c
parenteae5f51b3dbd4a2e50bea92f8609a89857f22d45 (diff)
downloadydb-3966efa7aa75db3716207221ede6f628e8ad5d8a.tar.gz
conclusions for unready storages and scanner methods (#3282)
-rw-r--r--ydb/core/tx/columnshard/counters/scan.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/abstract/abstract.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/abstract/read_context.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/actor/actor.cpp96
-rw-r--r--ydb/core/tx/columnshard/engines/reader/actor/actor.h13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common/result.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp67
-rw-r--r--ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.h2
-rw-r--r--ydb/core/tx/columnshard/inflight_request_tracker.cpp10
-rw-r--r--ydb/core/tx/columnshard/inflight_request_tracker.h17
17 files changed, 110 insertions, 131 deletions
diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h
index 7103e2f115..3469dc2898 100644
--- a/ydb/core/tx/columnshard/counters/scan.h
+++ b/ydb/core/tx/columnshard/counters/scan.h
@@ -45,9 +45,11 @@ public:
Success /* "Success" */ = 0,
ConveyorInternalError /* "ConveyorInternalError" */,
ExternalAbort /* "ExternalAbort" */,
- IteratorInternalError /* "IteratorInternalError" */,
+ IteratorInternalErrorScan /* "IteratorInternalErrorScan" */,
+ IteratorInternalErrorResult /* "IteratorInternalErrorResult" */,
Deadline /* "Deadline" */,
UndeliveredEvent /* "UndeliveredEvent" */,
+ CannotAddInFlight /* "CannotAddInFlight" */,
COUNT
};
diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h b/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h
index c19f00e400..734f18bc5b 100644
--- a/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h
@@ -19,11 +19,11 @@ public:
return {};
}
virtual bool Finished() const = 0;
- virtual std::optional<TPartialReadResult> GetBatch() = 0;
+ virtual TConclusion<std::optional<TPartialReadResult>> GetBatch() = 0;
virtual void PrepareResults() {
}
- virtual bool ReadNextInterval() { return false; }
+ virtual TConclusion<bool> ReadNextInterval() { return false; }
virtual TString DebugString(const bool verbose = false) const {
Y_UNUSED(verbose);
return "NO_DATA";
diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
index d88266eb81..af1b5ab5bb 100644
--- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
@@ -108,7 +108,7 @@ protected:
virtual void DoAbort() = 0;
virtual bool DoIsFinished() const = 0;
virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0;
- virtual bool DoReadNextInterval() = 0;
+ virtual TConclusion<bool> DoReadNextInterval() = 0;
public:
IDataReader(const std::shared_ptr<TReadContext>& context);
virtual ~IDataReader() = default;
@@ -156,7 +156,7 @@ public:
sb << DoDebugString(verbose);
return sb;
}
- bool ReadNextInterval() {
+ [[nodiscard]] TConclusion<bool> ReadNextInterval() {
return DoReadNextInterval();
}
};
diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
index a3805f3c4b..49ab834d3f 100644
--- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
@@ -45,7 +45,7 @@ void TColumnShardScan::PassAway() {
TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId, const std::shared_ptr<IStoragesManager>& storagesManager,
const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
- ui64 tabletId, TDuration timeout, std::vector<TReadMetadataBase::TConstPtr>&& readMetadataList,
+ ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool)
: StoragesManager(storagesManager)
, ColumnShardActorId(columnShardActorId)
@@ -57,15 +57,14 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
, RequestCookie(requestCookie)
, DataFormat(dataFormat)
, TabletId(tabletId)
- , ReadMetadataRanges(std::move(readMetadataList))
- , ReadMetadataIndex(0)
+ , ReadMetadataRange(readMetadataRange)
, Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT))
, ScanCountersPool(scanCountersPool)
, Stats(NTracing::TTraceClient::GetLocalClient("SHARD", ::ToString(TabletId)/*, "SCAN_TXID:" + ::ToString(TxId)*/))
, ComputeShardingPolicy(computeShardingPolicy)
{
- AFL_VERIFY(ReadMetadataRanges.size() == 1);
- KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema();
+ AFL_VERIFY(ReadMetadataRange);
+ KeyYqlSchema = ReadMetadataRange->GetKeyYqlSchema();
}
void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
@@ -81,8 +80,8 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
ReadCoordinatorActorId = ctx.Register(new NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));
std::shared_ptr<TReadContext> context = std::make_shared<TReadContext>(StoragesManager, ScanCountersPool,
- ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
- ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
+ ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
+ ScanIterator = ReadMetadataRange->StartScan(context);
// propagate self actor id // TODO: FlagSubscribeOnSession ?
Send(ScanComputeActorId, new NKqp::TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen, TabletId), IEventHandle::FlagTrackDelivery);
@@ -92,12 +91,6 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
ContinueProcessing();
}
-bool TColumnShardScan::ReadNextBlob() {
- while (ScanIterator->ReadNextInterval()) {
- }
- return true;
-}
-
void TColumnShardScan::HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
--InFlightReads;
auto g = Stats->MakeGuard("task_result");
@@ -198,21 +191,24 @@ bool TColumnShardScan::ProduceResults() noexcept {
return false;
}
- auto resultOpt = ScanIterator->GetBatch();
- if (!resultOpt) {
- ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
+ auto resultConclusion = ScanIterator->GetBatch();
+ if (resultConclusion.IsFail()) {
+ ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", resultConclusion.GetErrorMessage());
+ SendAbortExecution(resultConclusion.GetErrorMessage());
+
+ ScanIterator.reset();
+ Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalErrorResult);
return false;
}
- auto& result = *resultOpt;
- if (!result.ErrorString.empty()) {
- ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", result.ErrorString);
- SendAbortExecution(TString(result.ErrorString.data(), result.ErrorString.size()));
- ScanIterator.reset();
- Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalError);
+ std::optional<TPartialReadResult> resultOpt = resultConclusion.DetachResult();
+ if (!resultOpt) {
+ ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
return false;
}
+ auto& result = *resultOpt;
+
if (!result.GetRecordsCount()) {
ACFL_DEBUG("stage", "got empty batch")("iterator", ScanIterator->DebugString());
return true;
@@ -263,14 +259,27 @@ void TColumnShardScan::ContinueProcessing() {
while (ScanIterator && ProduceResults()) {
}
- // Switch to the next range if the current one is finished
- if (ScanIterator && ScanIterator->Finished() && ChunksLimiter.HasMore()) {
- NextReadMetadata();
- }
-
if (ScanIterator) {
- // Make read-ahead requests for the subsequent blobs
- ReadNextBlob();
+ // Switch to the next range if the current one is finished
+ if (ScanIterator->Finished() && ChunksLimiter.HasMore()) {
+ auto g = Stats->MakeGuard("Finish");
+ MakeResult();
+ SendResult(false, true);
+ ScanIterator.reset();
+ Finish(NColumnShard::TScanCounters::EStatusFinish::Success);
+ } else {
+ while (true) {
+ TConclusion<bool> hasMoreData = ScanIterator->ReadNextInterval();
+ if (hasMoreData.IsFail()) {
+ ACFL_ERROR("event", "ContinueProcessing")("error", hasMoreData.GetErrorMessage());
+ ScanIterator.reset();
+ SendScanError("iterator_error:" + hasMoreData.GetErrorMessage());
+ return Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalErrorScan);
+ } else if (!*hasMoreData) {
+ break;
+ }
+ }
+ }
}
AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)("gen", ScanGen)("tablet", TabletId)
("debug", ScanIterator->DebugString());
@@ -286,21 +295,6 @@ void TColumnShardScan::MakeResult(size_t reserveRows /*= 0*/) {
}
}
-void TColumnShardScan::NextReadMetadata() {
- auto g = Stats->MakeGuard("NextReadMetadata");
- if (++ReadMetadataIndex == ReadMetadataRanges.size()) {
- // Send empty batch with "finished" flag
- MakeResult();
- SendResult(false, true);
- ScanIterator.reset();
- return Finish(NColumnShard::TScanCounters::EStatusFinish::Success);
- }
-
- auto context = std::make_shared<TReadContext>(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(),
- ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
- ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
-}
-
void TColumnShardScan::AddRow(const TConstArrayRef<TCell>& row) {
Result->Rows.emplace_back(TOwnedCellVec::Make(row));
++Rows;
@@ -379,11 +373,10 @@ bool TColumnShardScan::SendResult(bool pageFault, bool lastBatch) {
return true;
}
-void TColumnShardScan::SendScanError(TString reason /*= {}*/) {
+void TColumnShardScan::SendScanError(const TString& reason) {
+ AFL_VERIFY(reason);
TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId;
- if (!reason.empty()) {
- msg += ", reason: " + reason;
- }
+ msg += ", reason: " + reason;
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, TabletId);
ev->Record.SetStatus(Ydb::StatusIds::GENERIC_ERROR);
@@ -393,12 +386,11 @@ void TColumnShardScan::SendScanError(TString reason /*= {}*/) {
Send(ScanComputeActorId, ev.Release());
}
-void TColumnShardScan::SendAbortExecution(TString reason /*= {}*/) {
+void TColumnShardScan::SendAbortExecution(const TString& reason) {
+ AFL_VERIFY(reason);
auto status = NYql::NDqProto::StatusIds::PRECONDITION_FAILED;
TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId;
- if (!reason.empty()) {
- msg += ", reason: " + reason;
- }
+ msg += ", reason: " + reason;
Send(ScanComputeActorId, new NKqp::TEvKqp::TEvAbortExecution(status, msg));
}
diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.h b/ydb/core/tx/columnshard/engines/reader/actor/actor.h
index 234fb17823..1ae46d783e 100644
--- a/ydb/core/tx/columnshard/engines/reader/actor/actor.h
+++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.h
@@ -33,7 +33,7 @@ public:
TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId,
const std::shared_ptr<IStoragesManager>& storagesManager, const TComputeShardingPolicy& computeShardingPolicy,
ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
- ui64 tabletId, TDuration timeout, std::vector<TReadMetadataBase::TConstPtr>&& readMetadataList,
+ ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool);
void Bootstrap(const TActorContext& ctx);
@@ -55,8 +55,6 @@ private:
}
}
- bool ReadNextBlob();
-
void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev);
void HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev);
@@ -75,8 +73,6 @@ private:
private:
void MakeResult(size_t reserveRows = 0);
- void NextReadMetadata();
-
void AddRow(const TConstArrayRef<TCell>& row) override;
TOwnedCellVec ConvertLastKey(const std::shared_ptr<arrow::RecordBatch>& lastReadKey);
@@ -101,9 +97,9 @@ private:
bool SendResult(bool pageFault, bool lastBatch);
- void SendScanError(TString reason = {});
+ void SendScanError(const TString& reason);
- void SendAbortExecution(TString reason = {});
+ void SendAbortExecution(const TString& reason);
void Finish(const NColumnShard::TScanCounters::EStatusFinish status);
@@ -123,8 +119,7 @@ private:
const NKikimrDataEvents::EDataFormat DataFormat;
const ui64 TabletId;
- std::vector<TReadMetadataBase::TConstPtr> ReadMetadataRanges;
- ui32 ReadMetadataIndex;
+ TReadMetadataBase::TConstPtr ReadMetadataRange;
std::unique_ptr<TScanIteratorBase> ScanIterator;
std::vector<std::pair<TString, NScheme::TTypeInfo>> KeyYqlSchema;
diff --git a/ydb/core/tx/columnshard/engines/reader/common/result.h b/ydb/core/tx/columnshard/engines/reader/common/result.h
index 8b575d8a26..cfa9ebe87e 100644
--- a/ydb/core/tx/columnshard/engines/reader/common/result.h
+++ b/ydb/core/tx/columnshard/engines/reader/common/result.h
@@ -54,8 +54,6 @@ public:
return LastReadKey;
}
- std::string ErrorString;
-
explicit TPartialReadResult(
const std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>& resourcesGuards,
const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey)
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp
index e8b33132b0..c8de567ad8 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp
@@ -15,7 +15,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(const std::shared_ptr<TReadCo
}
}
-std::optional<TPartialReadResult> TColumnShardScanIterator::GetBatch() {
+TConclusion<std::optional<TPartialReadResult>> TColumnShardScanIterator::GetBatch() {
FillReadyResults();
return ReadyResults.pop_front();
}
@@ -24,7 +24,7 @@ void TColumnShardScanIterator::PrepareResults() {
FillReadyResults();
}
-bool TColumnShardScanIterator::ReadNextInterval() {
+TConclusion<bool> TColumnShardScanIterator::ReadNextInterval() {
return IndexedData->ReadNextInterval();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h
index beba3b2733..61c3c97007 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h
@@ -83,10 +83,10 @@ public:
return IndexedData->IsFinished() && ReadyResults.empty();
}
- std::optional<TPartialReadResult> GetBatch() override;
+ TConclusion<std::optional<TPartialReadResult>> GetBatch() override;
virtual void PrepareResults() override;
- virtual bool ReadNextInterval() override;
+ virtual TConclusion<bool> ReadNextInterval() override;
private:
void FillReadyResults();
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp
index 1f362c00cc..875acea032 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp
@@ -69,7 +69,7 @@ std::vector<TPartialReadResult> TPlainReadData::DoExtractReadyResults(const int6
return result;
}
-bool TPlainReadData::DoReadNextInterval() {
+TConclusion<bool> TPlainReadData::DoReadNextInterval() {
return Scanner->BuildNextInterval();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h
index 07946d5195..0df41d3a9a 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h
@@ -28,7 +28,7 @@ protected:
}
virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) override;
- virtual bool DoReadNextInterval() override;
+ virtual TConclusion<bool> DoReadNextInterval() override;
virtual void DoAbort() override {
AbortedFlag = true;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
index 11f284f14f..51de7b6d84 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
@@ -76,7 +76,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const s
}
}
-bool TScanHead::BuildNextInterval() {
+TConclusion<bool> TScanHead::BuildNextInterval() {
while (BorderPoints.size() && (FetchingIntervals.size() < InFlightLimit || BorderPoints.begin()->second.GetStartSources().empty())) {
auto firstBorderPointInfo = std::move(BorderPoints.begin()->second);
bool includeStart = firstBorderPointInfo.GetStartSources().size();
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h
index a2ca7f3861..5465ec0330 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h
@@ -69,7 +69,7 @@ public:
TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context);
- bool BuildNextInterval();
+ [[nodiscard]] TConclusion<bool> BuildNextInterval();
};
diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h
index 63fcd76852..37a9855703 100644
--- a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h
@@ -89,7 +89,7 @@ protected:
std::deque<std::shared_ptr<TPortionInfo>> IndexPortions;
- virtual std::optional<TPartialReadResult> GetBatch() override {
+ virtual TConclusion<std::optional<TPartialReadResult>> GetBatch() override {
// Take next raw batch
auto batch = FillStatsBatch();
@@ -99,13 +99,13 @@ protected:
ApplyRangePredicates(batch);
if (!batch->num_rows()) {
- return {};
+ return std::nullopt;
}
// Leave only requested columns
auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema);
NArrow::TStatusValidator::Validate(ReadMetadata->GetProgram().ApplyProgram(resultBatch));
if (!resultBatch->num_rows()) {
- return {};
+ return std::nullopt;
}
TPartialReadResult out(resultBatch, lastKey);
diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp
index 988a2b60a6..c7a3ff6b23 100644
--- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp
@@ -150,20 +150,18 @@ bool TTxScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/
if (!record.RangesSize()) {
auto range = scannerConstructor->BuildReadMetadata(Self, read);
if (range) {
- ReadMetadataRanges = {range.DetachResult()};
+ ReadMetadataRange = range.DetachResult();
} else {
ErrorDescription = range.GetErrorMessage();
}
return true;
}
- ReadMetadataRanges.reserve(1);
-
auto ydbKey = scannerConstructor->GetPrimaryKeyScheme(Self);
auto* indexInfo = (vIndex && isIndex) ? &vIndex->GetSchema(snapshot)->GetIndexInfo() : nullptr;
for (auto& range : record.GetRanges()) {
if (!FillPredicatesFromRange(read, range, ydbKey, Self->TabletID(), indexInfo, ErrorDescription)) {
- ReadMetadataRanges.clear();
+ ReadMetadataRange = nullptr;
return true;
}
}
@@ -171,13 +169,12 @@ bool TTxScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/
auto newRange = scannerConstructor->BuildReadMetadata(Self, read);
if (!newRange) {
ErrorDescription = newRange.GetErrorMessage();
- ReadMetadataRanges.clear();
+ ReadMetadataRange = nullptr;
return true;
}
- ReadMetadataRanges.emplace_back(newRange.DetachResult());
+ ReadMetadataRange = newRange.DetachResult();
}
- Y_ABORT_UNLESS(ReadMetadataRanges.size() == 1);
-
+ AFL_VERIFY(ReadMetadataRange);
return true;
}
@@ -198,6 +195,7 @@ struct TContainerPrinter {
};
void TTxScan::Complete(const TActorContext& ctx) {
+
auto& request = Ev->Get()->Record;
auto scanComputeActor = Ev->Sender;
const auto& snapshot = request.GetSnapshot();
@@ -210,38 +208,43 @@ void TTxScan::Complete(const TActorContext& ctx) {
if (scanGen > 1) {
Self->IncCounter(NColumnShard::COUNTER_SCAN_RESTARTED);
}
+ const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build()
+ ("tx_id", txId)("scan_id", scanId)("gen", scanGen)("table", table)("snapshot", snapshot)("tablet", Self->TabletID())("timeout", timeout);
- TStringStream detailedInfo;
- if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) {
- detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")" << " req: " << request;
- }
- if (ReadMetadataRanges.empty()) {
- LOG_S_DEBUG("TTxScan failed "
- << " txId: " << txId
- << " scanId: " << scanId
- << " gen: " << scanGen
- << " table: " << table
- << " snapshot: " << snapshot
- << " timeout: " << timeout
- << detailedInfo.Str()
- << " at tablet " << Self->TabletID());
+ if (!ReadMetadataRange) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", "no metadata");
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(scanGen, Self->TabletID());
-
ev->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, TStringBuilder()
- << "Table " << table << " (shard " << Self->TabletID() << ") scan failed, reason: " << ErrorDescription ? ErrorDescription : "unknown error");
+ << "Table " << table << " (shard " << Self->TabletID() << ") scan failed, reason: " << ErrorDescription ? ErrorDescription : "no metadata ranges");
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
ctx.Send(scanComputeActor, ev.Release());
return;
}
+ TStringBuilder detailedInfo;
+ if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) {
+ detailedInfo << " read metadata: (" << *ReadMetadataRange << ")" << " req: " << request;
+ }
const TVersionedIndex* index = nullptr;
if (Self->HasIndex()) {
index = &Self->GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex();
}
- ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRanges, index);
+ const TConclusion<ui64> requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRange, index);
+ if (!requestCookie) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", requestCookie.GetErrorMessage())("trace_details", detailedInfo);
+ auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(scanGen, Self->TabletID());
+
+ ev->Record.SetStatus(Ydb::StatusIds::INTERNAL_ERROR);
+ auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder()
+ << "Table " << table << " (shard " << Self->TabletID() << ") scan failed, reason: " << requestCookie.GetErrorMessage());
+ NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
+ Self->ScanCounters.OnScanDuration(NColumnShard::TScanCounters::EStatusFinish::CannotAddInFlight, TDuration::Zero());
+ ctx.Send(scanComputeActor, ev.Release());
+ return;
+ }
auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta();
Self->IncCounter(NColumnShard::COUNTER_READ_INDEX_PORTIONS, statsDelta.Portions);
@@ -253,17 +256,9 @@ void TTxScan::Complete(const TActorContext& ctx) {
AFL_VERIFY(shardingPolicy.DeserializeFromProto(request.GetComputeShardingPolicy()));
auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(),
- shardingPolicy, scanId, txId, scanGen, requestCookie, Self->TabletID(), timeout, std::move(ReadMetadataRanges), dataFormat, Self->ScanCounters));
-
- LOG_S_DEBUG("TTxScan starting " << scanActor
- << " txId: " << txId
- << " scanId: " << scanId
- << " gen: " << scanGen
- << " table: " << table
- << " snapshot: " << snapshot
- << " timeout: " << timeout
- << detailedInfo.Str()
- << " at tablet " << Self->TabletID());
+ shardingPolicy, scanId, txId, scanGen, *requestCookie, Self->TabletID(), timeout, ReadMetadataRange, dataFormat, Self->ScanCounters));
+
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan started")("actor_id", scanActor)("trace_detailed", detailedInfo);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.h b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.h
index 36ad75ae47..2d9eb9619a 100644
--- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.h
+++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.h
@@ -22,7 +22,7 @@ public:
private:
TString ErrorDescription;
TEvColumnShard::TEvScan::TPtr Ev;
- std::vector<TReadMetadataPtr> ReadMetadataRanges;
+ TReadMetadataPtr ReadMetadataRange;
};
} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.cpp b/ydb/core/tx/columnshard/inflight_request_tracker.cpp
index bd593dff80..98ca6d7ab6 100644
--- a/ydb/core/tx/columnshard/inflight_request_tracker.cpp
+++ b/ydb/core/tx/columnshard/inflight_request_tracker.cpp
@@ -47,13 +47,13 @@ void TInFlightReadsTracker::RemoveInFlightRequest(ui64 cookie, const NOlap::TVer
RequestsMeta.erase(cookie);
}
-void TInFlightReadsTracker::AddToInFlightRequest(const ui64 cookie, NOlap::NReader::TReadMetadataBase::TConstPtr readMetaBase, const NOlap::TVersionedIndex* index) {
+TConclusionStatus TInFlightReadsTracker::AddToInFlightRequest(const ui64 cookie, NOlap::NReader::TReadMetadataBase::TConstPtr readMetaBase, const NOlap::TVersionedIndex* index) {
RequestsMeta[cookie].push_back(readMetaBase);
auto readMeta = std::dynamic_pointer_cast<const NOlap::NReader::NPlain::TReadMetadata>(readMetaBase);
if (!readMeta) {
- return;
+ return TConclusionStatus::Success();
}
auto selectInfo = readMeta->SelectInfo;
@@ -69,7 +69,10 @@ void TInFlightReadsTracker::AddToInFlightRequest(const ui64 cookie, NOlap::NRead
}
for (auto&& i : portionBlobIds) {
- auto storage = StoragesManager->GetOperatorVerified(i.first);
+ auto storage = StoragesManager->GetOperatorOptional(i.first);
+ if (!storage) {
+ return TConclusionStatus::Fail("blobs storage info not ready for '" + i.first + "'");
+ }
auto tracker = storage->GetBlobsTracker();
for (auto& blobId : i.second) {
tracker->UseBlob(blobId);
@@ -81,6 +84,7 @@ void TInFlightReadsTracker::AddToInFlightRequest(const ui64 cookie, NOlap::NRead
for (const auto& committedBlob : readMeta->CommittedBlobs) {
tracker->UseBlob(committedBlob.GetBlobRange().GetBlobId());
}
+ return TConclusionStatus::Success();
}
}
diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h
index 03fc5d7a2a..d530c11d7a 100644
--- a/ydb/core/tx/columnshard/inflight_request_tracker.h
+++ b/ydb/core/tx/columnshard/inflight_request_tracker.h
@@ -14,18 +14,11 @@ using NOlap::IBlobInUseTracker;
class TInFlightReadsTracker {
public:
// Returns a unique cookie associated with this request
- ui64 AddInFlightRequest(NOlap::NReader::TReadMetadataBase::TConstPtr readMeta, const NOlap::TVersionedIndex* index) {
+ [[nodiscard]] TConclusion<ui64> AddInFlightRequest(NOlap::NReader::TReadMetadataBase::TConstPtr readMeta, const NOlap::TVersionedIndex* index) {
const ui64 cookie = NextCookie++;
- AddToInFlightRequest(cookie, readMeta, index);
- return cookie;
- }
-
- // Returns a unique cookie associated with this request
- template <class TReadMetadataList>
- ui64 AddInFlightRequest(const TReadMetadataList& readMetaList, const NOlap::TVersionedIndex* index) {
- const ui64 cookie = NextCookie++;
- for (const auto& readMetaPtr : readMetaList) {
- AddToInFlightRequest(cookie, readMetaPtr, index);
+ auto status = AddToInFlightRequest(cookie, readMeta, index);
+ if (!status) {
+ return status;
}
return cookie;
}
@@ -50,7 +43,7 @@ public:
}
private:
- void AddToInFlightRequest(const ui64 cookie, NOlap::NReader::TReadMetadataBase::TConstPtr readMetaBase, const NOlap::TVersionedIndex* index);
+ [[nodiscard]] TConclusionStatus AddToInFlightRequest(const ui64 cookie, NOlap::NReader::TReadMetadataBase::TConstPtr readMetaBase, const NOlap::TVersionedIndex* index);
private:
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;