summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <[email protected]>2025-04-16 15:19:31 +0300
committerudovichenko-r <[email protected]>2025-04-16 15:38:35 +0300
commit0192d5ea8ae0e946f3c04fe10dde30601e65ec99 (patch)
treee821dad2b315bc9ecf0330a5e55161a2fc5ff55d
parentf2d5d88031564a430ea49c32ad0dc35f22008c72 (diff)
Add Prepare stage
commit_hash:800d6cd5cb053472ea8c5aab0f2c68e6be75b0d0
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp8
1 files changed, 8 insertions, 0 deletions
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index 71fdc92c5cc..22e20173d6e 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -3624,6 +3624,7 @@ private:
return execCtx->Session_->Queue_->Async([execCtx]() {
return execCtx->LookupQueryCacheAsync().Apply([execCtx] (const auto& f) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
+ execCtx->SetNodeExecProgress("Preparing");
auto entry = execCtx->GetEntry();
bool cacheHit = f.GetValue();
TVector<TRichYPath> outYPaths = PrepareDestinations(execCtx->OutTables_, execCtx, entry, !cacheHit);
@@ -3690,6 +3691,7 @@ private:
return execCtx->Session_->Queue_->Async([forceTransform, combineChunks, limit, inputQueryExpr, execCtx]() {
return execCtx->LookupQueryCacheAsync().Apply([forceTransform, combineChunks, limit, inputQueryExpr, execCtx] (const auto& f) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
+ execCtx->SetNodeExecProgress("Preparing");
auto entry = execCtx->GetEntry();
bool cacheHit = f.GetValue();
TVector<TRichYPath> outYPaths = PrepareDestinations(execCtx->OutTables_, execCtx, entry, !cacheHit);
@@ -3762,6 +3764,7 @@ private:
inputType, extraUsage, inputQueryExpr, execCtx, testRun] (const auto& f) mutable
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
+ execCtx->SetNodeExecProgress("Preparing");
TTransactionCache::TEntry::TPtr entry;
TVector<TRichYPath> outYPaths;
if (testRun) {
@@ -3990,6 +3993,7 @@ private:
(const auto& f) mutable
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
+ execCtx->SetNodeExecProgress("Preparing");
TTransactionCache::TEntry::TPtr entry;
TVector<TRichYPath> outYPaths;
if (testRun) {
@@ -4208,6 +4212,7 @@ private:
(const auto& f) mutable
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
+ execCtx->SetNodeExecProgress("Preparing");
TTransactionCache::TEntry::TPtr entry;
TVector<TRichYPath> outYPaths;
@@ -4458,6 +4463,7 @@ private:
(const auto& f) mutable
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
+ execCtx->SetNodeExecProgress("Preparing");
TTransactionCache::TEntry::TPtr entry;
TVector<TRichYPath> outYPaths;
if (testRun) {
@@ -4791,6 +4797,7 @@ private:
TFuture<bool> ret = testRun ? MakeFuture<bool>(false) : execCtx->LookupQueryCacheAsync();
return ret.Apply([lambda, extraUsage, tmpTable, execCtx, testRun] (const auto& f) mutable {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
+ execCtx->SetNodeExecProgress("Preparing");
TTransactionCache::TEntry::TPtr entry;
TVector<TRichYPath> outYPaths;
if (testRun) {
@@ -5417,6 +5424,7 @@ private:
bool localRun = execCtx->Config_->HasExecuteUdfLocallyIfPossible() ? execCtx->Config_->GetExecuteUdfLocallyIfPossible() : false;
{
+ execCtx->SetNodeExecProgress("Preparing");
TUserJobSpec userJobSpec;
TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),
execCtx->FunctionRegistry_->SupportsSizedAllocators());