diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2023-11-23 21:41:31 +0300 |
---|---|---|
committer | zverevgeny <zverevgeny@ydb.tech> | 2023-11-23 22:10:37 +0300 |
commit | f330971f637115368558cea161aaae7a51a11924 (patch) | |
tree | c7af80ec1495072e61156827e3dff89773c44818 | |
parent | 7e0b5a3f515f85f3b50d33b17de6b9acf8cee790 (diff) | |
download | ydb-f330971f637115368558cea161aaae7a51a11924.tar.gz |
YQL-16986 wide combiner with spilling prototype
26 files changed, 735 insertions, 64 deletions
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index e7c3b4372d..db011ef730 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -162,8 +162,13 @@ private: LOG_T("Task runner. Inject watermark " << watermark); TaskRunner->SetWatermarkIn(watermark); } - res = TaskRunner->Run(); +// Uncomment me to test YQL-16986 prototype +// Delete me after YQL-16988 +// if (ERunStatus::PendingInput == res){ +// //very poor man waiting for spiller async operation completion +// Schedule(TDuration::MilliSeconds(1), new TEvContinueRun(THashSet<ui32>(ev->Get()->InputChannels), ev->Get()->MemLimit)); +// } } for (auto& channelId : inputMap) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp index 2a4e0bb1bc..0710fb8ed2 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp @@ -20,8 +20,9 @@ public: if (item.IsFinish()) { return list.Release(); } - MKQL_ENSURE(!item.IsYield(), "Unexpected flow status!"); - list = ctx.HolderFactory.Append(list.Release(), item.Release()); + if (!item.IsYield()) { + list = ctx.HolderFactory.Append(list.Release(), item.Release()); + } } } #ifndef MKQL_DISABLE_CODEGEN diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index 73c9077fb4..d54cf0cb34 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -7,6 +7,8 @@ #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/mkql_stats_registry.h> #include <ydb/library/yql/minikql/defs.h> +#include <ydb/library/yql/minikql/computation/mkql_spiller.h> +#include <ydb/library/yql/minikql/computation/mkql_spiller_adapter.h> #include <ydb/library/yql/utils/cast.h> #include <util/string/cast.h> @@ -106,6 +108,16 @@ struct TCombinerNodes { std::transform(itemsOnResults.cbegin(), itemsOnResults.cend(), PasstroughtItems.begin(), [](const TPasstroughtMap::value_type& v) { return v.has_value(); }); } + bool IsInputItemNodeUsed(size_t i) const { + return (ItemNodes[i]->GetDependencesCount() > 0U || PasstroughtItems[i]); + } + + NUdf::TUnboxedValue* GetUsedInputItemNodePtrOrNull(TComputationContext& ctx, size_t i) const { + return IsInputItemNodeUsed(i) ? + &ItemNodes[i]->RefValue(ctx) : + nullptr; + } + void ExtractKey(TComputationContext& ctx, NUdf::TUnboxedValue** values, NUdf::TUnboxedValue* keys) const { std::for_each(ItemNodes.cbegin(), ItemNodes.cend(), [&](IComputationExternalNode* item) { if (const auto pointer = *values++) @@ -295,6 +307,367 @@ private: TStates States; }; +class TSpillingSupportState : public TComputationValue<TSpillingSupportState> { + typedef TComputationValue<TSpillingSupportState> TBase; +public: + enum class EOperatingMode { + InMemory, // try to perform all processing in memory + SpillState, //after switching to the spilling mode we spill collected state and free allocated memory + SpillData, //store incoming data + ProcessSpilled //restore and process spilled data + }; + TSpillingSupportState( + TMemoryUsageInfo* memInfo, + const TCombinerNodes& nodes, IComputationWideFlowNode *const flow, size_t wideFieldsIndex, + const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth, + const THashFunc& hash, const TEqualsFunc& equal + ) + : TBase(memInfo) + , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal) + , Nodes(nodes) + , Flow(flow) + , WideFieldsIndex(wideFieldsIndex) + , UsedInputItemType(usedInputItemType) + , KeyAndStateType(keyAndStateType) + , KeyWidth(keyWidth) + , Hasher(hash) + , Mode(EOperatingMode::InMemory) + { + BufferForUsedInputItems.reserve(usedInputItemType->GetElementsCount()); + BufferForKeyAnsState.reserve(keyAndStateType->GetElementsCount()); + } + ~TSpillingSupportState() { + } + + EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { + if (HasRunningSpillingAsyncOperation()) { + return EFetchResult::Yield; + } + while (true) { + switch(GetMode()) { + case EOperatingMode::InMemory: { + auto r = DoCalculateInMemory(ctx, output); + if (GetMode() == TSpillingSupportState::EOperatingMode::InMemory) { + return r; + } + break; + } + case EOperatingMode::SpillState: { + SpillState(); + if (GetMode() == EOperatingMode::SpillState) { + return AsyncWrite(); + } + MKQL_ENSURE(GetMode() == EOperatingMode::SpillData, "Internal logic error"); + break; + } + case EOperatingMode::SpillData: { + SpillData(ctx); + if (GetMode() == EOperatingMode::SpillData) { + return AsyncWrite(); + } + break; + } + case EOperatingMode::ProcessSpilled: { + return ProcessSpilledData(ctx, output); + } + } + } + Y_UNREACHABLE(); + } +private: + EFetchResult AsyncWrite() { + MKQL_ENSURE(!AsyncReadOperation.has_value(), "Internal logic error"); + MKQL_ENSURE(AsyncWriteOperation.has_value(), "Internal logic error"); + MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error"); + //AsyncWriteOperation.Subscribe() //TODO YQL-16988 + return EFetchResult::Yield; + } + EFetchResult AsyncRead() { + MKQL_ENSURE(!AsyncWriteOperation.has_value(), "Internal logic error"); + MKQL_ENSURE(AsyncReadOperation.has_value(), "Internal logic error"); + MKQL_ENSURE(AsyncReadOperation->HasValue(), "Internal logic error"); + //AsyncReadOperation.Subscribe() //TODO YQL-16988 + return EFetchResult::Yield; + } + EFetchResult DoCalculateInMemory(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { + auto **fields = ctx.WideFields.data() + WideFieldsIndex; + while (InputDataFetchResult != EFetchResult::Finish) { + for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) { + fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i); + } + InputDataFetchResult = Flow->FetchValues(ctx, fields); + switch (InputDataFetchResult) { + case EFetchResult::One: { + Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue)); + const bool isNew = InMemoryProcessingState.TasteIt(); + Nodes.ProcessItem( + ctx, + isNew ? nullptr : static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue), + static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Throat) + ); + if (IsSwitchToSpillingModeCondition()) { + SwitchMode(EOperatingMode::SpillState); + return EFetchResult::Yield; + } + continue; + } + case EFetchResult::Yield: + return EFetchResult::Yield; + case EFetchResult::Finish: + break; + } + } + if (const auto values = static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract())) { + Nodes.FinishItem(ctx, values, output); + return EFetchResult::One; + } + return EFetchResult::Finish; + } + + void SpillState() { + // mini State machine that: + //1. Spills InMemoryProcessingState + //2. Finilizes buckets + //During each phase asyn operation may be returned + //On next call execution is continued + MKQL_ENSURE(!AsyncReadOperation, "Internal logic error"); + if (AsyncWriteOperation) { + MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error"); + SpilledBuckets[CurrentAsyncOperationBucketId].InitialState->AsyncWriteCompleted(AsyncWriteOperation->ExtractValue()); + AsyncWriteOperation = std::nullopt; + } + while (const auto keyAndState = InMemoryProcessingState.Extract()) { + auto hash = Hasher(keyAndState); //Hasher uses only key for hashing + auto bucketId = hash % SpilledBucketCount; + auto& bucket = SpilledBuckets[bucketId]; + MKQL_ENSURE(bucket.InitialState, "Internal logic error"); + if (auto chunkIsBeingStored = bucket.InitialState->WriteWideItem({keyAndState, KeyAndStateType->GetElementsCount()})) { + CurrentAsyncOperationBucketId = bucketId; + AsyncWriteOperation = chunkIsBeingStored; + return; + } + for (size_t i = 0; i != KeyAndStateType->GetElementsCount(); ++i) { + //releasing values stored in unsafe TUnboxedValue buffer + keyAndState[i].UnRef(); + } + } + if (!FinalizingSpillerBuckets) { + FinalizingSpillerBuckets = true; + CurrentAsyncOperationBucketId = 0; + } + for(; CurrentAsyncOperationBucketId != SpilledBuckets.size(); ++CurrentAsyncOperationBucketId) { + if (auto chunkIsBeingStored = SpilledBuckets[CurrentAsyncOperationBucketId].InitialState->FinishWriting()) { + AsyncWriteOperation = chunkIsBeingStored; + return; + } + } + InMemoryProcessingState.IsEmpty(); + SwitchMode(EOperatingMode::SpillData); + } + + void SpillData(TComputationContext& ctx) { + if (AsyncWriteOperation) { + MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error"); + SpilledBuckets[CurrentAsyncOperationBucketId].Data->AsyncWriteCompleted(AsyncWriteOperation->ExtractValue()); + AsyncWriteOperation = std::nullopt; + } + while (!FinalizingSpillerBuckets) { + auto **fields = ctx.WideFields.data() + WideFieldsIndex; + for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) { + fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i); + } + InputDataFetchResult = Flow->FetchValues(ctx, fields); + switch (InputDataFetchResult) { + case EFetchResult::One: { + BufferForKeyAnsState.resize(KeyWidth); //use only key part of the buffer + Nodes.ExtractKey(ctx, fields, BufferForKeyAnsState.data()); + auto hash = Hasher(BufferForKeyAnsState.data()); + BufferForKeyAnsState.resize(0); //for freeing allocated key value asap + auto bucketId = hash % SpilledBucketCount; + MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error"); + for (size_t i = 0; i != Nodes.ItemNodes.size(); ++i) { + if (fields[i]) { + BufferForUsedInputItems.push_back(*fields[i]); + } + } + auto &bucket = SpilledBuckets[bucketId]; + const auto chunkIsBeingStored = bucket.Data->WriteWideItem(BufferForUsedInputItems); + BufferForUsedInputItems.resize(0); //for freeing allocated key value asap + if (chunkIsBeingStored) { + CurrentAsyncOperationBucketId = bucketId; + AsyncWriteOperation = chunkIsBeingStored; + return; + } + break; + } + case EFetchResult::Yield: + return; + case EFetchResult::Finish: + CurrentAsyncOperationBucketId = 0; + FinalizingSpillerBuckets = true; + break; + } + } + for(; CurrentAsyncOperationBucketId != SpilledBuckets.size(); ++CurrentAsyncOperationBucketId) { + auto& bucket = SpilledBuckets[CurrentAsyncOperationBucketId]; + if (auto chunkIsBeingStored = bucket.Data->FinishWriting()) { + AsyncWriteOperation = chunkIsBeingStored; + return; + } + } + SwitchMode(EOperatingMode::ProcessSpilled); + } + + EFetchResult ProcessSpilledData(TComputationContext& ctx, NUdf::TUnboxedValue*const* output){ + if (AsyncReadOperation) { + MKQL_ENSURE(AsyncReadOperation->HasValue(), "Internal logic error"); + if (RecoverState) { + SpilledBuckets[0].InitialState->AsyncReadCompleted( + AsyncReadOperation->ExtractValue(), ctx.HolderFactory); + } else { + SpilledBuckets[0].Data->AsyncReadCompleted( + AsyncReadOperation->ExtractValue(), ctx.HolderFactory); + } + AsyncWriteOperation = std::nullopt; + } + while(!SpilledBuckets.empty()){ + auto& bucket = SpilledBuckets.front(); + //recover spilled state + while(!bucket.InitialState->Empty()) { + RecoverState = true; + BufferForKeyAnsState.resize(KeyAndStateType->GetElementsCount()); + AsyncReadOperation = bucket.InitialState->ExtractWideItem(BufferForKeyAnsState); + if (AsyncReadOperation) { + BufferForKeyAnsState.resize(0); + return AsyncRead(); + } + for (size_t i = 0; i != KeyWidth; ++i) { + //jumping into unsafe world, refusing ownership + static_cast<NUdf::TUnboxedValue&>(InMemoryProcessingState.Tongue[i]) = std::move(BufferForKeyAnsState[i]); + } + auto isNew = InMemoryProcessingState.TasteIt(); + MKQL_ENSURE(isNew, "Internal logic error"); + for (size_t i = KeyWidth; i != KeyAndStateType->GetElementsCount(); ++i) { + //jumping into unsafe world, refusing ownership + static_cast<NUdf::TUnboxedValue&>(InMemoryProcessingState.Throat[i - KeyWidth]) = std::move(BufferForKeyAnsState[i]); + } + BufferForKeyAnsState.resize(0); + } + //process spilled data + while(!bucket.Data->Empty()) { + RecoverState = false; + BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount()); + AsyncReadOperation = bucket.Data->ExtractWideItem(BufferForUsedInputItems); + if (AsyncReadOperation) { + return AsyncRead(); + } + auto **fields = ctx.WideFields.data() + WideFieldsIndex; + for (size_t i = 0, j = 0; i != Nodes.ItemNodes.size(); ++i) { + if (Nodes.IsInputItemNodeUsed(i)) { + fields[i] = &BufferForUsedInputItems[j++]; + } else { + fields[i] = nullptr; + } + } + Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue)); + const bool isNew = InMemoryProcessingState.TasteIt(); + Nodes.ProcessItem( + ctx, + isNew ? nullptr : static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue), + static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Throat) + ); + BufferForKeyAnsState.resize(0); + } + if (const auto values = static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract())) { + Nodes.FinishItem(ctx, values, output); + return EFetchResult::One; + } + InMemoryProcessingState.IsEmpty(); + SpilledBuckets.pop_front(); + } + return EFetchResult::Finish; + } + + EOperatingMode GetMode() const { + return Mode; + } + + bool HasRunningSpillingAsyncOperation() const { + return + (AsyncWriteOperation.has_value() && !AsyncWriteOperation->HasValue()) || + (AsyncReadOperation.has_value() && !AsyncReadOperation->HasValue()); + } + + void SwitchMode(EOperatingMode mode) { + MKQL_ENSURE(!AsyncReadOperation, "Internal logic error"); + MKQL_ENSURE(!AsyncWriteOperation, "Internal logic error"); + switch(mode) { + case EOperatingMode::InMemory: + MKQL_ENSURE(false, "Internal logic error"); + break; + case EOperatingMode::SpillState: { + MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error"); + MKQL_ENSURE(!Spiller,"Internal logic error"); + Spiller = MakeSpiller(); + SpilledBuckets.resize(SpilledBucketCount); + for (auto &b: SpilledBuckets) { + b.InitialState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(Spiller, KeyAndStateType, 1 << 20); + } + FinalizingSpillerBuckets = false; + break; + } + case EOperatingMode::SpillData: + MKQL_ENSURE(EOperatingMode::SpillState == Mode, "Internal logic error"); + MKQL_ENSURE(Spiller,"Internal logic error"); + MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error"); + for (auto &b: SpilledBuckets) { + b.Data = std::make_unique<TWideUnboxedValuesSpillerAdapter>(Spiller, UsedInputItemType, 1 << 20); + } + InputDataFetchResult = EFetchResult::Yield; + FinalizingSpillerBuckets = false; + break; + case EOperatingMode::ProcessSpilled: + MKQL_ENSURE(EOperatingMode::SpillData == Mode, "Internal logic error"); + MKQL_ENSURE(Spiller,"Internal logic error"); + MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error"); + break; + } + Mode = mode; + } + + bool IsSwitchToSpillingModeCondition() const { + //TODO implement me + return false; + } + +private: + TState InMemoryProcessingState; + const TCombinerNodes& Nodes; + IComputationWideFlowNode* const Flow; + const size_t WideFieldsIndex; + const TMultiType* const UsedInputItemType; + const TMultiType* const KeyAndStateType; + const size_t KeyWidth; + THashFunc const Hasher; + EOperatingMode Mode; + bool FinalizingSpillerBuckets; // sub mode for SpillState and SpillData + bool RecoverState; //sub mode for ProcessSpilledData + + struct TSpilledBucket { + std::unique_ptr<TWideUnboxedValuesSpillerAdapter> InitialState; //state collected before switching to spilling mode + std::unique_ptr<TWideUnboxedValuesSpillerAdapter> Data; //data collected in spilling mode + }; + static constexpr size_t SpilledBucketCount = 4; + std::deque<TSpilledBucket> SpilledBuckets; + ISpiller::TPtr Spiller; + EFetchResult InputDataFetchResult; + size_t CurrentAsyncOperationBucketId; + std::optional<NThreading::TFuture<ISpiller::TKey>> AsyncWriteOperation; + std::optional<NThreading::TFuture<TRope>> AsyncReadOperation; + TUnboxedValueVector BufferForUsedInputItems; + TUnboxedValueVector BufferForKeyAnsState; +}; + #ifndef MKQL_DISABLE_CODEGEN class TLLVMFieldsStructureState: public TLLVMFieldsStructure<TComputationValue<TState>> { private: @@ -737,48 +1110,31 @@ class TWideLastCombinerWrapper: public TStatefulWideFlowCodegeneratorNode<TWideL { using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWrapper>; public: - TWideLastCombinerWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TCombinerNodes&& nodes, TKeyTypes&& keyTypes) + TWideLastCombinerWrapper( + TComputationMutables& mutables, + IComputationWideFlowNode* flow, + TCombinerNodes&& nodes, + const TMultiType* usedInputItemType, + TKeyTypes&& keyTypes, + const TMultiType* keyAndStateType + ) : TBaseComputation(mutables, flow, EValueRepresentation::Boxed) , Flow(flow) , Nodes(std::move(nodes)) , KeyTypes(std::move(keyTypes)) + , UsedInputItemType(usedInputItemType) + , KeyAndStateType(keyAndStateType) , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Nodes.ItemNodes.size())) {} - EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - if (!state.HasValue()) { - MakeState(ctx, state); - } - - if (const auto ptr = static_cast<TState*>(state.AsBoxed().Get())) { - auto **fields = ctx.WideFields.data() + WideFieldsIndex; - - while (EFetchResult::Finish != ptr->InputStatus) { - for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) - if (Nodes.ItemNodes[i]->GetDependencesCount() > 0U || Nodes.PasstroughtItems[i]) - fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx); - - switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) { - case EFetchResult::One: - Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue)); - Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat)); - continue; - case EFetchResult::Yield: - return EFetchResult::Yield; - case EFetchResult::Finish: - break; - } - } - - if (const auto values = static_cast<NUdf::TUnboxedValue*>(ptr->Extract())) { - Nodes.FinishItem(ctx, values, output); - return EFetchResult::One; - } - - return EFetchResult::Finish; + EFetchResult DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + if (!stateValue.HasValue()) { + MakeSpillingSupportState(ctx, stateValue); } - Y_UNREACHABLE(); + auto *const state = static_cast<TSpillingSupportState *>(stateValue.AsBoxed().Get()); + return state->DoCalculate(ctx, output); } + #ifndef MKQL_DISABLE_CODEGEN ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const { auto& context = ctx.Codegen.GetContext(); @@ -1018,6 +1374,14 @@ private: ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes)) ); #endif + } + void MakeSpillingSupportState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { + state = ctx.HolderFactory.Create<TSpillingSupportState>(Nodes, Flow, WideFieldsIndex, + UsedInputItemType, KeyAndStateType, + Nodes.KeyNodes.size(), + TMyValueHasher(KeyTypes), + TMyValueEqual(KeyTypes) + ); } void RegisterDependencies() const final { @@ -1032,7 +1396,8 @@ private: IComputationWideFlowNode *const Flow; const TCombinerNodes Nodes; const TKeyTypes KeyTypes; - + const TMultiType* const UsedInputItemType; + const TMultiType* const KeyAndStateType; const ui32 WideFieldsIndex; #ifndef MKQL_DISABLE_CODEGEN @@ -1070,8 +1435,8 @@ private: template<bool Last> IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() >= (Last ? 3U : 4U), "Expected more arguments."); - - const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); + const auto inputType = AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()); + const auto inputWidth = GetWideComponentsCount(inputType); const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); @@ -1082,10 +1447,12 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF const auto stateSize = AS_VALUE(TDataLiteral, callable.GetInput(++index))->AsValue().Get<ui32>(); ++index += inputWidth; - + std::vector<TType*> keyAndStateItemTypes; + keyAndStateItemTypes.reserve(keysSize + stateSize); TKeyTypes keyTypes; keyTypes.reserve(keysSize); for (ui32 i = index; i < index + keysSize; ++i) { + keyAndStateItemTypes.push_back(callable.GetInput(i).GetStaticType()); bool optional; keyTypes.emplace_back(*UnpackOptionalData(callable.GetInput(i).GetStaticType(), optional)->GetDataSlot(), optional); } @@ -1096,7 +1463,10 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF index += keysSize; nodes.InitResultNodes.reserve(stateSize); - std::generate_n(std::back_inserter(nodes.InitResultNodes), stateSize, [&](){ return LocateNode(ctx.NodeLocator, callable, index++); } ); + for (size_t i = 0; i != stateSize; ++i) { + keyAndStateItemTypes.push_back(callable.GetInput(index).GetStaticType()); + nodes.InitResultNodes.push_back(LocateNode(ctx.NodeLocator, callable, index++)); + } index += stateSize; nodes.UpdateResultNodes.reserve(stateSize); @@ -1125,9 +1495,21 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF nodes.BuildMaps(); if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) { - if constexpr (Last) - return new TWideLastCombinerWrapper(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes)); - else { + if constexpr (Last) { + const auto inputItemTypes = GetWideComponents(inputType); + std::vector<TType*> usedInputItemTypes; + usedInputItemTypes.reserve(inputItemTypes.size()); + for(size_t i = 0; i != inputItemTypes.size(); ++i) { + if (nodes.IsInputItemNodeUsed(i)) { + usedInputItemTypes.push_back(inputItemTypes[i]); + } + } + return new TWideLastCombinerWrapper(ctx.Mutables, wide, std::move(nodes), + TMultiType::Create(usedInputItemTypes.size(), usedInputItemTypes.data(), ctx.Env), + std::move(keyTypes), + TMultiType::Create(keyAndStateItemTypes.size(), keyAndStateItemTypes.data(), ctx.Env) + ); + } else { const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1U))->AsValue().Get<ui64>(); if (EGraphPerProcess::Single == ctx.GraphPerProcess) return new TWideCombinerWrapper<true>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit); diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp index 8e49947c0d..78dbb12569 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp @@ -923,14 +923,18 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto graph = setup.BuildGraph(pgmReturn); const auto iterator = graph->GetValue().GetListIterator(); NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "key one"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 2 / key two"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long key one"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 8 / very long value 7 / very long value 6"); + THashSet<TString> expectedResults = { + "key one", + "very long value 2 / key two", + "very long key one", + "very long value 8 / very long value 7 / very long value 6" + }; + while(!expectedResults.empty()) { + UNIT_ASSERT(iterator.Next(item)); + TString val{item.AsStringRef()}; + UNIT_ASSERT(expectedResults.contains(val)); + expectedResults.erase(val); + } UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } @@ -998,14 +1002,18 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto graph = setup.BuildGraph(pgmReturn); const auto iterator = graph->GetValue().GetListIterator(); NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 1 / key one / very long value 1 / key one"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 3 / key two / very long value 2 / key two"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 4 / very long key one / very long value 4 / very long key one"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "very long value 9 / very long key two / very long value 5 / very long key two"); + THashSet<TString> expectedResults = { + "very long value 1 / key one / very long value 1 / key one", + "very long value 3 / key two / very long value 2 / key two", + "very long value 4 / very long key one / very long value 4 / very long key one", + "very long value 9 / very long key two / very long value 5 / very long key two" + }; + while (!expectedResults.empty()) { + UNIT_ASSERT(iterator.Next(item)); + TString val{item.AsStringRef()}; + UNIT_ASSERT(expectedResults.contains(val)); + expectedResults.erase(val); + } UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } @@ -1070,11 +1078,17 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto graph = setup.BuildGraph(pgmReturn); const auto iterator = graph->GetValue().GetListIterator(); + THashSet<TString> expectedResults = { + "key one / value 2 / value 1 / value 5 / value 4", + "key two / value 4 / value 3 / value 3 / value 2", + }; NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "key one / value 2 / value 1 / value 5 / value 4"); - UNIT_ASSERT(iterator.Next(item)); - UNBOXED_VALUE_STR_EQUAL(item, "key two / value 4 / value 3 / value 3 / value 2"); + while (!expectedResults.empty()) { + UNIT_ASSERT(iterator.Next(item)); + TString val{item.AsStringRef()}; + UNIT_ASSERT(expectedResults.contains(val)); + expectedResults.erase(val); + } UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt index 364f602ce5..344f00fd5f 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt @@ -48,6 +48,7 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt index 364f602ce5..344f00fd5f 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt @@ -48,6 +48,7 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt index 2c5ea976b0..f0a0459bfc 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt @@ -49,6 +49,7 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt index 2c5ea976b0..f0a0459bfc 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt @@ -49,6 +49,7 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt index 364f602ce5..344f00fd5f 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt @@ -48,6 +48,7 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h index f1948212bd..d8dee524b2 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h @@ -85,6 +85,9 @@ public: size_t PackedSizeEstimate() const { return IsBlock_ ? BlockBuffer_.size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0); } + bool IsEmpty() const { + return !ItemCount_; + } void Clear(); TRope Finish(); diff --git a/ydb/library/yql/minikql/computation/mkql_spiller.cpp b/ydb/library/yql/minikql/computation/mkql_spiller.cpp new file mode 100644 index 0000000000..803dac9c56 --- /dev/null +++ b/ydb/library/yql/minikql/computation/mkql_spiller.cpp @@ -0,0 +1,61 @@ +#include "mkql_spiller.h" +#include <library/cpp/threading/future/core/future.h> +#include <util/system/thread.h> +#include <unordered_map> + +namespace NKikimr::NMiniKQL { + +//Dummy synchronous in-memory spiller +class TDummySpiller: public ISpiller{ +public: + TDummySpiller() + : NextKey(0) + {} + + NThreading::TFuture<TKey> Put(TRope&& blob) override { + auto promise = NThreading::NewPromise<ISpiller::TKey>(); +// TThread t([this, blob = std::move(blob), p = std::move(p)]() { +// WriteAsync(blob, p); +// }); +// t.Detach(); +// return f; + auto key = NextKey; + Storage[key] = blob; + NextKey++; + promise.SetValue(key); + return promise.GetFuture();; + } + std::optional<NThreading::TFuture<TRope>> Get(TKey key) override { + auto promise = NThreading::NewPromise<TRope>(); + if (auto it = Storage.find(key); it != Storage.end()) { + promise.SetValue(it->second); + return promise.GetFuture();; + } else { + return std::nullopt; + } + } + std::optional<NThreading::TFuture<TRope>> Extract(TKey key) override { + auto promise = NThreading::NewPromise<TRope>(); + if (auto it = Storage.find(key); it != Storage.end()) { + promise.SetValue(std::move(it->second)); + Storage.erase(it); + return promise.GetFuture();; + } else { + return std::nullopt; + } + } + NThreading::TFuture<void> Delete(TKey key) override { + auto promise = NThreading::NewPromise<void>(); + promise.SetValue(); + Storage.erase(key); + return promise.GetFuture(); + } +private: + ISpiller::TKey NextKey; + std::unordered_map<ISpiller::TKey, TRope> Storage; +}; +ISpiller::TPtr MakeSpiller() { + return std::make_shared<TDummySpiller>(); +} + +} //namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_spiller.h b/ydb/library/yql/minikql/computation/mkql_spiller.h new file mode 100644 index 0000000000..d9663d5985 --- /dev/null +++ b/ydb/library/yql/minikql/computation/mkql_spiller.h @@ -0,0 +1,27 @@ +#pragma once +#include <library/cpp/threading/future/core/future.h> +#include <library/cpp/actors/util/rope.h> + + +namespace NKikimr::NMiniKQL { + +struct ISpiller { + using TPtr = std::shared_ptr<ISpiller>; + virtual ~ISpiller(){} + using TKey = ui64; + virtual NThreading::TFuture<TKey> Put(TRope&& blob) = 0; + + ///\return + /// nullopt for absent keys + /// TFuture + virtual std::optional<NThreading::TFuture<TRope>> Get(TKey key) = 0; + virtual NThreading::TFuture<void> Delete(TKey) = 0; + ///Get + Delete + ///Stored value may be moved to feature + virtual std::optional<NThreading::TFuture<TRope>> Extract(TKey key) = 0; + +}; + +ISpiller::TPtr MakeSpiller(); + +}//namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h b/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h new file mode 100644 index 0000000000..b3bba486bd --- /dev/null +++ b/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h @@ -0,0 +1,88 @@ +#pragma once +#include "mkql_spiller.h" +#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h> + + +namespace NKikimr::NMiniKQL { + +///Stores and loads very long sequences of TMultiType UVs +///Can split sequences into chunks +///Sends chunks to ISplitter and keeps assigned keys +///When all data is written switches to read mode. Switching back to writing mode is not supported +///Provides an interface for sequential read (like forward iterator) +///When interaction with ISpiller is required, Write and Read operations return a Future +class TWideUnboxedValuesSpillerAdapter { +public: + TWideUnboxedValuesSpillerAdapter(ISpiller::TPtr spiller, const TMultiType* type, size_t sizeLimit) + : Spiller(spiller) + , ItemType(type) + , SizeLimit(sizeLimit) + , Packer(type) + { + } + + /// Write wide UV item + /// \returns + /// - nullopt, if thee values are accumulated + /// - TFeature, if the values are being stored asynchronously and a caller must wait until async operation ends + /// In this case a caller must wait operation completion and call StoreCompleted. + /// Design note: not using Subscribe on a Future here to avoid possible race condition + std::optional<NThreading::TFuture<ISpiller::TKey>> WriteWideItem(const TArrayRef<NUdf::TUnboxedValuePod>& wideItem) { + Packer.AddWideItem(wideItem.data(), wideItem.size()); + if(Packer.PackedSizeEstimate() > SizeLimit) { + return Spiller->Put(std::move(Packer.Finish())); + } else { + return std::nullopt; + } + } + + std::optional<NThreading::TFuture<ISpiller::TKey>> FinishWriting() { + if (Packer.IsEmpty()) + return std::nullopt; + return Spiller->Put(std::move(Packer.Finish())); + } + + void AsyncWriteCompleted(ISpiller::TKey key) { + StoredChunks.push_back(key); + } + + //Extracting interface + bool Empty() const { + return StoredChunks.empty() && !CurrentBatch; + } + std::optional<NThreading::TFuture<TRope>> ExtractWideItem(const TArrayRef<NUdf::TUnboxedValue>& wideItem) { + MKQL_ENSURE(!Empty(), "Internal logic error"); + if (CurrentBatch) { + auto row = CurrentBatch->Head(); + for (size_t i = 0; i != wideItem.size(); ++i) { + wideItem[i] = row[i]; + } + CurrentBatch->Pop(); + if (CurrentBatch->empty()) { + CurrentBatch = std::nullopt; + } + return std::nullopt; + } else { + auto r = Spiller->Get(StoredChunks.front()); + StoredChunks.pop_front(); + return r; + } + } + + void AsyncReadCompleted(TRope&& rope,const THolderFactory& holderFactory ) { + //Implementation detail: deserialization is performed in a processing thread + TUnboxedValueBatch batch(ItemType); + Packer.UnpackBatch(std::move(rope), holderFactory, batch); + CurrentBatch = std::move(batch); + } + +private: + ISpiller::TPtr Spiller; + const TMultiType* const ItemType; + const size_t SizeLimit; + TValuePackerTransport<false> Packer; + std::deque<ISpiller::TKey> StoredChunks; + std::optional<TUnboxedValueBatch> CurrentBatch; +}; + +}//namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp b/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp new file mode 100644 index 0000000000..4f3953623c --- /dev/null +++ b/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp @@ -0,0 +1,73 @@ +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> +#include <ydb/library/yql/minikql/computation/mkql_spiller_adapter.h> +#include <ydb/library/yql/minikql/computation/mkql_spiller.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <vector> +#include <utility> +#include <algorithm> + +namespace NKikimr::NMiniKQL { + +Y_UNIT_TEST_SUITE(TestWideSpillerAdapter) { + constexpr size_t itemWidth = 3; + constexpr size_t chunkSize = 100; + Y_UNIT_TEST(TestWriteExtractZeroItems) { + TScopedAlloc alloc(__LOCATION__); + TTypeEnvironment env(alloc); + const auto spiller = MakeSpiller(); + std::vector<TType*> itemTypes(itemWidth, TDataType::Create(NUdf::TDataType<char*>::Id, env)); + TWideUnboxedValuesSpillerAdapter wideUVSpiller(spiller, TMultiType::Create(itemWidth, itemTypes.data(), env), chunkSize); + auto r = wideUVSpiller.FinishWriting(); + UNIT_ASSERT(!r.has_value()); + UNIT_ASSERT(wideUVSpiller.Empty()); + } + + Y_UNIT_TEST(TestWriteExtract) { + TScopedAlloc alloc(__LOCATION__); + TMemoryUsageInfo memInfo("test"); + THolderFactory holderFactory(alloc.Ref(), memInfo); + TTypeEnvironment env(alloc); + const auto spiller = MakeSpiller(); + std::vector<TType*> itemTypes(itemWidth, TDataType::Create(NUdf::TDataType<char*>::Id, env)); + TWideUnboxedValuesSpillerAdapter wideUVSpiller(spiller, TMultiType::Create(itemWidth, itemTypes.data(), env), chunkSize); + std::vector<NUdf::TUnboxedValue> wideValue(itemWidth); + constexpr size_t rowCount = chunkSize*10+3; + for (size_t row = 0; row != rowCount; ++row) { + for(size_t i = 0; i != itemWidth; ++i) { + wideValue[i] = NUdf::TUnboxedValuePod(NUdf::TStringValue(TStringBuilder() << "Long enough string: " << row * 10 + i)); + } + if (auto r = wideUVSpiller.WriteWideItem(wideValue)) { + wideUVSpiller.AsyncWriteCompleted(r->GetValue()); + } + } + auto r = wideUVSpiller.FinishWriting(); + if (r) { + wideUVSpiller.AsyncWriteCompleted(r->GetValue()); + } + + wideUVSpiller.AsyncWriteCompleted(r->GetValue()); + for (size_t row = 0; row != rowCount; ++row) { + UNIT_ASSERT(!wideUVSpiller.Empty()); + if (auto r = wideUVSpiller.ExtractWideItem(wideValue)) { + wideUVSpiller.AsyncReadCompleted(r->ExtractValue(), holderFactory); + r = wideUVSpiller.ExtractWideItem(wideValue); + UNIT_ASSERT(!r.has_value()); + } + for (size_t i = 0; i != itemWidth; ++i) { + UNIT_ASSERT_VALUES_EQUAL(TStringBuf(wideValue[i].AsStringRef()) , TStringBuilder() << "Long enough string: " << row * 10 + i); + } + } + UNIT_ASSERT(!wideUVSpiller.Empty()); + } +} + +} //namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt index 2f9609b5a2..185dd5f550 100644 --- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt +++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt @@ -44,6 +44,7 @@ target_sources(minikql-computation-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt index 2f9609b5a2..185dd5f550 100644 --- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt @@ -44,6 +44,7 @@ target_sources(minikql-computation-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt index e0b561b565..38fd1bb839 100644 --- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt @@ -45,6 +45,7 @@ target_sources(minikql-computation-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt index e0b561b565..38fd1bb839 100644 --- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt @@ -45,6 +45,7 @@ target_sources(minikql-computation-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt index 2f9609b5a2..185dd5f550 100644 --- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt @@ -44,6 +44,7 @@ target_sources(minikql-computation-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt index 61f2fe7012..8da2d8478b 100644 --- a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt +++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt @@ -41,6 +41,7 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp ) set_property( diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt index 01dba3f99d..440182defa 100644 --- a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt @@ -42,6 +42,7 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp ) set_property( diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt index 67eac9242a..653994d7c9 100644 --- a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt @@ -45,6 +45,7 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp ) set_property( diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt index 436ad6ceac..fc6f85827f 100644 --- a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt @@ -46,6 +46,7 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp ) set_property( diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt index 4244aa874b..bfc123d31d 100644 --- a/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt @@ -35,6 +35,7 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp ) set_property( diff --git a/ydb/library/yql/minikql/computation/ut/ya.make b/ydb/library/yql/minikql/computation/ut/ya.make index b4c2ff659f..819a216fd6 100644 --- a/ydb/library/yql/minikql/computation/ut/ya.make +++ b/ydb/library/yql/minikql/computation/ut/ya.make @@ -21,6 +21,7 @@ SRCS( mkql_computation_pattern_cache_ut.cpp mkql_validate_ut.cpp mkql_value_builder_ut.cpp + mkql_spiller_adapter_ut.cpp presort_ut.cpp ) diff --git a/ydb/library/yql/minikql/computation/ya.make.inc b/ydb/library/yql/minikql/computation/ya.make.inc index 424a78ebdd..11e1687462 100644 --- a/ydb/library/yql/minikql/computation/ya.make.inc +++ b/ydb/library/yql/minikql/computation/ya.make.inc @@ -13,6 +13,7 @@ SRCS( mkql_computation_node_impl.cpp mkql_computation_node_pack.cpp mkql_computation_node_pack_impl.cpp + mkql_spiller.cpp mkql_custom_list.cpp mkql_llvm_base.cpp mkql_validate.cpp |