diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-29 16:59:15 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-29 16:59:15 +0300 |
commit | d562e7c54fcae2c816d67834310ecfec90e8a69d (patch) | |
tree | 34ee8bf0433157c173e2da5c2b209e261ee8738f | |
parent | 0a2ddfedca3128bfee3a3798eb11efc91824b95d (diff) | |
download | ydb-d562e7c54fcae2c816d67834310ecfec90e8a69d.tar.gz |
GraceJoin LLVM
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp | 316 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp | 88 |
2 files changed, 192 insertions, 212 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 895efd42e5..24fedca4b2 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -57,7 +57,7 @@ struct TGraceJoinPacker { ui64 DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum; ui64 DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum; inline void Pack() ; // Packs new tuple from TupleHolder and TuplePtrs to TupleIntVals, TupleStrSizes, TupleStrings - inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs + inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory); }; @@ -81,7 +81,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) { res.DataType = NUdf::EDataSlot::String; return; } - + colTypeId = AS_TYPE(TDataType, colType)->GetSchemeType(); NUdf::EDataSlot dataType = NUdf::GetDataSlot(colTypeId); @@ -172,7 +172,7 @@ void TGraceJoinPacker::Pack() { switch (pi.DataType) { case NUdf::EDataSlot::Bool: - WriteUnaligned<bool>(buffPtr, value.Get<bool>()); break; + WriteUnaligned<bool>(buffPtr, value.Get<bool>()); break; case NUdf::EDataSlot::Int8: WriteUnaligned<i8>(buffPtr, value.Get<i8>()); break; case NUdf::EDataSlot::Uint8: @@ -184,24 +184,24 @@ void TGraceJoinPacker::Pack() { case NUdf::EDataSlot::Int32: *(reinterpret_cast<i32*> (buffPtr)) = value.Get<i32>(); break; case NUdf::EDataSlot::Uint32: - WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break; + WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break; case NUdf::EDataSlot::Int64: - WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break; + WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break; case NUdf::EDataSlot::Uint64: - WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break; + WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break; *(reinterpret_cast<ui64*> (buffPtr)) = value.Get<ui64>(); break; case NUdf::EDataSlot::Float: - WriteUnaligned<float>(buffPtr, value.Get<float>()); break; + WriteUnaligned<float>(buffPtr, value.Get<float>()); break; case NUdf::EDataSlot::Double: - WriteUnaligned<double>(buffPtr, value.Get<double>()); break; + WriteUnaligned<double>(buffPtr, value.Get<double>()); break; case NUdf::EDataSlot::Date: - WriteUnaligned<ui16>(buffPtr, value.Get<ui16>()); break; + WriteUnaligned<ui16>(buffPtr, value.Get<ui16>()); break; case NUdf::EDataSlot::Datetime: - WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break; + WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break; case NUdf::EDataSlot::Timestamp: - WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break; + WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break; case NUdf::EDataSlot::Interval: - WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break; + WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break; case NUdf::EDataSlot::Uuid: { auto str = TuplePtrs[i]->AsStringRef(); @@ -211,19 +211,19 @@ void TGraceJoinPacker::Pack() { case NUdf::EDataSlot::TzDate: { WriteUnaligned<ui16>(buffPtr, value.Get<ui16>()); - WriteUnaligned<ui16>(buffPtr + sizeof(ui16), value.GetTimezoneId()); + WriteUnaligned<ui16>(buffPtr + sizeof(ui16), value.GetTimezoneId()); break; } case NUdf::EDataSlot::TzDatetime: { WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); - WriteUnaligned<ui16>(buffPtr + sizeof(ui32), value.GetTimezoneId()); + WriteUnaligned<ui16>(buffPtr + sizeof(ui32), value.GetTimezoneId()); break; } case NUdf::EDataSlot::TzTimestamp: { WriteUnaligned<ui32>(buffPtr, value.Get<ui64>()); - WriteUnaligned<ui16>(buffPtr + sizeof(ui64), value.GetTimezoneId()); + WriteUnaligned<ui16>(buffPtr + sizeof(ui64), value.GetTimezoneId()); break; } case NUdf::EDataSlot::Decimal: @@ -237,7 +237,7 @@ void TGraceJoinPacker::Pack() { TupleStrings[offset] = str.Data(); TupleStrSizes[offset] = str.Size(); } - + } } } @@ -277,55 +277,55 @@ void TGraceJoinPacker::UnPack() { switch (pi.DataType) { case NUdf::EDataSlot::Bool: - value = NUdf::TUnboxedValuePod(ReadUnaligned<bool>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<bool>(buffPtr)); break; case NUdf::EDataSlot::Int8: - value = NUdf::TUnboxedValuePod(ReadUnaligned<i8>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<i8>(buffPtr)); break; case NUdf::EDataSlot::Uint8: - value = NUdf::TUnboxedValuePod(ReadUnaligned<ui8>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<ui8>(buffPtr)); break; case NUdf::EDataSlot::Int16: - value = NUdf::TUnboxedValuePod(ReadUnaligned<i16>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<i16>(buffPtr)); break; case NUdf::EDataSlot::Uint16: - value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); break; case NUdf::EDataSlot::Int32: - value = NUdf::TUnboxedValuePod(ReadUnaligned<i32>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<i32>(buffPtr)); break; case NUdf::EDataSlot::Uint32: - value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); break; case NUdf::EDataSlot::Int64: - value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break; case NUdf::EDataSlot::Uint64: - value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); break; case NUdf::EDataSlot::Float: - value = NUdf::TUnboxedValuePod(ReadUnaligned<float>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<float>(buffPtr)); break; case NUdf::EDataSlot::Double: - value = NUdf::TUnboxedValuePod(ReadUnaligned<double>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<double>(buffPtr)); break; case NUdf::EDataSlot::Date: - value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); break; case NUdf::EDataSlot::Datetime: - value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); break; case NUdf::EDataSlot::Timestamp: - value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); break; case NUdf::EDataSlot::Interval: - value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break; + value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break; case NUdf::EDataSlot::Uuid: { value = MakeString(NUdf::TStringRef(TupleStrings[offset], TupleStrSizes[offset])); } case NUdf::EDataSlot::TzDate: { - value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); - value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui16))) ; + value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); + value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui16))) ; break; } case NUdf::EDataSlot::TzDatetime: { - value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); + value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui32))); break; } case NUdf::EDataSlot::TzTimestamp: { - value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); - value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui64))) ; + value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); + value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui64))) ; break; } case NUdf::EDataSlot::Decimal: @@ -339,14 +339,14 @@ void TGraceJoinPacker::UnPack() { { value = MakeString(NUdf::TStringRef(TupleStrings[offset], TupleStrSizes[offset])); } - + } } } -TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory) : +TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory) : ColumnTypes(columnTypes) , HolderFactory(holderFactory) { @@ -424,114 +424,137 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con currStrOffset++; } PackedIdx[p.ColumnIdx] = currIdx; - currIdx++; + currIdx++; } TablePtr = std::make_unique<GraceJoin::TTable>(keyIntColumnsNum, keyStrColumnsNum, dataIntColumnsNum, dataStrColumnsNum); } -struct GraceJoinState { - IComputationWideFlowNode* const FlowLeft; - IComputationWideFlowNode* const FlowRight; - const EJoinKind JoinKind; - const std::vector<ui32> LeftKeyColumns; - const std::vector<ui32> RightKeyColumns; - const std::vector<ui32> LeftRenames; - const std::vector<ui32> RightRenames; - const std::vector<TType *> LeftColumnsTypes; - const std::vector<TType *> RightColumnsTypes; - const std::unique_ptr<TGraceJoinPacker> LeftPacker; - const std::unique_ptr<TGraceJoinPacker> RightPacker; - const std::unique_ptr<GraceJoin::TTable> JoinedTablePtr; - const std::unique_ptr<bool> JoinCompleted; - const std::unique_ptr<std::vector<NUdf::TUnboxedValue*>> JoinedTuple; - EFetchResult DoCalculateWrapper( TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const; - - GraceJoinState( - IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight, - EJoinKind joinKind, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns, - const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames, - const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory) : - LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, holderFactory)) - , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory)) - , JoinedTablePtr(std::make_unique<GraceJoin::TTable>()) - , JoinCompleted(std::make_unique<bool>(false)) - , JoinedTuple(std::make_unique<std::vector<NUdf::TUnboxedValue*>>() ) - , FlowLeft(flowLeft) - , FlowRight(flowRight) - , JoinKind(joinKind) - , LeftKeyColumns(leftKeyColumns) - , RightKeyColumns(rightKeyColumns) - , LeftRenames(leftRenames) - , RightRenames(rightRenames) {} -}; - - -class TState : public TComputationValue<TState> { -using TBase = TComputationValue<TState>; +class TGraceJoinState : public TComputationValue<TGraceJoinState> { +using TBase = TComputationValue<TGraceJoinState>; public: - TState(TMemoryUsageInfo* memInfo, std::unique_ptr<GraceJoinState>&& state) - : TBase(memInfo), State(std::move(state)) - { - } - std::unique_ptr<GraceJoinState> State; + EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const; + + TGraceJoinState(TMemoryUsageInfo* memInfo, + IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight, + EJoinKind joinKind, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns, + const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames, + const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory) + : TBase(memInfo) + , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, holderFactory)) + , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory)) + , JoinedTablePtr(std::make_unique<GraceJoin::TTable>()) + , JoinCompleted(std::make_unique<bool>(false)) + , JoinedTuple(std::make_unique<std::vector<NUdf::TUnboxedValue*>>() ) + , FlowLeft(flowLeft) + , FlowRight(flowRight) + , JoinKind(joinKind) + , LeftKeyColumns(leftKeyColumns) + , RightKeyColumns(rightKeyColumns) + , LeftRenames(leftRenames) + , RightRenames(rightRenames) {} +private: + IComputationWideFlowNode* const FlowLeft; + IComputationWideFlowNode* const FlowRight; + + const EJoinKind JoinKind; + const std::vector<ui32> LeftKeyColumns; + const std::vector<ui32> RightKeyColumns; + const std::vector<ui32> LeftRenames; + const std::vector<ui32> RightRenames; + const std::vector<TType *> LeftColumnsTypes; + const std::vector<TType *> RightColumnsTypes; + const std::unique_ptr<TGraceJoinPacker> LeftPacker; + const std::unique_ptr<TGraceJoinPacker> RightPacker; + const std::unique_ptr<GraceJoin::TTable> JoinedTablePtr; + const std::unique_ptr<bool> JoinCompleted; + const std::unique_ptr<std::vector<NUdf::TUnboxedValue*>> JoinedTuple; }; - class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper> { using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper>; public: TGraceJoinWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight, - EJoinKind joinKind, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns, + EJoinKind joinKind, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns, std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames, - std::vector<TType*>&& leftColumnsTypes, std::vector<TType*>&& rightColumnsTypes ) - : TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed) - , FlowLeft(flowLeft) - , FlowRight(flowRight) - , JoinKind(joinKind) - , LeftKeyColumns(leftKeyColumns) - , RightKeyColumns(rightKeyColumns) - , LeftRenames(leftRenames) - , RightRenames(rightRenames) - , LeftColumnsTypes(leftColumnsTypes) - , RightColumnsTypes(rightColumnsTypes) { } + std::vector<TType*>&& leftColumnsTypes, std::vector<TType*>&& rightColumnsTypes, + std::vector<EValueRepresentation>&& outputRepresentations) + : TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed) + , FlowLeft(flowLeft) + , FlowRight(flowRight) + , JoinKind(joinKind) + , LeftKeyColumns(std::move(leftKeyColumns)) + , RightKeyColumns(std::move(rightKeyColumns)) + , LeftRenames(std::move(leftRenames)) + , RightRenames(std::move(rightRenames)) + , LeftColumnsTypes(std::move(leftColumnsTypes)) + , RightColumnsTypes(std::move(rightColumnsTypes)) + , OutputRepresentations(std::move(outputRepresentations)) + {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - if (!state.HasValue()) { MakeState(ctx, state); } - - const auto statePtr = static_cast<TState*>(state.AsBoxed().Get()); - return statePtr->State->DoCalculateWrapper( ctx, output); + return static_cast<TGraceJoinState*>(state.AsBoxed().Get())->FetchValues(ctx, output); } - - #ifndef MKQL_DISABLE_CODEGEN ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const { auto& context = ctx.Codegen->GetContext(); const auto valueType = Type::getInt128Ty(context); + const auto indexType = Type::getInt32Ty(context); const auto ptrValueType = PointerType::getUnqual(valueType); const auto structPtrType = PointerType::getUnqual(StructType::get(context)); const auto contextType = GetCompContextType(context); const auto statusType = Type::getInt32Ty(context); + + const auto arrayType = ArrayType::get(valueType, OutputRepresentations.size()); + const auto fieldsType = ArrayType::get(PointerType::getUnqual(valueType), OutputRepresentations.size()); + + const auto atTop = &ctx.Func->getEntryBlock().back(); + + const auto values = new AllocaInst(arrayType, 0U, "values", atTop); + const auto fields = new AllocaInst(fieldsType, 0U, "fields", atTop); + + ICodegeneratorInlineWideNode::TGettersList getters(OutputRepresentations.size()); + + Value* initV = UndefValue::get(arrayType); + Value* initF = UndefValue::get(fieldsType); + std::vector<Value*> pointers; + pointers.reserve(getters.size()); + for (auto i = 0U; i < getters.size(); ++i) { + pointers.emplace_back(GetElementPtrInst::CreateInBounds(values, {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, (TString("ptr_") += ToString(i)).c_str(), atTop)); + initV = InsertValueInst::Create(initV, ConstantInt::get(valueType, 0), {i}, (TString("zero_") += ToString(i)).c_str(), atTop); + initF = InsertValueInst::Create(initF, pointers.back(), {i}, (TString("insert_") += ToString(i)).c_str(), atTop); + + getters[i] = [i, values](const TCodegenContext& ctx, BasicBlock*& block) { + const auto indexType = Type::getInt32Ty(ctx.Codegen->GetContext()); + const auto pointer = GetElementPtrInst::CreateInBounds(values, {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, (TString("ptr_") += ToString(i)).c_str(), block); + return new LoadInst(pointer, (TString("load_") += ToString(i)).c_str(), block); + }; + } + + new StoreInst(initV, values, atTop); + new StoreInst(initF, fields, atTop); + const auto stateType = StructType::get(context, { - structPtrType // vtbl + structPtrType, // vtbl + Type::getInt32Ty(context), // ref + Type::getInt16Ty(context), // abi + Type::getInt16Ty(context), // reserved + structPtrType, // meminfo + // .... }); const auto statePtrType = PointerType::getUnqual(stateType); - ui64 outputSize = LeftRenames.size() + RightRenames.size(); - const auto keys = new AllocaInst(ArrayType::get(valueType, outputSize), 0U, "keys", &ctx.Func->getEntryBlock().back()); - const auto make = BasicBlock::Create(context, "make", ctx.Func); const auto main = BasicBlock::Create(context, "main", ctx.Func); - const auto more = BasicBlock::Create(context, "more", ctx.Func); BranchInst::Create(main, make, HasValue(statePtr, block), block); block = make; @@ -546,54 +569,53 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr block = main; + for (ui32 i = 0U; i < OutputRepresentations.size(); ++i) { + ValueCleanup(OutputRepresentations[i], pointers[i], ctx, block); + } + + new StoreInst(initV, values, block); + const auto state = new LoadInst(statePtr, "state", block); const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block); const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block); - BranchInst::Create(more, block); - block = more; + const auto func = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TGraceJoinState::FetchValues)); + const auto funcType = FunctionType::get(Type::getInt32Ty(context), { statePtrType, ctx.Ctx->getType(), fields->getType() }, false); + const auto funcPtr = CastInst::Create(Instruction::IntToPtr, func, PointerType::getUnqual(funcType), "fetch_func", block); + const auto result = CallInst::Create(funcPtr, { stateArg, ctx.Ctx, fields }, "fetch", block); - const auto over = BasicBlock::Create(context, "over", ctx.Func); - const auto result = PHINode::Create(statusType, 3U, "result", over); + for (ui32 i = 0U; i < OutputRepresentations.size(); ++i) { + ValueRelease(OutputRepresentations[i], pointers[i], ctx, block); + } - ICodegeneratorInlineWideNode::TGettersList getters; - getters.reserve(outputSize); -/* - std::transform(Nodes.FinishResultNodes.cbegin(), Nodes.FinishResultNodes.cend(), std::back_inserter(getters), [&](IComputationNode* node) { - return [node](const TCodegenContext& ctx, BasicBlock*& block){ return GetNodeValue(node, ctx, block); }; - }); -*/ return {result, std::move(getters)}; } #endif - private: void RegisterDependencies() const final { - this->DependsOn(FlowLeft, FlowRight); + FlowDependsOnBoth(FlowLeft, FlowRight); } - void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { - state = ctx.HolderFactory.Create<TState>(std::make_unique<GraceJoinState>( - FlowLeft, FlowRight, JoinKind, LeftKeyColumns, RightKeyColumns, - LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes, - ctx.HolderFactory)); + state = ctx.HolderFactory.Create<TGraceJoinState>( + FlowLeft, FlowRight, JoinKind, LeftKeyColumns, RightKeyColumns, + LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes, + ctx.HolderFactory); } - IComputationWideFlowNode* FlowLeft; - IComputationWideFlowNode* FlowRight; - EJoinKind JoinKind; - std::vector<ui32> LeftKeyColumns; - std::vector<ui32> RightKeyColumns; - std::vector<ui32> LeftRenames; - std::vector<ui32> RightRenames; - std::vector<TType *> LeftColumnsTypes; - std::vector<TType *> RightColumnsTypes; - + IComputationWideFlowNode *const FlowLeft; + IComputationWideFlowNode *const FlowRight; + const EJoinKind JoinKind; + const std::vector<ui32> LeftKeyColumns; + const std::vector<ui32> RightKeyColumns; + const std::vector<ui32> LeftRenames; + const std::vector<ui32> RightRenames; + const std::vector<TType *> LeftColumnsTypes; + const std::vector<TType *> RightColumnsTypes; + const std::vector<EValueRepresentation> OutputRepresentations; }; - -EFetchResult GraceJoinState::DoCalculateWrapper(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { +EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { while (!*JoinCompleted) { @@ -667,13 +689,18 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0)); const auto flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1)); + const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType()); + std::vector<EValueRepresentation> outputRepresentations; + outputRepresentations.reserve(tupleType->GetElementsCount()); + for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) + outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i))); std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames; std::vector<TType *> leftColumnsTypes, rightColumnsTypes; - + leftColumnsTypes.resize(AS_TYPE(TTupleType, leftFlowTupleType)->GetElementsCount()); rightColumnsTypes.resize(AS_TYPE(TTupleType, rightFlowTupleType)->GetElementsCount()); - + leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount()); for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) { leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>()); @@ -693,21 +720,20 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) { rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>()); } - + for (ui32 i = 0; i < leftColumnsTypes.size(); ++i) { leftColumnsTypes[i] = AS_TYPE(TTupleType, leftFlowTupleType)->GetElementType(i); } - for (ui32 i = 0; i < rightColumnsTypes.size(); ++i) { rightColumnsTypes[i] = AS_TYPE(TTupleType, rightFlowTupleType)->GetElementType(i); } return new TGraceJoinWrapper( - ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), - std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), - std::move(leftColumnsTypes), std::move(rightColumnsTypes)); - + ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), + std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations)); + } } diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp index e553a63b1d..2d42af3b41 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp @@ -24,8 +24,8 @@ constexpr bool IsVerbose = false; Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinMemTest) { - Y_UNIT_TEST_LLVM(TestMem1) { - + Y_UNIT_TEST(TestMem1) { + const ui64 TupleSize = 1024; const ui64 NBuckets = 128; const ui64 NTuples = 10000; @@ -162,14 +162,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinMemTest) { CTEST << Endl; UNIT_ASSERT(true); - + } } Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { - Y_UNIT_TEST_LLVM(TestImp1) { + Y_UNIT_TEST(TestImp1) { ui64 tuple[11] = {0,1,2,3,4,5,6,7,8,9,10}; ui32 strSizes[2] = {4, 4}; @@ -179,7 +179,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { (char *)"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"}; ui32 bigStrSize[2] = {151, 151}; - + GraceJoin::TTable bigTable(1,1,1,1); GraceJoin::TTable smallTable(1,1,1,1); @@ -189,7 +189,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { const ui64 TupleSize = 1024; - ui64 bigTuple[TupleSize]; + ui64 bigTuple[TupleSize]; for (ui64 i = 0; i < TupleSize; i++) { bigTuple[i] = std::rand() / ( RAND_MAX / 10000 ); @@ -197,13 +197,13 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { ui64 milliseconds = 0; - + const ui64 BigTableTuples = 60000; const ui64 SmallTableTuples = 15000; const ui64 BigTupleSize = 32; - std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now(); for ( ui64 i = 0; i < BigTableTuples; i++) { @@ -229,7 +229,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { bigTable.Clear(); smallTable.Clear(); - begin03 = std::chrono::steady_clock::now(); + begin03 = std::chrono::steady_clock::now(); for ( ui64 i = 0; i < BigTableTuples; i++) { @@ -251,7 +251,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { CTEST << "Time for hash = " << milliseconds << "[ms]" << Endl; CTEST << "Adding tuples speed: " << (BigTupleSize * (BigTableTuples + SmallTableTuples) * 1000) / ( milliseconds * 1024 * 1024) << "MB/sec" << Endl; CTEST << Endl; - + std::vector<ui64> vals1, vals2; @@ -274,7 +274,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { ui64 numBigTuples = 0; bigTable.ResetIterator(); - std::chrono::steady_clock::time_point begin04 = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point begin04 = std::chrono::steady_clock::now(); while(bigTable.NextTuple(td1)) { numBigTuples++; } @@ -287,7 +287,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { numBigTuples = 0; bigTable.ResetIterator(); - std::chrono::steady_clock::time_point begin041 = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point begin041 = std::chrono::steady_clock::now(); while(bigTable.NextTuple(td2)) { numBigTuples++; } @@ -298,7 +298,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { CTEST << Endl; - std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now(); joinTable.Join(bigTable,smallTable); @@ -310,7 +310,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { ui64 numJoinedTuples = 0; - std::chrono::steady_clock::time_point begin042 = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point begin042 = std::chrono::steady_clock::now(); while(joinTable.NextJoinedData(td1, td2)) { numJoinedTuples++; } @@ -331,13 +331,8 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { } } - - Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { Y_UNIT_TEST_LLVM(TestInner1) { - - if (LLVM) return; - for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -400,16 +395,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } - - } - - + } Y_UNIT_TEST_LLVM(TMiniKQLGraceJoinTestInnerMulti1) { - - if (LLVM) return; - for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -477,9 +466,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeft1) { - - if (LLVM) return; - for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -551,9 +537,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeftMulti1) { - - if (LLVM) return; - for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -626,9 +609,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeftSemi1) { - - if (LLVM) return; - for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -691,9 +671,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeftOnly1) { - - if (LLVM) return; - for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -751,9 +728,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeftSemiWithNullKey1) { - - if (LLVM) return; - for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -818,9 +792,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeftOnlyWithNullKey1) { - - if (LLVM) return; - for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -885,9 +856,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestRight1) { - - if (LLVM) return; - for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -958,10 +926,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } } - - Y_UNIT_TEST_LLVM(TestRightOnly1) { - - if (LLVM) return; + Y_UNIT_TEST_LLVM(TestRightOnly1) { for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1017,10 +982,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } } - - Y_UNIT_TEST_LLVM(TestRightSemi1) { - if (LLVM) return; for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1082,9 +1044,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } } - Y_UNIT_TEST_LLVM(TestRightMulti1) { - if (LLVM) return; for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1156,9 +1116,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } } - - Y_UNIT_TEST_LLVM(TestRightSemiWithNullKey1) { - if (LLVM) return; + Y_UNIT_TEST_LLVM(TestRightSemiWithNullKey1) { for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1223,7 +1181,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestRightOnlyWithNullKey1) { - if (LLVM) return; for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1288,7 +1245,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestFull1) { - if (LLVM) return; for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1353,9 +1309,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C"); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y"); UNIT_ASSERT(iterator.Next(tuple)); - UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A"); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A"); UNIT_ASSERT(!tuple.GetElement(1)); - UNIT_ASSERT(iterator.Next(tuple)); + UNIT_ASSERT(iterator.Next(tuple)); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z"); UNIT_ASSERT(!tuple.GetElement(0)); UNIT_ASSERT(!iterator.Next(tuple)); @@ -1365,7 +1321,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { Y_UNIT_TEST_LLVM(TestExclusion1) { - if (LLVM) return; for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1418,9 +1373,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { UNIT_ASSERT(iterator.Next(tuple)); - UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A"); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A"); UNIT_ASSERT(!tuple.GetElement(1)); - UNIT_ASSERT(iterator.Next(tuple)); + UNIT_ASSERT(iterator.Next(tuple)); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z"); UNIT_ASSERT(!tuple.GetElement(0)); UNIT_ASSERT(!iterator.Next(tuple)); @@ -1430,7 +1385,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } - } } |