aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.com>2024-11-12 22:07:14 +0300
committerMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-11-12 22:40:29 +0300
commit22ba6e9ad67a7b2bbfd1c634efd136112f3e78c3 (patch)
tree92ebdc7307496e53f65016369f9177dc24efd35e /yql/essentials
parent3c382c6d983d093ae974db36bf2c91616c317bc8 (diff)
downloadydb-22ba6e9ad67a7b2bbfd1c634efd136112f3e78c3.tar.gz
Apply GH commits
Apply GH: Stop wide combiner state from growing unlimited (#10997) Apply GH: Reconnect session has been supported (#9862) Apply GH: Handle unexpected future exception (#11091) commit_hash:c2597abeae3a153692a80cc13446423769765ae1
Diffstat (limited to 'yql/essentials')
-rw-r--r--yql/essentials/core/facade/yql_facade.cpp4
-rw-r--r--yql/essentials/core/yql_execution.cpp4
-rw-r--r--yql/essentials/core/yql_graph_transformer.cpp4
-rw-r--r--yql/essentials/core/yql_graph_transformer.h11
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp37
-rw-r--r--yql/essentials/providers/common/proto/gateways_config.proto3
6 files changed, 51 insertions, 12 deletions
diff --git a/yql/essentials/core/facade/yql_facade.cpp b/yql/essentials/core/facade/yql_facade.cpp
index 59cfeab848..94309e2148 100644
--- a/yql/essentials/core/facade/yql_facade.cpp
+++ b/yql/essentials/core/facade/yql_facade.cpp
@@ -80,13 +80,13 @@ TProgram::TStatus SyncExecution(
(program->*method)(std::forward<Params2>(params)...);
YQL_ENSURE(future.Initialized());
future.Wait();
- YQL_ENSURE(!future.HasException());
+ HandleFutureException(future);
TProgram::TStatus status = future.GetValue();
while (status == TProgram::TStatus::Async) {
auto continueFuture = program->ContinueAsync();
continueFuture.Wait();
- YQL_ENSURE(!continueFuture.HasException());
+ HandleFutureException(continueFuture);
status = continueFuture.GetValue();
}
diff --git a/yql/essentials/core/yql_execution.cpp b/yql/essentials/core/yql_execution.cpp
index 2fe21e1535..5b34866487 100644
--- a/yql/essentials/core/yql_execution.cpp
+++ b/yql/essentials/core/yql_execution.cpp
@@ -525,12 +525,12 @@ public:
if (DeterministicMode) {
future.Subscribe([state](const NThreading::TFuture<void>& future) {
- YQL_ENSURE(!future.HasException());
+ HandleFutureException(future);
ProcessFutureResultQueue(state);
});
} else {
future.Subscribe([state, node=node.Get(), dataProvider](const NThreading::TFuture<void>& future) {
- YQL_ENSURE(!future.HasException());
+ HandleFutureException(future);
TAutoPtr<TState::TItem> item = new TState::TItem;
item->Node = node; item->DataProvider = dataProvider;
diff --git a/yql/essentials/core/yql_graph_transformer.cpp b/yql/essentials/core/yql_graph_transformer.cpp
index 105e11846a..5248ee1597 100644
--- a/yql/essentials/core/yql_graph_transformer.cpp
+++ b/yql/essentials/core/yql_graph_transformer.cpp
@@ -249,7 +249,7 @@ IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNo
auto future = transformer.GetAsyncFuture(*root);
future.Wait();
- YQL_ENSURE(!future.HasException());
+ HandleFutureException(future);
status = transformer.ApplyAsyncChanges(root, newRoot, ctx);
if (newRoot) {
@@ -377,7 +377,7 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr
NThreading::TFuture<IGraphTransformer::TStatus> status = AsyncTransform(transformer, root, ctx, applyAsyncChanges);
status.Subscribe(
[asyncCallback](const NThreading::TFuture<IGraphTransformer::TStatus>& status) mutable -> void {
- YQL_ENSURE(!status.HasException());
+ HandleFutureException(status);
asyncCallback(status.GetValue());
});
}
diff --git a/yql/essentials/core/yql_graph_transformer.h b/yql/essentials/core/yql_graph_transformer.h
index 30ddc49ad5..d216761589 100644
--- a/yql/essentials/core/yql_graph_transformer.h
+++ b/yql/essentials/core/yql_graph_transformer.h
@@ -236,6 +236,17 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr
IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root,
TExprContext& ctx, bool applyAsyncChanges);
+template <typename T>
+void HandleFutureException(const NThreading::TFuture<T>& future) {
+ if (future.HasException()) {
+ try {
+ future.TryRethrow();
+ } catch (...) {
+ throw yexception() << "Unexpected future exception: " << CurrentExceptionMessage();
+ }
+ }
+}
+
class TSyncTransformerBase : public TGraphTransformerBase {
public:
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
index 7c2ac2dc18..75e5e0dd6f 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -240,8 +240,8 @@ private:
return KeyWidth + StateWidth;
}
public:
- TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal)
- : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), States(hash, equal, CountRowsOnPage) {
+ TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = false)
+ : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), States(hash, equal, CountRowsOnPage) {
CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod());
CurrentPosition = 0;
Tongue = CurrentPage->data();
@@ -276,11 +276,28 @@ public:
}
Throat = States.GetKey(itInsert) + KeyWidth;
if (isNew) {
- States.CheckGrow();
+ GrowStates();
}
return isNew;
}
+ void GrowStates() {
+ try {
+ States.CheckGrow();
+ } catch (TMemoryLimitExceededException) {
+ YQL_LOG(INFO) << "State failed to grow";
+ if (IsOutOfMemory || !AllowOutOfMemory) {
+ throw;
+ } else {
+ IsOutOfMemory = true;
+ }
+ }
+ }
+
+ bool CheckIsOutOfMemory() const {
+ return IsOutOfMemory;
+ }
+
template<bool SkipYields>
bool ReadMore() {
if constexpr (SkipYields) {
@@ -332,6 +349,8 @@ public:
private:
std::optional<TStorageIterator> ExtractIt;
const ui32 KeyWidth, StateWidth;
+ const bool AllowOutOfMemory;
+ bool IsOutOfMemory = false;
ui64 CurrentPosition = 0;
TRow* CurrentPage = nullptr;
TStorage Storage;
@@ -387,7 +406,7 @@ public:
const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx
)
: TBase(memInfo)
- , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal)
+ , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal, allowSpilling && ctx.SpillerFactory)
, UsedInputItemType(usedInputItemType)
, KeyAndStateType(keyAndStateType)
, KeyWidth(keyWidth)
@@ -452,6 +471,9 @@ public:
ETasteResult TasteIt() {
if (GetMode() == EOperatingMode::InMemory) {
bool isNew = InMemoryProcessingState.TasteIt();
+ if (InMemoryProcessingState.CheckIsOutOfMemory()) {
+ StateWantsToSpill = true;
+ }
Throat = InMemoryProcessingState.Throat;
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
@@ -650,7 +672,11 @@ private:
}
bool CheckMemoryAndSwitchToSpilling() {
- if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
+ if (!(AllowSpilling && Ctx.SpillerFactory)) {
+ return false;
+ }
+ if (StateWantsToSpill || IsSwitchToSpillingModeCondition()) {
+ StateWantsToSpill = false;
LogMemoryUsage();
SwitchMode(EOperatingMode::SplittingState);
@@ -841,6 +867,7 @@ public:
NUdf::TUnboxedValuePod* Throat = nullptr;
private:
+ bool StateWantsToSpill = false;
bool IsEverythingExtracted = false;
TState InMemoryProcessingState;
diff --git a/yql/essentials/providers/common/proto/gateways_config.proto b/yql/essentials/providers/common/proto/gateways_config.proto
index 8a0defbb4f..35654cb467 100644
--- a/yql/essentials/providers/common/proto/gateways_config.proto
+++ b/yql/essentials/providers/common/proto/gateways_config.proto
@@ -326,7 +326,8 @@ message TPqClusterConfig {
optional bool AddBearerToToken = 11; // whether to use prefix "Bearer " in token
optional string DatabaseId = 12;
repeated TAttr Settings = 100;
- optional bool SharedReading = 101;
+ optional bool SharedReading = 101;
+ optional string ReconnectPeriod = 102; // disabled by default, example of a parameter: 5m
}
message TPqGatewayConfig {