diff options
author | udovichenko-r <[email protected]> | 2025-04-16 15:19:31 +0300 |
---|---|---|
committer | udovichenko-r <[email protected]> | 2025-04-16 15:38:35 +0300 |
commit | 0192d5ea8ae0e946f3c04fe10dde30601e65ec99 (patch) | |
tree | e821dad2b315bc9ecf0330a5e55161a2fc5ff55d | |
parent | f2d5d88031564a430ea49c32ad0dc35f22008c72 (diff) |
Add Prepare stage
commit_hash:800d6cd5cb053472ea8c5aab0f2c68e6be75b0d0
-rw-r--r-- | yt/yql/providers/yt/gateway/native/yql_yt_native.cpp | 8 |
1 files changed, 8 insertions, 0 deletions
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index 71fdc92c5cc..22e20173d6e 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -3624,6 +3624,7 @@ private: return execCtx->Session_->Queue_->Async([execCtx]() { return execCtx->LookupQueryCacheAsync().Apply([execCtx] (const auto& f) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); + execCtx->SetNodeExecProgress("Preparing"); auto entry = execCtx->GetEntry(); bool cacheHit = f.GetValue(); TVector<TRichYPath> outYPaths = PrepareDestinations(execCtx->OutTables_, execCtx, entry, !cacheHit); @@ -3690,6 +3691,7 @@ private: return execCtx->Session_->Queue_->Async([forceTransform, combineChunks, limit, inputQueryExpr, execCtx]() { return execCtx->LookupQueryCacheAsync().Apply([forceTransform, combineChunks, limit, inputQueryExpr, execCtx] (const auto& f) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); + execCtx->SetNodeExecProgress("Preparing"); auto entry = execCtx->GetEntry(); bool cacheHit = f.GetValue(); TVector<TRichYPath> outYPaths = PrepareDestinations(execCtx->OutTables_, execCtx, entry, !cacheHit); @@ -3762,6 +3764,7 @@ private: inputType, extraUsage, inputQueryExpr, execCtx, testRun] (const auto& f) mutable { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); + execCtx->SetNodeExecProgress("Preparing"); TTransactionCache::TEntry::TPtr entry; TVector<TRichYPath> outYPaths; if (testRun) { @@ -3990,6 +3993,7 @@ private: (const auto& f) mutable { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); + execCtx->SetNodeExecProgress("Preparing"); TTransactionCache::TEntry::TPtr entry; TVector<TRichYPath> outYPaths; if (testRun) { @@ -4208,6 +4212,7 @@ private: (const auto& f) mutable { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); + execCtx->SetNodeExecProgress("Preparing"); TTransactionCache::TEntry::TPtr entry; TVector<TRichYPath> outYPaths; @@ -4458,6 +4463,7 @@ private: (const auto& f) mutable { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); + execCtx->SetNodeExecProgress("Preparing"); TTransactionCache::TEntry::TPtr entry; TVector<TRichYPath> outYPaths; if (testRun) { @@ -4791,6 +4797,7 @@ private: TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync(); return ret.Apply([lambda, extraUsage, tmpTable, execCtx, testRun] (const auto& f) mutable { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); + execCtx->SetNodeExecProgress("Preparing"); TTransactionCache::TEntry::TPtr entry; TVector<TRichYPath> outYPaths; if (testRun) { @@ -5417,6 +5424,7 @@ private: bool localRun = execCtx->Config_->HasExecuteUdfLocallyIfPossible() ? execCtx->Config_->GetExecuteUdfLocallyIfPossible() : false; { + execCtx->SetNodeExecProgress("Preparing"); TUserJobSpec userJobSpec; TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), execCtx->FunctionRegistry_->SupportsSizedAllocators()); |