diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-09-28 17:28:45 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-09-28 17:28:45 +0300 |
commit | c31acefb6d6c3f3f9fc27420eb5199f2ad7e79d5 (patch) | |
tree | cf929f64df23de949297c7505e52fa12e7b16b24 | |
parent | 771fe73ed7cde6b42249d963d9c7933ed36a4c66 (diff) | |
download | ydb-c31acefb6d6c3f3f9fc27420eb5199f2ad7e79d5.tar.gz |
don't send errors from Execute() and wait commit
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 85 |
1 files changed, 28 insertions, 57 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 15c2ff8c93e..f33e816d9c4 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -730,6 +730,12 @@ public: } Y_VERIFY(it->second); auto& state = *it->second; + + if (Result->Record.HasStatus()) { + // error happened on check phase + return EExecutionStatus::DelayComplete; + } + Y_VERIFY(state.State == TReadIteratorState::EState::Executing); ++ExecuteCount; @@ -791,9 +797,8 @@ public: auto tableId = state.PathId.LocalPathId; auto it = Self->TableInfos.find(tableId); if (it == Self->TableInfos.end()) { - SendErrorAndAbort( - ctx, - state, + SetStatusError( + Result->Record, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Unknown table id: " << tableId); return EExecutionStatus::DelayComplete; @@ -818,9 +823,8 @@ public: } if (!snapshotFound) { - SendErrorAndAbort( - ctx, - state, + SetStatusError( + Result->Record, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Table id " << tableId << " lost snapshot at " << state.ReadVersion << " shard " << Self->TabletID() @@ -830,9 +834,8 @@ public: } if (state.SchemaVersion != userTableInfo->GetTableSchemaVersion()) { - SendErrorAndAbort( - ctx, - state, + SetStatusError( + Result->Record, Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Schema changed, current " << userTableInfo->GetTableSchemaVersion() << ", requested table schemaversion " << state.SchemaVersion); @@ -864,9 +867,8 @@ public: case EEnsureCurrentLock::Abort: // Lock cannot be created and we must abort - SendErrorAndAbort( - ctx, - state, + SetStatusError( + Result->Record, Ydb::StatusIds::ABORTED, TStringBuilder() << "Transaction was already committed or aborted"); return EExecutionStatus::DelayComplete; @@ -942,7 +944,6 @@ public: auto it = Self->ReadIterators.find(readId); if (it == Self->ReadIterators.end()) { // iterator has been aborted - Abort(EExecutionUnitKind::CompletedOperations); return; } Y_VERIFY(it->second); @@ -980,9 +981,8 @@ public: if (state.PathId.OwnerId != Self->TabletID()) { // owner is schemeshard, read user table if (state.PathId.OwnerId != Self->GetPathOwnerId()) { - SendErrorAndAbort( - ctx, - state, + SetStatusError( + Result->Record, Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Requesting ownerId: " << state.PathId.OwnerId << ", tableId: " << state.PathId.LocalPathId @@ -993,9 +993,8 @@ public: const auto tableId = state.PathId.LocalPathId; auto it = Self->TableInfos.find(tableId); if (it == Self->TableInfos.end()) { - SendErrorAndAbort( - ctx, - state, + SetStatusError( + Result->Record, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Unknown table id: " << tableId); return; @@ -1005,9 +1004,8 @@ public: TableInfo = TShortTableInfo(userTableInfo); if (userTableInfo->IsBackup) { - SendErrorAndAbort( - ctx, - state, + SetStatusError( + Result->Record, Ydb::StatusIds::BAD_REQUEST, "Can't read from a backup table"); return; @@ -1037,9 +1035,8 @@ public: } if (!snapshotFound) { - SendErrorAndAbort( - ctx, - state, + SetStatusError( + Result->Record, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Table id " << tableId << " has no snapshot at " << state.ReadVersion << " shard " << Self->TabletID() @@ -1051,9 +1048,8 @@ public: state.SchemaVersion = userTableInfo->GetTableSchemaVersion(); if (record.GetTableId().HasSchemaVersion()) { if (state.SchemaVersion != record.GetTableId().GetSchemaVersion()) { - SendErrorAndAbort( - ctx, - state, + SetStatusError( + Result->Record, Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Wrong schemaversion " << record.GetTableId().GetSchemaVersion() << " requested, table schemaversion " << state.SchemaVersion); @@ -1067,9 +1063,8 @@ public: auto schema = txc.DB.GetRowScheme(state.PathId.LocalPathId); if (!schema) { - SendErrorAndAbort( - ctx, - state, + SetStatusError( + Result->Record, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Failed to get scheme for table local id: " << state.PathId.LocalPathId); @@ -1112,9 +1107,8 @@ public: for (auto col: record.GetColumns()) { auto it = TableInfo.Columns.find(col); if (it == TableInfo.Columns.end()) { - SendErrorAndAbort( - ctx, - state, + SetStatusError( + Result->Record, Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Unknown column: " << col); return; @@ -1231,29 +1225,6 @@ public: } private: - void SendErrorAndAbort( - const TActorContext& ctx, - TReadIteratorState& state, - Ydb::StatusIds::StatusCode code, - const TString& msg) - { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " aborted read iterator# " - << state.ReadId << " with msg " << msg); - - if (!Result) - Result.reset(new TEvDataShard::TEvReadResult()); - - SetStatusError(Result->Record, code, msg); - Result->Record.SetReadId(state.ReadId); - Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); - - // note that we don't need to execute any other unit - Abort(EExecutionUnitKind::CompletedOperations); - - TReadIteratorId readId(Sender, state.ReadId); - Self->DeleteReadIterator(readId); - } - // return semantics is like in Execute() bool Read(TTransactionContext& txc, const TActorContext& ctx, TReadIteratorState& state) { const auto& tableId = state.PathId.LocalPathId; |