aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Makunin <igor.makunin@gmail.com>2022-02-09 19:04:25 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 15:58:17 +0300
commit1db1a68d1cac3536b6c8e72ccbb25a265292e4d6 (patch)
treeab42e83dd48f9b3281e5af2bac7924fef2b2a88b
parent4d5b1933af9d396c508614c10264ff4b69394356 (diff)
downloadydb-1db1a68d1cac3536b6c8e72ccbb25a265292e4d6.tar.gz
KIKIMR-13802: get rid of TNotReadyTabletException in NewEngine
ref:7592bc8e94ebae6378964a5d0021d3f8cbf675a2
-rw-r--r--ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp38
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.h5
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_read_table.cpp3
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp38
-rw-r--r--ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp6
7 files changed, 70 insertions, 30 deletions
diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
index 57da5e7613..ffa56fbe67 100644
--- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
+++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
@@ -25,6 +25,10 @@ public:
bool IsReadyToExecute(TOperation::TPtr op) const override;
EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override;
void Complete(TOperation::TPtr op, const TActorContext& ctx) override;
+
+private:
+ EExecutionStatus OnTabletNotReady(TActiveTransaction& tx, TValidatedDataTx& dataTx, TTransactionContext& txc,
+ const TActorContext& ctx);
};
TBuildKqpDataTxOutRSUnit::TBuildKqpDataTxOutRSUnit(TDataShard& dataShard, TPipeline& pipeline)
@@ -89,8 +93,13 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
<< " set memory limit " << (txc.GetMemoryLimit() - dataTx->GetTxSize()));
dataTx->SetReadVersion(DataShard.GetReadWriteVersions(tx).ReadVersion);
+
if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) {
- KqpRunTransaction(ctx, op->GetTxId(), dataTx->GetKqpTasks(), tasksRunner);
+ auto result = KqpRunTransaction(ctx, op->GetTxId(), dataTx->GetKqpTasks(), tasksRunner);
+
+ if (result == NYql::NDq::ERunStatus::PendingInput && dataTx->GetKqpComputeCtx().IsTabletNotReady()) {
+ return OnTabletNotReady(*tx, *dataTx, txc, ctx);
+ }
}
KqpFillOutReadSets(op->OutReadSets(), kqpTx, tasksRunner, DataShard.SysLocksTable(), tabletId);
@@ -107,15 +116,9 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
tx->ReleaseTxData(txc, ctx);
return EExecutionStatus::Restart;
- } catch (const TNotReadyTabletException& e) {
- LOG_T("Tablet " << DataShard.TabletID() << " is not ready for " << *op << " execution");
-
- DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY);
-
- dataTx->GetKqpComputeCtx().PinPages(dataTx->TxInfo().Keys);
-
- tx->ReleaseTxData(txc, ctx);
- return EExecutionStatus::Restart;
+ } catch (const TNotReadyTabletException&) {
+ LOG_C("Unexpected TNotReadyTabletException exception at build out rs");
+ return OnTabletNotReady(*tx, *dataTx, txc, ctx);
} catch (const yexception& e) {
LOG_C("Exception while preparing out-readsets for KQP transaction " << *op << " at " << DataShard.TabletID()
<< ": " << e.what());
@@ -134,6 +137,21 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
void TBuildKqpDataTxOutRSUnit::Complete(TOperation::TPtr, const TActorContext&) {}
+EExecutionStatus TBuildKqpDataTxOutRSUnit::OnTabletNotReady(TActiveTransaction& tx, TValidatedDataTx& dataTx,
+ TTransactionContext& txc, const TActorContext& ctx)
+{
+ LOG_T("Tablet " << DataShard.TabletID() << " is not ready for " << tx << " execution");
+
+ dataTx.GetKqpComputeCtx().ResetTabletNotReady();
+ DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY);
+
+ ui64 pageFaultCount = tx.IncrementPageFaultCount();
+ dataTx.GetKqpComputeCtx().PinPages(dataTx.TxInfo().Keys, pageFaultCount);
+
+ tx.ReleaseTxData(txc, ctx);
+ return EExecutionStatus::Restart;
+}
+
THolder<TExecutionUnit> CreateBuildKqpDataTxOutRSUnit(TDataShard& dataShard, TPipeline& pipeline) {
return THolder(new TBuildKqpDataTxOutRSUnit(dataShard, pipeline));
}
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index edca688275..a262c98de5 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -417,6 +417,10 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
{
auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, tasks, tasksRunner, /* applyEffects */ true);
+ if (runStatus == NYql::NDq::ERunStatus::PendingInput && computeCtx.IsTabletNotReady()) {
+ return nullptr;
+ }
+
MKQL_ENSURE_S(runStatus == NYql::NDq::ERunStatus::Finished);
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA,
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h
index 58bdf35451..170e555b2b 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.h
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.h
@@ -55,6 +55,10 @@ public:
TEngineHostCounters& GetTaskCounters(ui64 taskId) { return TaskCounters[taskId]; }
TEngineHostCounters& GetDatashardCounters() { return DatashardCounters; }
+ void SetTabletNotReady() { TabletNotReady = true; };
+ void ResetTabletNotReady() { TabletNotReady = false; }
+ bool IsTabletNotReady() const { return TabletNotReady; }
+
public:
NTable::TDatabase* Database = nullptr;
@@ -65,6 +69,7 @@ private:
TInstant Now;
ui64 LockTxId = 0;
bool PersistentChannels = false;
+ bool TabletNotReady = false;
TRowVersion ReadVersion = TRowVersion::Min();
THashMap<std::pair<ui64, ui64>, TActorId> OutputChannels;
};
diff --git a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
index 418fb06087..24582c2664 100644
--- a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
@@ -135,7 +135,8 @@ public:
switch (ready) {
case EReady::Page:
- throw TNotReadyTabletException();
+ ComputeCtx.SetTabletNotReady();
+ return TUnboxedValue::MakeYield();
case EReady::Gone:
continue;
case EReady::Data:
@@ -284,7 +285,8 @@ public:
}
if (Iterator->Last() == NTable::EReady::Page) {
- throw TNotReadyTabletException();
+ ComputeCtx.SetTabletNotReady();
+ return TUnboxedValue::MakeYield();
}
Iterator = nullptr;
diff --git a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
index 7ecacdbcc7..9fbd8ef0ad 100644
--- a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
@@ -306,7 +306,8 @@ protected:
TaskTableStats.InvisibleRowSkips += invisibleRowSkips;
if (Iterator->Last() == NTable::EReady::Page) {
- throw TNotReadyTabletException();
+ ComputeCtx.SetTabletNotReady();
+ return EFetchResult::Yield;
}
return EFetchResult::Finish;
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index dbaf25ee89..98ab09de45 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -29,6 +29,8 @@ public:
private:
void AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx);
+ EExecutionStatus OnTabletNotReady(TActiveTransaction& tx, TValidatedDataTx& dataTx, TTransactionContext& txc,
+ const TActorContext& ctx);
};
TExecuteKqpDataTxUnit::TExecuteKqpDataTxUnit(TDataShard& dataShard, TPipeline& pipeline)
@@ -130,8 +132,15 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx();
- op->Result() = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(),
+ auto result = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(),
op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), dataTx->GetKqpTasks(), tasksRunner, computeCtx);
+
+ if (!result && computeCtx.IsTabletNotReady()) {
+ return OnTabletNotReady(*tx, *dataTx, txc, ctx);
+ }
+
+ Y_VERIFY(result);
+ op->Result().Swap(result);
op->SetKqpAttachedRSFlag();
KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable());
@@ -173,16 +182,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
tx->ReleaseTxData(txc, ctx);
return EExecutionStatus::Restart;
- } catch (const TNotReadyTabletException& e) {
- LOG_T("Tablet " << tabletId << " is not ready for " << *op << " execution");
-
- DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY);
-
- ui64 pageFaultCount = tx->IncrementPageFaultCount();
- dataTx->GetKqpComputeCtx().PinPages(dataTx->TxInfo().Keys, pageFaultCount);
-
- tx->ReleaseTxData(txc, ctx);
- return EExecutionStatus::Restart;
+ } catch (const TNotReadyTabletException&) {
+ return OnTabletNotReady(*tx, *dataTx, txc, ctx);
} catch (const yexception& e) {
LOG_C("Exception while executing KQP transaction " << *op << " at " << tabletId << ": " << e.what());
if (op->IsReadOnly() || op->IsImmediate()) {
@@ -220,6 +221,21 @@ void TExecuteKqpDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorCo
}
}
+EExecutionStatus TExecuteKqpDataTxUnit::OnTabletNotReady(TActiveTransaction& tx, TValidatedDataTx& dataTx,
+ TTransactionContext& txc, const TActorContext& ctx)
+{
+ LOG_T("Tablet " << DataShard.TabletID() << " is not ready for " << tx << " execution");
+
+ dataTx.GetKqpComputeCtx().ResetTabletNotReady();
+ DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY);
+
+ ui64 pageFaultCount = tx.IncrementPageFaultCount();
+ dataTx.GetKqpComputeCtx().PinPages(dataTx.TxInfo().Keys, pageFaultCount);
+
+ tx.ReleaseTxData(txc, ctx);
+ return EExecutionStatus::Restart;
+}
+
void TExecuteKqpDataTxUnit::Complete(TOperation::TPtr, const TActorContext&) {}
THolder<TExecutionUnit> CreateExecuteKqpDataTxUnit(TDataShard& dataShard, TPipeline& pipeline) {
diff --git a/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp b/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
index 3b17112a93..05ab71fc36 100644
--- a/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
+++ b/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
@@ -57,12 +57,6 @@ EExecutionStatus TPrepareKqpDataTxInRSUnit::Execute(TOperation::TPtr op, TTransa
try {
KqpPrepareInReadsets(op->InReadSets(), tx->GetDataTx()->GetKqpTransaction(), DataShard.TabletID());
- } catch (const TNotReadyTabletException&) {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
- "Tablet " << DataShard.TabletID() << " is not ready for " << *op
- << " execution");
-
- return EExecutionStatus::Restart;
} catch (const yexception& e) {
LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD, "Exception while preparing in-readsets for KQP transaction "
<< *op << " at " << DataShard.TabletID() << ": " << CurrentExceptionMessage());