diff options
author | Igor Makunin <igor.makunin@gmail.com> | 2022-02-09 19:04:25 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 15:58:17 +0300 |
commit | 1db1a68d1cac3536b6c8e72ccbb25a265292e4d6 (patch) | |
tree | ab42e83dd48f9b3281e5af2bac7924fef2b2a88b | |
parent | 4d5b1933af9d396c508614c10264ff4b69394356 (diff) | |
download | ydb-1db1a68d1cac3536b6c8e72ccbb25a265292e4d6.tar.gz |
KIKIMR-13802: get rid of TNotReadyTabletException in NewEngine
ref:7592bc8e94ebae6378964a5d0021d3f8cbf675a2
7 files changed, 70 insertions, 30 deletions
diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp index 57da5e7613..ffa56fbe67 100644 --- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp @@ -25,6 +25,10 @@ public: bool IsReadyToExecute(TOperation::TPtr op) const override; EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override; void Complete(TOperation::TPtr op, const TActorContext& ctx) override; + +private: + EExecutionStatus OnTabletNotReady(TActiveTransaction& tx, TValidatedDataTx& dataTx, TTransactionContext& txc, + const TActorContext& ctx); }; TBuildKqpDataTxOutRSUnit::TBuildKqpDataTxOutRSUnit(TDataShard& dataShard, TPipeline& pipeline) @@ -89,8 +93,13 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac << " set memory limit " << (txc.GetMemoryLimit() - dataTx->GetTxSize())); dataTx->SetReadVersion(DataShard.GetReadWriteVersions(tx).ReadVersion); + if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) { - KqpRunTransaction(ctx, op->GetTxId(), dataTx->GetKqpTasks(), tasksRunner); + auto result = KqpRunTransaction(ctx, op->GetTxId(), dataTx->GetKqpTasks(), tasksRunner); + + if (result == NYql::NDq::ERunStatus::PendingInput && dataTx->GetKqpComputeCtx().IsTabletNotReady()) { + return OnTabletNotReady(*tx, *dataTx, txc, ctx); + } } KqpFillOutReadSets(op->OutReadSets(), kqpTx, tasksRunner, DataShard.SysLocksTable(), tabletId); @@ -107,15 +116,9 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac tx->ReleaseTxData(txc, ctx); return EExecutionStatus::Restart; - } catch (const TNotReadyTabletException& e) { - LOG_T("Tablet " << DataShard.TabletID() << " is not ready for " << *op << " execution"); - - DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY); - - dataTx->GetKqpComputeCtx().PinPages(dataTx->TxInfo().Keys); - - tx->ReleaseTxData(txc, ctx); - return EExecutionStatus::Restart; + } catch (const TNotReadyTabletException&) { + LOG_C("Unexpected TNotReadyTabletException exception at build out rs"); + return OnTabletNotReady(*tx, *dataTx, txc, ctx); } catch (const yexception& e) { LOG_C("Exception while preparing out-readsets for KQP transaction " << *op << " at " << DataShard.TabletID() << ": " << e.what()); @@ -134,6 +137,21 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac void TBuildKqpDataTxOutRSUnit::Complete(TOperation::TPtr, const TActorContext&) {} +EExecutionStatus TBuildKqpDataTxOutRSUnit::OnTabletNotReady(TActiveTransaction& tx, TValidatedDataTx& dataTx, + TTransactionContext& txc, const TActorContext& ctx) +{ + LOG_T("Tablet " << DataShard.TabletID() << " is not ready for " << tx << " execution"); + + dataTx.GetKqpComputeCtx().ResetTabletNotReady(); + DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY); + + ui64 pageFaultCount = tx.IncrementPageFaultCount(); + dataTx.GetKqpComputeCtx().PinPages(dataTx.TxInfo().Keys, pageFaultCount); + + tx.ReleaseTxData(txc, ctx); + return EExecutionStatus::Restart; +} + THolder<TExecutionUnit> CreateBuildKqpDataTxOutRSUnit(TDataShard& dataShard, TPipeline& pipeline) { return THolder(new TBuildKqpDataTxOutRSUnit(dataShard, pipeline)); } diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index edca688275..a262c98de5 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -417,6 +417,10 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const { auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, tasks, tasksRunner, /* applyEffects */ true); + if (runStatus == NYql::NDq::ERunStatus::PendingInput && computeCtx.IsTabletNotReady()) { + return nullptr; + } + MKQL_ENSURE_S(runStatus == NYql::NDq::ERunStatus::Finished); auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA, diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h index 58bdf35451..170e555b2b 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.h +++ b/ydb/core/tx/datashard/datashard_kqp_compute.h @@ -55,6 +55,10 @@ public: TEngineHostCounters& GetTaskCounters(ui64 taskId) { return TaskCounters[taskId]; } TEngineHostCounters& GetDatashardCounters() { return DatashardCounters; } + void SetTabletNotReady() { TabletNotReady = true; }; + void ResetTabletNotReady() { TabletNotReady = false; } + bool IsTabletNotReady() const { return TabletNotReady; } + public: NTable::TDatabase* Database = nullptr; @@ -65,6 +69,7 @@ private: TInstant Now; ui64 LockTxId = 0; bool PersistentChannels = false; + bool TabletNotReady = false; TRowVersion ReadVersion = TRowVersion::Min(); THashMap<std::pair<ui64, ui64>, TActorId> OutputChannels; }; diff --git a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp index 418fb06087..24582c2664 100644 --- a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp @@ -135,7 +135,8 @@ public: switch (ready) { case EReady::Page: - throw TNotReadyTabletException(); + ComputeCtx.SetTabletNotReady(); + return TUnboxedValue::MakeYield(); case EReady::Gone: continue; case EReady::Data: @@ -284,7 +285,8 @@ public: } if (Iterator->Last() == NTable::EReady::Page) { - throw TNotReadyTabletException(); + ComputeCtx.SetTabletNotReady(); + return TUnboxedValue::MakeYield(); } Iterator = nullptr; diff --git a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp index 7ecacdbcc7..9fbd8ef0ad 100644 --- a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp @@ -306,7 +306,8 @@ protected: TaskTableStats.InvisibleRowSkips += invisibleRowSkips; if (Iterator->Last() == NTable::EReady::Page) { - throw TNotReadyTabletException(); + ComputeCtx.SetTabletNotReady(); + return EFetchResult::Yield; } return EFetchResult::Finish; diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index dbaf25ee89..98ab09de45 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -29,6 +29,8 @@ public: private: void AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx); + EExecutionStatus OnTabletNotReady(TActiveTransaction& tx, TValidatedDataTx& dataTx, TTransactionContext& txc, + const TActorContext& ctx); }; TExecuteKqpDataTxUnit::TExecuteKqpDataTxUnit(TDataShard& dataShard, TPipeline& pipeline) @@ -130,8 +132,15 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx(); - op->Result() = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(), + auto result = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(), op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), dataTx->GetKqpTasks(), tasksRunner, computeCtx); + + if (!result && computeCtx.IsTabletNotReady()) { + return OnTabletNotReady(*tx, *dataTx, txc, ctx); + } + + Y_VERIFY(result); + op->Result().Swap(result); op->SetKqpAttachedRSFlag(); KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable()); @@ -173,16 +182,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio tx->ReleaseTxData(txc, ctx); return EExecutionStatus::Restart; - } catch (const TNotReadyTabletException& e) { - LOG_T("Tablet " << tabletId << " is not ready for " << *op << " execution"); - - DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY); - - ui64 pageFaultCount = tx->IncrementPageFaultCount(); - dataTx->GetKqpComputeCtx().PinPages(dataTx->TxInfo().Keys, pageFaultCount); - - tx->ReleaseTxData(txc, ctx); - return EExecutionStatus::Restart; + } catch (const TNotReadyTabletException&) { + return OnTabletNotReady(*tx, *dataTx, txc, ctx); } catch (const yexception& e) { LOG_C("Exception while executing KQP transaction " << *op << " at " << tabletId << ": " << e.what()); if (op->IsReadOnly() || op->IsImmediate()) { @@ -220,6 +221,21 @@ void TExecuteKqpDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorCo } } +EExecutionStatus TExecuteKqpDataTxUnit::OnTabletNotReady(TActiveTransaction& tx, TValidatedDataTx& dataTx, + TTransactionContext& txc, const TActorContext& ctx) +{ + LOG_T("Tablet " << DataShard.TabletID() << " is not ready for " << tx << " execution"); + + dataTx.GetKqpComputeCtx().ResetTabletNotReady(); + DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY); + + ui64 pageFaultCount = tx.IncrementPageFaultCount(); + dataTx.GetKqpComputeCtx().PinPages(dataTx.TxInfo().Keys, pageFaultCount); + + tx.ReleaseTxData(txc, ctx); + return EExecutionStatus::Restart; +} + void TExecuteKqpDataTxUnit::Complete(TOperation::TPtr, const TActorContext&) {} THolder<TExecutionUnit> CreateExecuteKqpDataTxUnit(TDataShard& dataShard, TPipeline& pipeline) { diff --git a/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp b/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp index 3b17112a93..05ab71fc36 100644 --- a/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp +++ b/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp @@ -57,12 +57,6 @@ EExecutionStatus TPrepareKqpDataTxInRSUnit::Execute(TOperation::TPtr op, TTransa try { KqpPrepareInReadsets(op->InReadSets(), tx->GetDataTx()->GetKqpTransaction(), DataShard.TabletID()); - } catch (const TNotReadyTabletException&) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, - "Tablet " << DataShard.TabletID() << " is not ready for " << *op - << " execution"); - - return EExecutionStatus::Restart; } catch (const yexception& e) { LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD, "Exception while preparing in-readsets for KQP transaction " << *op << " at " << DataShard.TabletID() << ": " << CurrentExceptionMessage()); |