diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-08-25 13:33:15 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-08-25 13:33:15 +0300 |
commit | 0166916e98024b223fae6894817f4d8dc0a8a5a6 (patch) | |
tree | 628c9505d34ce258bc30883276dc7c85b02f8674 | |
parent | ad2a8ca7457a9e4ddb8f06da1e0a548faf070606 (diff) | |
download | ydb-0166916e98024b223fae6894817f4d8dc0a8a5a6.tar.gz |
Fix wide MapJoin
4 files changed, 100 insertions, 30 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp index 457092900c6..72299a0c511 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp @@ -17,9 +17,9 @@ template<bool IsTuple> class TWideMapJoinBase { protected: TWideMapJoinBase(TComputationMutables& mutables, std::vector<TFunctionDescriptor>&& leftKeyConverters, - TDictType* dictType, std::vector<EValueRepresentation>&& outputRepresentations, std::vector<ui32>&& leftKeyColumns, - std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames, - IComputationWideFlowNode* flow, IComputationNode* dict, ui32 inputWidth) + TDictType* dictType, std::vector<EValueRepresentation>&& outputRepresentations, std::vector<ui32>&& leftKeyColumns, + std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames, + IComputationWideFlowNode* flow, IComputationNode* dict, ui32 inputWidth) : LeftKeyConverters(std::move(leftKeyConverters)) , DictType(dictType) , OutputRepresentations(std::move(outputRepresentations)) @@ -30,9 +30,12 @@ protected: , Flow(flow) , Dict(dict) , KeyTuple(mutables) - , Inputs(UsedInputs.size()) - , Fields(MakeFields(inputWidth)) - {} + , InputsIndex(mutables.CurValueIndex) + , WideFieldsIndex(mutables.CurWideFieldsIndex) + { + mutables.DeferWideFieldsInit(UsedInputs.size()); + } + #ifndef MKQL_DISABLE_CODEGEN Value* GenMakeKeysTuple(Value* keysPtr, const ICodegeneratorInlineWideNode::TGettersList& getters, const TCodegenContext& ctx, BasicBlock*& block) const { auto& context = ctx.Codegen->GetContext(); @@ -165,14 +168,14 @@ protected: return {{placeholder, stub}}; } #endif - NUdf::TUnboxedValue MakeKeysTuple(TComputationContext& ctx) const { + NUdf::TUnboxedValue MakeKeysTuple(TComputationContext& ctx, NUdf::TUnboxedValue** fields) const { if constexpr (IsTuple) { NUdf::TUnboxedValue* items = nullptr; const auto keys = KeyTuple.NewArray(ctx, LeftKeyColumns.size(), items); if (!LeftKeyColumns.empty()) { Y_VERIFY(items); for (auto i = 0U; i < LeftKeyColumns.size(); ++i) { - const auto value = Fields[LeftKeyColumns[i]]; + const auto value = fields[LeftKeyColumns[i]]; const auto converter = LeftKeyConverters[i].Function; if (!(*items++ = converter ? converter(value) : *value)) return NUdf::TUnboxedValuePod(); @@ -181,18 +184,18 @@ protected: return keys; } else { - const auto value = Fields[LeftKeyColumns.front()]; + const auto value = fields[LeftKeyColumns.front()]; const auto converter = LeftKeyConverters.front().Function; return converter ? converter(value) : *value; } } - void FillLeftStruct(NUdf::TUnboxedValue*const* output) const { + void FillLeftStruct(NUdf::TUnboxedValue*const* output, NUdf::TUnboxedValue** fields) const { for (auto i = 0U; i < LeftRenames.size(); ++i) { const auto prevIndex = LeftRenames[i]; const auto newIndex = LeftRenames[++i]; if (const auto out = output[newIndex]) - *out = *Fields[prevIndex]; + *out = *fields[prevIndex]; } } @@ -229,14 +232,6 @@ protected: return unique; } - std::vector<NUdf::TUnboxedValue*> MakeFields(ui32 width) const { - std::vector<NUdf::TUnboxedValue*> fields(width, nullptr); - auto it = Inputs.begin(); - for (const auto idx : UsedInputs) - fields[idx] = &*it++; - return fields; - } - const std::vector<TFunctionDescriptor> LeftKeyConverters; TDictType* const DictType; const std::vector<EValueRepresentation> OutputRepresentations; @@ -249,8 +244,8 @@ protected: const TContainerCacheOnContext KeyTuple; - mutable std::vector<NUdf::TUnboxedValue> Inputs; - const std::vector<NUdf::TUnboxedValue*> Fields; + ui32 InputsIndex; + ui32 WideFieldsIndex; }; template<bool WithoutRight, bool RightRequired, bool IsTuple> @@ -267,12 +262,14 @@ public: {} EFetchResult DoCalculate(NUdf::TUnboxedValue& lookup, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + auto** fields = ctx.WideFields.data() + this->WideFieldsIndex; + const auto dict = this->Dict->GetValue(ctx); do { - if (const auto res = this->Flow->FetchValues(ctx, this->Fields.data()); EFetchResult::One != res) + if (const auto res = this->Flow->FetchValues(ctx, fields); EFetchResult::One != res) return res; - const auto keys = this->MakeKeysTuple(ctx); + const auto keys = this->MakeKeysTuple(ctx, fields); if constexpr (WithoutRight) { if ((keys && dict.Contains(keys)) == RightRequired) @@ -281,14 +278,14 @@ public: continue; } else if (keys) { if (lookup = dict.Lookup(keys)) { - this->FillLeftStruct(output); + this->FillLeftStruct(output, fields); this->FillRightStruct(lookup, output); return EFetchResult::One; } } } while (RightRequired || WithoutRight); - this->FillLeftStruct(output); + this->FillLeftStruct(output, fields); this->NullRightStruct(output); return EFetchResult::One; } @@ -426,10 +423,12 @@ public: {} EFetchResult DoCalculate(NUdf::TUnboxedValue& iter, NUdf::TUnboxedValue& item, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + auto** fields = ctx.WideFields.data() + this->WideFieldsIndex; + for (auto iterator = std::move(iter);;) { if (iterator.HasValue()) { if (iterator.Next(item)) { - this->FillLeftStruct(output); + this->FillLeftStruct(output, fields); this->FillRightStruct(item, output); iter = std::move(iterator); return EFetchResult::One; @@ -437,10 +436,10 @@ public: } for (const auto& dict = this->Dict->GetValue(ctx);;) { - if (const auto res = this->Flow->FetchValues(ctx, this->Fields.data()); EFetchResult::One != res) + if (const auto res = this->Flow->FetchValues(ctx, fields); EFetchResult::One != res) return res; - if (const auto keys = this->MakeKeysTuple(ctx)) { + if (const auto keys = this->MakeKeysTuple(ctx, fields)) { if (const auto lookup = dict.Lookup(keys)) { iterator = lookup.GetListIterator(); break; @@ -448,7 +447,7 @@ public: } if constexpr (!RightRequired) { - this->FillLeftStruct(output); + this->FillLeftStruct(output, fields); this->NullRightStruct(output); return EFetchResult::One; } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp index 8302b43e68c..4f366d9d730 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp @@ -38,6 +38,12 @@ TComputationContext::TComputationContext(const THolderFactory& holderFactory, , WideFields(mutables.CurWideFieldsIndex, nullptr) { std::fill_n(MutableValues.get(), mutables.CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid())); + + for (const auto& [mutableIdx, fieldIdx, count] : mutables.WideFieldInitialize) { + for (ui32 i = 0; i < count; ++i) { + WideFields[fieldIdx + i] = &MutableValues[mutableIdx + i]; + } + } } TComputationContext::~TComputationContext() { diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h index 798d2e52547..c38308a76d1 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h @@ -61,10 +61,23 @@ struct TComputationOptsFull: public TComputationOpts { const NUdf::ISecureParamsProvider* SecureParamsProvider; }; +struct TWideFieldsInitInfo { + ui32 MutablesIndex = 0; + ui32 WideFieldsIndex = 0; + ui32 Count = 0; +}; + struct TComputationMutables { ui32 CurValueIndex = 0U; std::vector<ui32> SerializableValues; // Indices of values that need to be saved in IComputationGraph::SaveGraphState() and restored in IComputationGraph::LoadGraphState(). ui32 CurWideFieldsIndex = 0U; + std::vector<TWideFieldsInitInfo> WideFieldInitialize; + + void DeferWideFieldsInit(ui32 count) { + WideFieldInitialize.push_back({CurValueIndex, CurWideFieldsIndex, count}); + CurValueIndex += count; + CurWideFieldsIndex += count; + } ui32 IncrementWideFieldsIndex(ui32 addend) { auto cur = CurWideFieldsIndex; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp index 907afe6fe19..fee939dae74 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp @@ -382,12 +382,60 @@ TRuntimeNode CreateSqueezeToSortedDict(TProgramBuilder& pb, size_t vecSize, TCal ); } +TRuntimeNode CreateMapJoin(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + const auto tupleType = pb.NewTupleType({ + pb.NewDataType(NUdf::TDataType<ui32>::Id), + pb.NewDataType(NUdf::TDataType<ui64>::Id) + }); + + const auto list1 = pb.Map(flow, [&] (TRuntimeNode item) { + return pb.NewTuple({pb.Mod(item, pb.NewDataLiteral<ui64>(1000)), pb.NewDataLiteral<ui32>(1)}); + }); + + const auto list2 = pb.NewList(tupleType, { + pb.NewTuple({pb.NewDataLiteral<ui32>(1), pb.NewDataLiteral<ui64>(3 * 1000)}), + pb.NewTuple({pb.NewDataLiteral<ui32>(2), pb.NewDataLiteral<ui64>(4 * 1000)}), + pb.NewTuple({pb.NewDataLiteral<ui32>(3), pb.NewDataLiteral<ui64>(5 * 1000)}), + }); + + const auto dict = pb.ToSortedDict(list2, false, + [&](TRuntimeNode item) { + return pb.Nth(item, 0); + }, + [&](TRuntimeNode item) { + return pb.NewTuple({pb.Nth(item, 1U)}); + }); + + const auto resultType = pb.NewFlowType(pb.NewTupleType({ + pb.NewDataType(NUdf::TDataType<char*>::Id), + pb.NewDataType(NUdf::TDataType<char*>::Id), + })); + + return pb.Map( + pb.NarrowMap(pb.MapJoinCore( + pb.ExpandMap(list1, [&] (TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0), pb.Nth(item, 1)}; }), + dict, + EJoinKind::Inner, + {0U}, + {1U, 0U}, + {0U, 1U}, + resultType + ), + [&](TRuntimeNode::TList items) { return pb.NewTuple(items); } + ), + [&](TRuntimeNode item) { return pb.Nth(item, 1); } + ); +} + Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { template<class T> void ParallelProgTest(T f, bool useLLVM, ui64 testResult, size_t vecSize = 10'000) { TTimer t("total: "); const ui32 cacheSize = 10; - const ui32 inFlight = 3; + const ui32 inFlight = 7; TComputationPatternLRUCache cache(cacheSize); auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); @@ -509,6 +557,10 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { Y_UNIT_TEST_QUAD(SqueezeToSortedDict, WithPayload, UseLLVM) { ParallelProgTest(CreateSqueezeToSortedDict<WithPayload>, UseLLVM, 125014500, 1000); } + + Y_UNIT_TEST_TWIN(MapJoin, UseLLVM) { + ParallelProgTest(CreateMapJoin, UseLLVM, 120000, 10'000); + } } |