diff options
author | chertus <azuikov@ydb.tech> | 2023-01-11 18:24:31 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-01-11 18:24:31 +0300 |
commit | f9813f35812edd39d54b76a695b36963254178b2 (patch) | |
tree | 36a99a6081a45a22209ecab2f4a1a822cdcbf0ea | |
parent | 180528d0bbbab5dc9e9c9e785a34040b5983ba44 (diff) | |
download | ydb-f9813f35812edd39d54b76a695b36963254178b2.tar.gz |
abort scan in case of SSA execution error
4 files changed, 73 insertions, 37 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index e4c57a6012..0d7f305880 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -54,7 +54,7 @@ public: public: TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, - const TString& table, TDuration timeout, TVector<TTxScan::TReadMetadataPtr>&& readMetadataList, + ui64 tabletId, TDuration timeout, TVector<TTxScan::TReadMetadataPtr>&& readMetadataList, NKikimrTxDataShard::EScanDataFormat dataFormat) : ColumnShardActorId(columnShardActorId) , ScanComputeActorId(scanComputeActorId) @@ -64,7 +64,7 @@ public: , ScanGen(scanGen) , RequestCookie(requestCookie) , DataFormat(dataFormat) - , TablePath(table) + , TabletId(tabletId) , ReadMetadataRanges(std::move(readMetadataList)) , ReadMetadataIndex(0) , Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT)) @@ -123,7 +123,7 @@ private: void HandleScan(TEvKqpCompute::TEvScanDataAck::TPtr& ev) { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " got ScanDataAck" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " freeSpace: " << ev->Get()->FreeSpace << " prevFreeSpace: " << PeerFreeSpace); --InFlightScanDataMessages; @@ -147,12 +147,13 @@ private: const auto& blobRange = event.BlobRange; if (event.Status != NKikimrProto::EReplyStatus::OK) { + TString strStatus = NKikimrProto::EReplyStatus_Name(event.Status); LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " got TEvReadBlobRangeResult error" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " blob: " << ev->Get()->BlobRange - << " status: " << NKikimrProto::EReplyStatus_Name(event.Status)); - SendError(event.Status); + << " status: " << strStatus); + SendScanError(strStatus); return Finish(); } @@ -161,15 +162,16 @@ private: InFlightReadBytes -= blobRange.Size; - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " got TEvReadBlobRangeResult" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " blob: " << ev->Get()->BlobRange << " prevFreeSpace: " << PeerFreeSpace); - ScanIterator->AddData(blobRange, event.Data); - - ContinueProcessing(); + if (ScanIterator) { + ScanIterator->AddData(blobRange, event.Data); + ContinueProcessing(); + } } // Returns true if it was able to produce new batch @@ -179,18 +181,29 @@ private: if (ScanIterator->Finished()) { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " producing result: scan iterator is finished" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath); + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); return false; } auto result = ScanIterator->GetBatch(); + if (!result.ErrorString.empty()) { + LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, + "Scan " << ScanActorId << " producing result: got error '" << result.ErrorString + << "' txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + SendAbortExecution(TString(result.ErrorString.data(), result.ErrorString.size())); + + ScanIterator.reset(); + Finish(); + return false; + } + if (ResultYqlSchema.empty() && DataFormat != NKikimrTxDataShard::EScanDataFormat::ARROW) { ResultYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetResultYqlSchema(); } if (!result.ResultBatch) { - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " producing result: no data is ready yet" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath); + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); return false; } @@ -199,7 +212,7 @@ private: int numColumns = batch->num_columns(); LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " producing result: got ready result" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " blob (" << numColumns << " columns, " << numRows << " rows)" << " format: " << NKikimrTxDataShard::EScanDataFormat_Name(DataFormat)); @@ -235,7 +248,7 @@ private: if (!ScanIterator) { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " iterator is not initialized" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath); + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); return; } @@ -244,7 +257,7 @@ private: LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " waiting for peer free space" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath); + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); return; } @@ -257,7 +270,7 @@ private: } // Switch to the next range if the current one is finished - if (ScanIterator->Finished() && !InFlightReads) { + if (ScanIterator && ScanIterator->Finished() && !InFlightReads) { NextReadMetadata(); } @@ -289,7 +302,7 @@ private: // The loop has finished without any progress! LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " is hanging" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath); + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); } void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) { @@ -299,12 +312,12 @@ private: auto prio = msg.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS ? NActors::NLog::PRI_DEBUG : NActors::NLog::PRI_WARN; LOG_LOG_S(*TlsActivationContext, prio, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " got AbortExecution" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " code: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " reason: " << reason); AbortReason = std::move(reason); - SendError(NKikimrProto::EReplyStatus::ERROR); // TODO: better status? + SendScanError(); Finish(); } @@ -321,7 +334,7 @@ private: LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " undelivered event: " << eventType - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " reason: " << ev->Get()->Reason << " description: " << AbortReason); @@ -331,7 +344,7 @@ private: void HandleScan(TEvents::TEvWakeup::TPtr&) { LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " guard execution timeout" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath); + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); TimeoutActorId = {}; Finish(); @@ -412,7 +425,7 @@ private: LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " send ScanData to " << ComputeActorId - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " bytes: " << Bytes << " rows: " << Rows << " page faults: " << Result->PageFaults << " finished: " << Result->Finished << " pageFault: " << Result->PageFault << " arrow schema:\n" << (Result->ArrowBatch ? Result->ArrowBatch->schema()->ToString() : "")); @@ -433,17 +446,30 @@ private: return true; } - void SendError(NKikimrProto::EReplyStatus status) { - auto ev = MakeHolder<TEvKqpCompute::TEvScanError>(ScanGen); + void SendScanError(TString reason = {}) { + TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId; + if (!reason.empty()) { + msg += ", reason: " + reason; + } + auto ev = MakeHolder<TEvKqpCompute::TEvScanError>(ScanGen); ev->Record.SetStatus(Ydb::StatusIds::GENERIC_ERROR); - auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_RESULT_UNAVAILABLE, TStringBuilder() - << "Table " << TablePath << " scan failed, reason: " << NKikimrProto::EReplyStatus_Name(status)); + auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_RESULT_UNAVAILABLE, msg); NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add()); Send(ComputeActorId, ev.Release()); } + void SendAbortExecution(TString reason = {}) { + auto status = NYql::NDqProto::StatusIds::PRECONDITION_FAILED; + TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId; + if (!reason.empty()) { + msg += ", reason: " + reason; + } + + Send(ComputeActorId, new TEvKqp::TEvAbortExecution(status, msg)); + } + void Finish() { if (TimeoutActorId) { Send(TimeoutActorId, new TEvents::TEvPoison); @@ -475,8 +501,7 @@ private: const ui64 RequestCookie; const i64 MaxReadAheadBytes = DEFAULT_READ_AHEAD_BYTES; const NKikimrTxDataShard::EScanDataFormat DataFormat; - - const TString TablePath; + const ui64 TabletId; TVector<NOlap::TReadMetadataBase::TConstPtr> ReadMetadataRanges; ui32 ReadMetadataIndex; @@ -798,7 +823,7 @@ void TTxScan::Complete(const TActorContext& ctx) { Self->IncCounter(COUNTER_READ_INDEX_BYTES, statsDelta.Bytes); auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, - scanId, txId, scanGen, requestCookie, table, timeout, std::move(ReadMetadataRanges), dataFormat)); + scanId, txId, scanGen, requestCookie, Self->TabletID(), timeout, std::move(ReadMetadataRanges), dataFormat)); LOG_S_DEBUG("TTxScan starting " << scanActor << " txId: " << txId diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h index cee4f525f7..f2ba4a97e1 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.h +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h @@ -67,7 +67,9 @@ public: if (ReadMetadata->Program) { auto status = ApplyProgram(out.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext()); - Y_VERIFY_S(status.ok(), status.message()); + if (!status.ok()) { + out.ErrorString = status.message(); + } } return out; } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index f82cb2c26b..323bd54bb4 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -363,7 +363,9 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR bool requireResult = !HasIndexRead(); // not indexed or the last indexed read (even if it's emply) auto out = MakeResult(ReadyToOut(), maxRowsInBatch); if (requireResult && out.empty()) { - out.push_back({NArrow::MakeEmptyBatch(ReadMetadata->ResultSchema), nullptr}); + out.push_back(TPartialReadResult{ + .ResultBatch = NArrow::MakeEmptyBatch(ReadMetadata->ResultSchema) + }); } return out; } @@ -520,14 +522,19 @@ TIndexedReadData::MakeResult(TVector<std::vector<std::shared_ptr<arrow::RecordBa // Leave only requested columns auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->ResultSchema); - out.emplace_back(TPartialReadResult{std::move(resultBatch), std::move(lastKey)}); + out.emplace_back(TPartialReadResult{ + .ResultBatch = std::move(resultBatch), + .LastReadKey = std::move(lastKey) + }); } } if (ReadMetadata->Program) { - for (auto& batch : out) { - auto status = ApplyProgram(batch.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext()); - Y_VERIFY_S(status.ok(), status.message()); + for (auto& result : out) { + auto status = ApplyProgram(result.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext()); + if (!status.ok()) { + result.ErrorString = status.message(); + } } } return out; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 1197f38d13..54cc70ac9c 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -176,6 +176,8 @@ struct TPartialReadResult { // This 1-row batch contains the last key that was read while producing the ResultBatch. // NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit std::shared_ptr<arrow::RecordBatch> LastReadKey; + + std::string ErrorString; }; class TIndexedReadData { |