aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-09-29 16:59:15 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-09-29 16:59:15 +0300
commitd562e7c54fcae2c816d67834310ecfec90e8a69d (patch)
tree34ee8bf0433157c173e2da5c2b209e261ee8738f
parent0a2ddfedca3128bfee3a3798eb11efc91824b95d (diff)
downloadydb-d562e7c54fcae2c816d67834310ecfec90e8a69d.tar.gz
GraceJoin LLVM
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp316
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp88
2 files changed, 192 insertions, 212 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
index 895efd42e5..24fedca4b2 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
@@ -57,7 +57,7 @@ struct TGraceJoinPacker {
ui64 DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum;
ui64 DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum;
inline void Pack() ; // Packs new tuple from TupleHolder and TuplePtrs to TupleIntVals, TupleStrSizes, TupleStrings
- inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs
+ inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs
TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory);
};
@@ -81,7 +81,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) {
res.DataType = NUdf::EDataSlot::String;
return;
}
-
+
colTypeId = AS_TYPE(TDataType, colType)->GetSchemeType();
NUdf::EDataSlot dataType = NUdf::GetDataSlot(colTypeId);
@@ -172,7 +172,7 @@ void TGraceJoinPacker::Pack() {
switch (pi.DataType)
{
case NUdf::EDataSlot::Bool:
- WriteUnaligned<bool>(buffPtr, value.Get<bool>()); break;
+ WriteUnaligned<bool>(buffPtr, value.Get<bool>()); break;
case NUdf::EDataSlot::Int8:
WriteUnaligned<i8>(buffPtr, value.Get<i8>()); break;
case NUdf::EDataSlot::Uint8:
@@ -184,24 +184,24 @@ void TGraceJoinPacker::Pack() {
case NUdf::EDataSlot::Int32:
*(reinterpret_cast<i32*> (buffPtr)) = value.Get<i32>(); break;
case NUdf::EDataSlot::Uint32:
- WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break;
+ WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break;
case NUdf::EDataSlot::Int64:
- WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break;
+ WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break;
case NUdf::EDataSlot::Uint64:
- WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break;
+ WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break;
*(reinterpret_cast<ui64*> (buffPtr)) = value.Get<ui64>(); break;
case NUdf::EDataSlot::Float:
- WriteUnaligned<float>(buffPtr, value.Get<float>()); break;
+ WriteUnaligned<float>(buffPtr, value.Get<float>()); break;
case NUdf::EDataSlot::Double:
- WriteUnaligned<double>(buffPtr, value.Get<double>()); break;
+ WriteUnaligned<double>(buffPtr, value.Get<double>()); break;
case NUdf::EDataSlot::Date:
- WriteUnaligned<ui16>(buffPtr, value.Get<ui16>()); break;
+ WriteUnaligned<ui16>(buffPtr, value.Get<ui16>()); break;
case NUdf::EDataSlot::Datetime:
- WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break;
+ WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break;
case NUdf::EDataSlot::Timestamp:
- WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break;
+ WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break;
case NUdf::EDataSlot::Interval:
- WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break;
+ WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break;
case NUdf::EDataSlot::Uuid:
{
auto str = TuplePtrs[i]->AsStringRef();
@@ -211,19 +211,19 @@ void TGraceJoinPacker::Pack() {
case NUdf::EDataSlot::TzDate:
{
WriteUnaligned<ui16>(buffPtr, value.Get<ui16>());
- WriteUnaligned<ui16>(buffPtr + sizeof(ui16), value.GetTimezoneId());
+ WriteUnaligned<ui16>(buffPtr + sizeof(ui16), value.GetTimezoneId());
break;
}
case NUdf::EDataSlot::TzDatetime:
{
WriteUnaligned<ui32>(buffPtr, value.Get<ui32>());
- WriteUnaligned<ui16>(buffPtr + sizeof(ui32), value.GetTimezoneId());
+ WriteUnaligned<ui16>(buffPtr + sizeof(ui32), value.GetTimezoneId());
break;
}
case NUdf::EDataSlot::TzTimestamp:
{
WriteUnaligned<ui32>(buffPtr, value.Get<ui64>());
- WriteUnaligned<ui16>(buffPtr + sizeof(ui64), value.GetTimezoneId());
+ WriteUnaligned<ui16>(buffPtr + sizeof(ui64), value.GetTimezoneId());
break;
}
case NUdf::EDataSlot::Decimal:
@@ -237,7 +237,7 @@ void TGraceJoinPacker::Pack() {
TupleStrings[offset] = str.Data();
TupleStrSizes[offset] = str.Size();
}
-
+
}
}
}
@@ -277,55 +277,55 @@ void TGraceJoinPacker::UnPack() {
switch (pi.DataType)
{
case NUdf::EDataSlot::Bool:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<bool>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<bool>(buffPtr)); break;
case NUdf::EDataSlot::Int8:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<i8>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<i8>(buffPtr)); break;
case NUdf::EDataSlot::Uint8:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<ui8>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<ui8>(buffPtr)); break;
case NUdf::EDataSlot::Int16:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<i16>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<i16>(buffPtr)); break;
case NUdf::EDataSlot::Uint16:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); break;
case NUdf::EDataSlot::Int32:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<i32>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<i32>(buffPtr)); break;
case NUdf::EDataSlot::Uint32:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); break;
case NUdf::EDataSlot::Int64:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break;
case NUdf::EDataSlot::Uint64:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); break;
case NUdf::EDataSlot::Float:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<float>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<float>(buffPtr)); break;
case NUdf::EDataSlot::Double:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<double>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<double>(buffPtr)); break;
case NUdf::EDataSlot::Date:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); break;
case NUdf::EDataSlot::Datetime:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); break;
case NUdf::EDataSlot::Timestamp:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); break;
case NUdf::EDataSlot::Interval:
- value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break;
case NUdf::EDataSlot::Uuid:
{
value = MakeString(NUdf::TStringRef(TupleStrings[offset], TupleStrSizes[offset]));
}
case NUdf::EDataSlot::TzDate:
{
- value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr));
- value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui16))) ;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr));
+ value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui16))) ;
break;
}
case NUdf::EDataSlot::TzDatetime:
{
- value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr));
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr));
value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui32)));
break;
}
case NUdf::EDataSlot::TzTimestamp:
{
- value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr));
- value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui64))) ;
+ value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr));
+ value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui64))) ;
break;
}
case NUdf::EDataSlot::Decimal:
@@ -339,14 +339,14 @@ void TGraceJoinPacker::UnPack() {
{
value = MakeString(NUdf::TStringRef(TupleStrings[offset], TupleStrSizes[offset]));
}
-
+
}
}
}
-TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory) :
+TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory) :
ColumnTypes(columnTypes)
, HolderFactory(holderFactory) {
@@ -424,114 +424,137 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
currStrOffset++;
}
PackedIdx[p.ColumnIdx] = currIdx;
- currIdx++;
+ currIdx++;
}
TablePtr = std::make_unique<GraceJoin::TTable>(keyIntColumnsNum, keyStrColumnsNum, dataIntColumnsNum, dataStrColumnsNum);
}
-struct GraceJoinState {
- IComputationWideFlowNode* const FlowLeft;
- IComputationWideFlowNode* const FlowRight;
- const EJoinKind JoinKind;
- const std::vector<ui32> LeftKeyColumns;
- const std::vector<ui32> RightKeyColumns;
- const std::vector<ui32> LeftRenames;
- const std::vector<ui32> RightRenames;
- const std::vector<TType *> LeftColumnsTypes;
- const std::vector<TType *> RightColumnsTypes;
- const std::unique_ptr<TGraceJoinPacker> LeftPacker;
- const std::unique_ptr<TGraceJoinPacker> RightPacker;
- const std::unique_ptr<GraceJoin::TTable> JoinedTablePtr;
- const std::unique_ptr<bool> JoinCompleted;
- const std::unique_ptr<std::vector<NUdf::TUnboxedValue*>> JoinedTuple;
- EFetchResult DoCalculateWrapper( TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const;
-
- GraceJoinState(
- IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight,
- EJoinKind joinKind, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns,
- const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames,
- const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory) :
- LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, holderFactory))
- , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory))
- , JoinedTablePtr(std::make_unique<GraceJoin::TTable>())
- , JoinCompleted(std::make_unique<bool>(false))
- , JoinedTuple(std::make_unique<std::vector<NUdf::TUnboxedValue*>>() )
- , FlowLeft(flowLeft)
- , FlowRight(flowRight)
- , JoinKind(joinKind)
- , LeftKeyColumns(leftKeyColumns)
- , RightKeyColumns(rightKeyColumns)
- , LeftRenames(leftRenames)
- , RightRenames(rightRenames) {}
-};
-
-
-class TState : public TComputationValue<TState> {
-using TBase = TComputationValue<TState>;
+class TGraceJoinState : public TComputationValue<TGraceJoinState> {
+using TBase = TComputationValue<TGraceJoinState>;
public:
- TState(TMemoryUsageInfo* memInfo, std::unique_ptr<GraceJoinState>&& state)
- : TBase(memInfo), State(std::move(state))
- {
- }
- std::unique_ptr<GraceJoinState> State;
+ EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const;
+
+ TGraceJoinState(TMemoryUsageInfo* memInfo,
+ IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight,
+ EJoinKind joinKind, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns,
+ const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames,
+ const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory)
+ : TBase(memInfo)
+ , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, holderFactory))
+ , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory))
+ , JoinedTablePtr(std::make_unique<GraceJoin::TTable>())
+ , JoinCompleted(std::make_unique<bool>(false))
+ , JoinedTuple(std::make_unique<std::vector<NUdf::TUnboxedValue*>>() )
+ , FlowLeft(flowLeft)
+ , FlowRight(flowRight)
+ , JoinKind(joinKind)
+ , LeftKeyColumns(leftKeyColumns)
+ , RightKeyColumns(rightKeyColumns)
+ , LeftRenames(leftRenames)
+ , RightRenames(rightRenames) {}
+private:
+ IComputationWideFlowNode* const FlowLeft;
+ IComputationWideFlowNode* const FlowRight;
+
+ const EJoinKind JoinKind;
+ const std::vector<ui32> LeftKeyColumns;
+ const std::vector<ui32> RightKeyColumns;
+ const std::vector<ui32> LeftRenames;
+ const std::vector<ui32> RightRenames;
+ const std::vector<TType *> LeftColumnsTypes;
+ const std::vector<TType *> RightColumnsTypes;
+ const std::unique_ptr<TGraceJoinPacker> LeftPacker;
+ const std::unique_ptr<TGraceJoinPacker> RightPacker;
+ const std::unique_ptr<GraceJoin::TTable> JoinedTablePtr;
+ const std::unique_ptr<bool> JoinCompleted;
+ const std::unique_ptr<std::vector<NUdf::TUnboxedValue*>> JoinedTuple;
};
-
class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper> {
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper>;
public:
TGraceJoinWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight,
- EJoinKind joinKind, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns,
+ EJoinKind joinKind, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns,
std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames,
- std::vector<TType*>&& leftColumnsTypes, std::vector<TType*>&& rightColumnsTypes )
- : TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed)
- , FlowLeft(flowLeft)
- , FlowRight(flowRight)
- , JoinKind(joinKind)
- , LeftKeyColumns(leftKeyColumns)
- , RightKeyColumns(rightKeyColumns)
- , LeftRenames(leftRenames)
- , RightRenames(rightRenames)
- , LeftColumnsTypes(leftColumnsTypes)
- , RightColumnsTypes(rightColumnsTypes) { }
+ std::vector<TType*>&& leftColumnsTypes, std::vector<TType*>&& rightColumnsTypes,
+ std::vector<EValueRepresentation>&& outputRepresentations)
+ : TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed)
+ , FlowLeft(flowLeft)
+ , FlowRight(flowRight)
+ , JoinKind(joinKind)
+ , LeftKeyColumns(std::move(leftKeyColumns))
+ , RightKeyColumns(std::move(rightKeyColumns))
+ , LeftRenames(std::move(leftRenames))
+ , RightRenames(std::move(rightRenames))
+ , LeftColumnsTypes(std::move(leftColumnsTypes))
+ , RightColumnsTypes(std::move(rightColumnsTypes))
+ , OutputRepresentations(std::move(outputRepresentations))
+ {}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
-
if (!state.HasValue()) {
MakeState(ctx, state);
}
-
- const auto statePtr = static_cast<TState*>(state.AsBoxed().Get());
- return statePtr->State->DoCalculateWrapper( ctx, output);
+ return static_cast<TGraceJoinState*>(state.AsBoxed().Get())->FetchValues(ctx, output);
}
-
-
#ifndef MKQL_DISABLE_CODEGEN
ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
auto& context = ctx.Codegen->GetContext();
const auto valueType = Type::getInt128Ty(context);
+ const auto indexType = Type::getInt32Ty(context);
const auto ptrValueType = PointerType::getUnqual(valueType);
const auto structPtrType = PointerType::getUnqual(StructType::get(context));
const auto contextType = GetCompContextType(context);
const auto statusType = Type::getInt32Ty(context);
+
+ const auto arrayType = ArrayType::get(valueType, OutputRepresentations.size());
+ const auto fieldsType = ArrayType::get(PointerType::getUnqual(valueType), OutputRepresentations.size());
+
+ const auto atTop = &ctx.Func->getEntryBlock().back();
+
+ const auto values = new AllocaInst(arrayType, 0U, "values", atTop);
+ const auto fields = new AllocaInst(fieldsType, 0U, "fields", atTop);
+
+ ICodegeneratorInlineWideNode::TGettersList getters(OutputRepresentations.size());
+
+ Value* initV = UndefValue::get(arrayType);
+ Value* initF = UndefValue::get(fieldsType);
+ std::vector<Value*> pointers;
+ pointers.reserve(getters.size());
+ for (auto i = 0U; i < getters.size(); ++i) {
+ pointers.emplace_back(GetElementPtrInst::CreateInBounds(values, {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, (TString("ptr_") += ToString(i)).c_str(), atTop));
+ initV = InsertValueInst::Create(initV, ConstantInt::get(valueType, 0), {i}, (TString("zero_") += ToString(i)).c_str(), atTop);
+ initF = InsertValueInst::Create(initF, pointers.back(), {i}, (TString("insert_") += ToString(i)).c_str(), atTop);
+
+ getters[i] = [i, values](const TCodegenContext& ctx, BasicBlock*& block) {
+ const auto indexType = Type::getInt32Ty(ctx.Codegen->GetContext());
+ const auto pointer = GetElementPtrInst::CreateInBounds(values, {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, (TString("ptr_") += ToString(i)).c_str(), block);
+ return new LoadInst(pointer, (TString("load_") += ToString(i)).c_str(), block);
+ };
+ }
+
+ new StoreInst(initV, values, atTop);
+ new StoreInst(initF, fields, atTop);
+
const auto stateType = StructType::get(context, {
- structPtrType // vtbl
+ structPtrType, // vtbl
+ Type::getInt32Ty(context), // ref
+ Type::getInt16Ty(context), // abi
+ Type::getInt16Ty(context), // reserved
+ structPtrType, // meminfo
+ // ....
});
const auto statePtrType = PointerType::getUnqual(stateType);
- ui64 outputSize = LeftRenames.size() + RightRenames.size();
- const auto keys = new AllocaInst(ArrayType::get(valueType, outputSize), 0U, "keys", &ctx.Func->getEntryBlock().back());
-
const auto make = BasicBlock::Create(context, "make", ctx.Func);
const auto main = BasicBlock::Create(context, "main", ctx.Func);
- const auto more = BasicBlock::Create(context, "more", ctx.Func);
BranchInst::Create(main, make, HasValue(statePtr, block), block);
block = make;
@@ -546,54 +569,53 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
block = main;
+ for (ui32 i = 0U; i < OutputRepresentations.size(); ++i) {
+ ValueCleanup(OutputRepresentations[i], pointers[i], ctx, block);
+ }
+
+ new StoreInst(initV, values, block);
+
const auto state = new LoadInst(statePtr, "state", block);
const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
- BranchInst::Create(more, block);
- block = more;
+ const auto func = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TGraceJoinState::FetchValues));
+ const auto funcType = FunctionType::get(Type::getInt32Ty(context), { statePtrType, ctx.Ctx->getType(), fields->getType() }, false);
+ const auto funcPtr = CastInst::Create(Instruction::IntToPtr, func, PointerType::getUnqual(funcType), "fetch_func", block);
+ const auto result = CallInst::Create(funcPtr, { stateArg, ctx.Ctx, fields }, "fetch", block);
- const auto over = BasicBlock::Create(context, "over", ctx.Func);
- const auto result = PHINode::Create(statusType, 3U, "result", over);
+ for (ui32 i = 0U; i < OutputRepresentations.size(); ++i) {
+ ValueRelease(OutputRepresentations[i], pointers[i], ctx, block);
+ }
- ICodegeneratorInlineWideNode::TGettersList getters;
- getters.reserve(outputSize);
-/*
- std::transform(Nodes.FinishResultNodes.cbegin(), Nodes.FinishResultNodes.cend(), std::back_inserter(getters), [&](IComputationNode* node) {
- return [node](const TCodegenContext& ctx, BasicBlock*& block){ return GetNodeValue(node, ctx, block); };
- });
-*/
return {result, std::move(getters)};
}
#endif
-
private:
void RegisterDependencies() const final {
- this->DependsOn(FlowLeft, FlowRight);
+ FlowDependsOnBoth(FlowLeft, FlowRight);
}
-
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
- state = ctx.HolderFactory.Create<TState>(std::make_unique<GraceJoinState>(
- FlowLeft, FlowRight, JoinKind, LeftKeyColumns, RightKeyColumns,
- LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes,
- ctx.HolderFactory));
+ state = ctx.HolderFactory.Create<TGraceJoinState>(
+ FlowLeft, FlowRight, JoinKind, LeftKeyColumns, RightKeyColumns,
+ LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes,
+ ctx.HolderFactory);
}
- IComputationWideFlowNode* FlowLeft;
- IComputationWideFlowNode* FlowRight;
- EJoinKind JoinKind;
- std::vector<ui32> LeftKeyColumns;
- std::vector<ui32> RightKeyColumns;
- std::vector<ui32> LeftRenames;
- std::vector<ui32> RightRenames;
- std::vector<TType *> LeftColumnsTypes;
- std::vector<TType *> RightColumnsTypes;
-
+ IComputationWideFlowNode *const FlowLeft;
+ IComputationWideFlowNode *const FlowRight;
+ const EJoinKind JoinKind;
+ const std::vector<ui32> LeftKeyColumns;
+ const std::vector<ui32> RightKeyColumns;
+ const std::vector<ui32> LeftRenames;
+ const std::vector<ui32> RightRenames;
+ const std::vector<TType *> LeftColumnsTypes;
+ const std::vector<TType *> RightColumnsTypes;
+ const std::vector<EValueRepresentation> OutputRepresentations;
};
-
-EFetchResult GraceJoinState::DoCalculateWrapper(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
while (!*JoinCompleted) {
@@ -667,13 +689,18 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0));
const auto flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1));
+ const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType());
+ std::vector<EValueRepresentation> outputRepresentations;
+ outputRepresentations.reserve(tupleType->GetElementsCount());
+ for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i)
+ outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i)));
std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames;
std::vector<TType *> leftColumnsTypes, rightColumnsTypes;
-
+
leftColumnsTypes.resize(AS_TYPE(TTupleType, leftFlowTupleType)->GetElementsCount());
rightColumnsTypes.resize(AS_TYPE(TTupleType, rightFlowTupleType)->GetElementsCount());
-
+
leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount());
for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) {
leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
@@ -693,21 +720,20 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) {
rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>());
}
-
+
for (ui32 i = 0; i < leftColumnsTypes.size(); ++i) {
leftColumnsTypes[i] = AS_TYPE(TTupleType, leftFlowTupleType)->GetElementType(i);
}
-
for (ui32 i = 0; i < rightColumnsTypes.size(); ++i) {
rightColumnsTypes[i] = AS_TYPE(TTupleType, rightFlowTupleType)->GetElementType(i);
}
return new TGraceJoinWrapper(
- ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind),
- std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
- std::move(leftColumnsTypes), std::move(rightColumnsTypes));
-
+ ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind),
+ std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
+ std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations));
+
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
index e553a63b1d..2d42af3b41 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
@@ -24,8 +24,8 @@ constexpr bool IsVerbose = false;
Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinMemTest) {
- Y_UNIT_TEST_LLVM(TestMem1) {
-
+ Y_UNIT_TEST(TestMem1) {
+
const ui64 TupleSize = 1024;
const ui64 NBuckets = 128;
const ui64 NTuples = 10000;
@@ -162,14 +162,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinMemTest) {
CTEST << Endl;
UNIT_ASSERT(true);
-
+
}
}
Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
- Y_UNIT_TEST_LLVM(TestImp1) {
+ Y_UNIT_TEST(TestImp1) {
ui64 tuple[11] = {0,1,2,3,4,5,6,7,8,9,10};
ui32 strSizes[2] = {4, 4};
@@ -179,7 +179,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
(char *)"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"};
ui32 bigStrSize[2] = {151, 151};
-
+
GraceJoin::TTable bigTable(1,1,1,1);
GraceJoin::TTable smallTable(1,1,1,1);
@@ -189,7 +189,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
const ui64 TupleSize = 1024;
- ui64 bigTuple[TupleSize];
+ ui64 bigTuple[TupleSize];
for (ui64 i = 0; i < TupleSize; i++) {
bigTuple[i] = std::rand() / ( RAND_MAX / 10000 );
@@ -197,13 +197,13 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
ui64 milliseconds = 0;
-
+
const ui64 BigTableTuples = 60000;
const ui64 SmallTableTuples = 15000;
const ui64 BigTupleSize = 32;
- std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now();
for ( ui64 i = 0; i < BigTableTuples; i++) {
@@ -229,7 +229,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
bigTable.Clear();
smallTable.Clear();
- begin03 = std::chrono::steady_clock::now();
+ begin03 = std::chrono::steady_clock::now();
for ( ui64 i = 0; i < BigTableTuples; i++) {
@@ -251,7 +251,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
CTEST << "Time for hash = " << milliseconds << "[ms]" << Endl;
CTEST << "Adding tuples speed: " << (BigTupleSize * (BigTableTuples + SmallTableTuples) * 1000) / ( milliseconds * 1024 * 1024) << "MB/sec" << Endl;
CTEST << Endl;
-
+
std::vector<ui64> vals1, vals2;
@@ -274,7 +274,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
ui64 numBigTuples = 0;
bigTable.ResetIterator();
- std::chrono::steady_clock::time_point begin04 = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point begin04 = std::chrono::steady_clock::now();
while(bigTable.NextTuple(td1)) { numBigTuples++; }
@@ -287,7 +287,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
numBigTuples = 0;
bigTable.ResetIterator();
- std::chrono::steady_clock::time_point begin041 = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point begin041 = std::chrono::steady_clock::now();
while(bigTable.NextTuple(td2)) { numBigTuples++; }
@@ -298,7 +298,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
CTEST << Endl;
- std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now();
joinTable.Join(bigTable,smallTable);
@@ -310,7 +310,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
ui64 numJoinedTuples = 0;
- std::chrono::steady_clock::time_point begin042 = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point begin042 = std::chrono::steady_clock::now();
while(joinTable.NextJoinedData(td1, td2)) { numJoinedTuples++; }
@@ -331,13 +331,8 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
}
}
-
-
Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
Y_UNIT_TEST_LLVM(TestInner1) {
-
- if (LLVM) return;
-
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -400,16 +395,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
-
- }
-
-
+ }
Y_UNIT_TEST_LLVM(TMiniKQLGraceJoinTestInnerMulti1) {
-
- if (LLVM) return;
-
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -477,9 +466,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeft1) {
-
- if (LLVM) return;
-
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -551,9 +537,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeftMulti1) {
-
- if (LLVM) return;
-
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -626,9 +609,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeftSemi1) {
-
- if (LLVM) return;
-
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -691,9 +671,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeftOnly1) {
-
- if (LLVM) return;
-
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -751,9 +728,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeftSemiWithNullKey1) {
-
- if (LLVM) return;
-
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -818,9 +792,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeftOnlyWithNullKey1) {
-
- if (LLVM) return;
-
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -885,9 +856,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestRight1) {
-
- if (LLVM) return;
-
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -958,10 +926,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
}
-
- Y_UNIT_TEST_LLVM(TestRightOnly1) {
-
- if (LLVM) return;
+ Y_UNIT_TEST_LLVM(TestRightOnly1) {
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1017,10 +982,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
}
-
-
Y_UNIT_TEST_LLVM(TestRightSemi1) {
- if (LLVM) return;
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1082,9 +1044,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
}
-
Y_UNIT_TEST_LLVM(TestRightMulti1) {
- if (LLVM) return;
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1156,9 +1116,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
}
-
- Y_UNIT_TEST_LLVM(TestRightSemiWithNullKey1) {
- if (LLVM) return;
+ Y_UNIT_TEST_LLVM(TestRightSemiWithNullKey1) {
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1223,7 +1181,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestRightOnlyWithNullKey1) {
- if (LLVM) return;
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1288,7 +1245,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestFull1) {
- if (LLVM) return;
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1353,9 +1309,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C");
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y");
UNIT_ASSERT(iterator.Next(tuple));
- UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A");
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A");
UNIT_ASSERT(!tuple.GetElement(1));
- UNIT_ASSERT(iterator.Next(tuple));
+ UNIT_ASSERT(iterator.Next(tuple));
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z");
UNIT_ASSERT(!tuple.GetElement(0));
UNIT_ASSERT(!iterator.Next(tuple));
@@ -1365,7 +1321,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
Y_UNIT_TEST_LLVM(TestExclusion1) {
- if (LLVM) return;
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1418,9 +1373,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
UNIT_ASSERT(iterator.Next(tuple));
- UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A");
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A");
UNIT_ASSERT(!tuple.GetElement(1));
- UNIT_ASSERT(iterator.Next(tuple));
+ UNIT_ASSERT(iterator.Next(tuple));
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z");
UNIT_ASSERT(!tuple.GetElement(0));
UNIT_ASSERT(!iterator.Next(tuple));
@@ -1430,7 +1385,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
-
}
}