diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-03-29 13:45:12 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-29 13:45:12 +0300 |
commit | 3966efa7aa75db3716207221ede6f628e8ad5d8a (patch) | |
tree | 7260aaf17c966c641c5a285361f26a6fb908d59c | |
parent | eae5f51b3dbd4a2e50bea92f8609a89857f22d45 (diff) | |
download | ydb-3966efa7aa75db3716207221ede6f628e8ad5d8a.tar.gz |
conclusions for unready storages and scanner methods (#3282)
17 files changed, 110 insertions, 131 deletions
diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h index 7103e2f115..3469dc2898 100644 --- a/ydb/core/tx/columnshard/counters/scan.h +++ b/ydb/core/tx/columnshard/counters/scan.h @@ -45,9 +45,11 @@ public: Success /* "Success" */ = 0, ConveyorInternalError /* "ConveyorInternalError" */, ExternalAbort /* "ExternalAbort" */, - IteratorInternalError /* "IteratorInternalError" */, + IteratorInternalErrorScan /* "IteratorInternalErrorScan" */, + IteratorInternalErrorResult /* "IteratorInternalErrorResult" */, Deadline /* "Deadline" */, UndeliveredEvent /* "UndeliveredEvent" */, + CannotAddInFlight /* "CannotAddInFlight" */, COUNT }; diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h b/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h index c19f00e400..734f18bc5b 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/abstract.h @@ -19,11 +19,11 @@ public: return {}; } virtual bool Finished() const = 0; - virtual std::optional<TPartialReadResult> GetBatch() = 0; + virtual TConclusion<std::optional<TPartialReadResult>> GetBatch() = 0; virtual void PrepareResults() { } - virtual bool ReadNextInterval() { return false; } + virtual TConclusion<bool> ReadNextInterval() { return false; } virtual TString DebugString(const bool verbose = false) const { Y_UNUSED(verbose); return "NO_DATA"; diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h index d88266eb81..af1b5ab5bb 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h @@ -108,7 +108,7 @@ protected: virtual void DoAbort() = 0; virtual bool DoIsFinished() const = 0; virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0; - virtual bool DoReadNextInterval() = 0; + virtual TConclusion<bool> DoReadNextInterval() = 0; public: IDataReader(const std::shared_ptr<TReadContext>& context); virtual ~IDataReader() = default; @@ -156,7 +156,7 @@ public: sb << DoDebugString(verbose); return sb; } - bool ReadNextInterval() { + [[nodiscard]] TConclusion<bool> ReadNextInterval() { return DoReadNextInterval(); } }; diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp index a3805f3c4b..49ab834d3f 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp @@ -45,7 +45,7 @@ void TColumnShardScan::PassAway() { TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId, const std::shared_ptr<IStoragesManager>& storagesManager, const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, - ui64 tabletId, TDuration timeout, std::vector<TReadMetadataBase::TConstPtr>&& readMetadataList, + ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange, NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool) : StoragesManager(storagesManager) , ColumnShardActorId(columnShardActorId) @@ -57,15 +57,14 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc , RequestCookie(requestCookie) , DataFormat(dataFormat) , TabletId(tabletId) - , ReadMetadataRanges(std::move(readMetadataList)) - , ReadMetadataIndex(0) + , ReadMetadataRange(readMetadataRange) , Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT)) , ScanCountersPool(scanCountersPool) , Stats(NTracing::TTraceClient::GetLocalClient("SHARD", ::ToString(TabletId)/*, "SCAN_TXID:" + ::ToString(TxId)*/)) , ComputeShardingPolicy(computeShardingPolicy) { - AFL_VERIFY(ReadMetadataRanges.size() == 1); - KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema(); + AFL_VERIFY(ReadMetadataRange); + KeyYqlSchema = ReadMetadataRange->GetKeyYqlSchema(); } void TColumnShardScan::Bootstrap(const TActorContext& ctx) { @@ -81,8 +80,8 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) { ReadCoordinatorActorId = ctx.Register(new NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId())); std::shared_ptr<TReadContext> context = std::make_shared<TReadContext>(StoragesManager, ScanCountersPool, - ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy); - ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); + ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy); + ScanIterator = ReadMetadataRange->StartScan(context); // propagate self actor id // TODO: FlagSubscribeOnSession ? Send(ScanComputeActorId, new NKqp::TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen, TabletId), IEventHandle::FlagTrackDelivery); @@ -92,12 +91,6 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) { ContinueProcessing(); } -bool TColumnShardScan::ReadNextBlob() { - while (ScanIterator->ReadNextInterval()) { - } - return true; -} - void TColumnShardScan::HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) { --InFlightReads; auto g = Stats->MakeGuard("task_result"); @@ -198,21 +191,24 @@ bool TColumnShardScan::ProduceResults() noexcept { return false; } - auto resultOpt = ScanIterator->GetBatch(); - if (!resultOpt) { - ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString()); + auto resultConclusion = ScanIterator->GetBatch(); + if (resultConclusion.IsFail()) { + ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", resultConclusion.GetErrorMessage()); + SendAbortExecution(resultConclusion.GetErrorMessage()); + + ScanIterator.reset(); + Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalErrorResult); return false; } - auto& result = *resultOpt; - if (!result.ErrorString.empty()) { - ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", result.ErrorString); - SendAbortExecution(TString(result.ErrorString.data(), result.ErrorString.size())); - ScanIterator.reset(); - Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalError); + std::optional<TPartialReadResult> resultOpt = resultConclusion.DetachResult(); + if (!resultOpt) { + ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString()); return false; } + auto& result = *resultOpt; + if (!result.GetRecordsCount()) { ACFL_DEBUG("stage", "got empty batch")("iterator", ScanIterator->DebugString()); return true; @@ -263,14 +259,27 @@ void TColumnShardScan::ContinueProcessing() { while (ScanIterator && ProduceResults()) { } - // Switch to the next range if the current one is finished - if (ScanIterator && ScanIterator->Finished() && ChunksLimiter.HasMore()) { - NextReadMetadata(); - } - if (ScanIterator) { - // Make read-ahead requests for the subsequent blobs - ReadNextBlob(); + // Switch to the next range if the current one is finished + if (ScanIterator->Finished() && ChunksLimiter.HasMore()) { + auto g = Stats->MakeGuard("Finish"); + MakeResult(); + SendResult(false, true); + ScanIterator.reset(); + Finish(NColumnShard::TScanCounters::EStatusFinish::Success); + } else { + while (true) { + TConclusion<bool> hasMoreData = ScanIterator->ReadNextInterval(); + if (hasMoreData.IsFail()) { + ACFL_ERROR("event", "ContinueProcessing")("error", hasMoreData.GetErrorMessage()); + ScanIterator.reset(); + SendScanError("iterator_error:" + hasMoreData.GetErrorMessage()); + return Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalErrorScan); + } else if (!*hasMoreData) { + break; + } + } + } } AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)("gen", ScanGen)("tablet", TabletId) ("debug", ScanIterator->DebugString()); @@ -286,21 +295,6 @@ void TColumnShardScan::MakeResult(size_t reserveRows /*= 0*/) { } } -void TColumnShardScan::NextReadMetadata() { - auto g = Stats->MakeGuard("NextReadMetadata"); - if (++ReadMetadataIndex == ReadMetadataRanges.size()) { - // Send empty batch with "finished" flag - MakeResult(); - SendResult(false, true); - ScanIterator.reset(); - return Finish(NColumnShard::TScanCounters::EStatusFinish::Success); - } - - auto context = std::make_shared<TReadContext>(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(), - ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy); - ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); -} - void TColumnShardScan::AddRow(const TConstArrayRef<TCell>& row) { Result->Rows.emplace_back(TOwnedCellVec::Make(row)); ++Rows; @@ -379,11 +373,10 @@ bool TColumnShardScan::SendResult(bool pageFault, bool lastBatch) { return true; } -void TColumnShardScan::SendScanError(TString reason /*= {}*/) { +void TColumnShardScan::SendScanError(const TString& reason) { + AFL_VERIFY(reason); TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId; - if (!reason.empty()) { - msg += ", reason: " + reason; - } + msg += ", reason: " + reason; auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, TabletId); ev->Record.SetStatus(Ydb::StatusIds::GENERIC_ERROR); @@ -393,12 +386,11 @@ void TColumnShardScan::SendScanError(TString reason /*= {}*/) { Send(ScanComputeActorId, ev.Release()); } -void TColumnShardScan::SendAbortExecution(TString reason /*= {}*/) { +void TColumnShardScan::SendAbortExecution(const TString& reason) { + AFL_VERIFY(reason); auto status = NYql::NDqProto::StatusIds::PRECONDITION_FAILED; TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId; - if (!reason.empty()) { - msg += ", reason: " + reason; - } + msg += ", reason: " + reason; Send(ScanComputeActorId, new NKqp::TEvKqp::TEvAbortExecution(status, msg)); } diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.h b/ydb/core/tx/columnshard/engines/reader/actor/actor.h index 234fb17823..1ae46d783e 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.h +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.h @@ -33,7 +33,7 @@ public: TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId, const std::shared_ptr<IStoragesManager>& storagesManager, const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, - ui64 tabletId, TDuration timeout, std::vector<TReadMetadataBase::TConstPtr>&& readMetadataList, + ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange, NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool); void Bootstrap(const TActorContext& ctx); @@ -55,8 +55,6 @@ private: } } - bool ReadNextBlob(); - void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev); void HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev); @@ -75,8 +73,6 @@ private: private: void MakeResult(size_t reserveRows = 0); - void NextReadMetadata(); - void AddRow(const TConstArrayRef<TCell>& row) override; TOwnedCellVec ConvertLastKey(const std::shared_ptr<arrow::RecordBatch>& lastReadKey); @@ -101,9 +97,9 @@ private: bool SendResult(bool pageFault, bool lastBatch); - void SendScanError(TString reason = {}); + void SendScanError(const TString& reason); - void SendAbortExecution(TString reason = {}); + void SendAbortExecution(const TString& reason); void Finish(const NColumnShard::TScanCounters::EStatusFinish status); @@ -123,8 +119,7 @@ private: const NKikimrDataEvents::EDataFormat DataFormat; const ui64 TabletId; - std::vector<TReadMetadataBase::TConstPtr> ReadMetadataRanges; - ui32 ReadMetadataIndex; + TReadMetadataBase::TConstPtr ReadMetadataRange; std::unique_ptr<TScanIteratorBase> ScanIterator; std::vector<std::pair<TString, NScheme::TTypeInfo>> KeyYqlSchema; diff --git a/ydb/core/tx/columnshard/engines/reader/common/result.h b/ydb/core/tx/columnshard/engines/reader/common/result.h index 8b575d8a26..cfa9ebe87e 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/result.h +++ b/ydb/core/tx/columnshard/engines/reader/common/result.h @@ -54,8 +54,6 @@ public: return LastReadKey; } - std::string ErrorString; - explicit TPartialReadResult( const std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>& resourcesGuards, const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp index e8b33132b0..c8de567ad8 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp @@ -15,7 +15,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(const std::shared_ptr<TReadCo } } -std::optional<TPartialReadResult> TColumnShardScanIterator::GetBatch() { +TConclusion<std::optional<TPartialReadResult>> TColumnShardScanIterator::GetBatch() { FillReadyResults(); return ReadyResults.pop_front(); } @@ -24,7 +24,7 @@ void TColumnShardScanIterator::PrepareResults() { FillReadyResults(); } -bool TColumnShardScanIterator::ReadNextInterval() { +TConclusion<bool> TColumnShardScanIterator::ReadNextInterval() { return IndexedData->ReadNextInterval(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h index beba3b2733..61c3c97007 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h @@ -83,10 +83,10 @@ public: return IndexedData->IsFinished() && ReadyResults.empty(); } - std::optional<TPartialReadResult> GetBatch() override; + TConclusion<std::optional<TPartialReadResult>> GetBatch() override; virtual void PrepareResults() override; - virtual bool ReadNextInterval() override; + virtual TConclusion<bool> ReadNextInterval() override; private: void FillReadyResults(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp index 1f362c00cc..875acea032 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp @@ -69,7 +69,7 @@ std::vector<TPartialReadResult> TPlainReadData::DoExtractReadyResults(const int6 return result; } -bool TPlainReadData::DoReadNextInterval() { +TConclusion<bool> TPlainReadData::DoReadNextInterval() { return Scanner->BuildNextInterval(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h index 07946d5195..0df41d3a9a 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h @@ -28,7 +28,7 @@ protected: } virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) override; - virtual bool DoReadNextInterval() override; + virtual TConclusion<bool> DoReadNextInterval() override; virtual void DoAbort() override { AbortedFlag = true; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp index 11f284f14f..51de7b6d84 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp @@ -76,7 +76,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const s } } -bool TScanHead::BuildNextInterval() { +TConclusion<bool> TScanHead::BuildNextInterval() { while (BorderPoints.size() && (FetchingIntervals.size() < InFlightLimit || BorderPoints.begin()->second.GetStartSources().empty())) { auto firstBorderPointInfo = std::move(BorderPoints.begin()->second); bool includeStart = firstBorderPointInfo.GetStartSources().size(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h index a2ca7f3861..5465ec0330 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h @@ -69,7 +69,7 @@ public: TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context); - bool BuildNextInterval(); + [[nodiscard]] TConclusion<bool> BuildNextInterval(); }; diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h index 63fcd76852..37a9855703 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h @@ -89,7 +89,7 @@ protected: std::deque<std::shared_ptr<TPortionInfo>> IndexPortions; - virtual std::optional<TPartialReadResult> GetBatch() override { + virtual TConclusion<std::optional<TPartialReadResult>> GetBatch() override { // Take next raw batch auto batch = FillStatsBatch(); @@ -99,13 +99,13 @@ protected: ApplyRangePredicates(batch); if (!batch->num_rows()) { - return {}; + return std::nullopt; } // Leave only requested columns auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema); NArrow::TStatusValidator::Validate(ReadMetadata->GetProgram().ApplyProgram(resultBatch)); if (!resultBatch->num_rows()) { - return {}; + return std::nullopt; } TPartialReadResult out(resultBatch, lastKey); diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp index 988a2b60a6..c7a3ff6b23 100644 --- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp +++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp @@ -150,20 +150,18 @@ bool TTxScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/ if (!record.RangesSize()) { auto range = scannerConstructor->BuildReadMetadata(Self, read); if (range) { - ReadMetadataRanges = {range.DetachResult()}; + ReadMetadataRange = range.DetachResult(); } else { ErrorDescription = range.GetErrorMessage(); } return true; } - ReadMetadataRanges.reserve(1); - auto ydbKey = scannerConstructor->GetPrimaryKeyScheme(Self); auto* indexInfo = (vIndex && isIndex) ? &vIndex->GetSchema(snapshot)->GetIndexInfo() : nullptr; for (auto& range : record.GetRanges()) { if (!FillPredicatesFromRange(read, range, ydbKey, Self->TabletID(), indexInfo, ErrorDescription)) { - ReadMetadataRanges.clear(); + ReadMetadataRange = nullptr; return true; } } @@ -171,13 +169,12 @@ bool TTxScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/ auto newRange = scannerConstructor->BuildReadMetadata(Self, read); if (!newRange) { ErrorDescription = newRange.GetErrorMessage(); - ReadMetadataRanges.clear(); + ReadMetadataRange = nullptr; return true; } - ReadMetadataRanges.emplace_back(newRange.DetachResult()); + ReadMetadataRange = newRange.DetachResult(); } - Y_ABORT_UNLESS(ReadMetadataRanges.size() == 1); - + AFL_VERIFY(ReadMetadataRange); return true; } @@ -198,6 +195,7 @@ struct TContainerPrinter { }; void TTxScan::Complete(const TActorContext& ctx) { + auto& request = Ev->Get()->Record; auto scanComputeActor = Ev->Sender; const auto& snapshot = request.GetSnapshot(); @@ -210,38 +208,43 @@ void TTxScan::Complete(const TActorContext& ctx) { if (scanGen > 1) { Self->IncCounter(NColumnShard::COUNTER_SCAN_RESTARTED); } + const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build() + ("tx_id", txId)("scan_id", scanId)("gen", scanGen)("table", table)("snapshot", snapshot)("tablet", Self->TabletID())("timeout", timeout); - TStringStream detailedInfo; - if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) { - detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")" << " req: " << request; - } - if (ReadMetadataRanges.empty()) { - LOG_S_DEBUG("TTxScan failed " - << " txId: " << txId - << " scanId: " << scanId - << " gen: " << scanGen - << " table: " << table - << " snapshot: " << snapshot - << " timeout: " << timeout - << detailedInfo.Str() - << " at tablet " << Self->TabletID()); + if (!ReadMetadataRange) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", "no metadata"); auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(scanGen, Self->TabletID()); - ev->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST); auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, TStringBuilder() - << "Table " << table << " (shard " << Self->TabletID() << ") scan failed, reason: " << ErrorDescription ? ErrorDescription : "unknown error"); + << "Table " << table << " (shard " << Self->TabletID() << ") scan failed, reason: " << ErrorDescription ? ErrorDescription : "no metadata ranges"); NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add()); ctx.Send(scanComputeActor, ev.Release()); return; } + TStringBuilder detailedInfo; + if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) { + detailedInfo << " read metadata: (" << *ReadMetadataRange << ")" << " req: " << request; + } const TVersionedIndex* index = nullptr; if (Self->HasIndex()) { index = &Self->GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex(); } - ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRanges, index); + const TConclusion<ui64> requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRange, index); + if (!requestCookie) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", requestCookie.GetErrorMessage())("trace_details", detailedInfo); + auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(scanGen, Self->TabletID()); + + ev->Record.SetStatus(Ydb::StatusIds::INTERNAL_ERROR); + auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder() + << "Table " << table << " (shard " << Self->TabletID() << ") scan failed, reason: " << requestCookie.GetErrorMessage()); + NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add()); + Self->ScanCounters.OnScanDuration(NColumnShard::TScanCounters::EStatusFinish::CannotAddInFlight, TDuration::Zero()); + ctx.Send(scanComputeActor, ev.Release()); + return; + } auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta(); Self->IncCounter(NColumnShard::COUNTER_READ_INDEX_PORTIONS, statsDelta.Portions); @@ -253,17 +256,9 @@ void TTxScan::Complete(const TActorContext& ctx) { AFL_VERIFY(shardingPolicy.DeserializeFromProto(request.GetComputeShardingPolicy())); auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(), - shardingPolicy, scanId, txId, scanGen, requestCookie, Self->TabletID(), timeout, std::move(ReadMetadataRanges), dataFormat, Self->ScanCounters)); - - LOG_S_DEBUG("TTxScan starting " << scanActor - << " txId: " << txId - << " scanId: " << scanId - << " gen: " << scanGen - << " table: " << table - << " snapshot: " << snapshot - << " timeout: " << timeout - << detailedInfo.Str() - << " at tablet " << Self->TabletID()); + shardingPolicy, scanId, txId, scanGen, *requestCookie, Self->TabletID(), timeout, ReadMetadataRange, dataFormat, Self->ScanCounters)); + + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan started")("actor_id", scanActor)("trace_detailed", detailedInfo); } } diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.h b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.h index 36ad75ae47..2d9eb9619a 100644 --- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.h +++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.h @@ -22,7 +22,7 @@ public: private: TString ErrorDescription; TEvColumnShard::TEvScan::TPtr Ev; - std::vector<TReadMetadataPtr> ReadMetadataRanges; + TReadMetadataPtr ReadMetadataRange; }; }
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.cpp b/ydb/core/tx/columnshard/inflight_request_tracker.cpp index bd593dff80..98ca6d7ab6 100644 --- a/ydb/core/tx/columnshard/inflight_request_tracker.cpp +++ b/ydb/core/tx/columnshard/inflight_request_tracker.cpp @@ -47,13 +47,13 @@ void TInFlightReadsTracker::RemoveInFlightRequest(ui64 cookie, const NOlap::TVer RequestsMeta.erase(cookie); } -void TInFlightReadsTracker::AddToInFlightRequest(const ui64 cookie, NOlap::NReader::TReadMetadataBase::TConstPtr readMetaBase, const NOlap::TVersionedIndex* index) { +TConclusionStatus TInFlightReadsTracker::AddToInFlightRequest(const ui64 cookie, NOlap::NReader::TReadMetadataBase::TConstPtr readMetaBase, const NOlap::TVersionedIndex* index) { RequestsMeta[cookie].push_back(readMetaBase); auto readMeta = std::dynamic_pointer_cast<const NOlap::NReader::NPlain::TReadMetadata>(readMetaBase); if (!readMeta) { - return; + return TConclusionStatus::Success(); } auto selectInfo = readMeta->SelectInfo; @@ -69,7 +69,10 @@ void TInFlightReadsTracker::AddToInFlightRequest(const ui64 cookie, NOlap::NRead } for (auto&& i : portionBlobIds) { - auto storage = StoragesManager->GetOperatorVerified(i.first); + auto storage = StoragesManager->GetOperatorOptional(i.first); + if (!storage) { + return TConclusionStatus::Fail("blobs storage info not ready for '" + i.first + "'"); + } auto tracker = storage->GetBlobsTracker(); for (auto& blobId : i.second) { tracker->UseBlob(blobId); @@ -81,6 +84,7 @@ void TInFlightReadsTracker::AddToInFlightRequest(const ui64 cookie, NOlap::NRead for (const auto& committedBlob : readMeta->CommittedBlobs) { tracker->UseBlob(committedBlob.GetBlobRange().GetBlobId()); } + return TConclusionStatus::Success(); } } diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h index 03fc5d7a2a..d530c11d7a 100644 --- a/ydb/core/tx/columnshard/inflight_request_tracker.h +++ b/ydb/core/tx/columnshard/inflight_request_tracker.h @@ -14,18 +14,11 @@ using NOlap::IBlobInUseTracker; class TInFlightReadsTracker { public: // Returns a unique cookie associated with this request - ui64 AddInFlightRequest(NOlap::NReader::TReadMetadataBase::TConstPtr readMeta, const NOlap::TVersionedIndex* index) { + [[nodiscard]] TConclusion<ui64> AddInFlightRequest(NOlap::NReader::TReadMetadataBase::TConstPtr readMeta, const NOlap::TVersionedIndex* index) { const ui64 cookie = NextCookie++; - AddToInFlightRequest(cookie, readMeta, index); - return cookie; - } - - // Returns a unique cookie associated with this request - template <class TReadMetadataList> - ui64 AddInFlightRequest(const TReadMetadataList& readMetaList, const NOlap::TVersionedIndex* index) { - const ui64 cookie = NextCookie++; - for (const auto& readMetaPtr : readMetaList) { - AddToInFlightRequest(cookie, readMetaPtr, index); + auto status = AddToInFlightRequest(cookie, readMeta, index); + if (!status) { + return status; } return cookie; } @@ -50,7 +43,7 @@ public: } private: - void AddToInFlightRequest(const ui64 cookie, NOlap::NReader::TReadMetadataBase::TConstPtr readMetaBase, const NOlap::TVersionedIndex* index); + [[nodiscard]] TConclusionStatus AddToInFlightRequest(const ui64 cookie, NOlap::NReader::TReadMetadataBase::TConstPtr readMetaBase, const NOlap::TVersionedIndex* index); private: std::shared_ptr<NOlap::IStoragesManager> StoragesManager; |