diff options
author | va-kuznecov <[email protected]> | 2022-08-01 17:46:41 +0300 |
---|---|---|
committer | va-kuznecov <[email protected]> | 2022-08-01 17:46:41 +0300 |
commit | 636f550d7e66cc88bfbf06b8218bed4d08a775f3 (patch) | |
tree | aee09bef284fb2e228c85f021182d595539563f8 | |
parent | 23b1156d8df573e6efc5c30099b7411bca59e1ad (diff) |
Fix data race in WideFilter, WideMap, WideCondense, WideChopper, WideCombine
10 files changed, 192 insertions, 106 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp index 62a7f10f247..78451056a88 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp @@ -121,7 +121,10 @@ class TWideSkipWrapper : public TStatefulWideFlowCodegeneratorNode<TWideSkipWrap using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper>; public: TWideSkipWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count, ui32 size) - : TBaseComputation(mutables, flow, EValueRepresentation::Embedded), Flow(flow), Count(count), Stubs(size, nullptr) + : TBaseComputation(mutables, flow, EValueRepresentation::Embedded) + , Flow(flow) + , Count(count) + , StubsIndex(mutables.IncrementWideFieldsIndex(size)) {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -130,7 +133,7 @@ public: } if (auto count = state.Get<ui64>()) { - do if (const auto result = Flow->FetchValues(ctx, Stubs.data()); EFetchResult::One != result) { + do if (const auto result = Flow->FetchValues(ctx, ctx.WideFields.data() + StubsIndex); EFetchResult::One != result) { state = NUdf::TUnboxedValuePod(count); return result; } while (--count); @@ -221,7 +224,7 @@ private: IComputationWideFlowNode* const Flow; IComputationNode* const Count; - mutable std::vector<NUdf::TUnboxedValue*> Stubs; + const ui32 StubsIndex; }; class TSkipStreamWrapper : public TMutableComputationNode<TSkipStreamWrapper> { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp index 00ef832c574..d68fc71ac10 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp @@ -16,16 +16,24 @@ class TWideChain1MapWrapper : public TStatefulWideFlowCodegeneratorNode<TWideCha using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideChain1MapWrapper>; public: TWideChain1MapWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, - TComputationExternalNodePtrVector&& inputs, - TComputationNodePtrVector&& initItems, - TComputationExternalNodePtrVector&& outputs, - TComputationNodePtrVector&& updateItems): - TBaseComputation(mutables, flow, EValueRepresentation::Embedded), Flow(flow), - Inputs(std::move(inputs)), InitItems(std::move(initItems)), Outputs(std::move(outputs)), UpdateItems(std::move(updateItems)), - InputsOnInit(GetPasstroughtMap(Inputs, InitItems)), InputsOnUpdate(GetPasstroughtMap(Inputs, UpdateItems)), - InitOnInputs(GetPasstroughtMap(InitItems, Inputs)), UpdateOnInputs(GetPasstroughtMap(UpdateItems, Inputs)), - OutputsOnUpdate(GetPasstroughtMap(Outputs, UpdateItems)), UpdateOnOutputs(GetPasstroughtMap(UpdateItems, Outputs)), - Fields(Inputs.size(), nullptr), TempState(Outputs.size()) + TComputationExternalNodePtrVector&& inputs, + TComputationNodePtrVector&& initItems, + TComputationExternalNodePtrVector&& outputs, + TComputationNodePtrVector&& updateItems) + : TBaseComputation(mutables, flow, EValueRepresentation::Embedded) + , Flow(flow) + , Inputs(std::move(inputs)) + , InitItems(std::move(initItems)) + , Outputs(std::move(outputs)) + , UpdateItems(std::move(updateItems)) + , InputsOnInit(GetPasstroughtMap(Inputs, InitItems)) + , InputsOnUpdate(GetPasstroughtMap(Inputs, UpdateItems)) + , InitOnInputs(GetPasstroughtMap(InitItems, Inputs)) + , UpdateOnInputs(GetPasstroughtMap(UpdateItems, Inputs)) + , OutputsOnUpdate(GetPasstroughtMap(Outputs, UpdateItems)) + , UpdateOnOutputs(GetPasstroughtMap(UpdateItems, Outputs)) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Inputs.size())) + , TempStateIndex(std::exchange(mutables.CurValueIndex, mutables.CurValueIndex + Outputs.size())) {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -135,24 +143,27 @@ public: #endif private: EFetchResult CalculateFirst(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - for (auto i = 0U; i < Fields.size(); ++i) { + Y_VERIFY_DEBUG(WideFieldsIndex + Inputs.size() <= ctx.WideFields.size()); + auto** fields = ctx.WideFields.data() + WideFieldsIndex; + + for (auto i = 0U; i < Inputs.size(); ++i) { if (Inputs[i]->GetDependencesCount() > 0U) { - Fields[i] = &Inputs[i]->RefValue(ctx); + fields[i] = &Inputs[i]->RefValue(ctx); continue; } else if (const auto& map = InputsOnInit[i]) { if (const auto& to = UpdateOnOutputs[*map]) { - Fields[i] = &Outputs[*to]->RefValue(ctx); + fields[i] = &Outputs[*to]->RefValue(ctx); continue; } else if (const auto out = output[*map]) { - Fields[i] = out; + fields[i] = out; continue; } } - Fields[i] = nullptr; + fields[i] = nullptr; } - if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result) + if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result) return result; for (auto i = 0U; i < Outputs.size(); ++i) { @@ -185,27 +196,30 @@ private: } EFetchResult CalculateOther(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - for (auto i = 0U; i < Fields.size(); ++i) { + Y_VERIFY_DEBUG(WideFieldsIndex + Inputs.size() <= ctx.WideFields.size()); + auto** fields = ctx.WideFields.data() + WideFieldsIndex; + + for (auto i = 0U; i < Inputs.size(); ++i) { if (Inputs[i]->GetDependencesCount() > 0U) { - Fields[i] = &Inputs[i]->RefValue(ctx); + fields[i] = &Inputs[i]->RefValue(ctx); continue; } else if (const auto& map = InputsOnUpdate[i]) { if (const auto out = output[*map]) { - Fields[i] = out; + fields[i] = out; continue; } } - Fields[i] = nullptr; + fields[i] = nullptr; } - if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result) + if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result) return result; for (auto i = 0U; i < Outputs.size(); ++i) { if (Outputs[i]->GetDependencesCount() > 0U || OutputsOnUpdate[i]) { if (const auto& map = UpdateOnInputs[i]; !map || Inputs[*map]->GetDependencesCount() > 0U) { - TempState[i] = UpdateItems[i]->GetValue(ctx); + ctx.MutableValues[TempStateIndex + i] = UpdateItems[i]->GetValue(ctx); } } } @@ -213,7 +227,7 @@ private: for (auto i = 0U; i < Outputs.size(); ++i) { if (Outputs[i]->GetDependencesCount() > 0U || OutputsOnUpdate[i]) { if (const auto& map = UpdateOnInputs[i]; !map || Inputs[*map]->GetDependencesCount() > 0U) { - Outputs[i]->SetValue(ctx, std::move(TempState[i])); + Outputs[i]->SetValue(ctx, std::move(ctx.MutableValues[TempStateIndex + i])); } } } @@ -257,8 +271,8 @@ private: const TPasstroughtMap InputsOnInit, InputsOnUpdate, InitOnInputs, UpdateOnInputs, OutputsOnUpdate, UpdateOnOutputs; - mutable std::vector<NUdf::TUnboxedValue*> Fields; - mutable std::vector<NUdf::TUnboxedValue> TempState; + const ui32 WideFieldsIndex; + const ui32 TempStateIndex; }; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp index adb3a65dc47..183174128fd 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp @@ -35,16 +35,19 @@ public: , ItemsOnKeys(GetPasstroughtMap(ItemArgs, Keys)) , KeysOnItems(GetPasstroughtMap(Keys, ItemArgs)) , SwitchItem(IsPasstrought(Chop, ItemArgs)) - , Fields(ItemArgs.size(), nullptr) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(ItemArgs.size())) { Input->SetFetcher(std::bind(&TWideChopperWrapper::DoCalculateInput, this, std::bind(&TWideChopperWrapper::RefState, this, _1), _1, _2)); } EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + Y_VERIFY_DEBUG(WideFieldsIndex + ItemArgs.size() <= ctx.WideFields.size()); + auto** fields = ctx.WideFields.data() + WideFieldsIndex; + if (state.IsInvalid()) { - for (auto i = 0U; i < Fields.size(); ++i) - Fields[i] = &ItemArgs[i]->RefValue(ctx); - if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result) + for (auto i = 0U; i < ItemArgs.size(); ++i) + fields[i] = &ItemArgs[i]->RefValue(ctx); + if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result) return result; for (ui32 i = 0U; i < Keys.size(); ++i) @@ -53,9 +56,9 @@ public: state = NUdf::TUnboxedValuePod(ui64(EState::Next)); } else if (EState::Skip == EState(state.Get<ui64>())) { do { - for (auto i = 0U; i < Fields.size(); ++i) - Fields[i] = &ItemArgs[i]->RefValue(ctx); - if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result) + for (auto i = 0U; i < ItemArgs.size(); ++i) + fields[i] = &ItemArgs[i]->RefValue(ctx); + if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result) return result; } while (!Chop->GetValue(ctx).Get<bool>()); @@ -73,9 +76,9 @@ public: case EState::Work: case EState::Next: do { - for (auto i = 0U; i < Fields.size(); ++i) - Fields[i] = &ItemArgs[i]->RefValue(ctx); - switch (const auto next = Flow->FetchValues(ctx, Fields.data())) { + for (auto i = 0U; i < ItemArgs.size(); ++i) + fields[i] = &ItemArgs[i]->RefValue(ctx); + switch (const auto next = Flow->FetchValues(ctx, fields)) { case EFetchResult::Yield: state = NUdf::TUnboxedValuePod(ui64(EState::Skip)); case EFetchResult::Finish: @@ -100,21 +103,24 @@ private: EFetchResult DoCalculateInput(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { if (EState::Next == EState(state.Get<ui64>())) { state = NUdf::TUnboxedValuePod(ui64(EState::Work)); - for (auto i = 0U; i < Fields.size(); ++i) + for (auto i = 0U; i < ItemArgs.size(); ++i) if (const auto out = output[i]) *out = ItemArgs[i]->GetValue(ctx); return EFetchResult::One; } - for (auto i = 0U; i < Fields.size(); ++i) - Fields[i] = &ItemArgs[i]->RefValue(ctx); + Y_VERIFY_DEBUG(WideFieldsIndex + ItemArgs.size() <= ctx.WideFields.size()); + auto** fields = ctx.WideFields.data() + WideFieldsIndex; + + for (auto i = 0U; i < ItemArgs.size(); ++i) + fields[i] = &ItemArgs[i]->RefValue(ctx); - if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result) + if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result) return result; - for (auto i = 0U; i < Fields.size(); ++i) + for (auto i = 0U; i < ItemArgs.size(); ++i) if (const auto out = output[i]) - *out = *Fields[i]; + *out = *fields[i]; if (Chop->GetValue(ctx).Get<bool>()) { state = NUdf::TUnboxedValuePod(ui64(EState::Chop)); @@ -329,7 +335,7 @@ private: const std::optional<size_t> SwitchItem; - mutable std::vector<NUdf::TUnboxedValue*> Fields; + const ui32 WideFieldsIndex; }; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index 778a594b958..c5346af0a0d 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -216,7 +216,7 @@ public: , Nodes(std::move(nodes)) , KeyTypes(std::move(keyTypes)) , MemLimit(memLimit) - , Fields(Nodes.ItemNodes.size(), nullptr) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Nodes.ItemNodes.size())) {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -237,17 +237,20 @@ public: const auto initUsage = MemLimit ? ctx.HolderFactory.GetMemoryUsed() : 0ULL; + Y_VERIFY_DEBUG(WideFieldsIndex + Nodes.ItemNodes.size() <= ctx.WideFields.size()); + auto **fields = ctx.WideFields.data() + WideFieldsIndex; + do { - for (auto i = 0U; i < Fields.size(); ++i) + for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) if (Nodes.ItemNodes[i]->GetDependencesCount() > 0U || Nodes.PasstroughtItems[i]) - Fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx); + fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx); - ptr->InputStatus = Flow->FetchValues(ctx, Fields.data()); + ptr->InputStatus = Flow->FetchValues(ctx, fields); if (EFetchResult::One != ptr->InputStatus) { break; } - Nodes.ExtractKey(ctx, Fields.data(), static_cast<NUdf::TUnboxedValue*>(ptr->Tongue)); + Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue)); Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat)); } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage)); @@ -567,7 +570,7 @@ private: const TKeyTypes KeyTypes; const ui64 MemLimit; - mutable std::vector<NUdf::TUnboxedValue*> Fields; + const ui32 WideFieldsIndex; #ifndef MKQL_DISABLE_CODEGEN TEqualsPtr Equals = nullptr; @@ -611,7 +614,7 @@ public: , Flow(flow) , Nodes(std::move(nodes)) , KeyTypes(std::move(keyTypes)) - , Fields(Nodes.ItemNodes.size(), nullptr) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Nodes.ItemNodes.size())) {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -620,14 +623,17 @@ public: } if (const auto ptr = static_cast<TState*>(state.AsBoxed().Get())) { + Y_VERIFY_DEBUG(WideFieldsIndex + Nodes.ItemNodes.size() <= ctx.WideFields.size()); + auto **fields = ctx.WideFields.data() + WideFieldsIndex; + while (EFetchResult::Finish != ptr->InputStatus) { - for (auto i = 0U; i < Fields.size(); ++i) + for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) if (Nodes.ItemNodes[i]->GetDependencesCount() > 0U || Nodes.PasstroughtItems[i]) - Fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx); + fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx); - switch (ptr->InputStatus = Flow->FetchValues(ctx, Fields.data())) { + switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) { case EFetchResult::One: - Nodes.ExtractKey(ctx, Fields.data(), static_cast<NUdf::TUnboxedValue*>(ptr->Tongue)); + Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue)); Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat)); continue; case EFetchResult::Yield: @@ -912,7 +918,7 @@ private: const TCombinerNodes Nodes; const TKeyTypes KeyTypes; - mutable std::vector<NUdf::TUnboxedValue*> Fields; + const ui32 WideFieldsIndex; #ifndef MKQL_DISABLE_CODEGEN TEqualsPtr Equals = nullptr; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp index 3f3a2832025..e466075688d 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp @@ -21,10 +21,16 @@ public: TComputationExternalNodePtrVector&& items, TComputationNodePtrVector&& initState, TComputationExternalNodePtrVector&& state, IComputationNode* outSwitch, TComputationNodePtrVector&& updateState) : TBaseComputation(mutables, flow, EValueRepresentation::Embedded), Flow(flow) - , Items(std::move(items)), InitState(std::move(initState)) - , State(std::move(state)), Switch(outSwitch), UpdateState(std::move(updateState)) - , Fields(Items.size(), nullptr), TempState(State.size()), SwitchItem(IsPasstrought(Switch, Items)) - , ItemsOnInit(GetPasstroughtMap(Items, InitState)), ItemsOnUpdate(GetPasstroughtMap(Items, UpdateState)) + , Items(std::move(items)) + , InitState(std::move(initState)) + , State(std::move(state)) + , Switch(outSwitch) + , UpdateState(std::move(updateState)) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size())) + , TempStateIndex(std::exchange(mutables.CurValueIndex, mutables.CurValueIndex + State.size())) + , SwitchItem(IsPasstrought(Switch, Items)) + , ItemsOnInit(GetPasstroughtMap(Items, InitState)) + , ItemsOnUpdate(GetPasstroughtMap(Items, UpdateState)) , UpdateOnItems(GetPasstroughtMap(UpdateState, Items)) {} @@ -41,12 +47,15 @@ public: State[i]->SetValue(ctx, InitState[i]->GetValue(ctx)); } + Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size()); + auto** fields = ctx.WideFields.data() + WideFieldsIndex; + while (true) { - for (auto i = 0U; i < Fields.size(); ++i) + for (auto i = 0U; i < Items.size(); ++i) if (Items[i]->GetDependencesCount() > 0U || ItemsOnInit[i] || ItemsOnUpdate[i] || SwitchItem && i == *SwitchItem) - Fields[i] = &Items[i]->RefValue(ctx); + fields[i] = &Items[i]->RefValue(ctx); - switch (Flow->FetchValues(ctx, Fields.data())) { + switch (Flow->FetchValues(ctx, fields)) { case EFetchResult::Yield: return EFetchResult::Yield; case EFetchResult::Finish: @@ -74,9 +83,9 @@ public: } for (ui32 i = 0U; i < State.size(); ++i) - TempState[i] = UpdateState[i]->GetValue(ctx); + ctx.MutableValues[TempStateIndex + i] = UpdateState[i]->GetValue(ctx); for (ui32 i = 0U; i < State.size(); ++i) - State[i]->SetValue(ctx, std::move(TempState[i])); + State[i]->SetValue(ctx, std::move(ctx.MutableValues[TempStateIndex + i])); } continue; } @@ -236,8 +245,8 @@ private: const TPasstroughtMap ItemsOnInit, ItemsOnUpdate, UpdateOnItems; - mutable std::vector<NUdf::TUnboxedValue*> Fields; - mutable std::vector<NUdf::TUnboxedValue> TempState; + ui32 WideFieldsIndex; + ui32 TempStateIndex; }; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp index 7a6ad7967af..04823e0b261 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp @@ -14,23 +14,37 @@ namespace { class TBaseWideFilterWrapper { protected: - TBaseWideFilterWrapper(IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate) - : Flow(flow), Items(std::move(items)), Predicate(predicate), FilterByField(GetPasstroughtMap({Predicate}, Items).front()), Fields(Items.size(), nullptr) + TBaseWideFilterWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate) + : Flow(flow) + , Items(std::move(items)) + , Predicate(predicate) + , FilterByField(GetPasstroughtMap({Predicate}, Items).front()) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size())) {} + NYql::NUdf::TUnboxedValue** GetFields(TComputationContext& ctx) const { + Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size()); + return ctx.WideFields.data() + WideFieldsIndex; + } + void PrepareArguments(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - for (auto i = 0U; i < Fields.size(); ++i) + auto** fields = GetFields(ctx); + + for (auto i = 0U; i < Items.size(); ++i) { if (Predicate == Items[i] || Items[i]->GetDependencesCount() > 0U) - Fields[i] = &Items[i]->RefValue(ctx); + fields[i] = &Items[i]->RefValue(ctx); else - Fields[i] = output[i]; + fields[i] = output[i]; + } } void FillOutputs(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - for (auto i = 0U; i < Fields.size(); ++i) + auto** fields = GetFields(ctx); + + for (auto i = 0U; i < Items.size(); ++i) if (const auto out = output[i]) if (Predicate == Items[i] || Items[i]->GetDependencesCount() > 0U) - *out = *Fields[i]; + *out = *fields[i]; } #ifndef MKQL_DISABLE_CODEGEN template<bool ReplaceOriginalGetter = true> @@ -57,21 +71,24 @@ protected: std::optional<size_t> FilterByField; - mutable std::vector<NUdf::TUnboxedValue*> Fields; + const ui32 WideFieldsIndex; }; class TWideFilterWrapper : public TStatelessWideFlowCodegeneratorNode<TWideFilterWrapper>, public TBaseWideFilterWrapper { using TBaseComputation = TStatelessWideFlowCodegeneratorNode<TWideFilterWrapper>; public: - TWideFilterWrapper(IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate) - : TBaseComputation(flow), TBaseWideFilterWrapper(flow, std::move(items), predicate) + TWideFilterWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate) + : TBaseComputation(flow) + , TBaseWideFilterWrapper(mutables, flow, std::move(items), predicate) {} EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + auto** fields = GetFields(ctx); + while (true) { PrepareArguments(ctx, output); - if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result) + if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result) return result; if (Predicate->GetValue(ctx).Get<bool>()) { @@ -121,8 +138,11 @@ private: class TWideFilterWithLimitWrapper : public TStatefulWideFlowCodegeneratorNode<TWideFilterWithLimitWrapper>, public TBaseWideFilterWrapper { using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFilterWithLimitWrapper>; public: - TWideFilterWithLimitWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* limit, TComputationExternalNodePtrVector&& items, IComputationNode* predicate) - : TBaseComputation(mutables, flow, EValueRepresentation::Embedded), TBaseWideFilterWrapper(flow, std::move(items), predicate), Limit(limit) + TWideFilterWithLimitWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* limit, + TComputationExternalNodePtrVector&& items, IComputationNode* predicate) + : TBaseComputation(mutables, flow, EValueRepresentation::Embedded) + , TBaseWideFilterWrapper(mutables, flow, std::move(items), predicate) + , Limit(limit) {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -132,10 +152,11 @@ public: return EFetchResult::Finish; } + auto **fields = GetFields(ctx); while (true) { PrepareArguments(ctx, output); - if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result) + if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result) return result; if (Predicate->GetValue(ctx).Get<bool>()) { @@ -221,8 +242,10 @@ template<bool Inclusive> class TWideTakeWhileWrapper : public TStatefulWideFlowCodegeneratorNode<TWideTakeWhileWrapper<Inclusive>>, public TBaseWideFilterWrapper { using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideTakeWhileWrapper<Inclusive>>; public: - TWideTakeWhileWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate) - : TBaseComputation(mutables, flow, EValueRepresentation::Embedded), TBaseWideFilterWrapper(flow, std::move(items), predicate) + TWideTakeWhileWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, + IComputationNode* predicate) + : TBaseComputation(mutables, flow, EValueRepresentation::Embedded) + , TBaseWideFilterWrapper(mutables, flow, std::move(items), predicate) {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -232,7 +255,9 @@ public: PrepareArguments(ctx, output); - if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result) + auto **fields = GetFields(ctx); + + if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result) return result; const bool predicate = Predicate->GetValue(ctx).Get<bool>(); @@ -302,7 +327,8 @@ class TWideSkipWhileWrapper : public TStatefulWideFlowCodegeneratorNode<TWideSki using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideSkipWhileWrapper<Inclusive>>; public: TWideSkipWhileWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* predicate) - : TBaseComputation(mutables, flow, EValueRepresentation::Embedded), TBaseWideFilterWrapper(flow, std::move(items), predicate) + : TBaseComputation(mutables, flow, EValueRepresentation::Embedded) + , TBaseWideFilterWrapper(mutables, flow, std::move(items), predicate) {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -310,9 +336,11 @@ public: return Flow->FetchValues(ctx, output); } + auto **fields = GetFields(ctx); + do { PrepareArguments(ctx, output); - if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result) + if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result) return result; } while (Predicate->GetValue(ctx).Get<bool>()); @@ -409,7 +437,7 @@ IComputationNode* WrapWideFilter(TCallable& callable, const TComputationNodeFact if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) { if (const auto last = callable.GetInputsCount() - 1U; last == width + 1U) { - return new TWideFilterWrapper(wide, std::move(args), predicate); + return new TWideFilterWrapper(ctx.Mutables, wide, std::move(args), predicate); } else { const auto limit = LocateNode(ctx.NodeLocator, callable, last); return new TWideFilterWithLimitWrapper(ctx.Mutables, wide, limit, std::move(args), predicate); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp index a4e3e3ad04f..47e7b1a5c43 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp @@ -83,20 +83,28 @@ private: class TWideMapWrapper : public TStatelessWideFlowCodegeneratorNode<TWideMapWrapper> { using TBaseComputation = TStatelessWideFlowCodegeneratorNode<TWideMapWrapper>; public: - TWideMapWrapper(IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, TComputationNodePtrVector&& newItems) - : TBaseComputation(flow), Flow(flow), Items(std::move(items)), NewItems(std::move(newItems)), Fields(Items.size(), nullptr) - , PasstroughtMap(GetPasstroughtMap(Items, NewItems)), ReversePasstroughtMap(GetPasstroughtMap(NewItems, Items)) + TWideMapWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, TComputationNodePtrVector&& newItems) + : TBaseComputation(flow) + , Flow(flow) + , Items(std::move(items)) + , NewItems(std::move(newItems)) + , PasstroughtMap(GetPasstroughtMap(Items, NewItems)) + , ReversePasstroughtMap(GetPasstroughtMap(NewItems, Items)) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size())) {} EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - for (auto i = 0U; i < Fields.size(); ++i) + Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size()); + auto** fields = ctx.WideFields.data() + WideFieldsIndex; + + for (auto i = 0U; i < Items.size(); ++i) if (Items[i]->GetDependencesCount() > 0U) - Fields[i] = &Items[i]->RefValue(ctx); + fields[i] = &Items[i]->RefValue(ctx); else if (const auto& map = PasstroughtMap[i]) if (const auto out = output[*map]) - Fields[i] = out; + fields[i] = out; - if (const auto result = Flow->FetchValues(ctx, Fields.data()); EFetchResult::One != result) + if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result) return result; for (auto i = 0U; i < NewItems.size(); ++i) { @@ -164,27 +172,31 @@ private: const TComputationNodePtrVector NewItems; const TPasstroughtMap PasstroughtMap, ReversePasstroughtMap; - mutable std::vector<NUdf::TUnboxedValue*> Fields; + const ui32 WideFieldsIndex; }; class TNarrowMapWrapper : public TStatelessFlowCodegeneratorNode<TNarrowMapWrapper> { using TBaseComputation = TStatelessFlowCodegeneratorNode<TNarrowMapWrapper>; public: - TNarrowMapWrapper(EValueRepresentation kind, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* newItem) + TNarrowMapWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* newItem) : TBaseComputation(flow, kind) , Flow(flow) , Items(std::move(items)) , NewItem(newItem) , PasstroughItem(GetPasstroughtMap({NewItem}, Items).front()) - , Fields(Items.size(), nullptr) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size())) {} NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - for (auto i = 0U; i < Fields.size(); ++i) + Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size()); + auto** fields = ctx.WideFields.data() + WideFieldsIndex; + + for (auto i = 0U; i < Items.size(); ++i) { if (NewItem == Items[i] || Items[i]->GetDependencesCount() > 0U) - Fields[i] = &Items[i]->RefValue(ctx); + fields[i] = &Items[i]->RefValue(ctx); + } - switch (const auto result = Flow->FetchValues(ctx, Fields.data())) { + switch (const auto result = Flow->FetchValues(ctx, fields)) { case EFetchResult::Finish: return NUdf::TUnboxedValuePod::MakeFinish(); case EFetchResult::Yield: @@ -243,8 +255,7 @@ private: IComputationNode* const NewItem; const std::optional<size_t> PasstroughItem; - - mutable std::vector<NUdf::TUnboxedValue*> Fields; + const ui32 WideFieldsIndex; }; } @@ -278,7 +289,7 @@ IComputationNode* WrapWideMap(TCallable& callable, const TComputationNodeFactory index = 0U; std::generate(args.begin(), args.end(), [&](){ return LocateExternalNode(ctx.NodeLocator, callable, ++index); }); - return new TWideMapWrapper(wide, std::move(args), std::move(newItems)); + return new TWideMapWrapper(ctx.Mutables, wide, std::move(args), std::move(newItems)); } THROW yexception() << "Expected wide flow."; @@ -295,8 +306,7 @@ IComputationNode* WrapNarrowMap(TCallable& callable, const TComputationNodeFacto TComputationExternalNodePtrVector args(width, nullptr); ui32 index = 0U; std::generate(args.begin(), args.end(), [&](){ return LocateExternalNode(ctx.NodeLocator, callable, ++index); }); - - return new TNarrowMapWrapper(GetValueRepresentation(callable.GetType()->GetReturnType()), wide, std::move(args), newItem); + return new TNarrowMapWrapper(ctx.Mutables, GetValueRepresentation(callable.GetType()->GetReturnType()), wide, std::move(args), newItem); } THROW yexception() << "Expected wide flow."; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp index aa2e27f0282..8302b43e68c 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp @@ -35,6 +35,7 @@ TComputationContext::TComputationContext(const THolderFactory& holderFactory, , RandomProvider(opts.RandomProvider) , TimeProvider(opts.TimeProvider) , ArrowMemoryPool(arrowMemoryPool) + , WideFields(mutables.CurWideFieldsIndex, nullptr) { std::fill_n(MutableValues.get(), mutables.CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid())); } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h index c93a13c3512..f1a77cca7e0 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h @@ -64,6 +64,13 @@ struct TComputationOptsFull: public TComputationOpts { struct TComputationMutables { ui32 CurValueIndex = 0U; std::vector<ui32> SerializableValues; // Indices of values that need to be saved in IComputationGraph::SaveGraphState() and restored in IComputationGraph::LoadGraphState(). + ui32 CurWideFieldsIndex = 0U; + + ui32 IncrementWideFieldsIndex(ui32 addend) { + auto cur = CurWideFieldsIndex; + CurWideFieldsIndex += addend; + return cur; + } }; class THolderFactory; @@ -84,6 +91,7 @@ struct TComputationContext : public TComputationContextLLVM { ITimeProvider& TimeProvider; bool ExecuteLLVM = true; arrow::MemoryPool& ArrowMemoryPool; + std::vector<NUdf::TUnboxedValue*> WideFields; TComputationContext(const THolderFactory& holderFactory, const NUdf::IValueBuilder* builder, diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp index 2d792f2164c..1ff7fd10d17 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp @@ -337,6 +337,7 @@ TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable *list = n Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { template<class T> void ParallelProgTest(T f, bool useLLVM, ui64 testResult) { + TTimer t("total: "); const ui32 cacheSize = 10; const ui32 inFlight = 3; TComputationPatternLRUCache cache(cacheSize); |