diff options
author | gvit <gvit@ydb.tech> | 2023-11-27 20:19:54 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-11-27 20:45:02 +0300 |
commit | 683fcca92aa79893d3d0c44a36d27c10c807c300 (patch) | |
tree | 12d63f682f52716b29bc1d194e2b88c41674188f | |
parent | 8dd796d6a2f607d86ff7c85931b580768ac6d5ab (diff) | |
download | ydb-683fcca92aa79893d3d0c44a36d27c10c807c300.tar.gz |
Revert commit rXXXXXX, YQL-16986 wide combiner with spilling prototype
26 files changed, 64 insertions, 735 deletions
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index db011ef730..e7c3b4372d 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -162,13 +162,8 @@ private: LOG_T("Task runner. Inject watermark " << watermark); TaskRunner->SetWatermarkIn(watermark); } + res = TaskRunner->Run(); -// Uncomment me to test YQL-16986 prototype -// Delete me after YQL-16988 -// if (ERunStatus::PendingInput == res){ -// //very poor man waiting for spiller async operation completion -// Schedule(TDuration::MilliSeconds(1), new TEvContinueRun(THashSet<ui32>(ev->Get()->InputChannels), ev->Get()->MemLimit)); -// } } for (auto& channelId : inputMap) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp index 0710fb8ed2..2a4e0bb1bc 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_collect.cpp @@ -20,9 +20,8 @@ public: if (item.IsFinish()) { return list.Release(); } - if (!item.IsYield()) { - list = ctx.HolderFactory.Append(list.Release(), item.Release()); - } + MKQL_ENSURE(!item.IsYield(), "Unexpected flow status!"); + list = ctx.HolderFactory.Append(list.Release(), item.Release()); } } #ifndef MKQL_DISABLE_CODEGEN diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index d54cf0cb34..73c9077fb4 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -7,8 +7,6 @@ #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/mkql_stats_registry.h> #include <ydb/library/yql/minikql/defs.h> -#include <ydb/library/yql/minikql/computation/mkql_spiller.h> -#include <ydb/library/yql/minikql/computation/mkql_spiller_adapter.h> #include <ydb/library/yql/utils/cast.h> #include <util/string/cast.h> @@ -108,16 +106,6 @@ struct TCombinerNodes { std::transform(itemsOnResults.cbegin(), itemsOnResults.cend(), PasstroughtItems.begin(), [](const TPasstroughtMap::value_type& v) { return v.has_value(); }); } - bool IsInputItemNodeUsed(size_t i) const { - return (ItemNodes[i]->GetDependencesCount() > 0U || PasstroughtItems[i]); - } - - NUdf::TUnboxedValue* GetUsedInputItemNodePtrOrNull(TComputationContext& ctx, size_t i) const { - return IsInputItemNodeUsed(i) ? - &ItemNodes[i]->RefValue(ctx) : - nullptr; - } - void ExtractKey(TComputationContext& ctx, NUdf::TUnboxedValue** values, NUdf::TUnboxedValue* keys) const { std::for_each(ItemNodes.cbegin(), ItemNodes.cend(), [&](IComputationExternalNode* item) { if (const auto pointer = *values++) @@ -307,367 +295,6 @@ private: TStates States; }; -class TSpillingSupportState : public TComputationValue<TSpillingSupportState> { - typedef TComputationValue<TSpillingSupportState> TBase; -public: - enum class EOperatingMode { - InMemory, // try to perform all processing in memory - SpillState, //after switching to the spilling mode we spill collected state and free allocated memory - SpillData, //store incoming data - ProcessSpilled //restore and process spilled data - }; - TSpillingSupportState( - TMemoryUsageInfo* memInfo, - const TCombinerNodes& nodes, IComputationWideFlowNode *const flow, size_t wideFieldsIndex, - const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth, - const THashFunc& hash, const TEqualsFunc& equal - ) - : TBase(memInfo) - , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal) - , Nodes(nodes) - , Flow(flow) - , WideFieldsIndex(wideFieldsIndex) - , UsedInputItemType(usedInputItemType) - , KeyAndStateType(keyAndStateType) - , KeyWidth(keyWidth) - , Hasher(hash) - , Mode(EOperatingMode::InMemory) - { - BufferForUsedInputItems.reserve(usedInputItemType->GetElementsCount()); - BufferForKeyAnsState.reserve(keyAndStateType->GetElementsCount()); - } - ~TSpillingSupportState() { - } - - EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { - if (HasRunningSpillingAsyncOperation()) { - return EFetchResult::Yield; - } - while (true) { - switch(GetMode()) { - case EOperatingMode::InMemory: { - auto r = DoCalculateInMemory(ctx, output); - if (GetMode() == TSpillingSupportState::EOperatingMode::InMemory) { - return r; - } - break; - } - case EOperatingMode::SpillState: { - SpillState(); - if (GetMode() == EOperatingMode::SpillState) { - return AsyncWrite(); - } - MKQL_ENSURE(GetMode() == EOperatingMode::SpillData, "Internal logic error"); - break; - } - case EOperatingMode::SpillData: { - SpillData(ctx); - if (GetMode() == EOperatingMode::SpillData) { - return AsyncWrite(); - } - break; - } - case EOperatingMode::ProcessSpilled: { - return ProcessSpilledData(ctx, output); - } - } - } - Y_UNREACHABLE(); - } -private: - EFetchResult AsyncWrite() { - MKQL_ENSURE(!AsyncReadOperation.has_value(), "Internal logic error"); - MKQL_ENSURE(AsyncWriteOperation.has_value(), "Internal logic error"); - MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error"); - //AsyncWriteOperation.Subscribe() //TODO YQL-16988 - return EFetchResult::Yield; - } - EFetchResult AsyncRead() { - MKQL_ENSURE(!AsyncWriteOperation.has_value(), "Internal logic error"); - MKQL_ENSURE(AsyncReadOperation.has_value(), "Internal logic error"); - MKQL_ENSURE(AsyncReadOperation->HasValue(), "Internal logic error"); - //AsyncReadOperation.Subscribe() //TODO YQL-16988 - return EFetchResult::Yield; - } - EFetchResult DoCalculateInMemory(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { - auto **fields = ctx.WideFields.data() + WideFieldsIndex; - while (InputDataFetchResult != EFetchResult::Finish) { - for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) { - fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i); - } - InputDataFetchResult = Flow->FetchValues(ctx, fields); - switch (InputDataFetchResult) { - case EFetchResult::One: { - Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue)); - const bool isNew = InMemoryProcessingState.TasteIt(); - Nodes.ProcessItem( - ctx, - isNew ? nullptr : static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue), - static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Throat) - ); - if (IsSwitchToSpillingModeCondition()) { - SwitchMode(EOperatingMode::SpillState); - return EFetchResult::Yield; - } - continue; - } - case EFetchResult::Yield: - return EFetchResult::Yield; - case EFetchResult::Finish: - break; - } - } - if (const auto values = static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract())) { - Nodes.FinishItem(ctx, values, output); - return EFetchResult::One; - } - return EFetchResult::Finish; - } - - void SpillState() { - // mini State machine that: - //1. Spills InMemoryProcessingState - //2. Finilizes buckets - //During each phase asyn operation may be returned - //On next call execution is continued - MKQL_ENSURE(!AsyncReadOperation, "Internal logic error"); - if (AsyncWriteOperation) { - MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error"); - SpilledBuckets[CurrentAsyncOperationBucketId].InitialState->AsyncWriteCompleted(AsyncWriteOperation->ExtractValue()); - AsyncWriteOperation = std::nullopt; - } - while (const auto keyAndState = InMemoryProcessingState.Extract()) { - auto hash = Hasher(keyAndState); //Hasher uses only key for hashing - auto bucketId = hash % SpilledBucketCount; - auto& bucket = SpilledBuckets[bucketId]; - MKQL_ENSURE(bucket.InitialState, "Internal logic error"); - if (auto chunkIsBeingStored = bucket.InitialState->WriteWideItem({keyAndState, KeyAndStateType->GetElementsCount()})) { - CurrentAsyncOperationBucketId = bucketId; - AsyncWriteOperation = chunkIsBeingStored; - return; - } - for (size_t i = 0; i != KeyAndStateType->GetElementsCount(); ++i) { - //releasing values stored in unsafe TUnboxedValue buffer - keyAndState[i].UnRef(); - } - } - if (!FinalizingSpillerBuckets) { - FinalizingSpillerBuckets = true; - CurrentAsyncOperationBucketId = 0; - } - for(; CurrentAsyncOperationBucketId != SpilledBuckets.size(); ++CurrentAsyncOperationBucketId) { - if (auto chunkIsBeingStored = SpilledBuckets[CurrentAsyncOperationBucketId].InitialState->FinishWriting()) { - AsyncWriteOperation = chunkIsBeingStored; - return; - } - } - InMemoryProcessingState.IsEmpty(); - SwitchMode(EOperatingMode::SpillData); - } - - void SpillData(TComputationContext& ctx) { - if (AsyncWriteOperation) { - MKQL_ENSURE(AsyncWriteOperation->HasValue(), "Internal logic error"); - SpilledBuckets[CurrentAsyncOperationBucketId].Data->AsyncWriteCompleted(AsyncWriteOperation->ExtractValue()); - AsyncWriteOperation = std::nullopt; - } - while (!FinalizingSpillerBuckets) { - auto **fields = ctx.WideFields.data() + WideFieldsIndex; - for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) { - fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i); - } - InputDataFetchResult = Flow->FetchValues(ctx, fields); - switch (InputDataFetchResult) { - case EFetchResult::One: { - BufferForKeyAnsState.resize(KeyWidth); //use only key part of the buffer - Nodes.ExtractKey(ctx, fields, BufferForKeyAnsState.data()); - auto hash = Hasher(BufferForKeyAnsState.data()); - BufferForKeyAnsState.resize(0); //for freeing allocated key value asap - auto bucketId = hash % SpilledBucketCount; - MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error"); - for (size_t i = 0; i != Nodes.ItemNodes.size(); ++i) { - if (fields[i]) { - BufferForUsedInputItems.push_back(*fields[i]); - } - } - auto &bucket = SpilledBuckets[bucketId]; - const auto chunkIsBeingStored = bucket.Data->WriteWideItem(BufferForUsedInputItems); - BufferForUsedInputItems.resize(0); //for freeing allocated key value asap - if (chunkIsBeingStored) { - CurrentAsyncOperationBucketId = bucketId; - AsyncWriteOperation = chunkIsBeingStored; - return; - } - break; - } - case EFetchResult::Yield: - return; - case EFetchResult::Finish: - CurrentAsyncOperationBucketId = 0; - FinalizingSpillerBuckets = true; - break; - } - } - for(; CurrentAsyncOperationBucketId != SpilledBuckets.size(); ++CurrentAsyncOperationBucketId) { - auto& bucket = SpilledBuckets[CurrentAsyncOperationBucketId]; - if (auto chunkIsBeingStored = bucket.Data->FinishWriting()) { - AsyncWriteOperation = chunkIsBeingStored; - return; - } - } - SwitchMode(EOperatingMode::ProcessSpilled); - } - - EFetchResult ProcessSpilledData(TComputationContext& ctx, NUdf::TUnboxedValue*const* output){ - if (AsyncReadOperation) { - MKQL_ENSURE(AsyncReadOperation->HasValue(), "Internal logic error"); - if (RecoverState) { - SpilledBuckets[0].InitialState->AsyncReadCompleted( - AsyncReadOperation->ExtractValue(), ctx.HolderFactory); - } else { - SpilledBuckets[0].Data->AsyncReadCompleted( - AsyncReadOperation->ExtractValue(), ctx.HolderFactory); - } - AsyncWriteOperation = std::nullopt; - } - while(!SpilledBuckets.empty()){ - auto& bucket = SpilledBuckets.front(); - //recover spilled state - while(!bucket.InitialState->Empty()) { - RecoverState = true; - BufferForKeyAnsState.resize(KeyAndStateType->GetElementsCount()); - AsyncReadOperation = bucket.InitialState->ExtractWideItem(BufferForKeyAnsState); - if (AsyncReadOperation) { - BufferForKeyAnsState.resize(0); - return AsyncRead(); - } - for (size_t i = 0; i != KeyWidth; ++i) { - //jumping into unsafe world, refusing ownership - static_cast<NUdf::TUnboxedValue&>(InMemoryProcessingState.Tongue[i]) = std::move(BufferForKeyAnsState[i]); - } - auto isNew = InMemoryProcessingState.TasteIt(); - MKQL_ENSURE(isNew, "Internal logic error"); - for (size_t i = KeyWidth; i != KeyAndStateType->GetElementsCount(); ++i) { - //jumping into unsafe world, refusing ownership - static_cast<NUdf::TUnboxedValue&>(InMemoryProcessingState.Throat[i - KeyWidth]) = std::move(BufferForKeyAnsState[i]); - } - BufferForKeyAnsState.resize(0); - } - //process spilled data - while(!bucket.Data->Empty()) { - RecoverState = false; - BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount()); - AsyncReadOperation = bucket.Data->ExtractWideItem(BufferForUsedInputItems); - if (AsyncReadOperation) { - return AsyncRead(); - } - auto **fields = ctx.WideFields.data() + WideFieldsIndex; - for (size_t i = 0, j = 0; i != Nodes.ItemNodes.size(); ++i) { - if (Nodes.IsInputItemNodeUsed(i)) { - fields[i] = &BufferForUsedInputItems[j++]; - } else { - fields[i] = nullptr; - } - } - Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue)); - const bool isNew = InMemoryProcessingState.TasteIt(); - Nodes.ProcessItem( - ctx, - isNew ? nullptr : static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Tongue), - static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Throat) - ); - BufferForKeyAnsState.resize(0); - } - if (const auto values = static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract())) { - Nodes.FinishItem(ctx, values, output); - return EFetchResult::One; - } - InMemoryProcessingState.IsEmpty(); - SpilledBuckets.pop_front(); - } - return EFetchResult::Finish; - } - - EOperatingMode GetMode() const { - return Mode; - } - - bool HasRunningSpillingAsyncOperation() const { - return - (AsyncWriteOperation.has_value() && !AsyncWriteOperation->HasValue()) || - (AsyncReadOperation.has_value() && !AsyncReadOperation->HasValue()); - } - - void SwitchMode(EOperatingMode mode) { - MKQL_ENSURE(!AsyncReadOperation, "Internal logic error"); - MKQL_ENSURE(!AsyncWriteOperation, "Internal logic error"); - switch(mode) { - case EOperatingMode::InMemory: - MKQL_ENSURE(false, "Internal logic error"); - break; - case EOperatingMode::SpillState: { - MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error"); - MKQL_ENSURE(!Spiller,"Internal logic error"); - Spiller = MakeSpiller(); - SpilledBuckets.resize(SpilledBucketCount); - for (auto &b: SpilledBuckets) { - b.InitialState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(Spiller, KeyAndStateType, 1 << 20); - } - FinalizingSpillerBuckets = false; - break; - } - case EOperatingMode::SpillData: - MKQL_ENSURE(EOperatingMode::SpillState == Mode, "Internal logic error"); - MKQL_ENSURE(Spiller,"Internal logic error"); - MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error"); - for (auto &b: SpilledBuckets) { - b.Data = std::make_unique<TWideUnboxedValuesSpillerAdapter>(Spiller, UsedInputItemType, 1 << 20); - } - InputDataFetchResult = EFetchResult::Yield; - FinalizingSpillerBuckets = false; - break; - case EOperatingMode::ProcessSpilled: - MKQL_ENSURE(EOperatingMode::SpillData == Mode, "Internal logic error"); - MKQL_ENSURE(Spiller,"Internal logic error"); - MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error"); - break; - } - Mode = mode; - } - - bool IsSwitchToSpillingModeCondition() const { - //TODO implement me - return false; - } - -private: - TState InMemoryProcessingState; - const TCombinerNodes& Nodes; - IComputationWideFlowNode* const Flow; - const size_t WideFieldsIndex; - const TMultiType* const UsedInputItemType; - const TMultiType* const KeyAndStateType; - const size_t KeyWidth; - THashFunc const Hasher; - EOperatingMode Mode; - bool FinalizingSpillerBuckets; // sub mode for SpillState and SpillData - bool RecoverState; //sub mode for ProcessSpilledData - - struct TSpilledBucket { - std::unique_ptr<TWideUnboxedValuesSpillerAdapter> InitialState; //state collected before switching to spilling mode - std::unique_ptr<TWideUnboxedValuesSpillerAdapter> Data; //data collected in spilling mode - }; - static constexpr size_t SpilledBucketCount = 4; - std::deque<TSpilledBucket> SpilledBuckets; - ISpiller::TPtr Spiller; - EFetchResult InputDataFetchResult; - size_t CurrentAsyncOperationBucketId; - std::optional<NThreading::TFuture<ISpiller::TKey>> AsyncWriteOperation; - std::optional<NThreading::TFuture<TRope>> AsyncReadOperation; - TUnboxedValueVector BufferForUsedInputItems; - TUnboxedValueVector BufferForKeyAnsState; -}; - #ifndef MKQL_DISABLE_CODEGEN class TLLVMFieldsStructureState: public TLLVMFieldsStructure<TComputationValue<TState>> { private: @@ -1110,31 +737,48 @@ class TWideLastCombinerWrapper: public TStatefulWideFlowCodegeneratorNode<TWideL { using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWrapper>; public: - TWideLastCombinerWrapper( - TComputationMutables& mutables, - IComputationWideFlowNode* flow, - TCombinerNodes&& nodes, - const TMultiType* usedInputItemType, - TKeyTypes&& keyTypes, - const TMultiType* keyAndStateType - ) + TWideLastCombinerWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TCombinerNodes&& nodes, TKeyTypes&& keyTypes) : TBaseComputation(mutables, flow, EValueRepresentation::Boxed) , Flow(flow) , Nodes(std::move(nodes)) , KeyTypes(std::move(keyTypes)) - , UsedInputItemType(usedInputItemType) - , KeyAndStateType(keyAndStateType) , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Nodes.ItemNodes.size())) {} - EFetchResult DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - if (!stateValue.HasValue()) { - MakeSpillingSupportState(ctx, stateValue); + EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + if (!state.HasValue()) { + MakeState(ctx, state); + } + + if (const auto ptr = static_cast<TState*>(state.AsBoxed().Get())) { + auto **fields = ctx.WideFields.data() + WideFieldsIndex; + + while (EFetchResult::Finish != ptr->InputStatus) { + for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) + if (Nodes.ItemNodes[i]->GetDependencesCount() > 0U || Nodes.PasstroughtItems[i]) + fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx); + + switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) { + case EFetchResult::One: + Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue)); + Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat)); + continue; + case EFetchResult::Yield: + return EFetchResult::Yield; + case EFetchResult::Finish: + break; + } + } + + if (const auto values = static_cast<NUdf::TUnboxedValue*>(ptr->Extract())) { + Nodes.FinishItem(ctx, values, output); + return EFetchResult::One; + } + + return EFetchResult::Finish; } - auto *const state = static_cast<TSpillingSupportState *>(stateValue.AsBoxed().Get()); - return state->DoCalculate(ctx, output); + Y_UNREACHABLE(); } - #ifndef MKQL_DISABLE_CODEGEN ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const { auto& context = ctx.Codegen.GetContext(); @@ -1374,14 +1018,6 @@ private: ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes)) ); #endif - } - void MakeSpillingSupportState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { - state = ctx.HolderFactory.Create<TSpillingSupportState>(Nodes, Flow, WideFieldsIndex, - UsedInputItemType, KeyAndStateType, - Nodes.KeyNodes.size(), - TMyValueHasher(KeyTypes), - TMyValueEqual(KeyTypes) - ); } void RegisterDependencies() const final { @@ -1396,8 +1032,7 @@ private: IComputationWideFlowNode *const Flow; const TCombinerNodes Nodes; const TKeyTypes KeyTypes; - const TMultiType* const UsedInputItemType; - const TMultiType* const KeyAndStateType; + const ui32 WideFieldsIndex; #ifndef MKQL_DISABLE_CODEGEN @@ -1435,8 +1070,8 @@ private: template<bool Last> IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() >= (Last ? 3U : 4U), "Expected more arguments."); - const auto inputType = AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()); - const auto inputWidth = GetWideComponentsCount(inputType); + + const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); @@ -1447,12 +1082,10 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF const auto stateSize = AS_VALUE(TDataLiteral, callable.GetInput(++index))->AsValue().Get<ui32>(); ++index += inputWidth; - std::vector<TType*> keyAndStateItemTypes; - keyAndStateItemTypes.reserve(keysSize + stateSize); + TKeyTypes keyTypes; keyTypes.reserve(keysSize); for (ui32 i = index; i < index + keysSize; ++i) { - keyAndStateItemTypes.push_back(callable.GetInput(i).GetStaticType()); bool optional; keyTypes.emplace_back(*UnpackOptionalData(callable.GetInput(i).GetStaticType(), optional)->GetDataSlot(), optional); } @@ -1463,10 +1096,7 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF index += keysSize; nodes.InitResultNodes.reserve(stateSize); - for (size_t i = 0; i != stateSize; ++i) { - keyAndStateItemTypes.push_back(callable.GetInput(index).GetStaticType()); - nodes.InitResultNodes.push_back(LocateNode(ctx.NodeLocator, callable, index++)); - } + std::generate_n(std::back_inserter(nodes.InitResultNodes), stateSize, [&](){ return LocateNode(ctx.NodeLocator, callable, index++); } ); index += stateSize; nodes.UpdateResultNodes.reserve(stateSize); @@ -1495,21 +1125,9 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF nodes.BuildMaps(); if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) { - if constexpr (Last) { - const auto inputItemTypes = GetWideComponents(inputType); - std::vector<TType*> usedInputItemTypes; - usedInputItemTypes.reserve(inputItemTypes.size()); - for(size_t i = 0; i != inputItemTypes.size(); ++i) { - if (nodes.IsInputItemNodeUsed(i)) { - usedInputItemTypes.push_back(inputItemTypes[i]); - } - } - return new TWideLastCombinerWrapper(ctx.Mutables, wide, std::move(nodes), - TMultiType::Create(usedInputItemTypes.size(), usedInputItemTypes.data(), ctx.Env), - std::move(keyTypes), - TMultiType::Create(keyAndStateItemTypes.size(), keyAndStateItemTypes.data(), ctx.Env) - ); - } else { + if constexpr (Last) + return new TWideLastCombinerWrapper(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes)); + else { const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1U))->AsValue().Get<ui64>(); if (EGraphPerProcess::Single == ctx.GraphPerProcess) return new TWideCombinerWrapper<true>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit); diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp index 78dbb12569..8e49947c0d 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp @@ -923,18 +923,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto graph = setup.BuildGraph(pgmReturn); const auto iterator = graph->GetValue().GetListIterator(); NUdf::TUnboxedValue item; - THashSet<TString> expectedResults = { - "key one", - "very long value 2 / key two", - "very long key one", - "very long value 8 / very long value 7 / very long value 6" - }; - while(!expectedResults.empty()) { - UNIT_ASSERT(iterator.Next(item)); - TString val{item.AsStringRef()}; - UNIT_ASSERT(expectedResults.contains(val)); - expectedResults.erase(val); - } + UNIT_ASSERT(iterator.Next(item)); + UNBOXED_VALUE_STR_EQUAL(item, "key one"); + UNIT_ASSERT(iterator.Next(item)); + UNBOXED_VALUE_STR_EQUAL(item, "very long value 2 / key two"); + UNIT_ASSERT(iterator.Next(item)); + UNBOXED_VALUE_STR_EQUAL(item, "very long key one"); + UNIT_ASSERT(iterator.Next(item)); + UNBOXED_VALUE_STR_EQUAL(item, "very long value 8 / very long value 7 / very long value 6"); UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } @@ -1002,18 +998,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto graph = setup.BuildGraph(pgmReturn); const auto iterator = graph->GetValue().GetListIterator(); NUdf::TUnboxedValue item; - THashSet<TString> expectedResults = { - "very long value 1 / key one / very long value 1 / key one", - "very long value 3 / key two / very long value 2 / key two", - "very long value 4 / very long key one / very long value 4 / very long key one", - "very long value 9 / very long key two / very long value 5 / very long key two" - }; - while (!expectedResults.empty()) { - UNIT_ASSERT(iterator.Next(item)); - TString val{item.AsStringRef()}; - UNIT_ASSERT(expectedResults.contains(val)); - expectedResults.erase(val); - } + UNIT_ASSERT(iterator.Next(item)); + UNBOXED_VALUE_STR_EQUAL(item, "very long value 1 / key one / very long value 1 / key one"); + UNIT_ASSERT(iterator.Next(item)); + UNBOXED_VALUE_STR_EQUAL(item, "very long value 3 / key two / very long value 2 / key two"); + UNIT_ASSERT(iterator.Next(item)); + UNBOXED_VALUE_STR_EQUAL(item, "very long value 4 / very long key one / very long value 4 / very long key one"); + UNIT_ASSERT(iterator.Next(item)); + UNBOXED_VALUE_STR_EQUAL(item, "very long value 9 / very long key two / very long value 5 / very long key two"); UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } @@ -1078,17 +1070,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) { const auto graph = setup.BuildGraph(pgmReturn); const auto iterator = graph->GetValue().GetListIterator(); - THashSet<TString> expectedResults = { - "key one / value 2 / value 1 / value 5 / value 4", - "key two / value 4 / value 3 / value 3 / value 2", - }; NUdf::TUnboxedValue item; - while (!expectedResults.empty()) { - UNIT_ASSERT(iterator.Next(item)); - TString val{item.AsStringRef()}; - UNIT_ASSERT(expectedResults.contains(val)); - expectedResults.erase(val); - } + UNIT_ASSERT(iterator.Next(item)); + UNBOXED_VALUE_STR_EQUAL(item, "key one / value 2 / value 1 / value 5 / value 4"); + UNIT_ASSERT(iterator.Next(item)); + UNBOXED_VALUE_STR_EQUAL(item, "key two / value 4 / value 3 / value 3 / value 2"); UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt index 344f00fd5f..364f602ce5 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-arm64.txt @@ -48,7 +48,6 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt index 344f00fd5f..364f602ce5 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt @@ -48,7 +48,6 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt index f0a0459bfc..2c5ea976b0 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt @@ -49,7 +49,6 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt index f0a0459bfc..2c5ea976b0 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt @@ -49,7 +49,6 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt index 344f00fd5f..364f602ce5 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt @@ -48,7 +48,6 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h index d8dee524b2..f1948212bd 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h @@ -85,9 +85,6 @@ public: size_t PackedSizeEstimate() const { return IsBlock_ ? BlockBuffer_.size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0); } - bool IsEmpty() const { - return !ItemCount_; - } void Clear(); TRope Finish(); diff --git a/ydb/library/yql/minikql/computation/mkql_spiller.cpp b/ydb/library/yql/minikql/computation/mkql_spiller.cpp deleted file mode 100644 index 803dac9c56..0000000000 --- a/ydb/library/yql/minikql/computation/mkql_spiller.cpp +++ /dev/null @@ -1,61 +0,0 @@ -#include "mkql_spiller.h" -#include <library/cpp/threading/future/core/future.h> -#include <util/system/thread.h> -#include <unordered_map> - -namespace NKikimr::NMiniKQL { - -//Dummy synchronous in-memory spiller -class TDummySpiller: public ISpiller{ -public: - TDummySpiller() - : NextKey(0) - {} - - NThreading::TFuture<TKey> Put(TRope&& blob) override { - auto promise = NThreading::NewPromise<ISpiller::TKey>(); -// TThread t([this, blob = std::move(blob), p = std::move(p)]() { -// WriteAsync(blob, p); -// }); -// t.Detach(); -// return f; - auto key = NextKey; - Storage[key] = blob; - NextKey++; - promise.SetValue(key); - return promise.GetFuture();; - } - std::optional<NThreading::TFuture<TRope>> Get(TKey key) override { - auto promise = NThreading::NewPromise<TRope>(); - if (auto it = Storage.find(key); it != Storage.end()) { - promise.SetValue(it->second); - return promise.GetFuture();; - } else { - return std::nullopt; - } - } - std::optional<NThreading::TFuture<TRope>> Extract(TKey key) override { - auto promise = NThreading::NewPromise<TRope>(); - if (auto it = Storage.find(key); it != Storage.end()) { - promise.SetValue(std::move(it->second)); - Storage.erase(it); - return promise.GetFuture();; - } else { - return std::nullopt; - } - } - NThreading::TFuture<void> Delete(TKey key) override { - auto promise = NThreading::NewPromise<void>(); - promise.SetValue(); - Storage.erase(key); - return promise.GetFuture(); - } -private: - ISpiller::TKey NextKey; - std::unordered_map<ISpiller::TKey, TRope> Storage; -}; -ISpiller::TPtr MakeSpiller() { - return std::make_shared<TDummySpiller>(); -} - -} //namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_spiller.h b/ydb/library/yql/minikql/computation/mkql_spiller.h deleted file mode 100644 index d9663d5985..0000000000 --- a/ydb/library/yql/minikql/computation/mkql_spiller.h +++ /dev/null @@ -1,27 +0,0 @@ -#pragma once -#include <library/cpp/threading/future/core/future.h> -#include <library/cpp/actors/util/rope.h> - - -namespace NKikimr::NMiniKQL { - -struct ISpiller { - using TPtr = std::shared_ptr<ISpiller>; - virtual ~ISpiller(){} - using TKey = ui64; - virtual NThreading::TFuture<TKey> Put(TRope&& blob) = 0; - - ///\return - /// nullopt for absent keys - /// TFuture - virtual std::optional<NThreading::TFuture<TRope>> Get(TKey key) = 0; - virtual NThreading::TFuture<void> Delete(TKey) = 0; - ///Get + Delete - ///Stored value may be moved to feature - virtual std::optional<NThreading::TFuture<TRope>> Extract(TKey key) = 0; - -}; - -ISpiller::TPtr MakeSpiller(); - -}//namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h b/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h deleted file mode 100644 index b3bba486bd..0000000000 --- a/ydb/library/yql/minikql/computation/mkql_spiller_adapter.h +++ /dev/null @@ -1,88 +0,0 @@ -#pragma once -#include "mkql_spiller.h" -#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h> - - -namespace NKikimr::NMiniKQL { - -///Stores and loads very long sequences of TMultiType UVs -///Can split sequences into chunks -///Sends chunks to ISplitter and keeps assigned keys -///When all data is written switches to read mode. Switching back to writing mode is not supported -///Provides an interface for sequential read (like forward iterator) -///When interaction with ISpiller is required, Write and Read operations return a Future -class TWideUnboxedValuesSpillerAdapter { -public: - TWideUnboxedValuesSpillerAdapter(ISpiller::TPtr spiller, const TMultiType* type, size_t sizeLimit) - : Spiller(spiller) - , ItemType(type) - , SizeLimit(sizeLimit) - , Packer(type) - { - } - - /// Write wide UV item - /// \returns - /// - nullopt, if thee values are accumulated - /// - TFeature, if the values are being stored asynchronously and a caller must wait until async operation ends - /// In this case a caller must wait operation completion and call StoreCompleted. - /// Design note: not using Subscribe on a Future here to avoid possible race condition - std::optional<NThreading::TFuture<ISpiller::TKey>> WriteWideItem(const TArrayRef<NUdf::TUnboxedValuePod>& wideItem) { - Packer.AddWideItem(wideItem.data(), wideItem.size()); - if(Packer.PackedSizeEstimate() > SizeLimit) { - return Spiller->Put(std::move(Packer.Finish())); - } else { - return std::nullopt; - } - } - - std::optional<NThreading::TFuture<ISpiller::TKey>> FinishWriting() { - if (Packer.IsEmpty()) - return std::nullopt; - return Spiller->Put(std::move(Packer.Finish())); - } - - void AsyncWriteCompleted(ISpiller::TKey key) { - StoredChunks.push_back(key); - } - - //Extracting interface - bool Empty() const { - return StoredChunks.empty() && !CurrentBatch; - } - std::optional<NThreading::TFuture<TRope>> ExtractWideItem(const TArrayRef<NUdf::TUnboxedValue>& wideItem) { - MKQL_ENSURE(!Empty(), "Internal logic error"); - if (CurrentBatch) { - auto row = CurrentBatch->Head(); - for (size_t i = 0; i != wideItem.size(); ++i) { - wideItem[i] = row[i]; - } - CurrentBatch->Pop(); - if (CurrentBatch->empty()) { - CurrentBatch = std::nullopt; - } - return std::nullopt; - } else { - auto r = Spiller->Get(StoredChunks.front()); - StoredChunks.pop_front(); - return r; - } - } - - void AsyncReadCompleted(TRope&& rope,const THolderFactory& holderFactory ) { - //Implementation detail: deserialization is performed in a processing thread - TUnboxedValueBatch batch(ItemType); - Packer.UnpackBatch(std::move(rope), holderFactory, batch); - CurrentBatch = std::move(batch); - } - -private: - ISpiller::TPtr Spiller; - const TMultiType* const ItemType; - const size_t SizeLimit; - TValuePackerTransport<false> Packer; - std::deque<ISpiller::TKey> StoredChunks; - std::optional<TUnboxedValueBatch> CurrentBatch; -}; - -}//namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp b/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp deleted file mode 100644 index 4f3953623c..0000000000 --- a/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp +++ /dev/null @@ -1,73 +0,0 @@ -#include <ydb/library/yql/minikql/mkql_node.h> -#include <ydb/library/yql/minikql/mkql_node_cast.h> -#include <ydb/library/yql/minikql/mkql_program_builder.h> -#include <ydb/library/yql/minikql/mkql_function_registry.h> -#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> -#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> -#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> -#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> -#include <ydb/library/yql/minikql/computation/mkql_spiller_adapter.h> -#include <ydb/library/yql/minikql/computation/mkql_spiller.h> - -#include <library/cpp/testing/unittest/registar.h> - -#include <vector> -#include <utility> -#include <algorithm> - -namespace NKikimr::NMiniKQL { - -Y_UNIT_TEST_SUITE(TestWideSpillerAdapter) { - constexpr size_t itemWidth = 3; - constexpr size_t chunkSize = 100; - Y_UNIT_TEST(TestWriteExtractZeroItems) { - TScopedAlloc alloc(__LOCATION__); - TTypeEnvironment env(alloc); - const auto spiller = MakeSpiller(); - std::vector<TType*> itemTypes(itemWidth, TDataType::Create(NUdf::TDataType<char*>::Id, env)); - TWideUnboxedValuesSpillerAdapter wideUVSpiller(spiller, TMultiType::Create(itemWidth, itemTypes.data(), env), chunkSize); - auto r = wideUVSpiller.FinishWriting(); - UNIT_ASSERT(!r.has_value()); - UNIT_ASSERT(wideUVSpiller.Empty()); - } - - Y_UNIT_TEST(TestWriteExtract) { - TScopedAlloc alloc(__LOCATION__); - TMemoryUsageInfo memInfo("test"); - THolderFactory holderFactory(alloc.Ref(), memInfo); - TTypeEnvironment env(alloc); - const auto spiller = MakeSpiller(); - std::vector<TType*> itemTypes(itemWidth, TDataType::Create(NUdf::TDataType<char*>::Id, env)); - TWideUnboxedValuesSpillerAdapter wideUVSpiller(spiller, TMultiType::Create(itemWidth, itemTypes.data(), env), chunkSize); - std::vector<NUdf::TUnboxedValue> wideValue(itemWidth); - constexpr size_t rowCount = chunkSize*10+3; - for (size_t row = 0; row != rowCount; ++row) { - for(size_t i = 0; i != itemWidth; ++i) { - wideValue[i] = NUdf::TUnboxedValuePod(NUdf::TStringValue(TStringBuilder() << "Long enough string: " << row * 10 + i)); - } - if (auto r = wideUVSpiller.WriteWideItem(wideValue)) { - wideUVSpiller.AsyncWriteCompleted(r->GetValue()); - } - } - auto r = wideUVSpiller.FinishWriting(); - if (r) { - wideUVSpiller.AsyncWriteCompleted(r->GetValue()); - } - - wideUVSpiller.AsyncWriteCompleted(r->GetValue()); - for (size_t row = 0; row != rowCount; ++row) { - UNIT_ASSERT(!wideUVSpiller.Empty()); - if (auto r = wideUVSpiller.ExtractWideItem(wideValue)) { - wideUVSpiller.AsyncReadCompleted(r->ExtractValue(), holderFactory); - r = wideUVSpiller.ExtractWideItem(wideValue); - UNIT_ASSERT(!r.has_value()); - } - for (size_t i = 0; i != itemWidth; ++i) { - UNIT_ASSERT_VALUES_EQUAL(TStringBuf(wideValue[i].AsStringRef()) , TStringBuilder() << "Long enough string: " << row * 10 + i); - } - } - UNIT_ASSERT(!wideUVSpiller.Empty()); - } -} - -} //namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt index 185dd5f550..2f9609b5a2 100644 --- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt +++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-arm64.txt @@ -44,7 +44,6 @@ target_sources(minikql-computation-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt index 185dd5f550..2f9609b5a2 100644 --- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.darwin-x86_64.txt @@ -44,7 +44,6 @@ target_sources(minikql-computation-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt index 38fd1bb839..e0b561b565 100644 --- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-aarch64.txt @@ -45,7 +45,6 @@ target_sources(minikql-computation-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt index 38fd1bb839..e0b561b565 100644 --- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.linux-x86_64.txt @@ -45,7 +45,6 @@ target_sources(minikql-computation-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt index 185dd5f550..2f9609b5a2 100644 --- a/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/computation/no_llvm/CMakeLists.windows-x86_64.txt @@ -44,7 +44,6 @@ target_sources(minikql-computation-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_custom_list.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_llvm_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate.cpp diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt index 8da2d8478b..61f2fe7012 100644 --- a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt +++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-arm64.txt @@ -41,7 +41,6 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp ) set_property( diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt index 440182defa..01dba3f99d 100644 --- a/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.darwin-x86_64.txt @@ -42,7 +42,6 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp ) set_property( diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt index 653994d7c9..67eac9242a 100644 --- a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-aarch64.txt @@ -45,7 +45,6 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp ) set_property( diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt index fc6f85827f..436ad6ceac 100644 --- a/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.linux-x86_64.txt @@ -46,7 +46,6 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp ) set_property( diff --git a/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt index bfc123d31d..4244aa874b 100644 --- a/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/computation/ut/CMakeLists.windows-x86_64.txt @@ -35,7 +35,6 @@ target_sources(ydb-library-yql-minikql-computation-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_validate_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_spiller_adapter_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/presort_ut.cpp ) set_property( diff --git a/ydb/library/yql/minikql/computation/ut/ya.make b/ydb/library/yql/minikql/computation/ut/ya.make index 819a216fd6..b4c2ff659f 100644 --- a/ydb/library/yql/minikql/computation/ut/ya.make +++ b/ydb/library/yql/minikql/computation/ut/ya.make @@ -21,7 +21,6 @@ SRCS( mkql_computation_pattern_cache_ut.cpp mkql_validate_ut.cpp mkql_value_builder_ut.cpp - mkql_spiller_adapter_ut.cpp presort_ut.cpp ) diff --git a/ydb/library/yql/minikql/computation/ya.make.inc b/ydb/library/yql/minikql/computation/ya.make.inc index 11e1687462..424a78ebdd 100644 --- a/ydb/library/yql/minikql/computation/ya.make.inc +++ b/ydb/library/yql/minikql/computation/ya.make.inc @@ -13,7 +13,6 @@ SRCS( mkql_computation_node_impl.cpp mkql_computation_node_pack.cpp mkql_computation_node_pack_impl.cpp - mkql_spiller.cpp mkql_custom_list.cpp mkql_llvm_base.cpp mkql_validate.cpp |