diff options
author | udovichenko-r <rvu@ydb.tech> | 2023-04-06 11:03:46 +0300 |
---|---|---|
committer | udovichenko-r <rvu@ydb.tech> | 2023-04-06 11:03:46 +0300 |
commit | 6c00a004990fdcd29f8a065d0f08c6a17d7999b2 (patch) | |
tree | 66c66e49e727da25c327db33e157ca37c67d1351 | |
parent | c3ca42136c25f3175755452eec3b788524f937c2 (diff) | |
download | ydb-6c00a004990fdcd29f8a065d0f08c6a17d7999b2.tar.gz |
[yql] Restore user data storage on fallback
YQL-15761
11 files changed, 69 insertions, 56 deletions
diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index b535cd2d2a3..61d46e86615 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -731,7 +731,7 @@ private: for (auto& output : *task.MutableOutputs()) { if (output.HasSink() && output.GetSink().GetType() == "S3Sink") { NS3::TSink s3SinkSettings; - YQL_ENSURE(output.GetSink().GetSettings().UnpackTo(&s3SinkSettings)); + YQL_ENSURE(output.GetSink().GetSettings().UnpackTo(&s3SinkSettings)); if (s3SinkSettings.GetAtomicUploadCommit()) { auto prefix = s3SinkSettings.GetUrl() + s3SinkSettings.GetPath(); const auto& [it, isNew] = S3Prefixes.insert(prefix); @@ -1368,7 +1368,7 @@ private: TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>(); dqConfiguration->Dispatch(dqGraphParams.GetSettings()); dqConfiguration->FreezeDefaults(); - dqConfiguration->FallbackPolicy = "never"; + dqConfiguration->FallbackPolicy = EFallbackPolicy::Never; TEvaluationGraphInfo info; @@ -1413,7 +1413,7 @@ private: TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>(); dqConfiguration->Dispatch(dqGraphParams.GetSettings()); dqConfiguration->FreezeDefaults(); - dqConfiguration->FallbackPolicy = "never"; + dqConfiguration->FallbackPolicy = EFallbackPolicy::Never; ExecuterId = Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), EnableCheckpointCoordinator)); diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp index 078d67c6525..6c5b2d18b1d 100644 --- a/ydb/library/yql/core/facade/yql_facade.cpp +++ b/ydb/library/yql/core/facade/yql_facade.cpp @@ -255,6 +255,7 @@ TProgram::TProgram( , UdfIndex_(udfIndex) , UdfIndexPackageSet_(udfIndexPackageSet) , FileStorage_(fileStorage) + , SavedUserDataTable_(userDataTable) , UserDataStorage_(MakeIntrusive<TUserDataStorage>(fileStorage, userDataTable, udfResolver, udfIndex)) , GatewaysConfig_(gatewaysConfig) , Filename_(filename) @@ -934,11 +935,8 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap && !TypeCtx_->ForceDq && SavedExprRoot_ && TypeCtx_->DqCaptured - && TypeCtx_->DqFallbackPolicy != "never") + && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Never) { - ExprRoot_ = SavedExprRoot_; - SavedExprRoot_ = nullptr; - auto issues = ExprCtx_->IssueManager.GetIssues(); bool hasDqGatewayError = false; bool hasDqGatewayFallbackError = false; @@ -965,18 +963,6 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap } }; - std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) { - if (issue->Severity == TSeverityIds::S_ERROR - || issue->Severity == TSeverityIds::S_FATAL - || issue->Severity == TSeverityIds::S_WARNING) - { - issue->Severity = TSeverityIds::S_INFO; - } - for (const auto& subissue : issue->GetSubIssues()) { - toInfo(subissue); - } - }; - for (const auto& issue : issues) { checkIssue(issue); // check subissues @@ -985,14 +971,16 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap } } - ExprCtx_->IssueManager.Reset(); - - if (hasDqGatewayError && !hasDqGatewayFallbackError && TypeCtx_->DqFallbackPolicy.find("always") == TString::npos) { + if (hasDqGatewayError && !hasDqGatewayFallbackError && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Always) { // unrecoverable error - ExprCtx_->IssueManager.AddIssues(issues); return res; } + ExprRoot_ = SavedExprRoot_; + SavedExprRoot_ = nullptr; + UserDataStorage_->SetUserDataTable(std::move(SavedUserDataTable_)); + + ExprCtx_->IssueManager.Reset(); YQL_LOG(DEBUG) << "Fallback, Issues: " << issues.ToString(); ExprCtx_->Reset(); @@ -1007,6 +995,19 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap CleanupLastSession(); if (hasDqGatewayError || hasDqOptimizeError) { + + std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) { + if (issue->Severity == TSeverityIds::S_ERROR + || issue->Severity == TSeverityIds::S_FATAL + || issue->Severity == TSeverityIds::S_WARNING) + { + issue->Severity = TSeverityIds::S_INFO; + } + for (const auto& subissue : issue->GetSubIssues()) { + toInfo(subissue); + } + }; + TIssue info("DQ cannot execute the query"); info.Severity = TSeverityIds::S_INFO; @@ -1027,14 +1028,14 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap ExprCtx_->IssueManager.AddIssues({info}); } - FallbackCounter ++; + ++FallbackCounter_; // don't execute recapture again ExprCtx_->Step.Done(TExprStep::Recapture); AbortHidden_(); return AsyncTransformWithFallback(false); } - if (status == IGraphTransformer::TStatus::Error && (TypeCtx_->DqFallbackPolicy == "never" || TypeCtx_->ForceDq)) { - YQL_LOG(DEBUG) << "Fallback skipped due to per query policy"; + if (status == IGraphTransformer::TStatus::Error && (TypeCtx_->DqFallbackPolicy == EFallbackPolicy::Never || TypeCtx_->ForceDq)) { + YQL_LOG(INFO) << "Fallback skipped due to per query policy"; } return res; }); @@ -1202,11 +1203,11 @@ TMaybe<TString> TProgram::GetStatistics(bool totalOnly, THashMap<TString, TStrin writer.OnInt64Scalar(rusage.MajorPageFaults); writer.OnEndMap(); - if (FallbackCounter) { + if (FallbackCounter_) { writer.OnKeyedItem("Fallback"); writer.OnBeginMap(); writer.OnKeyedItem("count"); - writer.OnInt64Scalar(FallbackCounter); + writer.OnInt64Scalar(FallbackCounter_); writer.OnEndMap(); } diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h index aead549e20c..cbbb758efd7 100644 --- a/ydb/library/yql/core/facade/yql_facade.h +++ b/ydb/library/yql/core/facade/yql_facade.h @@ -360,6 +360,7 @@ private: const TUdfIndex::TPtr UdfIndex_; const TUdfIndexPackageSet::TPtr UdfIndexPackageSet_; const TFileStoragePtr FileStorage_; + TUserDataTable SavedUserDataTable_; const TUserDataStorage::TPtr UserDataStorage_; const TGatewaysConfig* GatewaysConfig_; TString Filename_; @@ -398,7 +399,7 @@ private: TString ExtractedQueryParametersMetadataYson_; const bool EnableRangeComputeFor_; const IArrowResolver::TPtr ArrowResolver_; - i64 FallbackCounter = 0; + i64 FallbackCounter_ = 0; const EHiddenMode HiddenMode_ = EHiddenMode::Disable; THiddenQueryAborter AbortHidden_ = [](){}; }; diff --git a/ydb/library/yql/core/yql_type_annotation.h b/ydb/library/yql/core/yql_type_annotation.h index f1c633f424c..7e63c2780df 100644 --- a/ydb/library/yql/core/yql_type_annotation.h +++ b/ydb/library/yql/core/yql_type_annotation.h @@ -168,6 +168,12 @@ enum class EHiddenMode { Auto /* "auto" */ }; +enum class EFallbackPolicy { + Default /* "default" */, + Never /* "never" */, + Always /* "always" */ +}; + struct TUdfCachedInfo { const TTypeAnnotationNode* FunctionType = nullptr; const TTypeAnnotationNode* RunConfigType = nullptr; @@ -218,7 +224,7 @@ struct TTypeAnnotationContext: public TThrRefBase { bool DiscoveryMode = false; bool ForceDq = false; bool DqCaptured = false; // TODO: Add before/after recapture transformers - TString DqFallbackPolicy = ""; + EFallbackPolicy DqFallbackPolicy = EFallbackPolicy::Default; bool StrictTableProps = true; bool JsonQueryReturnsJsonDocument = false; bool YsonCastToString = true; diff --git a/ydb/library/yql/core/yql_user_data_storage.cpp b/ydb/library/yql/core/yql_user_data_storage.cpp index 1e98287876e..bbca125e5ab 100644 --- a/ydb/library/yql/core/yql_user_data_storage.cpp +++ b/ydb/library/yql/core/yql_user_data_storage.cpp @@ -40,8 +40,8 @@ namespace NYql { TUserDataStorage::TUserDataStorage(TFileStoragePtr fileStorage, TUserDataTable data, IUdfResolver::TPtr udfResolver, TUdfIndex::TPtr udfIndex) : FileStorage_(std::move(fileStorage)) , UserData_(std::move(data)) - , UdfResolver(std::move(udfResolver)) - , UdfIndex(std::move(udfIndex)) + , UdfResolver_(std::move(udfResolver)) + , UdfIndex_(std::move(udfIndex)) { } @@ -53,6 +53,10 @@ void TUserDataStorage::SetUrlPreprocessor(IUrlPreprocessing::TPtr urlPreprocessi UrlPreprocessing_ = std::move(urlPreprocessing); } +void TUserDataStorage::SetUserDataTable(TUserDataTable data) { + UserData_ = std::move(data); +} + void TUserDataStorage::AddUserDataBlock(const TStringBuf& name, const TUserDataBlock& block) { const auto key = ComposeUserDataKey(name); AddUserDataBlock(key, block); @@ -226,21 +230,21 @@ TUserDataBlock* TUserDataStorage::FreezeUdfNoThrow(const TUserDataKey& key, } block->CustomUdfPrefix = customUdfPrefix; - if (!ScannedUdfs.insert(key).second) { + if (!ScannedUdfs_.insert(key).second) { // already scanned return block; } - if (!UdfIndex) { + if (!UdfIndex_) { return block; } try { TString scope = "ScanUdfStrategy " + key.Alias(); YQL_PROFILE_SCOPE(DEBUG, scope.c_str()); - Y_ENSURE(UdfResolver); - Y_ENSURE(UdfIndex); - LoadRichMetadataToUdfIndex(*UdfResolver, *block, TUdfIndex::EOverrideMode::ReplaceWithNew, *UdfIndex); + Y_ENSURE(UdfResolver_); + Y_ENSURE(UdfIndex_); + LoadRichMetadataToUdfIndex(*UdfResolver_, *block, TUdfIndex::EOverrideMode::ReplaceWithNew, *UdfIndex_); } catch (const std::exception& e) { errorMessage = TStringBuilder() << "Failed to scan udf with key " << key << ", details: " << e.what(); return nullptr; diff --git a/ydb/library/yql/core/yql_user_data_storage.h b/ydb/library/yql/core/yql_user_data_storage.h index aed668e4c15..ef503e071c7 100644 --- a/ydb/library/yql/core/yql_user_data_storage.h +++ b/ydb/library/yql/core/yql_user_data_storage.h @@ -19,6 +19,7 @@ public: TUserDataStorage(TFileStoragePtr fileStorage, TUserDataTable data, IUdfResolver::TPtr udfResolver, TUdfIndex::TPtr udfIndex); void SetTokenResolver(TTokenResolver tokenResolver); void SetUrlPreprocessor(IUrlPreprocessing::TPtr urlPreprocessing); + void SetUserDataTable(TUserDataTable data); void AddUserDataBlock(const TStringBuf& name, const TUserDataBlock& block); void AddUserDataBlock(const TUserDataKey& key, const TUserDataBlock& block); @@ -71,12 +72,12 @@ private: private: THoldingFileStorage FileStorage_; TUserDataTable UserData_; - IUdfResolver::TPtr UdfResolver; - TUdfIndex::TPtr UdfIndex; + IUdfResolver::TPtr UdfResolver_; + TUdfIndex::TPtr UdfIndex_; TTokenResolver TokenResolver_; IUrlPreprocessing::TPtr UrlPreprocessing_; - THashSet<TUserDataKey, TUserDataKey::THash, TUserDataKey::TEqualTo> ScannedUdfs; + THashSet<TUserDataKey, TUserDataKey::THash, TUserDataKey::TEqualTo> ScannedUdfs_; std::function<void(const TUserDataBlock& block)> ScanUdfStrategy_; }; diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index 3832b4d9e5b..c89b57ce027 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -17,7 +17,7 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, MaxNetworkRetries); REGISTER_SETTING(*this, RetryBackoffMs); REGISTER_SETTING(*this, CollectCoreDumps); - REGISTER_SETTING(*this, FallbackPolicy); + REGISTER_SETTING(*this, FallbackPolicy).Parser([](const TString& v) { return FromString<EFallbackPolicy>(v); }); REGISTER_SETTING(*this, PullRequestTimeoutMs); REGISTER_SETTING(*this, PingTimeoutMs); REGISTER_SETTING(*this, UseSimpleYtReader); diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index ca1cef9987b..8861b654572 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -52,7 +52,7 @@ struct TDqSettings { NCommon::TConfSetting<int, false> MaxNetworkRetries; NCommon::TConfSetting<ui64, false> RetryBackoffMs; NCommon::TConfSetting<bool, false> CollectCoreDumps; - NCommon::TConfSetting<TString, false> FallbackPolicy; + NCommon::TConfSetting<EFallbackPolicy, false> FallbackPolicy; NCommon::TConfSetting<ui64, false> PullRequestTimeoutMs; NCommon::TConfSetting<ui64, false> PingTimeoutMs; NCommon::TConfSetting<bool, false> UseSimpleYtReader; diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index eec224331b5..1cf14115d03 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -767,7 +767,7 @@ private: enableLocalRun); if (lambdaResult.first.Level == TStatus::Error) { - if (State->Settings->FallbackPolicy.Get().GetOrElse("default") == "never" + if (State->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never || State->TypeCtx->ForceDq) { return SyncError(); @@ -844,7 +844,7 @@ private: state->Statistics[state->MetricId++] = res.Statistics; if (res.Fallback) { - if (state->Settings->FallbackPolicy.Get().GetOrElse("default") == "never" || state->TypeCtx->ForceDq) { + if (state->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never || state->TypeCtx->ForceDq) { auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)}; issues.AddIssues(res.Issues()); ctx.AssociativeIssues.emplace(input.Get(), std::move(issues)); @@ -1061,10 +1061,10 @@ private: auto stagesCount = executionPlanner->StagesCount(); if (!executionPlanner->CanFallback()) { - settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = "never"; + settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = EFallbackPolicy::Never; } - bool canFallback = (settings->FallbackPolicy.Get().GetOrElse("default") != "never" && !State->TypeCtx->ForceDq); + bool canFallback = (settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) != EFallbackPolicy::Never && !State->TypeCtx->ForceDq); if (stagesCount > maxTasksPerOperation && canFallback) { return SyncStatus(FallbackWithMessage( @@ -1242,7 +1242,7 @@ private: if (truncated && !state->TypeCtx->ForceDq && !enableFullResultWrite) { auto issue = TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "DQ cannot execute the query. Cause: " << "too big result " << trStr).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_INFO); - bool error = settings->FallbackPolicy.Get().GetOrElse("default") == "never"; + bool error = settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never; for (const auto& i : res.Issues()) { TIssuePtr subIssue = new TIssue(i); if (error && subIssue->Severity == TSeverityIds::S_WARNING) { @@ -1549,10 +1549,10 @@ private: auto stagesCount = executionPlanner->StagesCount(); if (!executionPlanner->CanFallback()) { - settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = "never"; + settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = EFallbackPolicy::Never; } - bool canFallback = (settings->FallbackPolicy.Get().GetOrElse("default") != "never" && !State->TypeCtx->ForceDq); + bool canFallback = (settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) != EFallbackPolicy::Never && !State->TypeCtx->ForceDq); if (stagesCount > maxTasksPerOperation && canFallback) { return FallbackWithMessage( @@ -1642,7 +1642,7 @@ private: executionPlanner.Destroy(); - bool neverFallback = settings->FallbackPolicy.Get().GetOrElse("default") == "never"; + bool neverFallback = settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never; future.Subscribe([publicIds, state = State, startTime, execState = ExecState, node = input.Get(), neverFallback, logCtx](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx); YQL_ENSURE(!completedFuture.HasException()); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index bd39e74fbbd..dfcaf9ef8a8 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -81,12 +81,12 @@ public: NYql::TIssues issues; auto operation = resp.operation(); - + for (auto& message_ : *operation.Mutableissues()) { TDeque<std::remove_reference_t<decltype(message_)>*> queue; queue.push_front(&message_); while (!queue.empty()) { - auto& message = *queue.front(); + auto& message = *queue.front(); queue.pop_front(); message.Setmessage(NBacktrace::Symbolize(message.Getmessage(), modulesMapping)); for (auto &subMsg : *message.Mutableissues()) { @@ -175,10 +175,10 @@ public: ) { auto backoff = TDuration::MilliSeconds(settings->RetryBackoffMs.Get().GetOrElse(1000)); auto promise = NewPromise<TResult>(); - auto fallbackPolicy = settings->FallbackPolicy.Get().GetOrElse("default"); - auto alwaysFallback = fallbackPolicy == "always"; + const auto fallbackPolicy = settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default); + const auto alwaysFallback = EFallbackPolicy::Always == fallbackPolicy; auto self = weak_from_this(); - auto callback = [self, promise, sessionId, alwaysFallback, modulesMapping](NGrpc::TGrpcStatus&& status, TResponse&& resp) mutable { + auto callback = [self, promise, sessionId, alwaysFallback, modulesMapping](NGrpc::TGrpcStatus&& status, TResponse&& resp) mutable { auto this_ = self.lock(); if (!this_) { YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp index a63d4fbb167..e60af247c41 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp @@ -104,7 +104,7 @@ public: } } - State_->TypeCtx->DqFallbackPolicy = State_->Settings->FallbackPolicy.Get().GetOrElse("default"); + State_->TypeCtx->DqFallbackPolicy = State_->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default); IGraphTransformer::TStatus status = NDq::DqWrapRead(input, output, ctx, *State_->TypeCtx, *State_->Settings); if (input != output) { |