diff options
author | imunkin <imunkin@yandex-team.com> | 2025-02-11 13:45:10 +0300 |
---|---|---|
committer | imunkin <imunkin@yandex-team.com> | 2025-02-11 14:34:43 +0300 |
commit | 50c6140bfda6b86cd98464d7df5feb27c78a496c (patch) | |
tree | c73ab45d6e517b6ce1a92c9dfc244ae4f24eb97b | |
parent | 8e899a3e9c25ea69dacab575d77421116dbef8bb (diff) | |
download | ydb-50c6140bfda6b86cd98464d7df5feb27c78a496c.tar.gz |
YQL-19424: Use WideStream instead of WideFlow in WideToBlocks computation node
commit_hash:0c0bfb556ff1f51f3293899c0364cd56c3965c41
28 files changed, 466 insertions, 390 deletions
diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp index f27fcb72f5..94754fb67d 100644 --- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -128,32 +128,22 @@ TExprNode::TPtr RebuildArgumentsOnlyLambdaForBlocks(const TExprNode& lambda, TEx TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - const auto& input = node->Head(); - if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) { - const auto& wideFromBlocks = input.Head(); - // Technically, the code below rewrites the following sequence - // (WideToBlocks (ToFlow (WideFromBlocks (<input>))))) - // into (ReplicateScalars (<input>)), but ToFlow/FromFlow - // wrappers will be removed when all other nodes in block - // pipeline start using WideStream instead of the WideFlow. - // Hence, the logging is left intact. - YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideFromBlocks.Content(); + if (input.IsCallable("WideFromBlocks")) { + YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << input.Content(); // If tail is FromFlow, its input is WideFlow and can be // used intact; Otherwise the input is WideStream, so the // new input should be converted to WideFlow. - const auto tail = wideFromBlocks.HeadPtr(); + const auto tail = input.HeadPtr(); const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr() : ctx.NewCallable(tail->Pos(), "ToFlow", { tail }); - return ctx.NewCallable(node->Pos(), "ReplicateScalars", { flowInput }); + return ctx.Builder(node->Pos()) + .Callable("FromFlow") + .Callable(0, "ReplicateScalars") + .Add(0, flowInput) + .Seal() + .Seal() + .Build(); } if (input.IsCallable({"Extend", "OrderedExtend"})) { @@ -171,26 +161,10 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext& TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - const auto& input = node->Head(); - if (input.IsCallable("FromFlow") && input.Head().IsCallable("WideToBlocks")) { - const auto& wideToBlocks = input.Head(); - // Technically, the code below rewrites the following sequence - // (WideFromBlocks (FromFlow (WideToBlocks (<input>)))) - // into (FromFlow (<input>)) (to match the ToFlow parent), - // but ToFlow/FromFlow wrappers will be removed when all - // other nodes in block pipeline start using WideStream - // instead of the WideFlow. Hence, the logging is left intact. - YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideToBlocks.Content(); - return ctx.NewCallable(node->Pos(), "FromFlow", {wideToBlocks.HeadPtr()}); + if (input.IsCallable("WideToBlocks")) { + YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << input.Content(); + return input.HeadPtr(); } if (input.IsCallable("FromFlow") && input.Head().IsCallable("ReplicateScalars")) { @@ -6268,22 +6242,17 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext& YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes << ", extra columns: " << rewritePositions.size(); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - auto ret = ctx.Builder(node->Pos()) .Callable("ToFlow") .Callable(0, "WideFromBlocks") .Callable(0, "FromFlow") .Callable(0, "WideMap") - .Callable(0, "WideToBlocks") - .Add(0, node->HeadPtr()) + .Callable(0, "ToFlow") + .Callable(0, "WideToBlocks") + .Callable(0, "FromFlow") + .Add(0, node->HeadPtr()) + .Seal() + .Seal() .Seal() .Add(1, blockLambda) .Seal() @@ -6318,18 +6287,14 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte return node; } - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - auto blockMapped = ctx.Builder(node->Pos()) .Callable("WideMap") - .Callable(0, "WideToBlocks") - .Add(0, node->HeadPtr()) + .Callable(0, "ToFlow") + .Callable(0, "WideToBlocks") + .Callable(0, "FromFlow") + .Add(0, node->HeadPtr()) + .Seal() + .Seal() .Seal() .Add(1, blockLambda) .Seal() @@ -6446,22 +6411,17 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte TStringBuf newName = node->Content() == "Skip" ? "WideSkipBlocks" : "WideTakeBlocks"; YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName; - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - return ctx.Builder(node->Pos()) .Callable("ToFlow") .Callable(0, "WideFromBlocks") .Callable(0, "FromFlow") .Callable(0, newName) - .Callable(0, "WideToBlocks") - .Add(0, node->HeadPtr()) + .Callable(0, "ToFlow") + .Callable(0, "WideToBlocks") + .Callable(0, "FromFlow") + .Add(0, node->HeadPtr()) + .Seal() + .Seal() .Seal() .Add(1, node->ChildPtr(1)) .Seal() @@ -6504,16 +6464,15 @@ TExprNode::TPtr OptimizeTopOrSortBlocks(const TExprNode::TPtr& node, TExprContex TString newName = node->Content() + TString("Blocks"); YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName; auto children = node->ChildrenList(); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - - children[0] = ctx.NewCallable(node->Pos(), "WideToBlocks", { children[0] }); + children[0] = ctx.Builder(node->Pos()) + .Callable("ToFlow") + .Callable(0, "WideToBlocks") + .Callable(0, "FromFlow") + .Add(0, children[0]) + .Seal() + .Seal() + .Seal() + .Build(); return ctx.Builder(node->Pos()) .Callable("ToFlow") .Callable(0, "WideFromBlocks") @@ -6726,27 +6685,46 @@ TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx) .Seal() .Add(1, DropUnusedArgs(node->Tail(), unused, ctx)) .Seal().Build(); - } else if (input.IsCallable("WideToBlocks")) { - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - + } else if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideToBlocks")) { auto actualUnused = unused; if (actualUnused.back() + 1U == node->Tail().Head().ChildrenSize()) actualUnused.pop_back(); if (!actualUnused.empty()) { - YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << input.Content() << " with " << actualUnused.size() << " unused fields."; + const auto& wideToBlocks = input.Head(); + // WideToBlocks uses WideStream instead of WideFlow, + // so it's wrapped with ToFlow/FromFlow. Hence, to drop + // unused fields for particular WideToBlocks node, + // the optimizer has to rewrite FromFlow child, but + // logging is left intact. + YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << wideToBlocks.Content() << " with " << actualUnused.size() << " unused fields."; + const auto tail = wideToBlocks.HeadPtr(); + const auto width = tail->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>()->GetSize(); + const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr() + : ctx.NewCallable(tail->Pos(), "ToFlow", { tail }); return ctx.Builder(node->Pos()) .Callable(node->Content()) - .Callable(0, input.Content()) - .Add(0, MakeWideMapForDropUnused(input.HeadPtr(), actualUnused, ctx)) + .Callable(0, "ToFlow") + .Callable(0, "WideToBlocks") + .Callable(0, "FromFlow") + .Callable(0, "WideMap") + .Add(0, flowInput) + .Lambda(1) + .Params("items", width) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (auto i = 0U, j = 0U; i < width; ++i) { + if (unused.cend() == std::find(unused.cbegin(), unused.cend(), i)) + parent.Arg(j++, "items", i); + } + return parent; + }) + .Seal() + .Seal() + .Seal() + .Seal() .Seal() .Add(1, DropUnusedArgs(node->Tail(), actualUnused, ctx)) - .Seal().Build(); + .Seal() + .Build(); } } else if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) { const auto& wideFromBlocks = input.Head(); diff --git a/yql/essentials/core/type_ann/type_ann_blocks.cpp b/yql/essentials/core/type_ann/type_ann_blocks.cpp index 93a1ef605c..496fe493f4 100644 --- a/yql/essentials/core/type_ann/type_ann_blocks.cpp +++ b/yql/essentials/core/type_ann/type_ann_blocks.cpp @@ -868,24 +868,15 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { Y_UNUSED(output); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - if (!EnsureArgsCount(*input, 1U, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - if (!EnsureWideFlowType(input->Head(), ctx.Expr)) { + if (!EnsureWideStreamType(input->Head(), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); + const auto multiType = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); TTypeAnnotationNode::TListType retMultiType; for (const auto& type : multiType->GetItems()) { if (type->IsBlockOrScalar()) { @@ -902,7 +893,7 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))); auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); - input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType)); return IGraphTransformer::TStatus::Ok; } diff --git a/yql/essentials/core/yql_aggregate_expander.cpp b/yql/essentials/core/yql_aggregate_expander.cpp index 60b0b9ff0f..7f8b143a39 100644 --- a/yql/essentials/core/yql_aggregate_expander.cpp +++ b/yql/essentials/core/yql_aggregate_expander.cpp @@ -679,17 +679,15 @@ TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& strea auto extractorLambda = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), std::move(extractorArgs)), std::move(extractorRoots)); auto mappedWideFlow = Ctx.NewCallable(Node->Pos(), "WideMap", { wideFlow, extractorLambda }); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - - auto blocks = Ctx.NewCallable(Node->Pos(), "WideToBlocks", { mappedWideFlow }); - return blocks; + return Ctx.Builder(Node->Pos()) + .Callable("ToFlow") + .Callable(0, "WideToBlocks") + .Callable(0, "FromFlow") + .Add(0, mappedWideFlow) + .Seal() + .Seal() + .Seal() + .Build(); } TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h index 377cbd2800..ec1a8f30c8 100644 --- a/yql/essentials/core/yql_expr_type_annotation.h +++ b/yql/essentials/core/yql_expr_type_annotation.h @@ -355,7 +355,7 @@ TStringBuf NormalizeCallableName(TStringBuf name); void CheckExpectedTypeAndColumnOrder(const TExprNode& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx); namespace NBlockStreamIO { - constexpr bool WideToBlocks = false; + constexpr bool WideToBlocks = true; } // namespace NBlockStreamIO } diff --git a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp index c05ccb9ca1..354b0237ed 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp @@ -58,10 +58,48 @@ private: TType* ItemType_; }; -class TWideToBlocksWrapper : public TStatefulWideFlowCodegeneratorNode<TWideToBlocksWrapper> { -using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideToBlocksWrapper>; +struct TWideToBlocksState : public TBlockState { + size_t Rows_ = 0; + bool IsFinished_ = false; + size_t BuilderAllocatedSize_ = 0; + size_t MaxBuilderAllocatedSize_ = 0; + std::vector<std::unique_ptr<IArrayBuilder>> Builders_; + static const size_t MaxAllocatedFactor_ = 4; + + TWideToBlocksState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types, size_t maxLength) + : TBlockState(memInfo, types.size() + 1U) + , Builders_(types.size()) + { + for (size_t i = 0; i < types.size(); ++i) { + Builders_[i] = MakeArrayBuilder(TTypeInfoHelper(), types[i], ctx.ArrowMemoryPool, maxLength, &ctx.Builder->GetPgBuilder(), &BuilderAllocatedSize_); + } + MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_; + } + + void Add(const NUdf::TUnboxedValuePod value, size_t idx) { + Builders_[idx]->Add(value); + } + + void MakeBlocks(const THolderFactory& holderFactory) { + Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(Rows_))); + Rows_ = 0; + BuilderAllocatedSize_ = 0; + + for (size_t i = 0; i < Builders_.size(); ++i) { + if (const auto builder = Builders_[i].get()) { + Values[i] = holderFactory.CreateArrowBlock(builder->Build(IsFinished_)); + } + } + + FillArrays(); + } +}; + +class TWideToBlocksFlowWrapper : public TStatefulWideFlowCodegeneratorNode<TWideToBlocksFlowWrapper> { +using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideToBlocksFlowWrapper>; +using TState = TWideToBlocksState; public: - TWideToBlocksWrapper(TComputationMutables& mutables, + TWideToBlocksFlowWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TVector<TType*>&& types) : TBaseComputation(mutables, flow, EValueRepresentation::Boxed) @@ -154,7 +192,7 @@ public: const auto ptrType = PointerType::getUnqual(StructType::get(context)); const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block); - const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TWideToBlocksWrapper::MakeState)); + const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TWideToBlocksFlowWrapper::MakeState)); const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false); const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block); CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block); @@ -264,43 +302,6 @@ public: } #endif private: - struct TState : public TBlockState { - size_t Rows_ = 0; - bool IsFinished_ = false; - size_t BuilderAllocatedSize_ = 0; - size_t MaxBuilderAllocatedSize_ = 0; - std::vector<std::unique_ptr<IArrayBuilder>> Builders_; - static const size_t MaxAllocatedFactor_ = 4; - - TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types, size_t maxLength, NUdf::TUnboxedValue**const fields) - : TBlockState(memInfo, types.size() + 1U) - , Builders_(types.size()) - { - for (size_t i = 0; i < types.size(); ++i) { - fields[i] = &Values[i]; - Builders_[i] = MakeArrayBuilder(TTypeInfoHelper(), types[i], ctx.ArrowMemoryPool, maxLength, &ctx.Builder->GetPgBuilder(), &BuilderAllocatedSize_); - } - MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_; - } - - void Add(const NUdf::TUnboxedValuePod value, size_t idx) { - Builders_[idx]->Add(value); - } - - void MakeBlocks(const THolderFactory& holderFactory) { - Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(Rows_))); - Rows_ = 0; - BuilderAllocatedSize_ = 0; - - for (size_t i = 0; i < Builders_.size(); ++i) { - if (const auto builder = Builders_[i].get()) { - Values[i] = holderFactory.CreateArrowBlock(builder->Build(IsFinished_)); - } - } - - FillArrays(); - } - }; #ifndef MKQL_DISABLE_CODEGEN class TLLVMFieldsStructureState: public TLLVMFieldsStructureBlockState { private: @@ -351,12 +352,18 @@ private: } void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { - state = ctx.HolderFactory.Create<TState>(ctx, Types_, MaxLength_, ctx.WideFields.data() + WideFieldsIndex_); + state = ctx.HolderFactory.Create<TState>(ctx, Types_, MaxLength_); } TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { - if (state.IsInvalid()) + if (state.IsInvalid()) { MakeState(ctx, state); + auto& s = *static_cast<TState*>(state.AsBoxed().Get()); + const auto fields = ctx.WideFields.data() + WideFieldsIndex_; + for (size_t i = 0; i < Width_; ++i) + fields[i] = &s.Values[i]; + return s; + } return *static_cast<TState*>(state.AsBoxed().Get()); } @@ -368,6 +375,92 @@ private: const size_t WideFieldsIndex_; }; +class TWideToBlocksStreamWrapper : public TMutableComputationNode<TWideToBlocksStreamWrapper> +{ +using TBaseComputation = TMutableComputationNode<TWideToBlocksStreamWrapper>; +using TState = TWideToBlocksState; +public: + TWideToBlocksStreamWrapper(TComputationMutables& mutables, + IComputationNode* stream, + TVector<TType*>&& types) + : TBaseComputation(mutables, EValueRepresentation::Boxed) + , Stream_(stream) + , Types_(std::move(types)) + , MaxLength_(CalcBlockLen(std::accumulate(Types_.cbegin(), Types_.cend(), 0ULL, [](size_t max, const TType* type){ return std::max(max, CalcMaxBlockItemSize(type)); }))) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const + { + const auto state = ctx.HolderFactory.Create<TState>(ctx, Types_, MaxLength_); + return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, + std::move(state), + std::move(Stream_->GetValue(ctx)), + MaxLength_); + } + +private: + class TStreamValue : public TComputationValue<TStreamValue> { + using TBase = TComputationValue<TStreamValue>; + public: + TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, + NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& stream, + const size_t maxLength) + : TBase(memInfo) + , BlockState_(blockState) + , Stream_(stream) + , MaxLength_(maxLength) + , HolderFactory_(holderFactory) + {} + + private: + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { + auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get()); + auto* inputFields = blockState.Pointer_; + const size_t inputWidth = blockState.Values.size() - 1; + + if (!blockState.Count) { + if (!blockState.IsFinished_) do { + switch (Stream_.WideFetch(inputFields, inputWidth)) { + case NUdf::EFetchStatus::Ok: + for (size_t i = 0; i < inputWidth; i++) + blockState.Add(blockState.Values[i], i); + continue; + case NUdf::EFetchStatus::Yield: + return NUdf::EFetchStatus::Yield; + case NUdf::EFetchStatus::Finish: + blockState.IsFinished_ = true; + break; + } + break; + } while (++blockState.Rows_ < MaxLength_ && blockState.BuilderAllocatedSize_ <= blockState.MaxBuilderAllocatedSize_); + if (blockState.Rows_) + blockState.MakeBlocks(HolderFactory_); + else + return NUdf::EFetchStatus::Finish; + } + + const auto sliceSize = blockState.Slice(); + for (size_t i = 0; i < width; i++) { + output[i] = blockState.Get(sliceSize, HolderFactory_, i); + } + return NUdf::EFetchStatus::Ok; + } + + NUdf::TUnboxedValue BlockState_; + NUdf::TUnboxedValue Stream_; + const size_t MaxLength_; + const THolderFactory& HolderFactory_; + }; + + void RegisterDependencies() const final { + this->DependsOn(Stream_); + } + + IComputationNode* const Stream_; + const TVector<TType*> Types_; + const size_t MaxLength_; +}; + class TFromBlocksWrapper : public TStatefulFlowCodegeneratorNode<TFromBlocksWrapper> { using TBaseComputation = TStatefulFlowCodegeneratorNode<TFromBlocksWrapper>; public: @@ -1259,13 +1352,25 @@ IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactor IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); - const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto wideComponents = GetWideComponents(flowType); + const auto inputType = callable.GetInput(0).GetStaticType(); + MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), + "Expected either WideStream or WideFlow as an input"); + const auto yieldsStream = callable.GetType()->GetReturnType()->IsStream(); + MKQL_ENSURE(yieldsStream == inputType->IsStream(), + "Expected both input and output have to be either WideStream or WideFlow"); + + const auto wideComponents = GetWideComponents(inputType); TVector<TType*> items(wideComponents.begin(), wideComponents.end()); - const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); + const auto wideFlowOrStream = LocateNode(ctx.NodeLocator, callable, 0); + if (yieldsStream) { + const auto wideStream = wideFlowOrStream; + return new TWideToBlocksStreamWrapper(ctx.Mutables, wideStream, std::move(items)); + } + // FIXME: Drop the branch below, when the time comes. + const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(wideFlowOrStream); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - return new TWideToBlocksWrapper(ctx.Mutables, wideFlow, std::move(items)); + return new TWideToBlocksFlowWrapper(ctx.Mutables, wideFlow, std::move(items)); } IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp index 5aaf9da44b..7fcb7a16e1 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp @@ -81,7 +81,7 @@ void DoNestedTuplesCompressTest() { node = pb.ExpandMap(node, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }); - node = pb.WideToBlocks(node); + node = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(node))); node = pb.BlockExpandChunked(node); node = pb.WideSkipBlocks(node, pb.template NewDataLiteral<ui64>(19)); @@ -186,7 +186,8 @@ Y_UNIT_TEST_LLVM(CompressBasic) { const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }); - const auto compressedBlocks = pb.BlockCompress(pb.WideToBlocks(wideFlow), 0); + const auto uncompressedBlocks = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); + const auto compressedBlocks = pb.BlockCompress(uncompressedBlocks, 0); const auto compressedFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(compressedBlocks))); const auto narrowFlow = pb.NarrowMap(compressedFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple({items[0], items[1]}); diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp index fe65e76d1e..373ef67f4b 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp @@ -53,7 +53,7 @@ void DoBlockExistsOffset(size_t length, size_t offset) { pb.Nth(item, 3) }; }); - node = pb.WideToBlocks(node); + node = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(node))); if (offset > 0) { node = pb.WideSkipBlocks(node, pb.NewDataLiteral<ui64>(offset)); } diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp index 8303c709dc..2066594e3d 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp @@ -48,8 +48,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), + const auto wideFlow = pb.ExpandMap(pb.ToFlow(list), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); + const auto blockFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); + const auto topBlocks = pb.WideTopBlocks(blockFlow, pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}}); const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks))); const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow, @@ -113,8 +115,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), + const auto wideFlow = pb.ExpandMap(pb.ToFlow(list), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); + const auto blockFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); + const auto topBlocks = pb.WideTopBlocks(blockFlow, pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}}); const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks))); const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow, @@ -184,8 +188,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), + const auto wideFlow = pb.ExpandMap(pb.ToFlow(list), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); + const auto blockFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); + const auto topBlocks = pb.WideTopBlocks(blockFlow, pb.NewDataLiteral<ui64>(3ULL), {{1U, pb.NewDataLiteral<bool>(true)}}); const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks))); const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow, @@ -246,8 +252,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), + const auto wideFlow = pb.ExpandMap(pb.ToFlow(list), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); + const auto blockFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); + const auto topBlocks = pb.WideTopBlocks(blockFlow, pb.NewDataLiteral<ui64>(2ULL), {{1U, pb.NewDataLiteral<bool>(false)}}); const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks))); const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow, @@ -305,8 +313,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), + const auto wideFlow = pb.ExpandMap(pb.ToFlow(list), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); + const auto blockFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); + const auto topSortBlocks = pb.WideTopSortBlocks(blockFlow, pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}, {1U, pb.NewDataLiteral<bool>(false)}}); const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks))); const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow, @@ -370,8 +380,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), + const auto wideFlow = pb.ExpandMap(pb.ToFlow(list), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); + const auto blockFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); + const auto topSortBlocks = pb.WideTopSortBlocks(blockFlow, pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}, {1U, pb.NewDataLiteral<bool>(true)}}); const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks))); const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow, @@ -441,8 +453,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), + const auto wideFlow = pb.ExpandMap(pb.ToFlow(list), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); + const auto blockFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); + const auto topSortBlocks = pb.WideTopSortBlocks(blockFlow, pb.NewDataLiteral<ui64>(4ULL), {{1U, pb.NewDataLiteral<bool>(true)}, {0U, pb.NewDataLiteral<bool>(false)}}); const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks))); const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow, @@ -506,8 +520,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), + const auto wideFlow = pb.ExpandMap(pb.ToFlow(list), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); + const auto blockFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); + const auto topSortBlocks = pb.WideTopSortBlocks(blockFlow, pb.NewDataLiteral<ui64>(6ULL), {{1U, pb.NewDataLiteral<bool>(false)}, {0U, pb.NewDataLiteral<bool>(true)}}); const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks))); const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow, @@ -581,8 +597,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockSortTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto sortBlocks = pb.WideSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), + const auto wideFlow = pb.ExpandMap(pb.ToFlow(list), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); + const auto blockFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); + const auto sortBlocks = pb.WideSortBlocks(blockFlow, {{0U, pb.NewDataLiteral<bool>(true)}}); const auto sortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(sortBlocks))); const auto pgmReturn = pb.Collect(pb.NarrowMap(sortFlow, diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp index 1ad2c3c5c5..a873fbeb5b 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp @@ -126,7 +126,7 @@ Y_UNIT_TEST_LLVM(TestWideToBlocks) { const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); - const auto wideBlocksFlow = pb.WideToBlocks(wideFlow); + const auto wideBlocksFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); const auto narrowBlocksFlow = pb.NarrowMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return items[1]; }); @@ -191,7 +191,7 @@ void TestChunked(bool withBlockExpand) { node = pb.ExpandMap(node, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U)}; }); - node = pb.WideToBlocks(node); + node = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(node))); if (withBlockExpand) { node = pb.BlockExpandChunked(node); // WideTakeBlocks won't work on chunked blocks @@ -336,7 +336,7 @@ Y_UNIT_TEST_LLVM(TestBlockFunc) { const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); - const auto wideBlocksFlow = pb.WideToBlocks(wideFlow); + const auto wideBlocksFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {pb.BlockFunc("Add", ui64BlockType, {items[0], items[1]})}; }); @@ -393,7 +393,7 @@ Y_UNIT_TEST_LLVM(TestBlockFuncWithNullables) { const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); - const auto wideBlocksFlow = pb.WideToBlocks(wideFlow); + const auto wideBlocksFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {pb.BlockFunc("Add", ui64OptBlockType, {items[0], items[1]})}; }); @@ -598,7 +598,7 @@ Y_UNIT_TEST_LLVM(TestWideToAndFromBlocks) { const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); - const auto wideBlocksFlow = pb.WideToBlocks(wideFlow); + const auto wideBlocksFlow = pb.ToFlow(pb.WideToBlocks(pb.FromFlow(wideFlow))); const auto wideFlow2 = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(wideBlocksFlow))); const auto narrowFlow = pb.NarrowMap(wideFlow2, [&](TRuntimeNode::TList items) -> TRuntimeNode { return items[1]; diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index d7df4981b3..717eb9ec32 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -1476,21 +1476,36 @@ TRuntimeNode TProgramBuilder::ToBlocks(TRuntimeNode flow) { return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode flow) { - TType* outputItemType; - { - const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); - std::vector<TType*> outputItems; - outputItems.reserve(wideComponents.size()); - for (size_t i = 0; i < wideComponents.size(); ++i) { - outputItems.push_back(NewBlockType(wideComponents[i], TBlockType::EShape::Many)); - } - outputItems.push_back(NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); - outputItemType = NewMultiType(outputItems); +TType* TProgramBuilder::BuildWideBlockType(const TArrayRef<TType* const>& wideComponents) { + std::vector<TType*> blockItems; + blockItems.reserve(wideComponents.size()); + for (size_t i = 0; i < wideComponents.size(); i++) { + blockItems.push_back(NewBlockType(wideComponents[i], TBlockType::EShape::Many)); } + blockItems.push_back(NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); + return NewMultiType(blockItems); +} - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputItemType)); - callableBuilder.Add(flow); +TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode stream) { + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected WideStream as input type"); + if constexpr (RuntimeVersion < 58U) { + // Preserve the old behaviour for ABI compatibility. + // Emit (FromFlow (WideToBlocks (ToFlow (<stream>)))) to + // process the flow in favor to the given stream following + // the older MKQL ABI. + // FIXME: Drop the branch below, when the time comes. + const auto inputFlow = ToFlow(stream); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, inputFlow.GetStaticType())); + TType* outputMultiType = BuildWideBlockType(wideComponents); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType)); + callableBuilder.Add(inputFlow); + const auto outputFlow = TRuntimeNode(callableBuilder.Build(), false); + return FromFlow(outputFlow); + } + const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, stream.GetStaticType())); + TType* outputMultiType = BuildWideBlockType(wideComponents); + TCallableBuilder callableBuilder(Env, __func__, NewStreamType(outputMultiType)); + callableBuilder.Add(stream); return TRuntimeNode(callableBuilder.Build(), false); } diff --git a/yql/essentials/minikql/mkql_program_builder.h b/yql/essentials/minikql/mkql_program_builder.h index 22ca659cae..b67d80861d 100644 --- a/yql/essentials/minikql/mkql_program_builder.h +++ b/yql/essentials/minikql/mkql_program_builder.h @@ -853,6 +853,7 @@ private: TType* ChooseCommonType(TType* type1, TType* type2); TType* BuildArithmeticCommonType(TType* type1, TType* type2); + TType* BuildWideBlockType(const TArrayRef<TType* const>& wideComponents); bool IsNull(TRuntimeNode arg); protected: diff --git a/yql/essentials/minikql/mkql_runtime_version.h b/yql/essentials/minikql/mkql_runtime_version.h index bbf22f4c57..8782dd1942 100644 --- a/yql/essentials/minikql/mkql_runtime_version.h +++ b/yql/essentials/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 57U +#define MKQL_RUNTIME_VERSION 58U #endif // History: diff --git a/yql/essentials/public/purecalc/common/transformations/utils.cpp b/yql/essentials/public/purecalc/common/transformations/utils.cpp index 51ab9f0b40..54e0ef7caf 100644 --- a/yql/essentials/public/purecalc/common/transformations/utils.cpp +++ b/yql/essentials/public/purecalc/common/transformations/utils.cpp @@ -74,37 +74,32 @@ TExprNode::TPtr NYql::NPureCalc::NodeToBlocks( ) { const auto items = structType->GetItems(); Y_ENSURE(items.size() > 0); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - return ctx.Builder(pos) .Lambda() .Param("stream") .Callable("FromFlow") .Callable(0, "NarrowMap") - .Callable(0, "WideToBlocks") - .Callable(0, "ExpandMap") - .Callable(0, "ToFlow") - .Arg(0, "stream") - .Seal() - .Lambda(1) - .Param("item") - .Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& { - ui32 i = 0; - for (const auto& item : items) { - lambda.Callable(i++, "Member") - .Arg(0, "item") - .Atom(1, item->GetName()) - .Seal(); - } - return lambda; - }) + .Callable(0, "ToFlow") + .Callable(0, "WideToBlocks") + .Callable(0, "FromFlow") + .Callable(0, "ExpandMap") + .Callable(0, "ToFlow") + .Arg(0, "stream") + .Seal() + .Lambda(1) + .Param("item") + .Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& { + ui32 i = 0; + for (const auto& item : items) { + lambda.Callable(i++, "Member") + .Arg(0, "item") + .Atom(1, item->GetName()) + .Seal(); + } + return lambda; + }) + .Seal() + .Seal() .Seal() .Seal() .Seal() diff --git a/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json b/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json index 90ba14b9ea..f48bcd05bc 100644 --- a/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json +++ b/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json @@ -1,9 +1,9 @@ { "test.test[Blocks-BlockMapJoinCore-default.txt-Debug]": [ { - "checksum": "71db64c54c014278aadebde24e80edca", + "checksum": "78ca432550e55028e5a9754d3698bcb4", "size": 1864, - "uri": "https://{canondata_backend}/1936997/67628cc299565ace722746c32bfd5aa5ec155b43/resource.tar.gz#test.test_Blocks-BlockMapJoinCore-default.txt-Debug_/opt.yql" + "uri": "https://{canondata_backend}/1900335/faec43b5ddae0d95b63fd9929fbf0ba6117189f1/resource.tar.gz#test.test_Blocks-BlockMapJoinCore-default.txt-Debug_/opt.yql" } ], "test.test[Blocks-BlockMapJoinCore-default.txt-Results]": [ diff --git a/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls b/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls index e47ea29f6a..8a6ca3b84c 100644 --- a/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls +++ b/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls @@ -15,7 +15,7 @@ (let narrowLambdaLeftSemi (lambda '(item1 item2) (AsStruct '('"asubkey" item1) '('"avalue" item2)))) (let doJoin (lambda '(left right narrowMapLambda joinKind leftKeyColumns leftKeyDrops rightKeyColumns rightKeyDrops) (block '( - (return (Collect (NarrowMap (ToFlow (WideFromBlocks (BlockMapJoinCore (FromFlow (WideToBlocks (ExpandMap left expandLambda))) (FromFlow (WideToBlocks (ExpandMap right expandLambda))) joinKind leftKeyColumns leftKeyDrops rightKeyColumns rightKeyDrops '()))) narrowMapLambda))) + (return (Collect (NarrowMap (ToFlow (WideFromBlocks (BlockMapJoinCore (WideToBlocks (FromFlow (ExpandMap left expandLambda))) (WideToBlocks (FromFlow (ExpandMap right expandLambda))) joinKind leftKeyColumns leftKeyDrops rightKeyColumns rightKeyDrops '()))) narrowMapLambda))) )))) (let innerJoin (Apply doJoin (ToFlow table (DependsOn (String '0))) (ToFlow table (DependsOn (String '1))) narrowLambdaInner 'Inner '('0) '() '('0) '())) diff --git a/yql/essentials/tests/sql/minirun/part0/canondata/result.json b/yql/essentials/tests/sql/minirun/part0/canondata/result.json index 6f56c572bd..5770f74200 100644 --- a/yql/essentials/tests/sql/minirun/part0/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part0/canondata/result.json @@ -249,9 +249,9 @@ ], "test.test[blocks-agg_all_mixed_distinct-default.txt-Peephole]": [ { - "checksum": "ed1fcea38db98750f0319c3638745142", + "checksum": "68605b539309a1ecb2a83ecdfc4b310f", "size": 2750, - "uri": "https://{canondata_backend}/1936842/8073eb626dd657fcbe20d34185c363a1a18c3e7c/resource.tar.gz#test.test_blocks-agg_all_mixed_distinct-default.txt-Peephole_/opt.yql" + "uri": "https://{canondata_backend}/1937429/79055d48a8b63ae0b5d61c80f61c844ca111e11a/resource.tar.gz#test.test_blocks-agg_all_mixed_distinct-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-agg_all_mixed_distinct-default.txt-Results]": [ @@ -270,9 +270,9 @@ ], "test.test[blocks-and-default.txt-Peephole]": [ { - "checksum": "25301b655e8f72e42581ed1009f05cf5", - "size": 682, - "uri": "https://{canondata_backend}/1920236/e2416b57cd9baa140c086892d357d5945ed0fdb1/resource.tar.gz#test.test_blocks-and-default.txt-Peephole_/opt.yql" + "checksum": "ebaa52929ab2ea1003a45542332d450d", + "size": 702, + "uri": "https://{canondata_backend}/1937429/79055d48a8b63ae0b5d61c80f61c844ca111e11a/resource.tar.gz#test.test_blocks-and-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-and-default.txt-Results]": [ @@ -291,9 +291,9 @@ ], "test.test[blocks-pg_call-default.txt-Peephole]": [ { - "checksum": "4b035ae018652e54247b56a686325bb4", - "size": 887, - "uri": "https://{canondata_backend}/1814674/d1d41a17170e55fd1f93d85f4708eb5282bce7d2/resource.tar.gz#test.test_blocks-pg_call-default.txt-Peephole_/opt.yql" + "checksum": "0d759dfe6d84dfb9eb76e3ad4ba99be4", + "size": 907, + "uri": "https://{canondata_backend}/1937429/79055d48a8b63ae0b5d61c80f61c844ca111e11a/resource.tar.gz#test.test_blocks-pg_call-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-pg_call-default.txt-Results]": [ @@ -312,9 +312,9 @@ ], "test.test[blocks-xor-default.txt-Peephole]": [ { - "checksum": "0cf43f4750e64901c4c51aa0d8ff9f75", - "size": 682, - "uri": "https://{canondata_backend}/1920236/e2416b57cd9baa140c086892d357d5945ed0fdb1/resource.tar.gz#test.test_blocks-xor-default.txt-Peephole_/opt.yql" + "checksum": "8c0970d313c69c4b0d13590d30b5d864", + "size": 702, + "uri": "https://{canondata_backend}/1937429/79055d48a8b63ae0b5d61c80f61c844ca111e11a/resource.tar.gz#test.test_blocks-xor-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-xor-default.txt-Results]": [ @@ -333,9 +333,9 @@ ], "test.test[blocks-xor_opt_scalar-default.txt-Peephole]": [ { - "checksum": "f513ccd4b355aba2f8aebb682fddd56a", - "size": 1814, - "uri": "https://{canondata_backend}/1920236/e2416b57cd9baa140c086892d357d5945ed0fdb1/resource.tar.gz#test.test_blocks-xor_opt_scalar-default.txt-Peephole_/opt.yql" + "checksum": "4bab98d84e3103195a7896c1955d1030", + "size": 1854, + "uri": "https://{canondata_backend}/1937429/79055d48a8b63ae0b5d61c80f61c844ca111e11a/resource.tar.gz#test.test_blocks-xor_opt_scalar-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-xor_opt_scalar-default.txt-Results]": [ diff --git a/yql/essentials/tests/sql/minirun/part1/canondata/result.json b/yql/essentials/tests/sql/minirun/part1/canondata/result.json index b9ac523223..bd0f9512c7 100644 --- a/yql/essentials/tests/sql/minirun/part1/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part1/canondata/result.json @@ -218,9 +218,9 @@ ], "test.test[blocks-agg_all-default.txt-Peephole]": [ { - "checksum": "2fb133577b2bebe665243748a6546c47", + "checksum": "15315c4bc0f3b4b589e14c0158c32e12", "size": 989, - "uri": "https://{canondata_backend}/1936997/ed3a7d8870edd79021e442428408c37f03c08cee/resource.tar.gz#test.test_blocks-agg_all-default.txt-Peephole_/opt.yql" + "uri": "https://{canondata_backend}/1917492/eba86b26b71f83d74091393c00c13c811ffb39c5/resource.tar.gz#test.test_blocks-agg_all-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-agg_all-default.txt-Results]": [ @@ -239,9 +239,9 @@ ], "test.test[blocks-and_opt_scalar-default.txt-Peephole]": [ { - "checksum": "e410280186ee325d3b67ac78de5f8e36", - "size": 1814, - "uri": "https://{canondata_backend}/1920236/0ae0061e7a37e74785c20a7b0e1cfe4b3655ad32/resource.tar.gz#test.test_blocks-and_opt_scalar-default.txt-Peephole_/opt.yql" + "checksum": "00d23182d06fa9428fff3a19fbb814c2", + "size": 1854, + "uri": "https://{canondata_backend}/1917492/eba86b26b71f83d74091393c00c13c811ffb39c5/resource.tar.gz#test.test_blocks-and_opt_scalar-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-and_opt_scalar-default.txt-Results]": [ @@ -302,9 +302,9 @@ ], "test.test[blocks-or_scalar-default.txt-Peephole]": [ { - "checksum": "ece0c5618fe0b4818ea7c0c6b6e588e0", - "size": 1389, - "uri": "https://{canondata_backend}/1920236/0ae0061e7a37e74785c20a7b0e1cfe4b3655ad32/resource.tar.gz#test.test_blocks-or_scalar-default.txt-Peephole_/opt.yql" + "checksum": "d0b85b427d840550afe53e884481742f", + "size": 1429, + "uri": "https://{canondata_backend}/1917492/eba86b26b71f83d74091393c00c13c811ffb39c5/resource.tar.gz#test.test_blocks-or_scalar-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-or_scalar-default.txt-Results]": [ diff --git a/yql/essentials/tests/sql/minirun/part2/canondata/result.json b/yql/essentials/tests/sql/minirun/part2/canondata/result.json index 965d96ec88..b9a3de5ee2 100644 --- a/yql/essentials/tests/sql/minirun/part2/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part2/canondata/result.json @@ -246,9 +246,9 @@ ], "test.test[blocks-agg_by_key_only_distinct-default.txt-Peephole]": [ { - "checksum": "61d7e4298f7c95b1d74a833e56b0fb1b", + "checksum": "0ab97b60dc932f51a4cad3d5b703d98c", "size": 2174, - "uri": "https://{canondata_backend}/1937150/3d01c6ab2777fc3b99338655d39a5bcbb1ac89c3/resource.tar.gz#test.test_blocks-agg_by_key_only_distinct-default.txt-Peephole_/opt.yql" + "uri": "https://{canondata_backend}/1917492/6fdf85f7e05da60eed58efcacce70c29bce9a047/resource.tar.gz#test.test_blocks-agg_by_key_only_distinct-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-agg_by_key_only_distinct-default.txt-Results]": [ @@ -267,9 +267,9 @@ ], "test.test[blocks-exists-default.txt-Peephole]": [ { - "checksum": "489280aa6951c20dbd91012493e95323", - "size": 539, - "uri": "https://{canondata_backend}/1942671/13a781c07395a2ba482afc02d12bb4a6fa60d5ef/resource.tar.gz#test.test_blocks-exists-default.txt-Peephole_/opt.yql" + "checksum": "fe4b8e508f707b10f04202f1c79b69ba", + "size": 559, + "uri": "https://{canondata_backend}/1917492/6fdf85f7e05da60eed58efcacce70c29bce9a047/resource.tar.gz#test.test_blocks-exists-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-exists-default.txt-Results]": [ @@ -288,9 +288,9 @@ ], "test.test[blocks-sort-default.txt-Peephole]": [ { - "checksum": "ac26701a5abd8c7670e227bb534127ea", - "size": 879, - "uri": "https://{canondata_backend}/1599023/429c43efbf699a2907bed12a3ec8e6c4b74d39ce/resource.tar.gz#test.test_blocks-sort-default.txt-Peephole_/opt.yql" + "checksum": "756e6c3f006f33c75a3dedea68c889ac", + "size": 919, + "uri": "https://{canondata_backend}/1917492/6fdf85f7e05da60eed58efcacce70c29bce9a047/resource.tar.gz#test.test_blocks-sort-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-sort-default.txt-Results]": [ @@ -309,9 +309,9 @@ ], "test.test[blocks-take_skip-default.txt-Peephole]": [ { - "checksum": "6de1c3b7d8decabd30376dc756d53d17", - "size": 879, - "uri": "https://{canondata_backend}/1599023/429c43efbf699a2907bed12a3ec8e6c4b74d39ce/resource.tar.gz#test.test_blocks-take_skip-default.txt-Peephole_/opt.yql" + "checksum": "9c0785e522b7b68892a802bfb7d72dc6", + "size": 919, + "uri": "https://{canondata_backend}/1917492/6fdf85f7e05da60eed58efcacce70c29bce9a047/resource.tar.gz#test.test_blocks-take_skip-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-take_skip-default.txt-Results]": [ diff --git a/yql/essentials/tests/sql/minirun/part3/canondata/result.json b/yql/essentials/tests/sql/minirun/part3/canondata/result.json index 1a818dbe9d..9dc8932e18 100644 --- a/yql/essentials/tests/sql/minirun/part3/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part3/canondata/result.json @@ -294,9 +294,9 @@ ], "test.test[blocks-agg_by_key_mixed_distinct-default.txt-Peephole]": [ { - "checksum": "d85f9dcc1a0cc15f14cd6f4e8aa4a288", + "checksum": "0c88e98d12d4cf8941c04be6c12db505", "size": 2985, - "uri": "https://{canondata_backend}/1936842/b7d0729d8113067b84268db9fc0907f14193cecc/resource.tar.gz#test.test_blocks-agg_by_key_mixed_distinct-default.txt-Peephole_/opt.yql" + "uri": "https://{canondata_backend}/1809005/48af937bc8ac0c97a4229075f4b2be4980c228fe/resource.tar.gz#test.test_blocks-agg_by_key_mixed_distinct-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-agg_by_key_mixed_distinct-default.txt-Results]": [ @@ -315,9 +315,9 @@ ], "test.test[blocks-frompg-default.txt-Peephole]": [ { - "checksum": "e245ccef92be3a037238558f8908427b", - "size": 531, - "uri": "https://{canondata_backend}/1775319/25a92a1cd773ccebf59751427286ef1fe45aeeb2/resource.tar.gz#test.test_blocks-frompg-default.txt-Peephole_/opt.yql" + "checksum": "ccf828290806a669ba391634e532dac5", + "size": 551, + "uri": "https://{canondata_backend}/1809005/48af937bc8ac0c97a4229075f4b2be4980c228fe/resource.tar.gz#test.test_blocks-frompg-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-frompg-default.txt-Results]": [ @@ -336,9 +336,9 @@ ], "test.test[blocks-or_opt-default.txt-Peephole]": [ { - "checksum": "845329e7352cf454e326edeac8a39237", - "size": 879, - "uri": "https://{canondata_backend}/1920236/9bbaee93ec54cd7cbc4c6311c4df2e94db14c07e/resource.tar.gz#test.test_blocks-or_opt-default.txt-Peephole_/opt.yql" + "checksum": "5bf71c50b094eda6b1175383a07d0e5f", + "size": 899, + "uri": "https://{canondata_backend}/1809005/48af937bc8ac0c97a4229075f4b2be4980c228fe/resource.tar.gz#test.test_blocks-or_opt-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-or_opt-default.txt-Results]": [ diff --git a/yql/essentials/tests/sql/minirun/part4/canondata/result.json b/yql/essentials/tests/sql/minirun/part4/canondata/result.json index 97562f4708..fa6c416555 100644 --- a/yql/essentials/tests/sql/minirun/part4/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part4/canondata/result.json @@ -302,9 +302,9 @@ ], "test.test[blocks-agg_by_key-default.txt-Peephole]": [ { - "checksum": "dfcd63a547cc12864741028ead35a272", + "checksum": "dad5945424bc4c6ce2b36208dafca12f", "size": 1400, - "uri": "https://{canondata_backend}/1936842/397d19bc034685a9103c63c079eb706022471c35/resource.tar.gz#test.test_blocks-agg_by_key-default.txt-Peephole_/opt.yql" + "uri": "https://{canondata_backend}/1809005/8279b674a9a28181ade233ada2f27748f8cb3017/resource.tar.gz#test.test_blocks-agg_by_key-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-agg_by_key-default.txt-Results]": [ @@ -323,9 +323,9 @@ ], "test.test[blocks-or_opt_scalar-default.txt-Peephole]": [ { - "checksum": "ab501477ca29cbf2b360f0e80d7bc1e2", - "size": 1808, - "uri": "https://{canondata_backend}/1916746/09c919bce0458a8ce66d1cb974cd8dba5513025d/resource.tar.gz#test.test_blocks-or_opt_scalar-default.txt-Peephole_/opt.yql" + "checksum": "d4f552f4259424a5c8d52d472c39ddcb", + "size": 1848, + "uri": "https://{canondata_backend}/1809005/8279b674a9a28181ade233ada2f27748f8cb3017/resource.tar.gz#test.test_blocks-or_opt_scalar-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-or_opt_scalar-default.txt-Results]": [ diff --git a/yql/essentials/tests/sql/minirun/part5/canondata/result.json b/yql/essentials/tests/sql/minirun/part5/canondata/result.json index d109287d0f..64e4208d97 100644 --- a/yql/essentials/tests/sql/minirun/part5/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part5/canondata/result.json @@ -333,9 +333,9 @@ ], "test.test[blocks-and_opt-default.txt-Peephole]": [ { - "checksum": "b8c85dc4102de70b2fcae3acee2efa3c", - "size": 880, - "uri": "https://{canondata_backend}/1942671/ce51ce34754ddc48cce99d0eb0d68c58043ce298/resource.tar.gz#test.test_blocks-and_opt-default.txt-Peephole_/opt.yql" + "checksum": "62b19601e78710a4f6a19a3ba5afeb05", + "size": 900, + "uri": "https://{canondata_backend}/1809005/02f459fce1f16d89b3444e6e8728b9747bb52b53/resource.tar.gz#test.test_blocks-and_opt-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-and_opt-default.txt-Results]": [ @@ -354,9 +354,9 @@ ], "test.test[blocks-decimal_binop-default.txt-Peephole]": [ { - "checksum": "e165bc99d0eeb90c4e09d377dfa0609b", - "size": 732, - "uri": "https://{canondata_backend}/1775319/488597481a390ab007602d4f2c7cb21f0d5833de/resource.tar.gz#test.test_blocks-decimal_binop-default.txt-Peephole_/opt.yql" + "checksum": "543af01b436576f500908330fda5ad9d", + "size": 752, + "uri": "https://{canondata_backend}/1809005/02f459fce1f16d89b3444e6e8728b9747bb52b53/resource.tar.gz#test.test_blocks-decimal_binop-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-decimal_binop-default.txt-Results]": [ @@ -375,9 +375,9 @@ ], "test.test[blocks-extend-default.txt-Peephole]": [ { - "checksum": "424973dad53667a534c277cbb74c20f7", - "size": 624, - "uri": "https://{canondata_backend}/1937001/b704bbc48624073924a714c5207966ffdfdddc62/resource.tar.gz#test.test_blocks-extend-default.txt-Peephole_/opt.yql" + "checksum": "2bec63a1689cb0b100f82fec2b89cd3c", + "size": 629, + "uri": "https://{canondata_backend}/1809005/02f459fce1f16d89b3444e6e8728b9747bb52b53/resource.tar.gz#test.test_blocks-extend-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-extend-default.txt-Results]": [ @@ -396,9 +396,9 @@ ], "test.test[blocks-filter-default.txt-Peephole]": [ { - "checksum": "727b35f5c4d7a0647482ffad550fd950", - "size": 710, - "uri": "https://{canondata_backend}/1942525/3a5c45563e24ce3cec2704463eb5e7a3038772ba/resource.tar.gz#test.test_blocks-filter-default.txt-Peephole_/opt.yql" + "checksum": "d57ebdb1176a6d15f700ed7c6e64f31b", + "size": 730, + "uri": "https://{canondata_backend}/1809005/02f459fce1f16d89b3444e6e8728b9747bb52b53/resource.tar.gz#test.test_blocks-filter-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-filter-default.txt-Results]": [ @@ -417,9 +417,9 @@ ], "test.test[blocks-just-default.txt-Peephole]": [ { - "checksum": "d51f71d182470ef3d43b3082d7d48fb8", - "size": 468, - "uri": "https://{canondata_backend}/1775319/488597481a390ab007602d4f2c7cb21f0d5833de/resource.tar.gz#test.test_blocks-just-default.txt-Peephole_/opt.yql" + "checksum": "c352b609f1b0e2462f1290962c13e6f6", + "size": 488, + "uri": "https://{canondata_backend}/1809005/02f459fce1f16d89b3444e6e8728b9747bb52b53/resource.tar.gz#test.test_blocks-just-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-just-default.txt-Results]": [ @@ -438,9 +438,9 @@ ], "test.test[blocks-not-default.txt-Peephole]": [ { - "checksum": "e07fe7e10cb9bca87ca0df3855e30800", - "size": 504, - "uri": "https://{canondata_backend}/1942671/ce51ce34754ddc48cce99d0eb0d68c58043ce298/resource.tar.gz#test.test_blocks-not-default.txt-Peephole_/opt.yql" + "checksum": "64173242c7d6f033cc1de18b88472271", + "size": 524, + "uri": "https://{canondata_backend}/1809005/02f459fce1f16d89b3444e6e8728b9747bb52b53/resource.tar.gz#test.test_blocks-not-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-not-default.txt-Results]": [ @@ -459,9 +459,9 @@ ], "test.test[blocks-not_opt-default.txt-Peephole]": [ { - "checksum": "f8c1132586ba55043b160dd90392d223", - "size": 579, - "uri": "https://{canondata_backend}/1942671/ce51ce34754ddc48cce99d0eb0d68c58043ce298/resource.tar.gz#test.test_blocks-not_opt-default.txt-Peephole_/opt.yql" + "checksum": "9ea6f61d33ac7fddb2bc2050600e327d", + "size": 599, + "uri": "https://{canondata_backend}/1809005/02f459fce1f16d89b3444e6e8728b9747bb52b53/resource.tar.gz#test.test_blocks-not_opt-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-not_opt-default.txt-Results]": [ @@ -480,9 +480,9 @@ ], "test.test[blocks-xor_scalar-default.txt-Peephole]": [ { - "checksum": "b692bb81f6109ff6725a7daa8ce6a643", - "size": 1393, - "uri": "https://{canondata_backend}/1942671/ce51ce34754ddc48cce99d0eb0d68c58043ce298/resource.tar.gz#test.test_blocks-xor_scalar-default.txt-Peephole_/opt.yql" + "checksum": "40b1616837226098205c07c33be898ec", + "size": 1433, + "uri": "https://{canondata_backend}/1809005/02f459fce1f16d89b3444e6e8728b9747bb52b53/resource.tar.gz#test.test_blocks-xor_scalar-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-xor_scalar-default.txt-Results]": [ diff --git a/yql/essentials/tests/sql/minirun/part6/canondata/result.json b/yql/essentials/tests/sql/minirun/part6/canondata/result.json index 268e02f868..07c8cc0c20 100644 --- a/yql/essentials/tests/sql/minirun/part6/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part6/canondata/result.json @@ -305,9 +305,9 @@ ], "test.test[blocks-coalesce_scalar-default.txt-Peephole]": [ { - "checksum": "de0d9ee659715c2a884666a8e2ad690c", - "size": 1923, - "uri": "https://{canondata_backend}/1920236/8a88b220fe1ff8b571b6813b812d7623629101fd/resource.tar.gz#test.test_blocks-coalesce_scalar-default.txt-Peephole_/opt.yql" + "checksum": "16037c7a59e90100426abd1f7461cfd3", + "size": 1963, + "uri": "https://{canondata_backend}/1917492/9722518a50ef2685c59301cc045eae38584b9f63/resource.tar.gz#test.test_blocks-coalesce_scalar-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-coalesce_scalar-default.txt-Results]": [ @@ -326,9 +326,9 @@ ], "test.test[blocks-member-default.txt-Peephole]": [ { - "checksum": "b40c6c197f750c6a604329621bd37300", - "size": 586, - "uri": "https://{canondata_backend}/1900335/81a360a61ba1555c418f20e4c99272eb22766040/resource.tar.gz#test.test_blocks-member-default.txt-Peephole_/opt.yql" + "checksum": "c7daf76e38a19686c4ecd83abd730695", + "size": 606, + "uri": "https://{canondata_backend}/1917492/9722518a50ef2685c59301cc045eae38584b9f63/resource.tar.gz#test.test_blocks-member-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-member-default.txt-Results]": [ @@ -347,9 +347,9 @@ ], "test.test[blocks-projection_add_ints_filter-default.txt-Peephole]": [ { - "checksum": "f5fb44eaf3fafb816e7d772449908cd6", - "size": 851, - "uri": "https://{canondata_backend}/1784826/964c3d771c51aa808279a968bec9d2485cb85678/resource.tar.gz#test.test_blocks-projection_add_ints_filter-default.txt-Peephole_/opt.yql" + "checksum": "b48a57d896cdabf3db3480cd86cc8e2a", + "size": 871, + "uri": "https://{canondata_backend}/1917492/9722518a50ef2685c59301cc045eae38584b9f63/resource.tar.gz#test.test_blocks-projection_add_ints_filter-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-projection_add_ints_filter-default.txt-Results]": [ diff --git a/yql/essentials/tests/sql/minirun/part7/canondata/result.json b/yql/essentials/tests/sql/minirun/part7/canondata/result.json index 900975682d..bd0cea5780 100644 --- a/yql/essentials/tests/sql/minirun/part7/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part7/canondata/result.json @@ -218,9 +218,9 @@ ], "test.test[blocks-as_tuple-default.txt-Peephole]": [ { - "checksum": "dc0005ecb6750c691d66fe6c50e00d0c", - "size": 603, - "uri": "https://{canondata_backend}/1775319/ebad1e34262c6a71e8c6dbf23efe7ba30e371e75/resource.tar.gz#test.test_blocks-as_tuple-default.txt-Peephole_/opt.yql" + "checksum": "02e80809d3cbf91101d09d4ac1e87aa0", + "size": 623, + "uri": "https://{canondata_backend}/1917492/b01930df0710eb10e4ce2d35cddca6be33ac8a9f/resource.tar.gz#test.test_blocks-as_tuple-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-as_tuple-default.txt-Results]": [ @@ -239,9 +239,9 @@ ], "test.test[blocks-or-default.txt-Peephole]": [ { - "checksum": "e86101ef3ffbf79e56099e81ec6bc0fb", - "size": 681, - "uri": "https://{canondata_backend}/1942671/a9486852a02ad9ecba5005ac92cde1261882e326/resource.tar.gz#test.test_blocks-or-default.txt-Peephole_/opt.yql" + "checksum": "4c4fe5d41f9b37bfaf53bc0a57d8b48b", + "size": 701, + "uri": "https://{canondata_backend}/1917492/b01930df0710eb10e4ce2d35cddca6be33ac8a9f/resource.tar.gz#test.test_blocks-or-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-or-default.txt-Results]": [ @@ -260,9 +260,9 @@ ], "test.test[blocks-topg-default.txt-Peephole]": [ { - "checksum": "adfdcd16ed6e227522ee922fd1e818c8", - "size": 537, - "uri": "https://{canondata_backend}/1775319/ebad1e34262c6a71e8c6dbf23efe7ba30e371e75/resource.tar.gz#test.test_blocks-topg-default.txt-Peephole_/opt.yql" + "checksum": "237a12866d5a4f1af7a79ba95d1a6f71", + "size": 557, + "uri": "https://{canondata_backend}/1917492/b01930df0710eb10e4ce2d35cddca6be33ac8a9f/resource.tar.gz#test.test_blocks-topg-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-topg-default.txt-Results]": [ diff --git a/yql/essentials/tests/sql/minirun/part8/canondata/result.json b/yql/essentials/tests/sql/minirun/part8/canondata/result.json index 76dd1b302e..d1df68cd62 100644 --- a/yql/essentials/tests/sql/minirun/part8/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part8/canondata/result.json @@ -361,9 +361,9 @@ ], "test.test[blocks-and_scalar-default.txt-Peephole]": [ { - "checksum": "d7eb4215f3447f164802a0fa8a9e93f6", - "size": 1393, - "uri": "https://{canondata_backend}/1942671/1fe07dc7779a4d1b05048ad4f78bfbeb3a86d998/resource.tar.gz#test.test_blocks-and_scalar-default.txt-Peephole_/opt.yql" + "checksum": "9ef95c0429987ba69696f349db7a3143", + "size": 1433, + "uri": "https://{canondata_backend}/1881367/2ee5d4b68dd6e9c4fef4660614522e34e2459106/resource.tar.gz#test.test_blocks-and_scalar-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-and_scalar-default.txt-Results]": [ @@ -382,9 +382,9 @@ ], "test.test[blocks-if-default.txt-Peephole]": [ { - "checksum": "c8b3668a80da61f1695e1f4c95558f4c", - "size": 717, - "uri": "https://{canondata_backend}/1942671/1fe07dc7779a4d1b05048ad4f78bfbeb3a86d998/resource.tar.gz#test.test_blocks-if-default.txt-Peephole_/opt.yql" + "checksum": "b23952b137d3faf33b6168319047e152", + "size": 737, + "uri": "https://{canondata_backend}/1881367/2ee5d4b68dd6e9c4fef4660614522e34e2459106/resource.tar.gz#test.test_blocks-if-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-if-default.txt-Results]": [ @@ -403,9 +403,9 @@ ], "test.test[blocks-pg_op-default.txt-Peephole]": [ { - "checksum": "7b89533ec36887d33beaa65ae0662d3f", - "size": 863, - "uri": "https://{canondata_backend}/1900335/9f3e189e481af8d65c06be72d7112a9ca1209c31/resource.tar.gz#test.test_blocks-pg_op-default.txt-Peephole_/opt.yql" + "checksum": "570868d831a49ebc5e0c21dcadf9c81f", + "size": 883, + "uri": "https://{canondata_backend}/1881367/2ee5d4b68dd6e9c4fef4660614522e34e2459106/resource.tar.gz#test.test_blocks-pg_op-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-pg_op-default.txt-Results]": [ @@ -424,9 +424,9 @@ ], "test.test[blocks-xor_opt-default.txt-Peephole]": [ { - "checksum": "9dfc3fbfce576e0daed66ae740e44d5c", - "size": 880, - "uri": "https://{canondata_backend}/1942671/1fe07dc7779a4d1b05048ad4f78bfbeb3a86d998/resource.tar.gz#test.test_blocks-xor_opt-default.txt-Peephole_/opt.yql" + "checksum": "2b909a2bd312a54ad8c7153247298993", + "size": 900, + "uri": "https://{canondata_backend}/1881367/2ee5d4b68dd6e9c4fef4660614522e34e2459106/resource.tar.gz#test.test_blocks-xor_opt-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-xor_opt-default.txt-Results]": [ diff --git a/yql/essentials/tests/sql/minirun/part9/canondata/result.json b/yql/essentials/tests/sql/minirun/part9/canondata/result.json index 7763b7c9fa..7be476f491 100644 --- a/yql/essentials/tests/sql/minirun/part9/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part9/canondata/result.json @@ -176,9 +176,9 @@ ], "test.test[blocks-agg_all_only_distinct-default.txt-Peephole]": [ { - "checksum": "43f57f7269807e7a8a04d1dea312d0a6", + "checksum": "71442e05f1ea713f163527b5ff08d065", "size": 1683, - "uri": "https://{canondata_backend}/1936842/a3d6c7434a3c60fd4096f171ba514687570df6ae/resource.tar.gz#test.test_blocks-agg_all_only_distinct-default.txt-Peephole_/opt.yql" + "uri": "https://{canondata_backend}/1917492/5cddb9dcb358e358fc244d25a12d573969df0013/resource.tar.gz#test.test_blocks-agg_all_only_distinct-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-agg_all_only_distinct-default.txt-Results]": [ @@ -197,9 +197,9 @@ ], "test.test[blocks-coalesce-default.txt-Peephole]": [ { - "checksum": "22b6a7673a01385d879856dc322c94aa", - "size": 1135, - "uri": "https://{canondata_backend}/1920236/19a9934ebc9455bf1caab37058331acdf3c4ee26/resource.tar.gz#test.test_blocks-coalesce-default.txt-Peephole_/opt.yql" + "checksum": "9ea7c29f9825b44790014fc6a147d5cc", + "size": 1175, + "uri": "https://{canondata_backend}/1917492/5cddb9dcb358e358fc244d25a12d573969df0013/resource.tar.gz#test.test_blocks-coalesce-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-coalesce-default.txt-Results]": [ @@ -218,9 +218,9 @@ ], "test.test[blocks-if_scalar-default.txt-Peephole]": [ { - "checksum": "e2ee0df7a6e82c32e4bb81445125cf8c", - "size": 1995, - "uri": "https://{canondata_backend}/1920236/19a9934ebc9455bf1caab37058331acdf3c4ee26/resource.tar.gz#test.test_blocks-if_scalar-default.txt-Peephole_/opt.yql" + "checksum": "471947ddcb963e1ac303d4cd71c10242", + "size": 2035, + "uri": "https://{canondata_backend}/1917492/5cddb9dcb358e358fc244d25a12d573969df0013/resource.tar.gz#test.test_blocks-if_scalar-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-if_scalar-default.txt-Results]": [ @@ -239,9 +239,9 @@ ], "test.test[blocks-nth-default.txt-Peephole]": [ { - "checksum": "e06d2c11c8bad42802c715436faa44c0", - "size": 591, - "uri": "https://{canondata_backend}/1900335/892d26e90090b841f50bacde742fd87e2ef88a38/resource.tar.gz#test.test_blocks-nth-default.txt-Peephole_/opt.yql" + "checksum": "66aeb49d6120f870670261bac89f6c13", + "size": 611, + "uri": "https://{canondata_backend}/1917492/5cddb9dcb358e358fc244d25a12d573969df0013/resource.tar.gz#test.test_blocks-nth-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-nth-default.txt-Results]": [ @@ -260,9 +260,9 @@ ], "test.test[blocks-projection_add_ints-default.txt-Peephole]": [ { - "checksum": "7445b50d8d9dbd3e04e6307b079be6a2", - "size": 662, - "uri": "https://{canondata_backend}/1942525/d498eafe1e3c2e3c1a75f331ab6d923ed8992697/resource.tar.gz#test.test_blocks-projection_add_ints-default.txt-Peephole_/opt.yql" + "checksum": "40ca33def60be28702004037b2c2222c", + "size": 682, + "uri": "https://{canondata_backend}/1917492/5cddb9dcb358e358fc244d25a12d573969df0013/resource.tar.gz#test.test_blocks-projection_add_ints-default.txt-Peephole_/opt.yql" } ], "test.test[blocks-projection_add_ints-default.txt-Results]": [ diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp index cd89e92032..2d805612f9 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp @@ -661,15 +661,9 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { } if (IsWideBlockType(lambdaInputType)) { - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - - values = ctx.ProgramBuilder.WideToBlocks(values); + values = ctx.ProgramBuilder.ToFlow( + ctx.ProgramBuilder.WideToBlocks( + ctx.ProgramBuilder.FromFlow(values))); } NCommon::TMkqlBuildContext innerCtx(ctx, {{arg, values}}, ytMap.Mapper().Ref().UniqueId()); @@ -1122,15 +1116,7 @@ void RegisterDqYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) ytRead.Input().Ref(), Nothing(), ctx, false, THashSet<TString>{"num", "index"}, forceKeyColumns); values = ApplyPathRangesAndSampling(values, outputType, ytRead.Input().Ref(), ctx); - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - - return ctx.ProgramBuilder.FromFlow(ctx.ProgramBuilder.WideToBlocks(ExpandFlow(ctx.ProgramBuilder.ToFlow(values), ctx))); + return ctx.ProgramBuilder.WideToBlocks(ctx.ProgramBuilder.FromFlow(ExpandFlow(ctx.ProgramBuilder.ToFlow(values), ctx))); } return TRuntimeNode(); diff --git a/yt/yql/providers/yt/provider/yql_yt_block_output.cpp b/yt/yql/providers/yt/provider/yql_yt_block_output.cpp index 771058b177..d1af06712a 100644 --- a/yt/yql/providers/yt/provider/yql_yt_block_output.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_block_output.cpp @@ -43,20 +43,16 @@ private: auto settings = RemoveSetting(map.Settings().Ref(), EYtSettingType::BlockOutputReady, ctx); settings = AddSetting(*settings, EYtSettingType::BlockOutputApplied, TExprNode::TPtr(), ctx); - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - auto mapperLambda = Build<TCoLambda>(ctx, map.Mapper().Pos()) .Args({"flow"}) - .Body<TCoWideToBlocks>() - .Input<TExprApplier>() - .Apply(map.Mapper()) - .With(0, "flow") + .Body<TCoToFlow>() + .Input<TCoWideToBlocks>() + .Input<TCoFromFlow>() + .Input<TExprApplier>() + .Apply(map.Mapper()) + .With(0, "flow") + .Build() + .Build() .Build() .Build() .Done() diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp index 81fc4f432f..ec7c6781fe 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp @@ -1632,29 +1632,21 @@ TExprNode::TPtr BuildBlockMapJoin(TExprNode::TPtr leftFlow, TExprNode::TPtr righ .Build(); } - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideToBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideToBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideToBlocks); - return ctx.Builder(pos) .Callable("NarrowMap") .Callable(0, "ToFlow") .Callable(0, "WideFromBlocks") .Callable(0, "BlockMapJoinCore") - .Callable(0, "FromFlow") - .Callable(0, "WideToBlocks") + .Callable(0, "WideToBlocks") + .Callable(0, "FromFlow") .Callable(0, "ExpandMap") .Add(0, std::move(leftFlow)) .Add(1, std::move(leftExpandLambda)) .Seal() .Seal() .Seal() - .Callable(1, "FromFlow") - .Callable(0, "WideToBlocks") + .Callable(1, "WideToBlocks") + .Callable(0, "FromFlow") .Callable(0, "ExpandMap") .Add(0, std::move(rightFlow)) .Add(1, std::move(rightExpandLambda)) |