aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-08-25 13:33:15 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-08-25 13:33:15 +0300
commit0166916e98024b223fae6894817f4d8dc0a8a5a6 (patch)
tree628c9505d34ce258bc30883276dc7c85b02f8674
parentad2a8ca7457a9e4ddb8f06da1e0a548faf070606 (diff)
downloadydb-0166916e98024b223fae6894817f4d8dc0a8a5a6.tar.gz
Fix wide MapJoin
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp57
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node.cpp6
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node.h13
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp54
4 files changed, 100 insertions, 30 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
index 457092900c6..72299a0c511 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
@@ -17,9 +17,9 @@ template<bool IsTuple>
class TWideMapJoinBase {
protected:
TWideMapJoinBase(TComputationMutables& mutables, std::vector<TFunctionDescriptor>&& leftKeyConverters,
- TDictType* dictType, std::vector<EValueRepresentation>&& outputRepresentations, std::vector<ui32>&& leftKeyColumns,
- std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames,
- IComputationWideFlowNode* flow, IComputationNode* dict, ui32 inputWidth)
+ TDictType* dictType, std::vector<EValueRepresentation>&& outputRepresentations, std::vector<ui32>&& leftKeyColumns,
+ std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames,
+ IComputationWideFlowNode* flow, IComputationNode* dict, ui32 inputWidth)
: LeftKeyConverters(std::move(leftKeyConverters))
, DictType(dictType)
, OutputRepresentations(std::move(outputRepresentations))
@@ -30,9 +30,12 @@ protected:
, Flow(flow)
, Dict(dict)
, KeyTuple(mutables)
- , Inputs(UsedInputs.size())
- , Fields(MakeFields(inputWidth))
- {}
+ , InputsIndex(mutables.CurValueIndex)
+ , WideFieldsIndex(mutables.CurWideFieldsIndex)
+ {
+ mutables.DeferWideFieldsInit(UsedInputs.size());
+ }
+
#ifndef MKQL_DISABLE_CODEGEN
Value* GenMakeKeysTuple(Value* keysPtr, const ICodegeneratorInlineWideNode::TGettersList& getters, const TCodegenContext& ctx, BasicBlock*& block) const {
auto& context = ctx.Codegen->GetContext();
@@ -165,14 +168,14 @@ protected:
return {{placeholder, stub}};
}
#endif
- NUdf::TUnboxedValue MakeKeysTuple(TComputationContext& ctx) const {
+ NUdf::TUnboxedValue MakeKeysTuple(TComputationContext& ctx, NUdf::TUnboxedValue** fields) const {
if constexpr (IsTuple) {
NUdf::TUnboxedValue* items = nullptr;
const auto keys = KeyTuple.NewArray(ctx, LeftKeyColumns.size(), items);
if (!LeftKeyColumns.empty()) {
Y_VERIFY(items);
for (auto i = 0U; i < LeftKeyColumns.size(); ++i) {
- const auto value = Fields[LeftKeyColumns[i]];
+ const auto value = fields[LeftKeyColumns[i]];
const auto converter = LeftKeyConverters[i].Function;
if (!(*items++ = converter ? converter(value) : *value))
return NUdf::TUnboxedValuePod();
@@ -181,18 +184,18 @@ protected:
return keys;
} else {
- const auto value = Fields[LeftKeyColumns.front()];
+ const auto value = fields[LeftKeyColumns.front()];
const auto converter = LeftKeyConverters.front().Function;
return converter ? converter(value) : *value;
}
}
- void FillLeftStruct(NUdf::TUnboxedValue*const* output) const {
+ void FillLeftStruct(NUdf::TUnboxedValue*const* output, NUdf::TUnboxedValue** fields) const {
for (auto i = 0U; i < LeftRenames.size(); ++i) {
const auto prevIndex = LeftRenames[i];
const auto newIndex = LeftRenames[++i];
if (const auto out = output[newIndex])
- *out = *Fields[prevIndex];
+ *out = *fields[prevIndex];
}
}
@@ -229,14 +232,6 @@ protected:
return unique;
}
- std::vector<NUdf::TUnboxedValue*> MakeFields(ui32 width) const {
- std::vector<NUdf::TUnboxedValue*> fields(width, nullptr);
- auto it = Inputs.begin();
- for (const auto idx : UsedInputs)
- fields[idx] = &*it++;
- return fields;
- }
-
const std::vector<TFunctionDescriptor> LeftKeyConverters;
TDictType* const DictType;
const std::vector<EValueRepresentation> OutputRepresentations;
@@ -249,8 +244,8 @@ protected:
const TContainerCacheOnContext KeyTuple;
- mutable std::vector<NUdf::TUnboxedValue> Inputs;
- const std::vector<NUdf::TUnboxedValue*> Fields;
+ ui32 InputsIndex;
+ ui32 WideFieldsIndex;
};
template<bool WithoutRight, bool RightRequired, bool IsTuple>
@@ -267,12 +262,14 @@ public:
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& lookup, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ auto** fields = ctx.WideFields.data() + this->WideFieldsIndex;
+
const auto dict = this->Dict->GetValue(ctx);
do {
- if (const auto res = this->Flow->FetchValues(ctx, this->Fields.data()); EFetchResult::One != res)
+ if (const auto res = this->Flow->FetchValues(ctx, fields); EFetchResult::One != res)
return res;
- const auto keys = this->MakeKeysTuple(ctx);
+ const auto keys = this->MakeKeysTuple(ctx, fields);
if constexpr (WithoutRight) {
if ((keys && dict.Contains(keys)) == RightRequired)
@@ -281,14 +278,14 @@ public:
continue;
} else if (keys) {
if (lookup = dict.Lookup(keys)) {
- this->FillLeftStruct(output);
+ this->FillLeftStruct(output, fields);
this->FillRightStruct(lookup, output);
return EFetchResult::One;
}
}
} while (RightRequired || WithoutRight);
- this->FillLeftStruct(output);
+ this->FillLeftStruct(output, fields);
this->NullRightStruct(output);
return EFetchResult::One;
}
@@ -426,10 +423,12 @@ public:
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& iter, NUdf::TUnboxedValue& item, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ auto** fields = ctx.WideFields.data() + this->WideFieldsIndex;
+
for (auto iterator = std::move(iter);;) {
if (iterator.HasValue()) {
if (iterator.Next(item)) {
- this->FillLeftStruct(output);
+ this->FillLeftStruct(output, fields);
this->FillRightStruct(item, output);
iter = std::move(iterator);
return EFetchResult::One;
@@ -437,10 +436,10 @@ public:
}
for (const auto& dict = this->Dict->GetValue(ctx);;) {
- if (const auto res = this->Flow->FetchValues(ctx, this->Fields.data()); EFetchResult::One != res)
+ if (const auto res = this->Flow->FetchValues(ctx, fields); EFetchResult::One != res)
return res;
- if (const auto keys = this->MakeKeysTuple(ctx)) {
+ if (const auto keys = this->MakeKeysTuple(ctx, fields)) {
if (const auto lookup = dict.Lookup(keys)) {
iterator = lookup.GetListIterator();
break;
@@ -448,7 +447,7 @@ public:
}
if constexpr (!RightRequired) {
- this->FillLeftStruct(output);
+ this->FillLeftStruct(output, fields);
this->NullRightStruct(output);
return EFetchResult::One;
}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
index 8302b43e68c..4f366d9d730 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
@@ -38,6 +38,12 @@ TComputationContext::TComputationContext(const THolderFactory& holderFactory,
, WideFields(mutables.CurWideFieldsIndex, nullptr)
{
std::fill_n(MutableValues.get(), mutables.CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid()));
+
+ for (const auto& [mutableIdx, fieldIdx, count] : mutables.WideFieldInitialize) {
+ for (ui32 i = 0; i < count; ++i) {
+ WideFields[fieldIdx + i] = &MutableValues[mutableIdx + i];
+ }
+ }
}
TComputationContext::~TComputationContext() {
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h
index 798d2e52547..c38308a76d1 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h
@@ -61,10 +61,23 @@ struct TComputationOptsFull: public TComputationOpts {
const NUdf::ISecureParamsProvider* SecureParamsProvider;
};
+struct TWideFieldsInitInfo {
+ ui32 MutablesIndex = 0;
+ ui32 WideFieldsIndex = 0;
+ ui32 Count = 0;
+};
+
struct TComputationMutables {
ui32 CurValueIndex = 0U;
std::vector<ui32> SerializableValues; // Indices of values that need to be saved in IComputationGraph::SaveGraphState() and restored in IComputationGraph::LoadGraphState().
ui32 CurWideFieldsIndex = 0U;
+ std::vector<TWideFieldsInitInfo> WideFieldInitialize;
+
+ void DeferWideFieldsInit(ui32 count) {
+ WideFieldInitialize.push_back({CurValueIndex, CurWideFieldsIndex, count});
+ CurValueIndex += count;
+ CurWideFieldsIndex += count;
+ }
ui32 IncrementWideFieldsIndex(ui32 addend) {
auto cur = CurWideFieldsIndex;
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
index 907afe6fe19..fee939dae74 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
@@ -382,12 +382,60 @@ TRuntimeNode CreateSqueezeToSortedDict(TProgramBuilder& pb, size_t vecSize, TCal
);
}
+TRuntimeNode CreateMapJoin(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ const auto tupleType = pb.NewTupleType({
+ pb.NewDataType(NUdf::TDataType<ui32>::Id),
+ pb.NewDataType(NUdf::TDataType<ui64>::Id)
+ });
+
+ const auto list1 = pb.Map(flow, [&] (TRuntimeNode item) {
+ return pb.NewTuple({pb.Mod(item, pb.NewDataLiteral<ui64>(1000)), pb.NewDataLiteral<ui32>(1)});
+ });
+
+ const auto list2 = pb.NewList(tupleType, {
+ pb.NewTuple({pb.NewDataLiteral<ui32>(1), pb.NewDataLiteral<ui64>(3 * 1000)}),
+ pb.NewTuple({pb.NewDataLiteral<ui32>(2), pb.NewDataLiteral<ui64>(4 * 1000)}),
+ pb.NewTuple({pb.NewDataLiteral<ui32>(3), pb.NewDataLiteral<ui64>(5 * 1000)}),
+ });
+
+ const auto dict = pb.ToSortedDict(list2, false,
+ [&](TRuntimeNode item) {
+ return pb.Nth(item, 0);
+ },
+ [&](TRuntimeNode item) {
+ return pb.NewTuple({pb.Nth(item, 1U)});
+ });
+
+ const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ pb.NewDataType(NUdf::TDataType<char*>::Id),
+ pb.NewDataType(NUdf::TDataType<char*>::Id),
+ }));
+
+ return pb.Map(
+ pb.NarrowMap(pb.MapJoinCore(
+ pb.ExpandMap(list1, [&] (TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0), pb.Nth(item, 1)}; }),
+ dict,
+ EJoinKind::Inner,
+ {0U},
+ {1U, 0U},
+ {0U, 1U},
+ resultType
+ ),
+ [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
+ ),
+ [&](TRuntimeNode item) { return pb.Nth(item, 1); }
+ );
+}
+
Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
template<class T>
void ParallelProgTest(T f, bool useLLVM, ui64 testResult, size_t vecSize = 10'000) {
TTimer t("total: ");
const ui32 cacheSize = 10;
- const ui32 inFlight = 3;
+ const ui32 inFlight = 7;
TComputationPatternLRUCache cache(cacheSize);
auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
@@ -509,6 +557,10 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
Y_UNIT_TEST_QUAD(SqueezeToSortedDict, WithPayload, UseLLVM) {
ParallelProgTest(CreateSqueezeToSortedDict<WithPayload>, UseLLVM, 125014500, 1000);
}
+
+ Y_UNIT_TEST_TWIN(MapJoin, UseLLVM) {
+ ParallelProgTest(CreateMapJoin, UseLLVM, 120000, 10'000);
+ }
}