summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <[email protected]>2022-08-01 17:46:41 +0300
committerva-kuznecov <[email protected]>2022-08-01 17:46:41 +0300
commit636f550d7e66cc88bfbf06b8218bed4d08a775f3 (patch)
treeaee09bef284fb2e228c85f021182d595539563f8
parent23b1156d8df573e6efc5c30099b7411bca59e1ad (diff)
Fix data race in WideFilter, WideMap, WideCondense, WideChopper, WideCombine
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp9
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp64
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp40
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp30
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp31
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp68
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp46
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node.cpp1
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node.h8
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp1
10 files changed, 192 insertions, 106 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp
index 62a7f10f247..78451056a88 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp
@@ -121,7 +121,10 @@ class TWideSkipWrapper : public TStatefulWideFlowCodegeneratorNode<TWideSkipWrap
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper>;
public:
TWideSkipWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count, ui32 size)
- : TBaseComputation(mutables, flow, EValueRepresentation::Embedded), Flow(flow), Count(count), Stubs(size, nullptr)
+ : TBaseComputation(mutables, flow, EValueRepresentation::Embedded)
+ , Flow(flow)
+ , Count(count)
+ , StubsIndex(mutables.IncrementWideFieldsIndex(size))
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -130,7 +133,7 @@ public:
}
if (auto count = state.Get<ui64>()) {
- do if (const auto result = Flow->FetchValues(ctx, Stubs.data()); EFetchResult::One != result) {
+ do if (const auto result = Flow->FetchValues(ctx, ctx.WideFields.data() + StubsIndex); EFetchResult::One != result) {
state = NUdf::TUnboxedValuePod(count);
return result;
} while (--count);
@@ -221,7 +224,7 @@ private:
IComputationWideFlowNode* const Flow;
IComputationNode* const Count;
- mutable std::vector<NUdf::TUnboxedValue*> Stubs;
+ const ui32 StubsIndex;
};
class TSkipStreamWrapper : public TMutableComputationNode<TSkipStreamWrapper> {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp
index 00ef832c574..d68fc71ac10 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp
@@ -16,16 +16,24 @@ class TWideChain1MapWrapper : public TStatefulWideFlowCodegeneratorNode<TWideCha
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideChain1MapWrapper>;
public:
TWideChain1MapWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow,
- TComputationExternalNodePtrVector&& inputs,
- TComputationNodePtrVector&& initItems,
- TComputationExternalNodePtrVector&& outputs,
- TComputationNodePtrVector&& updateItems):
- TBaseComputation(mutables, flow, EValueRepresentation::Embedded), Flow(flow),
- Inputs(std::move(inputs)), InitItems(std::move(initItems)), Outputs(std::move(outputs)), UpdateItems(std::move(updateItems)),
- InputsOnInit(GetPasstroughtMap(Inputs, InitItems)), InputsOnUpdate(GetPasstroughtMap(Inputs, UpdateItems)),
- InitOnInputs(GetPasstroughtMap(InitItems, Inputs)), UpdateOnInputs(GetPasstroughtMap(UpdateItems, Inputs)),
- OutputsOnUpdate(GetPasstroughtMap(Outputs, UpdateItems)), UpdateOnOutputs(GetPasstroughtMap(UpdateItems, Outputs)),
- Fields(Inputs.size(), nullptr), TempState(Outputs.size())
+ TComputationExternalNodePtrVector&& inputs,
+ TComputationNodePtrVector&& initItems,
+ TComputationExternalNodePtrVector&& outputs,
+ TComputationNodePtrVector&& updateItems)
+ : TBaseComputation(mutables, flow, EValueRepresentation::Embedded)
+ , Flow(flow)
+ , Inputs(std::move(inputs))
+ , InitItems(std::move(initItems))
+ , Outputs(std::move(outputs))
+ , UpdateItems(std::move(updateItems))
+ , InputsOnInit(GetPasstroughtMap(Inputs, InitItems))
+ , InputsOnUpdate(GetPasstroughtMap(Inputs, UpdateItems))
+ , InitOnInputs(GetPasstroughtMap(InitItems, Inputs))
+ , UpdateOnInputs(GetPasstroughtMap(UpdateItems, Inputs))
+ , OutputsOnUpdate(GetPasstroughtMap(Outputs, UpdateItems))
+ , UpdateOnOutputs(GetPasstroughtMap(UpdateItems, Outputs))
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Inputs.size()))
+ , TempStateIndex(std::exchange(mutables.CurValueIndex, mutables.CurValueIndex + Outputs.size()))
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -135,24 +143,27 @@ public:
#endif
private:
EFetchResult CalculateFirst(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- for (auto i = 0U; i < Fields.size(); ++i) {
+ Y_VERIFY_DEBUG(WideFieldsIndex + Inputs.size() <= ctx.WideFields.size());
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
+
+ for (auto i = 0U; i < Inputs.size(); ++i) {
if (Inputs[i]->GetDependencesCount() > 0U) {
- Fields[i] = &Inputs[i]->RefValue(ctx);
+ fields[i] = &Inputs[i]->RefValue(ctx);
continue;
} else if (const auto& map = InputsOnInit[i]) {
if (const auto& to = UpdateOnOutputs[*map]) {
- Fields[i] = &Outputs[*to]->RefValue(ctx);
+ fields[i] = &Outputs[*to]->RefValue(ctx);
continue;
} else if (const auto out = output[*map]) {
- Fields[i] = out;
+ fields[i] = out;
continue;
}
}
- Fields[i] = nullptr;
+ fields[i] = nullptr;
}
- if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result)
+ if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result)
return result;
for (auto i = 0U; i < Outputs.size(); ++i) {
@@ -185,27 +196,30 @@ private:
}
EFetchResult CalculateOther(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- for (auto i = 0U; i < Fields.size(); ++i) {
+ Y_VERIFY_DEBUG(WideFieldsIndex + Inputs.size() <= ctx.WideFields.size());
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
+
+ for (auto i = 0U; i < Inputs.size(); ++i) {
if (Inputs[i]->GetDependencesCount() > 0U) {
- Fields[i] = &Inputs[i]->RefValue(ctx);
+ fields[i] = &Inputs[i]->RefValue(ctx);
continue;
} else if (const auto& map = InputsOnUpdate[i]) {
if (const auto out = output[*map]) {
- Fields[i] = out;
+ fields[i] = out;
continue;
}
}
- Fields[i] = nullptr;
+ fields[i] = nullptr;
}
- if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result)
+ if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result)
return result;
for (auto i = 0U; i < Outputs.size(); ++i) {
if (Outputs[i]->GetDependencesCount() > 0U || OutputsOnUpdate[i]) {
if (const auto& map = UpdateOnInputs[i]; !map || Inputs[*map]->GetDependencesCount() > 0U) {
- TempState[i] = UpdateItems[i]->GetValue(ctx);
+ ctx.MutableValues[TempStateIndex + i] = UpdateItems[i]->GetValue(ctx);
}
}
}
@@ -213,7 +227,7 @@ private:
for (auto i = 0U; i < Outputs.size(); ++i) {
if (Outputs[i]->GetDependencesCount() > 0U || OutputsOnUpdate[i]) {
if (const auto& map = UpdateOnInputs[i]; !map || Inputs[*map]->GetDependencesCount() > 0U) {
- Outputs[i]->SetValue(ctx, std::move(TempState[i]));
+ Outputs[i]->SetValue(ctx, std::move(ctx.MutableValues[TempStateIndex + i]));
}
}
}
@@ -257,8 +271,8 @@ private:
const TPasstroughtMap InputsOnInit, InputsOnUpdate, InitOnInputs, UpdateOnInputs, OutputsOnUpdate, UpdateOnOutputs;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
- mutable std::vector<NUdf::TUnboxedValue> TempState;
+ const ui32 WideFieldsIndex;
+ const ui32 TempStateIndex;
};
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp
index adb3a65dc47..183174128fd 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp
@@ -35,16 +35,19 @@ public:
, ItemsOnKeys(GetPasstroughtMap(ItemArgs, Keys))
, KeysOnItems(GetPasstroughtMap(Keys, ItemArgs))
, SwitchItem(IsPasstrought(Chop, ItemArgs))
- , Fields(ItemArgs.size(), nullptr)
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(ItemArgs.size()))
{
Input->SetFetcher(std::bind(&TWideChopperWrapper::DoCalculateInput, this, std::bind(&TWideChopperWrapper::RefState, this, _1), _1, _2));
}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ Y_VERIFY_DEBUG(WideFieldsIndex + ItemArgs.size() <= ctx.WideFields.size());
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
+
if (state.IsInvalid()) {
- for (auto i = 0U; i < Fields.size(); ++i)
- Fields[i] = &ItemArgs[i]->RefValue(ctx);
- if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result)
+ for (auto i = 0U; i < ItemArgs.size(); ++i)
+ fields[i] = &ItemArgs[i]->RefValue(ctx);
+ if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result)
return result;
for (ui32 i = 0U; i < Keys.size(); ++i)
@@ -53,9 +56,9 @@ public:
state = NUdf::TUnboxedValuePod(ui64(EState::Next));
} else if (EState::Skip == EState(state.Get<ui64>())) {
do {
- for (auto i = 0U; i < Fields.size(); ++i)
- Fields[i] = &ItemArgs[i]->RefValue(ctx);
- if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result)
+ for (auto i = 0U; i < ItemArgs.size(); ++i)
+ fields[i] = &ItemArgs[i]->RefValue(ctx);
+ if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result)
return result;
} while (!Chop->GetValue(ctx).Get<bool>());
@@ -73,9 +76,9 @@ public:
case EState::Work:
case EState::Next:
do {
- for (auto i = 0U; i < Fields.size(); ++i)
- Fields[i] = &ItemArgs[i]->RefValue(ctx);
- switch (const auto next = Flow->FetchValues(ctx, Fields.data())) {
+ for (auto i = 0U; i < ItemArgs.size(); ++i)
+ fields[i] = &ItemArgs[i]->RefValue(ctx);
+ switch (const auto next = Flow->FetchValues(ctx, fields)) {
case EFetchResult::Yield:
state = NUdf::TUnboxedValuePod(ui64(EState::Skip));
case EFetchResult::Finish:
@@ -100,21 +103,24 @@ private:
EFetchResult DoCalculateInput(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (EState::Next == EState(state.Get<ui64>())) {
state = NUdf::TUnboxedValuePod(ui64(EState::Work));
- for (auto i = 0U; i < Fields.size(); ++i)
+ for (auto i = 0U; i < ItemArgs.size(); ++i)
if (const auto out = output[i])
*out = ItemArgs[i]->GetValue(ctx);
return EFetchResult::One;
}
- for (auto i = 0U; i < Fields.size(); ++i)
- Fields[i] = &ItemArgs[i]->RefValue(ctx);
+ Y_VERIFY_DEBUG(WideFieldsIndex + ItemArgs.size() <= ctx.WideFields.size());
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
+
+ for (auto i = 0U; i < ItemArgs.size(); ++i)
+ fields[i] = &ItemArgs[i]->RefValue(ctx);
- if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result)
+ if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result)
return result;
- for (auto i = 0U; i < Fields.size(); ++i)
+ for (auto i = 0U; i < ItemArgs.size(); ++i)
if (const auto out = output[i])
- *out = *Fields[i];
+ *out = *fields[i];
if (Chop->GetValue(ctx).Get<bool>()) {
state = NUdf::TUnboxedValuePod(ui64(EState::Chop));
@@ -329,7 +335,7 @@ private:
const std::optional<size_t> SwitchItem;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
+ const ui32 WideFieldsIndex;
};
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
index 778a594b958..c5346af0a0d 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -216,7 +216,7 @@ public:
, Nodes(std::move(nodes))
, KeyTypes(std::move(keyTypes))
, MemLimit(memLimit)
- , Fields(Nodes.ItemNodes.size(), nullptr)
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Nodes.ItemNodes.size()))
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -237,17 +237,20 @@ public:
const auto initUsage = MemLimit ? ctx.HolderFactory.GetMemoryUsed() : 0ULL;
+ Y_VERIFY_DEBUG(WideFieldsIndex + Nodes.ItemNodes.size() <= ctx.WideFields.size());
+ auto **fields = ctx.WideFields.data() + WideFieldsIndex;
+
do {
- for (auto i = 0U; i < Fields.size(); ++i)
+ for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
if (Nodes.ItemNodes[i]->GetDependencesCount() > 0U || Nodes.PasstroughtItems[i])
- Fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx);
+ fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx);
- ptr->InputStatus = Flow->FetchValues(ctx, Fields.data());
+ ptr->InputStatus = Flow->FetchValues(ctx, fields);
if (EFetchResult::One != ptr->InputStatus) {
break;
}
- Nodes.ExtractKey(ctx, Fields.data(), static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
+ Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
} while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage));
@@ -567,7 +570,7 @@ private:
const TKeyTypes KeyTypes;
const ui64 MemLimit;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
+ const ui32 WideFieldsIndex;
#ifndef MKQL_DISABLE_CODEGEN
TEqualsPtr Equals = nullptr;
@@ -611,7 +614,7 @@ public:
, Flow(flow)
, Nodes(std::move(nodes))
, KeyTypes(std::move(keyTypes))
- , Fields(Nodes.ItemNodes.size(), nullptr)
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Nodes.ItemNodes.size()))
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -620,14 +623,17 @@ public:
}
if (const auto ptr = static_cast<TState*>(state.AsBoxed().Get())) {
+ Y_VERIFY_DEBUG(WideFieldsIndex + Nodes.ItemNodes.size() <= ctx.WideFields.size());
+ auto **fields = ctx.WideFields.data() + WideFieldsIndex;
+
while (EFetchResult::Finish != ptr->InputStatus) {
- for (auto i = 0U; i < Fields.size(); ++i)
+ for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
if (Nodes.ItemNodes[i]->GetDependencesCount() > 0U || Nodes.PasstroughtItems[i])
- Fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx);
+ fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx);
- switch (ptr->InputStatus = Flow->FetchValues(ctx, Fields.data())) {
+ switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) {
case EFetchResult::One:
- Nodes.ExtractKey(ctx, Fields.data(), static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
+ Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
continue;
case EFetchResult::Yield:
@@ -912,7 +918,7 @@ private:
const TCombinerNodes Nodes;
const TKeyTypes KeyTypes;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
+ const ui32 WideFieldsIndex;
#ifndef MKQL_DISABLE_CODEGEN
TEqualsPtr Equals = nullptr;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
index 3f3a2832025..e466075688d 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
@@ -21,10 +21,16 @@ public:
TComputationExternalNodePtrVector&& items, TComputationNodePtrVector&& initState,
TComputationExternalNodePtrVector&& state, IComputationNode* outSwitch, TComputationNodePtrVector&& updateState)
: TBaseComputation(mutables, flow, EValueRepresentation::Embedded), Flow(flow)
- , Items(std::move(items)), InitState(std::move(initState))
- , State(std::move(state)), Switch(outSwitch), UpdateState(std::move(updateState))
- , Fields(Items.size(), nullptr), TempState(State.size()), SwitchItem(IsPasstrought(Switch, Items))
- , ItemsOnInit(GetPasstroughtMap(Items, InitState)), ItemsOnUpdate(GetPasstroughtMap(Items, UpdateState))
+ , Items(std::move(items))
+ , InitState(std::move(initState))
+ , State(std::move(state))
+ , Switch(outSwitch)
+ , UpdateState(std::move(updateState))
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
+ , TempStateIndex(std::exchange(mutables.CurValueIndex, mutables.CurValueIndex + State.size()))
+ , SwitchItem(IsPasstrought(Switch, Items))
+ , ItemsOnInit(GetPasstroughtMap(Items, InitState))
+ , ItemsOnUpdate(GetPasstroughtMap(Items, UpdateState))
, UpdateOnItems(GetPasstroughtMap(UpdateState, Items))
{}
@@ -41,12 +47,15 @@ public:
State[i]->SetValue(ctx, InitState[i]->GetValue(ctx));
}
+ Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size());
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
+
while (true) {
- for (auto i = 0U; i < Fields.size(); ++i)
+ for (auto i = 0U; i < Items.size(); ++i)
if (Items[i]->GetDependencesCount() > 0U || ItemsOnInit[i] || ItemsOnUpdate[i] || SwitchItem && i == *SwitchItem)
- Fields[i] = &Items[i]->RefValue(ctx);
+ fields[i] = &Items[i]->RefValue(ctx);
- switch (Flow->FetchValues(ctx, Fields.data())) {
+ switch (Flow->FetchValues(ctx, fields)) {
case EFetchResult::Yield:
return EFetchResult::Yield;
case EFetchResult::Finish:
@@ -74,9 +83,9 @@ public:
}
for (ui32 i = 0U; i < State.size(); ++i)
- TempState[i] = UpdateState[i]->GetValue(ctx);
+ ctx.MutableValues[TempStateIndex + i] = UpdateState[i]->GetValue(ctx);
for (ui32 i = 0U; i < State.size(); ++i)
- State[i]->SetValue(ctx, std::move(TempState[i]));
+ State[i]->SetValue(ctx, std::move(ctx.MutableValues[TempStateIndex + i]));
}
continue;
}
@@ -236,8 +245,8 @@ private:
const TPasstroughtMap ItemsOnInit, ItemsOnUpdate, UpdateOnItems;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
- mutable std::vector<NUdf::TUnboxedValue> TempState;
+ ui32 WideFieldsIndex;
+ ui32 TempStateIndex;
};
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp
index 7a6ad7967af..04823e0b261 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp
@@ -14,23 +14,37 @@ namespace {
class TBaseWideFilterWrapper {
protected:
- TBaseWideFilterWrapper(IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate)
- : Flow(flow), Items(std::move(items)), Predicate(predicate), FilterByField(GetPasstroughtMap({Predicate}, Items).front()), Fields(Items.size(), nullptr)
+ TBaseWideFilterWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate)
+ : Flow(flow)
+ , Items(std::move(items))
+ , Predicate(predicate)
+ , FilterByField(GetPasstroughtMap({Predicate}, Items).front())
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
{}
+ NYql::NUdf::TUnboxedValue** GetFields(TComputationContext& ctx) const {
+ Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size());
+ return ctx.WideFields.data() + WideFieldsIndex;
+ }
+
void PrepareArguments(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- for (auto i = 0U; i < Fields.size(); ++i)
+ auto** fields = GetFields(ctx);
+
+ for (auto i = 0U; i < Items.size(); ++i) {
if (Predicate == Items[i] || Items[i]->GetDependencesCount() > 0U)
- Fields[i] = &Items[i]->RefValue(ctx);
+ fields[i] = &Items[i]->RefValue(ctx);
else
- Fields[i] = output[i];
+ fields[i] = output[i];
+ }
}
void FillOutputs(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- for (auto i = 0U; i < Fields.size(); ++i)
+ auto** fields = GetFields(ctx);
+
+ for (auto i = 0U; i < Items.size(); ++i)
if (const auto out = output[i])
if (Predicate == Items[i] || Items[i]->GetDependencesCount() > 0U)
- *out = *Fields[i];
+ *out = *fields[i];
}
#ifndef MKQL_DISABLE_CODEGEN
template<bool ReplaceOriginalGetter = true>
@@ -57,21 +71,24 @@ protected:
std::optional<size_t> FilterByField;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
+ const ui32 WideFieldsIndex;
};
class TWideFilterWrapper : public TStatelessWideFlowCodegeneratorNode<TWideFilterWrapper>, public TBaseWideFilterWrapper {
using TBaseComputation = TStatelessWideFlowCodegeneratorNode<TWideFilterWrapper>;
public:
- TWideFilterWrapper(IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate)
- : TBaseComputation(flow), TBaseWideFilterWrapper(flow, std::move(items), predicate)
+ TWideFilterWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate)
+ : TBaseComputation(flow)
+ , TBaseWideFilterWrapper(mutables, flow, std::move(items), predicate)
{}
EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ auto** fields = GetFields(ctx);
+
while (true) {
PrepareArguments(ctx, output);
- if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result)
+ if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result)
return result;
if (Predicate->GetValue(ctx).Get<bool>()) {
@@ -121,8 +138,11 @@ private:
class TWideFilterWithLimitWrapper : public TStatefulWideFlowCodegeneratorNode<TWideFilterWithLimitWrapper>, public TBaseWideFilterWrapper {
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFilterWithLimitWrapper>;
public:
- TWideFilterWithLimitWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* limit, TComputationExternalNodePtrVector&& items, IComputationNode* predicate)
- : TBaseComputation(mutables, flow, EValueRepresentation::Embedded), TBaseWideFilterWrapper(flow, std::move(items), predicate), Limit(limit)
+ TWideFilterWithLimitWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* limit,
+ TComputationExternalNodePtrVector&& items, IComputationNode* predicate)
+ : TBaseComputation(mutables, flow, EValueRepresentation::Embedded)
+ , TBaseWideFilterWrapper(mutables, flow, std::move(items), predicate)
+ , Limit(limit)
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -132,10 +152,11 @@ public:
return EFetchResult::Finish;
}
+ auto **fields = GetFields(ctx);
while (true) {
PrepareArguments(ctx, output);
- if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result)
+ if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result)
return result;
if (Predicate->GetValue(ctx).Get<bool>()) {
@@ -221,8 +242,10 @@ template<bool Inclusive>
class TWideTakeWhileWrapper : public TStatefulWideFlowCodegeneratorNode<TWideTakeWhileWrapper<Inclusive>>, public TBaseWideFilterWrapper {
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideTakeWhileWrapper<Inclusive>>;
public:
- TWideTakeWhileWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate)
- : TBaseComputation(mutables, flow, EValueRepresentation::Embedded), TBaseWideFilterWrapper(flow, std::move(items), predicate)
+ TWideTakeWhileWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items,
+ IComputationNode* predicate)
+ : TBaseComputation(mutables, flow, EValueRepresentation::Embedded)
+ , TBaseWideFilterWrapper(mutables, flow, std::move(items), predicate)
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -232,7 +255,9 @@ public:
PrepareArguments(ctx, output);
- if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result)
+ auto **fields = GetFields(ctx);
+
+ if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result)
return result;
const bool predicate = Predicate->GetValue(ctx).Get<bool>();
@@ -302,7 +327,8 @@ class TWideSkipWhileWrapper : public TStatefulWideFlowCodegeneratorNode<TWideSki
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideSkipWhileWrapper<Inclusive>>;
public:
TWideSkipWhileWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate)
- : TBaseComputation(mutables, flow, EValueRepresentation::Embedded), TBaseWideFilterWrapper(flow, std::move(items), predicate)
+ : TBaseComputation(mutables, flow, EValueRepresentation::Embedded)
+ , TBaseWideFilterWrapper(mutables, flow, std::move(items), predicate)
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -310,9 +336,11 @@ public:
return Flow->FetchValues(ctx, output);
}
+ auto **fields = GetFields(ctx);
+
do {
PrepareArguments(ctx, output);
- if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result)
+ if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result)
return result;
} while (Predicate->GetValue(ctx).Get<bool>());
@@ -409,7 +437,7 @@ IComputationNode* WrapWideFilter(TCallable& callable, const TComputationNodeFact
if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
if (const auto last = callable.GetInputsCount() - 1U; last == width + 1U) {
- return new TWideFilterWrapper(wide, std::move(args), predicate);
+ return new TWideFilterWrapper(ctx.Mutables, wide, std::move(args), predicate);
} else {
const auto limit = LocateNode(ctx.NodeLocator, callable, last);
return new TWideFilterWithLimitWrapper(ctx.Mutables, wide, limit, std::move(args), predicate);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp
index a4e3e3ad04f..47e7b1a5c43 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp
@@ -83,20 +83,28 @@ private:
class TWideMapWrapper : public TStatelessWideFlowCodegeneratorNode<TWideMapWrapper> {
using TBaseComputation = TStatelessWideFlowCodegeneratorNode<TWideMapWrapper>;
public:
- TWideMapWrapper(IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, TComputationNodePtrVector&& newItems)
- : TBaseComputation(flow), Flow(flow), Items(std::move(items)), NewItems(std::move(newItems)), Fields(Items.size(), nullptr)
- , PasstroughtMap(GetPasstroughtMap(Items, NewItems)), ReversePasstroughtMap(GetPasstroughtMap(NewItems, Items))
+ TWideMapWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, TComputationNodePtrVector&& newItems)
+ : TBaseComputation(flow)
+ , Flow(flow)
+ , Items(std::move(items))
+ , NewItems(std::move(newItems))
+ , PasstroughtMap(GetPasstroughtMap(Items, NewItems))
+ , ReversePasstroughtMap(GetPasstroughtMap(NewItems, Items))
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
{}
EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- for (auto i = 0U; i < Fields.size(); ++i)
+ Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size());
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
+
+ for (auto i = 0U; i < Items.size(); ++i)
if (Items[i]->GetDependencesCount() > 0U)
- Fields[i] = &Items[i]->RefValue(ctx);
+ fields[i] = &Items[i]->RefValue(ctx);
else if (const auto& map = PasstroughtMap[i])
if (const auto out = output[*map])
- Fields[i] = out;
+ fields[i] = out;
- if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result)
+ if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result)
return result;
for (auto i = 0U; i < NewItems.size(); ++i) {
@@ -164,27 +172,31 @@ private:
const TComputationNodePtrVector NewItems;
const TPasstroughtMap PasstroughtMap, ReversePasstroughtMap;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
+ const ui32 WideFieldsIndex;
};
class TNarrowMapWrapper : public TStatelessFlowCodegeneratorNode<TNarrowMapWrapper> {
using TBaseComputation = TStatelessFlowCodegeneratorNode<TNarrowMapWrapper>;
public:
- TNarrowMapWrapper(EValueRepresentation kind, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* newItem)
+ TNarrowMapWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* newItem)
: TBaseComputation(flow, kind)
, Flow(flow)
, Items(std::move(items))
, NewItem(newItem)
, PasstroughItem(GetPasstroughtMap({NewItem}, Items).front())
- , Fields(Items.size(), nullptr)
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
{}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- for (auto i = 0U; i < Fields.size(); ++i)
+ Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size());
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
+
+ for (auto i = 0U; i < Items.size(); ++i) {
if (NewItem == Items[i] || Items[i]->GetDependencesCount() > 0U)
- Fields[i] = &Items[i]->RefValue(ctx);
+ fields[i] = &Items[i]->RefValue(ctx);
+ }
- switch (const auto result = Flow->FetchValues(ctx, Fields.data())) {
+ switch (const auto result = Flow->FetchValues(ctx, fields)) {
case EFetchResult::Finish:
return NUdf::TUnboxedValuePod::MakeFinish();
case EFetchResult::Yield:
@@ -243,8 +255,7 @@ private:
IComputationNode* const NewItem;
const std::optional<size_t> PasstroughItem;
-
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
+ const ui32 WideFieldsIndex;
};
}
@@ -278,7 +289,7 @@ IComputationNode* WrapWideMap(TCallable& callable, const TComputationNodeFactory
index = 0U;
std::generate(args.begin(), args.end(), [&](){ return LocateExternalNode(ctx.NodeLocator, callable, ++index); });
- return new TWideMapWrapper(wide, std::move(args), std::move(newItems));
+ return new TWideMapWrapper(ctx.Mutables, wide, std::move(args), std::move(newItems));
}
THROW yexception() << "Expected wide flow.";
@@ -295,8 +306,7 @@ IComputationNode* WrapNarrowMap(TCallable& callable, const TComputationNodeFacto
TComputationExternalNodePtrVector args(width, nullptr);
ui32 index = 0U;
std::generate(args.begin(), args.end(), [&](){ return LocateExternalNode(ctx.NodeLocator, callable, ++index); });
-
- return new TNarrowMapWrapper(GetValueRepresentation(callable.GetType()->GetReturnType()), wide, std::move(args), newItem);
+ return new TNarrowMapWrapper(ctx.Mutables, GetValueRepresentation(callable.GetType()->GetReturnType()), wide, std::move(args), newItem);
}
THROW yexception() << "Expected wide flow.";
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
index aa2e27f0282..8302b43e68c 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
@@ -35,6 +35,7 @@ TComputationContext::TComputationContext(const THolderFactory& holderFactory,
, RandomProvider(opts.RandomProvider)
, TimeProvider(opts.TimeProvider)
, ArrowMemoryPool(arrowMemoryPool)
+ , WideFields(mutables.CurWideFieldsIndex, nullptr)
{
std::fill_n(MutableValues.get(), mutables.CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid()));
}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h
index c93a13c3512..f1a77cca7e0 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h
@@ -64,6 +64,13 @@ struct TComputationOptsFull: public TComputationOpts {
struct TComputationMutables {
ui32 CurValueIndex = 0U;
std::vector<ui32> SerializableValues; // Indices of values that need to be saved in IComputationGraph::SaveGraphState() and restored in IComputationGraph::LoadGraphState().
+ ui32 CurWideFieldsIndex = 0U;
+
+ ui32 IncrementWideFieldsIndex(ui32 addend) {
+ auto cur = CurWideFieldsIndex;
+ CurWideFieldsIndex += addend;
+ return cur;
+ }
};
class THolderFactory;
@@ -84,6 +91,7 @@ struct TComputationContext : public TComputationContextLLVM {
ITimeProvider& TimeProvider;
bool ExecuteLLVM = true;
arrow::MemoryPool& ArrowMemoryPool;
+ std::vector<NUdf::TUnboxedValue*> WideFields;
TComputationContext(const THolderFactory& holderFactory,
const NUdf::IValueBuilder* builder,
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
index 2d792f2164c..1ff7fd10d17 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
@@ -337,6 +337,7 @@ TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable *list = n
Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
template<class T>
void ParallelProgTest(T f, bool useLLVM, ui64 testResult) {
+ TTimer t("total: ");
const ui32 cacheSize = 10;
const ui32 inFlight = 3;
TComputationPatternLRUCache cache(cacheSize);