aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-09-28 17:28:45 +0300
committereivanov89 <eivanov89@ydb.tech>2022-09-28 17:28:45 +0300
commitc31acefb6d6c3f3f9fc27420eb5199f2ad7e79d5 (patch)
treecf929f64df23de949297c7505e52fa12e7b16b24
parent771fe73ed7cde6b42249d963d9c7933ed36a4c66 (diff)
downloadydb-c31acefb6d6c3f3f9fc27420eb5199f2ad7e79d5.tar.gz
don't send errors from Execute() and wait commit
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp85
1 files changed, 28 insertions, 57 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 15c2ff8c93e..f33e816d9c4 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -730,6 +730,12 @@ public:
}
Y_VERIFY(it->second);
auto& state = *it->second;
+
+ if (Result->Record.HasStatus()) {
+ // error happened on check phase
+ return EExecutionStatus::DelayComplete;
+ }
+
Y_VERIFY(state.State == TReadIteratorState::EState::Executing);
++ExecuteCount;
@@ -791,9 +797,8 @@ public:
auto tableId = state.PathId.LocalPathId;
auto it = Self->TableInfos.find(tableId);
if (it == Self->TableInfos.end()) {
- SendErrorAndAbort(
- ctx,
- state,
+ SetStatusError(
+ Result->Record,
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Unknown table id: " << tableId);
return EExecutionStatus::DelayComplete;
@@ -818,9 +823,8 @@ public:
}
if (!snapshotFound) {
- SendErrorAndAbort(
- ctx,
- state,
+ SetStatusError(
+ Result->Record,
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Table id " << tableId << " lost snapshot at "
<< state.ReadVersion << " shard " << Self->TabletID()
@@ -830,9 +834,8 @@ public:
}
if (state.SchemaVersion != userTableInfo->GetTableSchemaVersion()) {
- SendErrorAndAbort(
- ctx,
- state,
+ SetStatusError(
+ Result->Record,
Ydb::StatusIds::SCHEME_ERROR,
TStringBuilder() << "Schema changed, current " << userTableInfo->GetTableSchemaVersion()
<< ", requested table schemaversion " << state.SchemaVersion);
@@ -864,9 +867,8 @@ public:
case EEnsureCurrentLock::Abort:
// Lock cannot be created and we must abort
- SendErrorAndAbort(
- ctx,
- state,
+ SetStatusError(
+ Result->Record,
Ydb::StatusIds::ABORTED,
TStringBuilder() << "Transaction was already committed or aborted");
return EExecutionStatus::DelayComplete;
@@ -942,7 +944,6 @@ public:
auto it = Self->ReadIterators.find(readId);
if (it == Self->ReadIterators.end()) {
// iterator has been aborted
- Abort(EExecutionUnitKind::CompletedOperations);
return;
}
Y_VERIFY(it->second);
@@ -980,9 +981,8 @@ public:
if (state.PathId.OwnerId != Self->TabletID()) {
// owner is schemeshard, read user table
if (state.PathId.OwnerId != Self->GetPathOwnerId()) {
- SendErrorAndAbort(
- ctx,
- state,
+ SetStatusError(
+ Result->Record,
Ydb::StatusIds::BAD_REQUEST,
TStringBuilder() << "Requesting ownerId: " << state.PathId.OwnerId
<< ", tableId: " << state.PathId.LocalPathId
@@ -993,9 +993,8 @@ public:
const auto tableId = state.PathId.LocalPathId;
auto it = Self->TableInfos.find(tableId);
if (it == Self->TableInfos.end()) {
- SendErrorAndAbort(
- ctx,
- state,
+ SetStatusError(
+ Result->Record,
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Unknown table id: " << tableId);
return;
@@ -1005,9 +1004,8 @@ public:
TableInfo = TShortTableInfo(userTableInfo);
if (userTableInfo->IsBackup) {
- SendErrorAndAbort(
- ctx,
- state,
+ SetStatusError(
+ Result->Record,
Ydb::StatusIds::BAD_REQUEST,
"Can't read from a backup table");
return;
@@ -1037,9 +1035,8 @@ public:
}
if (!snapshotFound) {
- SendErrorAndAbort(
- ctx,
- state,
+ SetStatusError(
+ Result->Record,
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Table id " << tableId << " has no snapshot at "
<< state.ReadVersion << " shard " << Self->TabletID()
@@ -1051,9 +1048,8 @@ public:
state.SchemaVersion = userTableInfo->GetTableSchemaVersion();
if (record.GetTableId().HasSchemaVersion()) {
if (state.SchemaVersion != record.GetTableId().GetSchemaVersion()) {
- SendErrorAndAbort(
- ctx,
- state,
+ SetStatusError(
+ Result->Record,
Ydb::StatusIds::SCHEME_ERROR,
TStringBuilder() << "Wrong schemaversion " << record.GetTableId().GetSchemaVersion()
<< " requested, table schemaversion " << state.SchemaVersion);
@@ -1067,9 +1063,8 @@ public:
auto schema = txc.DB.GetRowScheme(state.PathId.LocalPathId);
if (!schema) {
- SendErrorAndAbort(
- ctx,
- state,
+ SetStatusError(
+ Result->Record,
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Failed to get scheme for table local id: "
<< state.PathId.LocalPathId);
@@ -1112,9 +1107,8 @@ public:
for (auto col: record.GetColumns()) {
auto it = TableInfo.Columns.find(col);
if (it == TableInfo.Columns.end()) {
- SendErrorAndAbort(
- ctx,
- state,
+ SetStatusError(
+ Result->Record,
Ydb::StatusIds::SCHEME_ERROR,
TStringBuilder() << "Unknown column: " << col);
return;
@@ -1231,29 +1225,6 @@ public:
}
private:
- void SendErrorAndAbort(
- const TActorContext& ctx,
- TReadIteratorState& state,
- Ydb::StatusIds::StatusCode code,
- const TString& msg)
- {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " aborted read iterator# "
- << state.ReadId << " with msg " << msg);
-
- if (!Result)
- Result.reset(new TEvDataShard::TEvReadResult());
-
- SetStatusError(Result->Record, code, msg);
- Result->Record.SetReadId(state.ReadId);
- Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId);
-
- // note that we don't need to execute any other unit
- Abort(EExecutionUnitKind::CompletedOperations);
-
- TReadIteratorId readId(Sender, state.ReadId);
- Self->DeleteReadIterator(readId);
- }
-
// return semantics is like in Execute()
bool Read(TTransactionContext& txc, const TActorContext& ctx, TReadIteratorState& state) {
const auto& tableId = state.PathId.LocalPathId;