aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-01-11 18:24:31 +0300
committerchertus <azuikov@ydb.tech>2023-01-11 18:24:31 +0300
commitf9813f35812edd39d54b76a695b36963254178b2 (patch)
tree36a99a6081a45a22209ecab2f4a1a822cdcbf0ea
parent180528d0bbbab5dc9e9c9e785a34040b5983ba44 (diff)
downloadydb-f9813f35812edd39d54b76a695b36963254178b2.tar.gz
abort scan in case of SSA execution error
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp87
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.h4
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h2
4 files changed, 73 insertions, 37 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index e4c57a6012..0d7f305880 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -54,7 +54,7 @@ public:
public:
TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId,
ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
- const TString& table, TDuration timeout, TVector<TTxScan::TReadMetadataPtr>&& readMetadataList,
+ ui64 tabletId, TDuration timeout, TVector<TTxScan::TReadMetadataPtr>&& readMetadataList,
NKikimrTxDataShard::EScanDataFormat dataFormat)
: ColumnShardActorId(columnShardActorId)
, ScanComputeActorId(scanComputeActorId)
@@ -64,7 +64,7 @@ public:
, ScanGen(scanGen)
, RequestCookie(requestCookie)
, DataFormat(dataFormat)
- , TablePath(table)
+ , TabletId(tabletId)
, ReadMetadataRanges(std::move(readMetadataList))
, ReadMetadataIndex(0)
, Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT))
@@ -123,7 +123,7 @@ private:
void HandleScan(TEvKqpCompute::TEvScanDataAck::TPtr& ev) {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " got ScanDataAck"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
<< " freeSpace: " << ev->Get()->FreeSpace << " prevFreeSpace: " << PeerFreeSpace);
--InFlightScanDataMessages;
@@ -147,12 +147,13 @@ private:
const auto& blobRange = event.BlobRange;
if (event.Status != NKikimrProto::EReplyStatus::OK) {
+ TString strStatus = NKikimrProto::EReplyStatus_Name(event.Status);
LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " got TEvReadBlobRangeResult error"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
<< " blob: " << ev->Get()->BlobRange
- << " status: " << NKikimrProto::EReplyStatus_Name(event.Status));
- SendError(event.Status);
+ << " status: " << strStatus);
+ SendScanError(strStatus);
return Finish();
}
@@ -161,15 +162,16 @@ private:
InFlightReadBytes -= blobRange.Size;
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " got TEvReadBlobRangeResult"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
<< " blob: " << ev->Get()->BlobRange
<< " prevFreeSpace: " << PeerFreeSpace);
- ScanIterator->AddData(blobRange, event.Data);
-
- ContinueProcessing();
+ if (ScanIterator) {
+ ScanIterator->AddData(blobRange, event.Data);
+ ContinueProcessing();
+ }
}
// Returns true if it was able to produce new batch
@@ -179,18 +181,29 @@ private:
if (ScanIterator->Finished()) {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " producing result: scan iterator is finished"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath);
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
return false;
}
auto result = ScanIterator->GetBatch();
+ if (!result.ErrorString.empty()) {
+ LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
+ "Scan " << ScanActorId << " producing result: got error '" << result.ErrorString
+ << "' txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ SendAbortExecution(TString(result.ErrorString.data(), result.ErrorString.size()));
+
+ ScanIterator.reset();
+ Finish();
+ return false;
+ }
+
if (ResultYqlSchema.empty() && DataFormat != NKikimrTxDataShard::EScanDataFormat::ARROW) {
ResultYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetResultYqlSchema();
}
if (!result.ResultBatch) {
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " producing result: no data is ready yet"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath);
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
return false;
}
@@ -199,7 +212,7 @@ private:
int numColumns = batch->num_columns();
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " producing result: got ready result"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
<< " blob (" << numColumns << " columns, " << numRows << " rows)"
<< " format: " << NKikimrTxDataShard::EScanDataFormat_Name(DataFormat));
@@ -235,7 +248,7 @@ private:
if (!ScanIterator) {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " iterator is not initialized"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath);
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
return;
}
@@ -244,7 +257,7 @@ private:
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " waiting for peer free space"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath);
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
return;
}
@@ -257,7 +270,7 @@ private:
}
// Switch to the next range if the current one is finished
- if (ScanIterator->Finished() && !InFlightReads) {
+ if (ScanIterator && ScanIterator->Finished() && !InFlightReads) {
NextReadMetadata();
}
@@ -289,7 +302,7 @@ private:
// The loop has finished without any progress!
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " is hanging"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath);
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
}
void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) {
@@ -299,12 +312,12 @@ private:
auto prio = msg.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS ? NActors::NLog::PRI_DEBUG : NActors::NLog::PRI_WARN;
LOG_LOG_S(*TlsActivationContext, prio, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " got AbortExecution"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
<< " code: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode())
<< " reason: " << reason);
AbortReason = std::move(reason);
- SendError(NKikimrProto::EReplyStatus::ERROR); // TODO: better status?
+ SendScanError();
Finish();
}
@@ -321,7 +334,7 @@ private:
LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " undelivered event: " << eventType
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
<< " reason: " << ev->Get()->Reason
<< " description: " << AbortReason);
@@ -331,7 +344,7 @@ private:
void HandleScan(TEvents::TEvWakeup::TPtr&) {
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " guard execution timeout"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath);
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
TimeoutActorId = {};
Finish();
@@ -412,7 +425,7 @@ private:
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " send ScanData to " << ComputeActorId
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " table: " << TablePath
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
<< " bytes: " << Bytes << " rows: " << Rows << " page faults: " << Result->PageFaults
<< " finished: " << Result->Finished << " pageFault: " << Result->PageFault
<< " arrow schema:\n" << (Result->ArrowBatch ? Result->ArrowBatch->schema()->ToString() : ""));
@@ -433,17 +446,30 @@ private:
return true;
}
- void SendError(NKikimrProto::EReplyStatus status) {
- auto ev = MakeHolder<TEvKqpCompute::TEvScanError>(ScanGen);
+ void SendScanError(TString reason = {}) {
+ TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId;
+ if (!reason.empty()) {
+ msg += ", reason: " + reason;
+ }
+ auto ev = MakeHolder<TEvKqpCompute::TEvScanError>(ScanGen);
ev->Record.SetStatus(Ydb::StatusIds::GENERIC_ERROR);
- auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_RESULT_UNAVAILABLE, TStringBuilder()
- << "Table " << TablePath << " scan failed, reason: " << NKikimrProto::EReplyStatus_Name(status));
+ auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_RESULT_UNAVAILABLE, msg);
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
Send(ComputeActorId, ev.Release());
}
+ void SendAbortExecution(TString reason = {}) {
+ auto status = NYql::NDqProto::StatusIds::PRECONDITION_FAILED;
+ TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId;
+ if (!reason.empty()) {
+ msg += ", reason: " + reason;
+ }
+
+ Send(ComputeActorId, new TEvKqp::TEvAbortExecution(status, msg));
+ }
+
void Finish() {
if (TimeoutActorId) {
Send(TimeoutActorId, new TEvents::TEvPoison);
@@ -475,8 +501,7 @@ private:
const ui64 RequestCookie;
const i64 MaxReadAheadBytes = DEFAULT_READ_AHEAD_BYTES;
const NKikimrTxDataShard::EScanDataFormat DataFormat;
-
- const TString TablePath;
+ const ui64 TabletId;
TVector<NOlap::TReadMetadataBase::TConstPtr> ReadMetadataRanges;
ui32 ReadMetadataIndex;
@@ -798,7 +823,7 @@ void TTxScan::Complete(const TActorContext& ctx) {
Self->IncCounter(COUNTER_READ_INDEX_BYTES, statsDelta.Bytes);
auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor,
- scanId, txId, scanGen, requestCookie, table, timeout, std::move(ReadMetadataRanges), dataFormat));
+ scanId, txId, scanGen, requestCookie, Self->TabletID(), timeout, std::move(ReadMetadataRanges), dataFormat));
LOG_S_DEBUG("TTxScan starting " << scanActor
<< " txId: " << txId
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h
index cee4f525f7..f2ba4a97e1 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h
@@ -67,7 +67,9 @@ public:
if (ReadMetadata->Program) {
auto status = ApplyProgram(out.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext());
- Y_VERIFY_S(status.ok(), status.message());
+ if (!status.ok()) {
+ out.ErrorString = status.message();
+ }
}
return out;
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index f82cb2c26b..323bd54bb4 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -363,7 +363,9 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
bool requireResult = !HasIndexRead(); // not indexed or the last indexed read (even if it's emply)
auto out = MakeResult(ReadyToOut(), maxRowsInBatch);
if (requireResult && out.empty()) {
- out.push_back({NArrow::MakeEmptyBatch(ReadMetadata->ResultSchema), nullptr});
+ out.push_back(TPartialReadResult{
+ .ResultBatch = NArrow::MakeEmptyBatch(ReadMetadata->ResultSchema)
+ });
}
return out;
}
@@ -520,14 +522,19 @@ TIndexedReadData::MakeResult(TVector<std::vector<std::shared_ptr<arrow::RecordBa
// Leave only requested columns
auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->ResultSchema);
- out.emplace_back(TPartialReadResult{std::move(resultBatch), std::move(lastKey)});
+ out.emplace_back(TPartialReadResult{
+ .ResultBatch = std::move(resultBatch),
+ .LastReadKey = std::move(lastKey)
+ });
}
}
if (ReadMetadata->Program) {
- for (auto& batch : out) {
- auto status = ApplyProgram(batch.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext());
- Y_VERIFY_S(status.ok(), status.message());
+ for (auto& result : out) {
+ auto status = ApplyProgram(result.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext());
+ if (!status.ok()) {
+ result.ErrorString = status.message();
+ }
}
}
return out;
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 1197f38d13..54cc70ac9c 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -176,6 +176,8 @@ struct TPartialReadResult {
// This 1-row batch contains the last key that was read while producing the ResultBatch.
// NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit
std::shared_ptr<arrow::RecordBatch> LastReadKey;
+
+ std::string ErrorString;
};
class TIndexedReadData {