diff options
author | udovichenko-r <udovichenko-r@yandex-team.com> | 2024-11-12 22:07:14 +0300 |
---|---|---|
committer | Maxim Yurchuk <maxim-yurchuk@ydb.tech> | 2024-11-12 22:40:29 +0300 |
commit | 22ba6e9ad67a7b2bbfd1c634efd136112f3e78c3 (patch) | |
tree | 92ebdc7307496e53f65016369f9177dc24efd35e /yql/essentials | |
parent | 3c382c6d983d093ae974db36bf2c91616c317bc8 (diff) | |
download | ydb-22ba6e9ad67a7b2bbfd1c634efd136112f3e78c3.tar.gz |
Apply GH commits
Apply GH: Stop wide combiner state from growing unlimited (#10997)
Apply GH: Reconnect session has been supported (#9862)
Apply GH: Handle unexpected future exception (#11091)
commit_hash:c2597abeae3a153692a80cc13446423769765ae1
Diffstat (limited to 'yql/essentials')
-rw-r--r-- | yql/essentials/core/facade/yql_facade.cpp | 4 | ||||
-rw-r--r-- | yql/essentials/core/yql_execution.cpp | 4 | ||||
-rw-r--r-- | yql/essentials/core/yql_graph_transformer.cpp | 4 | ||||
-rw-r--r-- | yql/essentials/core/yql_graph_transformer.h | 11 | ||||
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp | 37 | ||||
-rw-r--r-- | yql/essentials/providers/common/proto/gateways_config.proto | 3 |
6 files changed, 51 insertions, 12 deletions
diff --git a/yql/essentials/core/facade/yql_facade.cpp b/yql/essentials/core/facade/yql_facade.cpp index 59cfeab848..94309e2148 100644 --- a/yql/essentials/core/facade/yql_facade.cpp +++ b/yql/essentials/core/facade/yql_facade.cpp @@ -80,13 +80,13 @@ TProgram::TStatus SyncExecution( (program->*method)(std::forward<Params2>(params)...); YQL_ENSURE(future.Initialized()); future.Wait(); - YQL_ENSURE(!future.HasException()); + HandleFutureException(future); TProgram::TStatus status = future.GetValue(); while (status == TProgram::TStatus::Async) { auto continueFuture = program->ContinueAsync(); continueFuture.Wait(); - YQL_ENSURE(!continueFuture.HasException()); + HandleFutureException(continueFuture); status = continueFuture.GetValue(); } diff --git a/yql/essentials/core/yql_execution.cpp b/yql/essentials/core/yql_execution.cpp index 2fe21e1535..5b34866487 100644 --- a/yql/essentials/core/yql_execution.cpp +++ b/yql/essentials/core/yql_execution.cpp @@ -525,12 +525,12 @@ public: if (DeterministicMode) { future.Subscribe([state](const NThreading::TFuture<void>& future) { - YQL_ENSURE(!future.HasException()); + HandleFutureException(future); ProcessFutureResultQueue(state); }); } else { future.Subscribe([state, node=node.Get(), dataProvider](const NThreading::TFuture<void>& future) { - YQL_ENSURE(!future.HasException()); + HandleFutureException(future); TAutoPtr<TState::TItem> item = new TState::TItem; item->Node = node; item->DataProvider = dataProvider; diff --git a/yql/essentials/core/yql_graph_transformer.cpp b/yql/essentials/core/yql_graph_transformer.cpp index 105e11846a..5248ee1597 100644 --- a/yql/essentials/core/yql_graph_transformer.cpp +++ b/yql/essentials/core/yql_graph_transformer.cpp @@ -249,7 +249,7 @@ IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNo auto future = transformer.GetAsyncFuture(*root); future.Wait(); - YQL_ENSURE(!future.HasException()); + HandleFutureException(future); status = transformer.ApplyAsyncChanges(root, newRoot, ctx); if (newRoot) { @@ -377,7 +377,7 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr NThreading::TFuture<IGraphTransformer::TStatus> status = AsyncTransform(transformer, root, ctx, applyAsyncChanges); status.Subscribe( [asyncCallback](const NThreading::TFuture<IGraphTransformer::TStatus>& status) mutable -> void { - YQL_ENSURE(!status.HasException()); + HandleFutureException(status); asyncCallback(status.GetValue()); }); } diff --git a/yql/essentials/core/yql_graph_transformer.h b/yql/essentials/core/yql_graph_transformer.h index 30ddc49ad5..d216761589 100644 --- a/yql/essentials/core/yql_graph_transformer.h +++ b/yql/essentials/core/yql_graph_transformer.h @@ -236,6 +236,17 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges); +template <typename T> +void HandleFutureException(const NThreading::TFuture<T>& future) { + if (future.HasException()) { + try { + future.TryRethrow(); + } catch (...) { + throw yexception() << "Unexpected future exception: " << CurrentExceptionMessage(); + } + } +} + class TSyncTransformerBase : public TGraphTransformerBase { public: NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final { diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp index 7c2ac2dc18..75e5e0dd6f 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp @@ -240,8 +240,8 @@ private: return KeyWidth + StateWidth; } public: - TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal) - : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), States(hash, equal, CountRowsOnPage) { + TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = false) + : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), States(hash, equal, CountRowsOnPage) { CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod()); CurrentPosition = 0; Tongue = CurrentPage->data(); @@ -276,11 +276,28 @@ public: } Throat = States.GetKey(itInsert) + KeyWidth; if (isNew) { - States.CheckGrow(); + GrowStates(); } return isNew; } + void GrowStates() { + try { + States.CheckGrow(); + } catch (TMemoryLimitExceededException) { + YQL_LOG(INFO) << "State failed to grow"; + if (IsOutOfMemory || !AllowOutOfMemory) { + throw; + } else { + IsOutOfMemory = true; + } + } + } + + bool CheckIsOutOfMemory() const { + return IsOutOfMemory; + } + template<bool SkipYields> bool ReadMore() { if constexpr (SkipYields) { @@ -332,6 +349,8 @@ public: private: std::optional<TStorageIterator> ExtractIt; const ui32 KeyWidth, StateWidth; + const bool AllowOutOfMemory; + bool IsOutOfMemory = false; ui64 CurrentPosition = 0; TRow* CurrentPage = nullptr; TStorage Storage; @@ -387,7 +406,7 @@ public: const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx ) : TBase(memInfo) - , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal) + , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal, allowSpilling && ctx.SpillerFactory) , UsedInputItemType(usedInputItemType) , KeyAndStateType(keyAndStateType) , KeyWidth(keyWidth) @@ -452,6 +471,9 @@ public: ETasteResult TasteIt() { if (GetMode() == EOperatingMode::InMemory) { bool isNew = InMemoryProcessingState.TasteIt(); + if (InMemoryProcessingState.CheckIsOutOfMemory()) { + StateWantsToSpill = true; + } Throat = InMemoryProcessingState.Throat; return isNew ? ETasteResult::Init : ETasteResult::Update; } @@ -650,7 +672,11 @@ private: } bool CheckMemoryAndSwitchToSpilling() { - if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) { + if (!(AllowSpilling && Ctx.SpillerFactory)) { + return false; + } + if (StateWantsToSpill || IsSwitchToSpillingModeCondition()) { + StateWantsToSpill = false; LogMemoryUsage(); SwitchMode(EOperatingMode::SplittingState); @@ -841,6 +867,7 @@ public: NUdf::TUnboxedValuePod* Throat = nullptr; private: + bool StateWantsToSpill = false; bool IsEverythingExtracted = false; TState InMemoryProcessingState; diff --git a/yql/essentials/providers/common/proto/gateways_config.proto b/yql/essentials/providers/common/proto/gateways_config.proto index 8a0defbb4f..35654cb467 100644 --- a/yql/essentials/providers/common/proto/gateways_config.proto +++ b/yql/essentials/providers/common/proto/gateways_config.proto @@ -326,7 +326,8 @@ message TPqClusterConfig { optional bool AddBearerToToken = 11; // whether to use prefix "Bearer " in token optional string DatabaseId = 12; repeated TAttr Settings = 100; - optional bool SharedReading = 101; + optional bool SharedReading = 101; + optional string ReconnectPeriod = 102; // disabled by default, example of a parameter: 5m } message TPqGatewayConfig { |